78 lines
2.6 KiB
Python
78 lines
2.6 KiB
Python
import pytest
|
|
import uuid
|
|
from channels.testing import WebsocketCommunicator
|
|
from project.config.asgi import application
|
|
from project.remotectl.models import RemoteHost
|
|
|
|
class DummyProc:
|
|
def __init__(self, chunks, exit_status=0):
|
|
self._chunks = chunks
|
|
self._exit_status_value = exit_status
|
|
self.stdout_iter = self._iter_stream('stdout')
|
|
self.stderr_iter = self._iter_stream('stderr')
|
|
self.stdout = self._aiter(self.stdout_iter)
|
|
self.stderr = self._aiter(self.stderr_iter)
|
|
self._done = False
|
|
|
|
def _iter_stream(self, which):
|
|
for stream, data in self._chunks:
|
|
if stream == which:
|
|
yield data
|
|
|
|
async def _aiter(self, it):
|
|
for item in it:
|
|
yield item
|
|
self._done = True
|
|
|
|
@property
|
|
def exit_status(self):
|
|
return self._exit_status_value if self._done else None
|
|
|
|
def terminate(self):
|
|
self._done = True
|
|
|
|
def kill(self):
|
|
self._done = True
|
|
|
|
class DummyConn:
|
|
def __init__(self, procs):
|
|
self.procs = procs
|
|
async def create_process(self, command):
|
|
return self.procs.pop(0)
|
|
def close(self):
|
|
pass
|
|
|
|
@pytest.mark.django_db(transaction=True)
|
|
@pytest.mark.asyncio
|
|
async def test_run_simple_command(monkeypatch, django_user_model):
|
|
user = await django_user_model.objects.acreate(username='user')
|
|
host = await RemoteHost.objects.acreate(name='h', hostname='localhost', username='u')
|
|
|
|
from project.remotectl import consumers
|
|
async def fake_open_connection(rh):
|
|
return DummyConn([DummyProc([('stdout', 'line1\n'), ('stdout', 'line2\n')], exit_status=0)])
|
|
monkeypatch.setattr(consumers, 'open_connection', fake_open_connection)
|
|
|
|
session_id = uuid.uuid4()
|
|
communicator = WebsocketCommunicator(application, f"/ws/ssh/{session_id}/stream/")
|
|
communicator.scope['user'] = user
|
|
connected, _ = await communicator.connect()
|
|
assert connected
|
|
msg = await communicator.receive_json_from()
|
|
assert msg['event'] == 'connected'
|
|
|
|
await communicator.send_json_to({'action':'start','host_id': host.id, 'command':'echo hi'})
|
|
started_or_error = await communicator.receive_json_from()
|
|
assert started_or_error['event'] == 'started'
|
|
|
|
chunks = []
|
|
while True:
|
|
msg = await communicator.receive_json_from()
|
|
if msg['event'] == 'chunk':
|
|
chunks.append(msg['data'].strip())
|
|
elif msg['event'] == 'completed':
|
|
assert msg['status'] == 'ok'
|
|
break
|
|
assert 'line1' in chunks and 'line2' in chunks
|
|
await communicator.disconnect()
|