remoteadmin/project/remotectl/tests/test_ws_execution.py
2025-10-15 17:41:05 +02:00

78 lines
2.6 KiB
Python

import pytest
import uuid
from channels.testing import WebsocketCommunicator
from project.config.asgi import application
from project.remotectl.models import RemoteHost
class DummyProc:
def __init__(self, chunks, exit_status=0):
self._chunks = chunks
self._exit_status_value = exit_status
self.stdout_iter = self._iter_stream('stdout')
self.stderr_iter = self._iter_stream('stderr')
self.stdout = self._aiter(self.stdout_iter)
self.stderr = self._aiter(self.stderr_iter)
self._done = False
def _iter_stream(self, which):
for stream, data in self._chunks:
if stream == which:
yield data
async def _aiter(self, it):
for item in it:
yield item
self._done = True
@property
def exit_status(self):
return self._exit_status_value if self._done else None
def terminate(self):
self._done = True
def kill(self):
self._done = True
class DummyConn:
def __init__(self, procs):
self.procs = procs
async def create_process(self, command):
return self.procs.pop(0)
def close(self):
pass
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_run_simple_command(monkeypatch, django_user_model):
user = await django_user_model.objects.acreate(username='user')
host = await RemoteHost.objects.acreate(name='h', hostname='localhost', username='u')
from project.remotectl import consumers
async def fake_open_connection(rh):
return DummyConn([DummyProc([('stdout', 'line1\n'), ('stdout', 'line2\n')], exit_status=0)])
monkeypatch.setattr(consumers, 'open_connection', fake_open_connection)
session_id = uuid.uuid4()
communicator = WebsocketCommunicator(application, f"/ws/ssh/{session_id}/stream/")
communicator.scope['user'] = user
connected, _ = await communicator.connect()
assert connected
msg = await communicator.receive_json_from()
assert msg['event'] == 'connected'
await communicator.send_json_to({'action':'start','host_id': host.id, 'command':'echo hi'})
started_or_error = await communicator.receive_json_from()
assert started_or_error['event'] == 'started'
chunks = []
while True:
msg = await communicator.receive_json_from()
if msg['event'] == 'chunk':
chunks.append(msg['data'].strip())
elif msg['event'] == 'completed':
assert msg['status'] == 'ok'
break
assert 'line1' in chunks and 'line2' in chunks
await communicator.disconnect()