Skip to content

Commit 4921c48

Browse files
committed
use existing cereal bridge better in webrtcd, clean up support files
1 parent ba6f8df commit 4921c48

7 files changed

Lines changed: 78 additions & 390 deletions

File tree

cereal/log.capnp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2360,6 +2360,15 @@ struct SoundRequest {
23602360
sound @0 :Car.CarControl.HUDControl.AudibleAlert;
23612361
}
23622362

2363+
struct LiveStreamCamera {
2364+
camera @0 :CameraType;
2365+
2366+
enum CameraType {
2367+
driver @0;
2368+
wideRoad @1;
2369+
}
2370+
}
2371+
23632372
struct Touch {
23642373
sec @0 :Int64;
23652374
usec @1 :Int64;
@@ -2479,7 +2488,7 @@ struct Event {
24792488
livestreamDriverEncodeData @122 :EncodeData;
24802489

24812490
soundRequest @151 :SoundRequest;
2482-
webrtcAudioData @152 :AudioData;
2491+
liveStreamCamera @152 :LiveStreamCamera;
24832492

24842493
# *********** Custom: reserved for forks ***********
24852494

cereal/services.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def __init__(self, should_log: bool, frequency: float, decimation: Optional[int]
8686
"bookmarkButton": (True, 0., 1),
8787
"audioFeedback": (True, 0., 1),
8888
"soundRequest": (False, 0.),
89+
"liveStreamCamera": (False, 0.),
8990
"webrtcAudioData": (False, 0.),
9091
"roadEncodeData": (False, 20., None, QueueSize.BIG),
9192
"driverEncodeData": (False, 20., None, QueueSize.BIG),

selfdrive/ui/soundd.py

Lines changed: 6 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
from collections import deque
21
import math
32
import numpy as np
4-
import threading
53
import time
64
import wave
75

@@ -17,8 +15,6 @@
1715

1816
SAMPLE_RATE = 48000
1917
SAMPLE_BUFFER = 4096 # (approx 100ms)
20-
MAX_WEBRTC_BUFFER_SAMPLES = SAMPLE_RATE
21-
WEBRTC_START_BUFFER_SAMPLES = SAMPLE_BUFFER + (SAMPLE_RATE // 50) # keep headroom over 20ms WebRTC chunks
2218
MAX_VOLUME = 1.0
2319
MIN_VOLUME = 0.1
2420
ALERT_RAMP_TIME = 4 # seconds to ramp to max volume for warningImmediate
@@ -79,11 +75,6 @@ def __init__(self):
7975
self.selfdrive_timeout_alert = False
8076

8177
self.spl_filter_weighted = FirstOrderFilter(0, 2.5, FILTER_DT, initialized=False)
82-
self.webrtc_buffer: deque[np.ndarray] = deque()
83-
self.webrtc_buffer_offset = 0
84-
self.webrtc_buffer_size = 0
85-
self.webrtc_playing = False
86-
self.webrtc_lock = threading.Lock()
8778

8879
def load_sounds(self):
8980
self.loaded_sounds: dict[int, np.ndarray] = {}
@@ -121,85 +112,10 @@ def get_sound_data(self, frames): # get "frames" worth of data from the current
121112

122113
return ret * self.current_volume
123114

124-
def _trim_webrtc_buffer(self):
125-
overflow = self.webrtc_buffer_size - MAX_WEBRTC_BUFFER_SAMPLES
126-
while overflow > 0 and self.webrtc_buffer:
127-
chunk = self.webrtc_buffer[0]
128-
available = chunk.size - self.webrtc_buffer_offset
129-
drop = min(overflow, available)
130-
self.webrtc_buffer_offset += drop
131-
self.webrtc_buffer_size -= drop
132-
overflow -= drop
133-
134-
if self.webrtc_buffer_offset >= chunk.size:
135-
self.webrtc_buffer.popleft()
136-
self.webrtc_buffer_offset = 0
137-
138-
def _clear_webrtc_buffer_locked(self):
139-
self.webrtc_buffer.clear()
140-
self.webrtc_buffer_offset = 0
141-
self.webrtc_buffer_size = 0
142-
143-
def add_webrtc_audio(self, audio_data: bytes, sample_rate: int):
144-
if sample_rate != SAMPLE_RATE:
145-
cloudlog.warning(f"soundd dropping webrtc audio with unexpected sample rate: {sample_rate}")
146-
return
147-
if not audio_data:
148-
return
149-
150-
samples = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / (2**15)
151-
if samples.size == 0:
152-
return
153-
154-
with self.webrtc_lock:
155-
self.webrtc_buffer.append(samples)
156-
self.webrtc_buffer_size += samples.size
157-
self._trim_webrtc_buffer()
158-
159-
def get_webrtc_audio(self, frames: int) -> np.ndarray:
160-
out = np.zeros(frames, dtype=np.float32)
161-
162-
with self.webrtc_lock:
163-
if not self.webrtc_playing:
164-
if self.webrtc_buffer_size < max(frames, WEBRTC_START_BUFFER_SAMPLES):
165-
return out
166-
self.webrtc_playing = True
167-
168-
if self.webrtc_buffer_size < frames:
169-
self._clear_webrtc_buffer_locked()
170-
self.webrtc_playing = False
171-
return out
172-
173-
written = 0
174-
while written < frames and self.webrtc_buffer:
175-
chunk = self.webrtc_buffer[0]
176-
available = chunk.size - self.webrtc_buffer_offset
177-
take = min(frames - written, available)
178-
out[written:written + take] = chunk[self.webrtc_buffer_offset:self.webrtc_buffer_offset + take]
179-
written += take
180-
self.webrtc_buffer_offset += take
181-
182-
if self.webrtc_buffer_offset >= chunk.size:
183-
self.webrtc_buffer.popleft()
184-
self.webrtc_buffer_offset = 0
185-
186-
self.webrtc_buffer_size -= written
187-
188-
return out
189-
190-
def webrtc_audio_thread(self, sock) -> None:
191-
while True:
192-
for msg in messaging.drain_sock(sock, wait_for_one=True):
193-
audio = msg.webrtcAudioData
194-
self.add_webrtc_audio(audio.data, audio.sampleRate)
195-
196115
def callback(self, data_out: np.ndarray, frames: int, time, status) -> None:
197116
if status:
198117
cloudlog.warning(f"soundd stream over/underflow: {status}")
199-
sound = self.get_sound_data(frames)
200-
sound += self.get_webrtc_audio(frames)
201-
np.clip(sound, -1.0, 1.0, out=sound)
202-
data_out[:frames, 0] = sound
118+
data_out[:frames, 0] = self.get_sound_data(frames)
203119

204120
def update_alert(self, new_alert):
205121
current_alert_played_once = self.current_alert == AudibleAlert.none or self.current_sound_frame > len(self.loaded_sounds[self.current_alert])
@@ -211,14 +127,7 @@ def update_alert(self, new_alert):
211127
self.current_sound_frame = 0
212128

213129
def get_audible_alert(self, sm):
214-
sound_request_updated = False
215-
if sm.updated['soundRequest']:
216-
new_alert = sm['soundRequest'].sound.raw
217-
if new_alert != AudibleAlert.none:
218-
self.update_alert(new_alert)
219-
sound_request_updated = True
220-
221-
if sm.updated['selfdriveState'] and not sound_request_updated:
130+
if sm.updated['selfdriveState']:
222131
new_alert = sm['selfdriveState'].alertSound.raw
223132
self.update_alert(new_alert)
224133
elif check_selfdrive_timeout_alert(sm):
@@ -227,6 +136,10 @@ def get_audible_alert(self, sm):
227136
elif self.selfdrive_timeout_alert:
228137
self.update_alert(AudibleAlert.none)
229138
self.selfdrive_timeout_alert = False
139+
elif sm.updated['soundRequest']:
140+
new_alert = sm['soundRequest'].sound.raw
141+
if new_alert != AudibleAlert.none:
142+
self.update_alert(new_alert)
230143

231144
def calculate_volume(self, weighted_db):
232145
volume = ((weighted_db - AMBIENT_DB) / DB_SCALE) * (MAX_VOLUME - MIN_VOLUME) + MIN_VOLUME
@@ -244,8 +157,6 @@ def soundd_thread(self):
244157
import sounddevice as sd
245158

246159
sm = messaging.SubMaster(['selfdriveState', 'soundPressure', 'soundRequest'])
247-
webrtc_audio_sock = messaging.sub_sock('webrtcAudioData', conflate=False)
248-
threading.Thread(target=self.webrtc_audio_thread, args=(webrtc_audio_sock,), daemon=True).start()
249160

250161
with self.get_stream(sd) as stream:
251162
rk = Ratekeeper(20)

system/athena/athenad.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -571,7 +571,7 @@ def getNetworks():
571571
@dispatcher.add_method
572572
def startJoystickStream(sdp: str) -> dict:
573573
from openpilot.system.webrtc.webrtcd import StreamRequestBody
574-
body = StreamRequestBody(sdp, ["driver"], ["testJoystick"], ["carState"])
574+
body = StreamRequestBody(sdp, ["driver"], ["testJoystick", "soundRequest", "liveStreamCamera"], ["carState"])
575575
try:
576576
resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream",
577577
json=asdict(body), timeout=10)

system/loggerd/encoderd.cc

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,15 +151,12 @@ void encoderd_thread(const LogCameraInfo (&cameras)[N]) {
151151
}
152152
}
153153

154-
// Map param value to stream camera config
155-
const LogCameraInfo *find_stream_camera(const std::string &name) {
156-
if (name == "driver") return &stream_driver_camera_info;
157-
return &stream_wide_road_camera_info; // default
154+
const LogCameraInfo *find_stream_camera(cereal::LiveStreamCamera::CameraType type) {
155+
if (type == cereal::LiveStreamCamera::CameraType::DRIVER) return &stream_driver_camera_info;
156+
return &stream_wide_road_camera_info;
158157
}
159158

160159
void stream_encoderd_thread() {
161-
Params params;
162-
163160
// Wait for cameras to be available
164161
std::set<VisionStreamType> available_streams;
165162
while (!do_exit) {
@@ -168,16 +165,17 @@ void stream_encoderd_thread() {
168165
util::sleep_for(100);
169166
}
170167

171-
std::string active_camera = params.get("LivestreamCamera");
172-
if (active_camera.empty()) active_camera = "driver";
168+
SubMaster sm({"liveStreamCamera"});
169+
170+
auto active_camera = cereal::LiveStreamCamera::CameraType::DRIVER;
173171

174172
while (!do_exit) {
175173
const LogCameraInfo *cam_info = find_stream_camera(active_camera);
176174

177175
// Check that the requested camera stream is available
178176
if (available_streams.find(cam_info->stream_type) == available_streams.end()) {
179-
LOGE("stream encoder: camera %s not available, falling back", active_camera.c_str());
180-
active_camera = "wideRoad";
177+
LOGE("stream encoder: requested camera not available, falling back to wideRoad");
178+
active_camera = cereal::LiveStreamCamera::CameraType::WIDE_ROAD;
181179
cam_info = find_stream_camera(active_camera);
182180
}
183181

@@ -188,7 +186,7 @@ void stream_encoderd_thread() {
188186
}
189187

190188
const VisionBuf &buf_info = vipc_client.buffers[0];
191-
LOGW("stream encoder init %s %zux%zu", active_camera.c_str(), buf_info.width, buf_info.height);
189+
LOGW("stream encoder init %zux%zu", buf_info.width, buf_info.height);
192190
assert(buf_info.width > 0 && buf_info.height > 0);
193191

194192
const auto &encoder_info = cam_info->encoder_infos[0];
@@ -197,11 +195,14 @@ void stream_encoderd_thread() {
197195

198196
while (!do_exit) {
199197
// Check for camera switch request
200-
std::string requested = params.get("LivestreamCamera");
201-
if (!requested.empty() && requested != active_camera) {
202-
LOGW("stream encoder switching from %s to %s", active_camera.c_str(), requested.c_str());
203-
active_camera = requested;
204-
break; // break to reinit encoder with new camera
198+
sm.update(0);
199+
if (sm.updated("liveStreamCamera")) {
200+
auto requested = sm["liveStreamCamera"].getLiveStreamCamera().getCamera();
201+
if (requested != active_camera) {
202+
LOGW("stream encoder switching camera");
203+
active_camera = requested;
204+
break; // break to reinit encoder with new camera
205+
}
205206
}
206207

207208
VisionIpcBufExtra extra;

0 commit comments

Comments
 (0)