Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ members = [
]

[workspace.dependencies]
savant_core = { git = "https://github.com/insight-platform/savant-rs", tag = "0.3.5" }
savant_core = { git = "https://github.com/insight-platform/savant-rs", branch = "opentelemetry-update" }
savant-protobuf = { git = "https://github.com/insight-platform/savant-protobuf", tag = "0.2.0" }
serde_json = "1"
serde_yaml = "0.9.34-deprecated"
Expand All @@ -22,8 +22,12 @@ env_logger = "0.11"
actix-web = { version = "4", features = ["openssl"] }
actix-protobuf = "0.10"
actix-web-httpauth = "0.8.2"
http = "1.1.0"
openssl = "0.10"
mockall = "0.12"
opentelemetry = "0.24.0"
opentelemetry-http = "0.13.0"
parking_lot = "0.12.3"

[workspace.package]
version = "0.2.0"
Expand Down
1 change: 1 addition & 0 deletions media_gateway_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "signal", "sync"] }
log = { workspace = true }
env_logger = { workspace = true }
actix-web = { workspace = true }
opentelemetry = { workspace = true }

media_gateway_common = { path = "../media_gateway_common" }

Expand Down
23 changes: 20 additions & 3 deletions media_gateway_client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
//! The media gateway client.
//!
//! The module provides [`GatewayClient`] and [`ForwardResult`].
use std::fmt::{Display, Formatter};
use std::fs;

use anyhow::anyhow;
use http_auth_basic::Credentials;
use opentelemetry::Context;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use reqwest::{Certificate, Client, Identity, StatusCode};

use media_gateway_common::model::Media;
use media_gateway_common::telemetry::{get_propagated_context, propagate_header_context};

use crate::configuration::GatewayClientConfiguration;

Expand All @@ -25,6 +28,12 @@ pub enum ForwardResult {
AckTimeout,
}

impl Display for ForwardResult {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:?}", self))
}
}

/// The client for the media gateway server.
///
/// The recommended way to create a new instance is via [`GatewayClientConfiguration`].
Expand Down Expand Up @@ -64,12 +73,20 @@ impl GatewayClient {
}

/// Receives the messages using [`SyncReader`] and sends it to the media gateway server.
pub async fn forward_message(&self, media: &Media) -> anyhow::Result<ForwardResult> {
pub async fn forward_message(&self, media: &mut Media) -> anyhow::Result<ForwardResult> {
let ctx = Context::current();

media.update_context(get_propagated_context(&ctx));
let data = media.to_proto()?;

let mut headers = HeaderMap::new();
propagate_header_context(&mut headers, &ctx);

let send_result = self
.client
.post(&self.url)
.body(data)
.headers(headers)
.header(CONTENT_TYPE, "application/protobuf")
.send()
.await;
Expand Down Expand Up @@ -190,7 +207,7 @@ mod tests {
let message = Message::unknown("message".to_string());
let topic = "topic";
let data: Vec<&[u8]> = vec![&[1]];
let media = Media {
let mut media = Media {
message: Option::from(savant_protobuf::generated::Message::from(&message)),
topic: topic.as_bytes().to_vec(),
data: data.iter().map(|e| e.to_vec()).collect::<Vec<Vec<u8>>>(),
Expand Down Expand Up @@ -218,7 +235,7 @@ mod tests {

let client = GatewayClient::new(Client::default(), gateway_url);

let actual_result = client.forward_message(&media).await;
let actual_result = client.forward_message(&mut media).await;

match expected_result {
Ok(expected_forward_result) => {
Expand Down
3 changes: 3 additions & 0 deletions media_gateway_client/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! The module provides [`GatewayClientConfiguration`].
use std::time::Duration;

use savant_core::telemetry::TelemetryConfiguration;
use savant_core::transport::zeromq::{NonBlockingReader, ReaderConfigBuilder};
use serde::{Deserialize, Serialize};
use twelf::{config, Layer};
Expand Down Expand Up @@ -43,6 +44,8 @@ pub struct GatewayClientConfiguration {
pub auth: Option<AuthConfiguration>,
/// Statistics settings
pub statistics: Option<StatisticsConfiguration>,
/// OpenTelemetry settings
pub telemetry: Option<TelemetryConfiguration>,
}

impl GatewayClientConfiguration {
Expand Down
13 changes: 11 additions & 2 deletions media_gateway_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tokio::signal::{ctrl_c, unix};

use media_gateway_common::api::health;
use media_gateway_common::health::HealthService;
use media_gateway_common::telemetry;

use crate::configuration::GatewayClientConfiguration;
use crate::service::GatewayClientService;
Expand Down Expand Up @@ -58,6 +59,10 @@ async fn main() -> Result<()> {
let conf = GatewayClientConfiguration::new(&conf_arg)?;
let bind_address = (conf.ip.as_str(), conf.port);

if let Some(telemetry_conf) = conf.telemetry.as_ref() {
telemetry::init(telemetry_conf);
}

let health_service = web::Data::new(HealthService::new());
let service = Arc::new(GatewayClientService::try_from(&conf)?);
let service_to_stop = service.clone();
Expand All @@ -79,13 +84,17 @@ async fn main() -> Result<()> {

tokio::spawn(async move { service.run().await });

HttpServer::new(move || {
let result = HttpServer::new(move || {
App::new()
.app_data(health_service.clone())
.route("/health", web::get().to(health))
})
.bind(bind_address)?
.run()
.await
.map_err(anyhow::Error::from)
.map_err(anyhow::Error::from);

telemetry::shutdown();

result
}
67 changes: 61 additions & 6 deletions media_gateway_client/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ use std::sync::{Arc, OnceLock};
use std::time::Duration;

use anyhow::{anyhow, bail, Result};
use opentelemetry::trace::{FutureExt, Status, TraceContextExt};
use opentelemetry::KeyValue;
use savant_core::transport::zeromq::{NonBlockingReader, ReaderResult};
use tokio::sync::{mpsc, Mutex};
use tokio_timerfd::sleep;

use media_gateway_common::model::Media;
use media_gateway_common::statistics::StatisticsService;
use media_gateway_common::telemetry::{get_context_with_span, get_message_context};

use crate::client::{ForwardResult, GatewayClient};
use crate::configuration::GatewayClientConfiguration;
Expand Down Expand Up @@ -85,6 +88,11 @@ impl GatewayClientService {
..
} => {
log::debug!("Success while reading message");
let parent_ctx = get_message_context(&message);
let ctx = get_context_with_span("process", &parent_ctx);
let queue_ctx = get_context_with_span("queue", &ctx);
let queue_span = queue_ctx.span();

let id = match reader_statistics_service.as_ref() {
Some(service) => match service.register_message_start() {
Ok(id) => Some(id),
Expand All @@ -105,10 +113,20 @@ impl GatewayClientService {
topic,
data,
};
if let Err(e) = sender.send((id, media)).await {
if let Err(e) = sender.send((id, media, ctx)).await {
log::warn!("Error while sharing message: {:?}", e);
if queue_span.is_recording() {
queue_span.record_error(&e);
queue_span
.set_status(Status::error("error while sharing message"));
}
queue_span.end();
break;
}
if queue_span.is_recording() {
queue_span.set_status(Status::Ok);
}
queue_span.end();
}
ReaderResult::Timeout => {
log::debug!(
Expand Down Expand Up @@ -136,10 +154,17 @@ impl GatewayClientService {

let sender_task: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
log::info!("Message sending is started");
while let Some((id, media)) = receiver.recv().await {
while let Some((id, mut media, ctx)) = receiver.recv().await {
let ctx = get_context_with_span("forward", &ctx);
let span = ctx.span();

let mut retry: Option<Retry> = None;
loop {
let forward_result = client.forward_message(&media).await;
let retry_number = retry.as_ref().map_or(0, |e| e.number());
let forward_result = client
.forward_message(&mut media)
.with_context(ctx.clone())
.await;
match forward_result {
Ok(ForwardResult::Success) => {
if let Some(stat_id) = id {
Expand All @@ -160,21 +185,51 @@ impl GatewayClientService {
} else {
log::debug!("Success while sending message (retry=0)");
}
if span.is_recording() {
span.add_event(
"attempt",
vec![
KeyValue::new("number", retry_number as i64),
KeyValue::new("result", ForwardResult::Success.to_string()),
],
);
span.set_status(Status::Ok);
}
span.end();
ctx.span().end();
break;
}
Ok(result) => {
log::warn!(
"Failure while sending message (retry={}): {:?}",
retry.as_ref().map_or(0, |e| e.number()),
retry_number,
result
);
if span.is_recording() {
span.add_event(
"attempt",
vec![
KeyValue::new("number", retry_number as i64),
KeyValue::new("result", result.to_string()),
],
);
}
}
Err(e) => {
log::warn!(
"Error while sending message (retry={}): {:?}",
retry.as_ref().map_or(0, |e| e.number()),
retry_number,
e
)
);
if span.is_recording() {
span.add_event(
"attempt",
vec![
KeyValue::new("number", retry_number as i64),
KeyValue::new("error", e.to_string()),
],
);
}
}
}
let next_retry = sender_retry_strategy.next_retry(retry);
Expand Down
5 changes: 5 additions & 0 deletions media_gateway_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ anyhow = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
actix-web = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-http = { workspace = true }
http = { workspace = true }
parking_lot = { workspace = true }
log = { workspace = true }

prost = "0.12"
prost-types = "0.12"
Expand Down
2 changes: 2 additions & 0 deletions media_gateway_common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ pub mod health;
pub mod api;

pub mod statistics;

pub mod telemetry;
8 changes: 8 additions & 0 deletions media_gateway_common/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
//!
//! The module provides [`Media`] struct that can be converted from/to
//! [protocol buffers](https://protobuf.dev/).

use savant_core::otlp::PropagatedContext;
use savant_protobuf::generated::Message;

/// A struct that contains all information required to forward a message.
Expand Down Expand Up @@ -34,6 +36,12 @@ impl Media {
let media = Media::decode(bytes)?;
Ok(media)
}

pub fn update_context(&mut self, ctx: PropagatedContext) {
if let Some(message) = self.message.as_mut() {
message.propagated_context = ctx.0;
}
}
}

#[cfg(test)]
Expand Down
Loading