import pytest import uuid from channels.testing import WebsocketCommunicator from project.config.asgi import application from project.remotectl.models import RemoteHost class DummyProc: def __init__(self, chunks, exit_status=0): self._chunks = chunks self._exit_status_value = exit_status self.stdout_iter = self._iter_stream('stdout') self.stderr_iter = self._iter_stream('stderr') self.stdout = self._aiter(self.stdout_iter) self.stderr = self._aiter(self.stderr_iter) self._done = False def _iter_stream(self, which): for stream, data in self._chunks: if stream == which: yield data async def _aiter(self, it): for item in it: yield item self._done = True @property def exit_status(self): return self._exit_status_value if self._done else None def terminate(self): self._done = True def kill(self): self._done = True class DummyConn: def __init__(self, procs): self.procs = procs async def create_process(self, command): return self.procs.pop(0) def close(self): pass @pytest.mark.django_db(transaction=True) @pytest.mark.asyncio async def test_run_simple_command(monkeypatch, django_user_model): user = await django_user_model.objects.acreate(username='user') host = await RemoteHost.objects.acreate(name='h', hostname='localhost', username='u') from project.remotectl import consumers async def fake_open_connection(rh): return DummyConn([DummyProc([('stdout', 'line1\n'), ('stdout', 'line2\n')], exit_status=0)]) monkeypatch.setattr(consumers, 'open_connection', fake_open_connection) session_id = uuid.uuid4() communicator = WebsocketCommunicator(application, f"/ws/ssh/{session_id}/stream/") communicator.scope['user'] = user connected, _ = await communicator.connect() assert connected msg = await communicator.receive_json_from() assert msg['event'] == 'connected' await communicator.send_json_to({'action':'start','host_id': host.id, 'command':'echo hi'}) started_or_error = await communicator.receive_json_from() assert started_or_error['event'] == 'started' chunks = [] while True: msg = await communicator.receive_json_from() if msg['event'] == 'chunk': chunks.append(msg['data'].strip()) elif msg['event'] == 'completed': assert msg['status'] == 'ok' break assert 'line1' in chunks and 'line2' in chunks await communicator.disconnect()