-
Notifications
You must be signed in to change notification settings - Fork 10.9k
Expand file tree
/
Copy pathtest_stream_session.py
More file actions
87 lines (66 loc) · 2.99 KB
/
test_stream_session.py
File metadata and controls
87 lines (66 loc) · 2.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import asyncio
import json
import time
# for aiortc and its dependencies
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning) # TODO: remove this when google-crc32c publish a python3.12 wheel
from aiortc import RTCDataChannel
from aiortc.mediastreams import VIDEO_CLOCK_RATE, VIDEO_TIME_BASE
import capnp
from cereal import messaging, log
from openpilot.system.webrtc.webrtcd import CerealOutgoingMessageProxy, CerealIncomingMessageProxy
from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack
class TestStreamSession:
def setup_method(self):
self.loop = asyncio.new_event_loop()
def teardown_method(self):
self.loop.stop()
self.loop.close()
def test_outgoing_proxy(self, mocker):
test_msg = log.Event.new_message()
test_msg.logMonoTime = 123
test_msg.valid = True
test_msg.customReservedRawData0 = b"test"
expected_dict = {"type": "customReservedRawData0", "logMonoTime": 123, "valid": True, "data": "test"}
expected_json = json.dumps(expected_dict).encode()
channel = mocker.Mock(spec=RTCDataChannel)
mocked_submaster = messaging.SubMaster(["customReservedRawData0"])
def mocked_update(t):
mocked_submaster.update_msgs(0, [test_msg])
mocker.patch.object(messaging.SubMaster, "update", side_effect=mocked_update)
proxy = CerealOutgoingMessageProxy(mocked_submaster)
proxy.add_channel(channel)
proxy.update()
channel.send.assert_called_once_with(expected_json)
def test_incoming_proxy(self, mocker):
tested_msgs = [
{"type": "customReservedRawData0", "data": "test"}, # primitive
{"type": "can", "data": [{"address": 0, "dat": "", "src": 0}]}, # list
{"type": "testJoystick", "data": {"axes": [0, 0], "buttons": [False]}}, # dict
]
mocked_pubmaster = mocker.MagicMock(spec=messaging.PubMaster)
proxy = CerealIncomingMessageProxy(mocked_pubmaster)
for msg in tested_msgs:
proxy.send(json.dumps(msg).encode())
mocked_pubmaster.send.assert_called_once()
mt, md = mocked_pubmaster.send.call_args.args
assert mt == msg["type"]
assert isinstance(md, capnp._DynamicStructBuilder)
assert hasattr(md, msg["type"])
mocked_pubmaster.reset_mock()
def test_livestream_track(self, mocker):
fake_msg = messaging.new_message("livestreamCameraEncodeData")
config = {"receive.return_value": fake_msg.to_bytes()}
mocker.patch("msgq.SubSocket", spec=True, **config)
track = LiveStreamVideoStreamTrack("driver")
assert track.id.startswith("driver")
assert track.codec_preference() == "H264"
for i in range(5):
packet = self.loop.run_until_complete(track.recv())
assert packet.time_base == VIDEO_TIME_BASE
if i == 0:
start_ns = time.monotonic_ns()
start_pts = packet.pts
assert abs(i + packet.pts - (start_pts + (((time.monotonic_ns() - start_ns) * VIDEO_CLOCK_RATE) // 1_000_000_000))) < 450 #5ms
assert packet.size == 0