diff --git a/Cargo.toml b/Cargo.toml index e07d7a7..9d79448 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ members = [ ] [workspace.dependencies] -savant_core = { git = "https://github.com/insight-platform/savant-rs", tag = "0.3.5" } +savant_core = { git = "https://github.com/insight-platform/savant-rs", branch = "opentelemetry-update" } savant-protobuf = { git = "https://github.com/insight-platform/savant-protobuf", tag = "0.2.0" } serde_json = "1" serde_yaml = "0.9.34-deprecated" @@ -22,8 +22,12 @@ env_logger = "0.11" actix-web = { version = "4", features = ["openssl"] } actix-protobuf = "0.10" actix-web-httpauth = "0.8.2" +http = "1.1.0" openssl = "0.10" mockall = "0.12" +opentelemetry = "0.24.0" +opentelemetry-http = "0.13.0" +parking_lot = "0.12.3" [workspace.package] version = "0.2.0" diff --git a/media_gateway_client/Cargo.toml b/media_gateway_client/Cargo.toml index 06c361a..ee619d4 100644 --- a/media_gateway_client/Cargo.toml +++ b/media_gateway_client/Cargo.toml @@ -21,6 +21,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "signal", "sync"] } log = { workspace = true } env_logger = { workspace = true } actix-web = { workspace = true } +opentelemetry = { workspace = true } media_gateway_common = { path = "../media_gateway_common" } diff --git a/media_gateway_client/src/client.rs b/media_gateway_client/src/client.rs index 4891b61..e0e3cb5 100644 --- a/media_gateway_client/src/client.rs +++ b/media_gateway_client/src/client.rs @@ -1,14 +1,17 @@ //! The media gateway client. //! //! The module provides [`GatewayClient`] and [`ForwardResult`]. +use std::fmt::{Display, Formatter}; use std::fs; use anyhow::anyhow; use http_auth_basic::Credentials; +use opentelemetry::Context; use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE}; use reqwest::{Certificate, Client, Identity, StatusCode}; use media_gateway_common::model::Media; +use media_gateway_common::telemetry::{get_propagated_context, propagate_header_context}; use crate::configuration::GatewayClientConfiguration; @@ -25,6 +28,12 @@ pub enum ForwardResult { AckTimeout, } +impl Display for ForwardResult { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{:?}", self)) + } +} + /// The client for the media gateway server. /// /// The recommended way to create a new instance is via [`GatewayClientConfiguration`]. @@ -64,12 +73,20 @@ impl GatewayClient { } /// Receives the messages using [`SyncReader`] and sends it to the media gateway server. - pub async fn forward_message(&self, media: &Media) -> anyhow::Result { + pub async fn forward_message(&self, media: &mut Media) -> anyhow::Result { + let ctx = Context::current(); + + media.update_context(get_propagated_context(&ctx)); let data = media.to_proto()?; + + let mut headers = HeaderMap::new(); + propagate_header_context(&mut headers, &ctx); + let send_result = self .client .post(&self.url) .body(data) + .headers(headers) .header(CONTENT_TYPE, "application/protobuf") .send() .await; @@ -190,7 +207,7 @@ mod tests { let message = Message::unknown("message".to_string()); let topic = "topic"; let data: Vec<&[u8]> = vec![&[1]]; - let media = Media { + let mut media = Media { message: Option::from(savant_protobuf::generated::Message::from(&message)), topic: topic.as_bytes().to_vec(), data: data.iter().map(|e| e.to_vec()).collect::>>(), @@ -218,7 +235,7 @@ mod tests { let client = GatewayClient::new(Client::default(), gateway_url); - let actual_result = client.forward_message(&media).await; + let actual_result = client.forward_message(&mut media).await; match expected_result { Ok(expected_forward_result) => { diff --git a/media_gateway_client/src/configuration.rs b/media_gateway_client/src/configuration.rs index e8f83ac..966dca7 100644 --- a/media_gateway_client/src/configuration.rs +++ b/media_gateway_client/src/configuration.rs @@ -3,6 +3,7 @@ //! The module provides [`GatewayClientConfiguration`]. use std::time::Duration; +use savant_core::telemetry::TelemetryConfiguration; use savant_core::transport::zeromq::{NonBlockingReader, ReaderConfigBuilder}; use serde::{Deserialize, Serialize}; use twelf::{config, Layer}; @@ -43,6 +44,8 @@ pub struct GatewayClientConfiguration { pub auth: Option, /// Statistics settings pub statistics: Option, + /// OpenTelemetry settings + pub telemetry: Option, } impl GatewayClientConfiguration { diff --git a/media_gateway_client/src/main.rs b/media_gateway_client/src/main.rs index 962a62c..72e0a65 100644 --- a/media_gateway_client/src/main.rs +++ b/media_gateway_client/src/main.rs @@ -29,6 +29,7 @@ use tokio::signal::{ctrl_c, unix}; use media_gateway_common::api::health; use media_gateway_common::health::HealthService; +use media_gateway_common::telemetry; use crate::configuration::GatewayClientConfiguration; use crate::service::GatewayClientService; @@ -58,6 +59,10 @@ async fn main() -> Result<()> { let conf = GatewayClientConfiguration::new(&conf_arg)?; let bind_address = (conf.ip.as_str(), conf.port); + if let Some(telemetry_conf) = conf.telemetry.as_ref() { + telemetry::init(telemetry_conf); + } + let health_service = web::Data::new(HealthService::new()); let service = Arc::new(GatewayClientService::try_from(&conf)?); let service_to_stop = service.clone(); @@ -79,7 +84,7 @@ async fn main() -> Result<()> { tokio::spawn(async move { service.run().await }); - HttpServer::new(move || { + let result = HttpServer::new(move || { App::new() .app_data(health_service.clone()) .route("/health", web::get().to(health)) @@ -87,5 +92,9 @@ async fn main() -> Result<()> { .bind(bind_address)? .run() .await - .map_err(anyhow::Error::from) + .map_err(anyhow::Error::from); + + telemetry::shutdown(); + + result } diff --git a/media_gateway_client/src/service.rs b/media_gateway_client/src/service.rs index 2893777..12eb535 100644 --- a/media_gateway_client/src/service.rs +++ b/media_gateway_client/src/service.rs @@ -2,12 +2,15 @@ use std::sync::{Arc, OnceLock}; use std::time::Duration; use anyhow::{anyhow, bail, Result}; +use opentelemetry::trace::{FutureExt, Status, TraceContextExt}; +use opentelemetry::KeyValue; use savant_core::transport::zeromq::{NonBlockingReader, ReaderResult}; use tokio::sync::{mpsc, Mutex}; use tokio_timerfd::sleep; use media_gateway_common::model::Media; use media_gateway_common::statistics::StatisticsService; +use media_gateway_common::telemetry::{get_context_with_span, get_message_context}; use crate::client::{ForwardResult, GatewayClient}; use crate::configuration::GatewayClientConfiguration; @@ -85,6 +88,11 @@ impl GatewayClientService { .. } => { log::debug!("Success while reading message"); + let parent_ctx = get_message_context(&message); + let ctx = get_context_with_span("process", &parent_ctx); + let queue_ctx = get_context_with_span("queue", &ctx); + let queue_span = queue_ctx.span(); + let id = match reader_statistics_service.as_ref() { Some(service) => match service.register_message_start() { Ok(id) => Some(id), @@ -105,10 +113,20 @@ impl GatewayClientService { topic, data, }; - if let Err(e) = sender.send((id, media)).await { + if let Err(e) = sender.send((id, media, ctx)).await { log::warn!("Error while sharing message: {:?}", e); + if queue_span.is_recording() { + queue_span.record_error(&e); + queue_span + .set_status(Status::error("error while sharing message")); + } + queue_span.end(); break; } + if queue_span.is_recording() { + queue_span.set_status(Status::Ok); + } + queue_span.end(); } ReaderResult::Timeout => { log::debug!( @@ -136,10 +154,17 @@ impl GatewayClientService { let sender_task: tokio::task::JoinHandle> = tokio::spawn(async move { log::info!("Message sending is started"); - while let Some((id, media)) = receiver.recv().await { + while let Some((id, mut media, ctx)) = receiver.recv().await { + let ctx = get_context_with_span("forward", &ctx); + let span = ctx.span(); + let mut retry: Option = None; loop { - let forward_result = client.forward_message(&media).await; + let retry_number = retry.as_ref().map_or(0, |e| e.number()); + let forward_result = client + .forward_message(&mut media) + .with_context(ctx.clone()) + .await; match forward_result { Ok(ForwardResult::Success) => { if let Some(stat_id) = id { @@ -160,21 +185,51 @@ impl GatewayClientService { } else { log::debug!("Success while sending message (retry=0)"); } + if span.is_recording() { + span.add_event( + "attempt", + vec![ + KeyValue::new("number", retry_number as i64), + KeyValue::new("result", ForwardResult::Success.to_string()), + ], + ); + span.set_status(Status::Ok); + } + span.end(); + ctx.span().end(); break; } Ok(result) => { log::warn!( "Failure while sending message (retry={}): {:?}", - retry.as_ref().map_or(0, |e| e.number()), + retry_number, result ); + if span.is_recording() { + span.add_event( + "attempt", + vec![ + KeyValue::new("number", retry_number as i64), + KeyValue::new("result", result.to_string()), + ], + ); + } } Err(e) => { log::warn!( "Error while sending message (retry={}): {:?}", - retry.as_ref().map_or(0, |e| e.number()), + retry_number, e - ) + ); + if span.is_recording() { + span.add_event( + "attempt", + vec![ + KeyValue::new("number", retry_number as i64), + KeyValue::new("error", e.to_string()), + ], + ); + } } } let next_retry = sender_retry_strategy.next_retry(retry); diff --git a/media_gateway_common/Cargo.toml b/media_gateway_common/Cargo.toml index f8e9184..a4d2bb6 100644 --- a/media_gateway_common/Cargo.toml +++ b/media_gateway_common/Cargo.toml @@ -17,6 +17,11 @@ anyhow = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } actix-web = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry-http = { workspace = true } +http = { workspace = true } +parking_lot = { workspace = true } +log = { workspace = true } prost = "0.12" prost-types = "0.12" diff --git a/media_gateway_common/src/lib.rs b/media_gateway_common/src/lib.rs index a96fa47..74fb091 100644 --- a/media_gateway_common/src/lib.rs +++ b/media_gateway_common/src/lib.rs @@ -8,3 +8,5 @@ pub mod health; pub mod api; pub mod statistics; + +pub mod telemetry; diff --git a/media_gateway_common/src/model.rs b/media_gateway_common/src/model.rs index 87b7030..1bb07e1 100644 --- a/media_gateway_common/src/model.rs +++ b/media_gateway_common/src/model.rs @@ -2,6 +2,8 @@ //! //! The module provides [`Media`] struct that can be converted from/to //! [protocol buffers](https://protobuf.dev/). + +use savant_core::otlp::PropagatedContext; use savant_protobuf::generated::Message; /// A struct that contains all information required to forward a message. @@ -34,6 +36,12 @@ impl Media { let media = Media::decode(bytes)?; Ok(media) } + + pub fn update_context(&mut self, ctx: PropagatedContext) { + if let Some(message) = self.message.as_mut() { + message.propagated_context = ctx.0; + } + } } #[cfg(test)] diff --git a/media_gateway_common/src/telemetry.rs b/media_gateway_common/src/telemetry.rs new file mode 100644 index 0000000..31036ed --- /dev/null +++ b/media_gateway_common/src/telemetry.rs @@ -0,0 +1,264 @@ +use std::borrow::Cow; +use std::cell::OnceCell; + +use http::HeaderMap; +use log::error; +use opentelemetry::global::Error; +use opentelemetry::trace::{SpanKind, TraceContextExt, Tracer}; +use opentelemetry::{global, Context}; +use opentelemetry_http::{HeaderExtractor, HeaderInjector}; +use parking_lot::Mutex; +use savant_core::message::Message; +use savant_core::otlp::PropagatedContext; +use savant_core::telemetry::{Configurator, TelemetryConfiguration}; + +static CONFIGURATOR: Mutex> = Mutex::new(OnceCell::new()); + +pub fn init(config: &TelemetryConfiguration) { + let configurator = CONFIGURATOR.lock(); + match configurator.get() { + Some(_) => panic!("Open Telemetry has been configured"), + None => { + let result = configurator.set(Configurator::new("media-gateway", config)); + if result.is_err() { + // should not happen + panic!("Error while configuring OpenTelemetry"); + } + global::set_error_handler(|e| match e { + Error::Propagation(pe) + if pe.to_string().contains( + "Cannot extract from invalid jaeger header format, JaegerPropagator", + ) => {} + _ => { + error!(target: "opentelemetry", "{}", e); + } + }) + .expect("Error while configuring OpenTelemetry error handler"); + } + } +} + +pub fn shutdown() { + let mut configurator = CONFIGURATOR.lock(); + if let Some(mut c) = configurator.take() { + c.shutdown() + } +} + +pub fn get_context_with_span(span_name: T, parent_ctx: &Context) -> Context +where + T: Into>, +{ + if !parent_ctx.span().span_context().is_valid() { + Context::default() + } else { + let tracer = global::tracer(""); + let span = tracer + .span_builder(span_name) + .with_kind(SpanKind::Internal) + .start_with_context(&tracer, parent_ctx); + Context::default().with_span(span) + } +} + +pub fn get_message_context(message: &Message) -> Context { + global::get_text_map_propagator(|p| p.extract(&message.meta().span_context)) +} + +pub fn get_header_context(headers: &HeaderMap) -> Context { + global::get_text_map_propagator(|p| p.extract(&HeaderExtractor(headers))) +} + +pub fn propagate_header_context(headers: &mut HeaderMap, ctx: &Context) { + global::get_text_map_propagator(|propagator| { + propagator.inject_context(ctx, &mut HeaderInjector(headers)) + }) +} + +pub fn get_propagated_context(ctx: &Context) -> PropagatedContext { + let mut propagated_ctx = PropagatedContext::new(); + global::get_text_map_propagator(|propagator| { + propagator.inject_context(ctx, &mut propagated_ctx) + }); + propagated_ctx +} + +#[cfg(test)] +mod tests { + use std::sync::Once; + + use http::{HeaderMap, HeaderName, HeaderValue}; + use opentelemetry::trace::{TraceContextExt, TraceFlags, Tracer}; + use opentelemetry::{global, Context}; + use savant_core::message::Message; + use savant_core::telemetry::TelemetryConfiguration; + + use crate::telemetry::{ + get_context_with_span, get_header_context, get_message_context, get_propagated_context, + init, propagate_header_context, + }; + + const TRACE_ID: &str = "d4532e7a51d51c29c2166a58b7e0916a"; + const SPAN_ID: &str = "12ba53d3c291b8f1"; + const TRACE_FLAGS: u8 = 1; + const VENDOR_KEY: &str = "vendor1_key"; + const VENDOR_VALUE: &str = "vendor1_value"; + const W3C_TRACE_PARENT_KEY: &str = "traceparent"; + const W3C_TRACE_PARENT_VALUE: &str = "00-d4532e7a51d51c29c2166a58b7e0916a-12ba53d3c291b8f1-01"; + const W3C_TRACE_STATE_KEY: &str = "tracestate"; + const W3C_TRACE_STATE_VALUE: &str = "vendor1_key=vendor1_value"; + + static INIT: Once = Once::new(); + + fn init_telemetry() { + INIT.call_once(|| init(&TelemetryConfiguration::no_op())) + } + + #[test] + fn test_get_message_context() { + init_telemetry(); + + let mut message = Message::unknown("message".to_string()); + let meta = message.meta_mut(); + meta.span_context.0.insert( + W3C_TRACE_PARENT_KEY.to_string(), + W3C_TRACE_PARENT_VALUE.to_string(), + ); + meta.span_context.0.insert( + W3C_TRACE_STATE_KEY.to_string(), + W3C_TRACE_STATE_VALUE.to_string(), + ); + + let result = get_message_context(&message); + let result_span = result.span(); + let result_span_context = result_span.span_context(); + + assert_eq!(result_span_context.trace_id().to_string(), TRACE_ID); + assert_eq!(result_span_context.span_id().to_string(), SPAN_ID); + assert_eq!( + result_span_context.trace_flags(), + TraceFlags::new(TRACE_FLAGS) + ); + assert_eq!( + result_span_context.trace_state().get(VENDOR_KEY), + Some(VENDOR_VALUE) + ); + } + + #[test] + fn test_get_header_context() { + init_telemetry(); + + let mut headers = HeaderMap::new(); + headers.insert( + HeaderName::try_from(W3C_TRACE_PARENT_KEY).unwrap(), + HeaderValue::try_from(W3C_TRACE_PARENT_VALUE).unwrap(), + ); + headers.insert( + HeaderName::try_from(W3C_TRACE_STATE_KEY).unwrap(), + HeaderValue::try_from(W3C_TRACE_STATE_VALUE).unwrap(), + ); + + let result = get_header_context(&headers); + let result_span = result.span(); + let result_span_context = result_span.span_context(); + + assert_eq!(result_span_context.trace_id().to_string(), TRACE_ID); + assert_eq!(result_span_context.span_id().to_string(), SPAN_ID); + assert_eq!( + result_span_context.trace_flags(), + TraceFlags::new(TRACE_FLAGS) + ); + assert_eq!( + result_span_context.trace_state().get(VENDOR_KEY), + Some(VENDOR_VALUE) + ); + } + + #[test] + fn test_propagate_header_context() { + init_telemetry(); + + let mut headers = HeaderMap::new(); + + let (trace_id, span_id, trace_flags) = global::tracer("").in_span("test", |ctx| { + let span = ctx.span(); + let span_context = span.span_context(); + + propagate_header_context(&mut headers, &ctx); + + ( + span_context.trace_id(), + span_context.span_id(), + span_context.trace_flags(), + ) + }); + + assert_eq!(headers.len(), 2); + assert_eq!( + headers + .get(W3C_TRACE_STATE_KEY) + .map(|e| e.to_str().unwrap()), + Some("") + ); + assert_eq!( + headers + .get(W3C_TRACE_PARENT_KEY) + .map(|e| e.to_str().unwrap()), + Some(format!("00-{}-{}-{:02x}", trace_id, span_id, trace_flags.to_u8()).as_str()) + ); + } + + #[test] + fn test_get_propagated_context() { + init_telemetry(); + + let (trace_id, span_id, trace_flags, result) = global::tracer("").in_span("test", |ctx| { + let span = ctx.span(); + let span_context = span.span_context(); + + ( + span_context.trace_id(), + span_context.span_id(), + span_context.trace_flags(), + get_propagated_context(&ctx), + ) + }); + + assert_eq!(result.0.len(), 2); + assert_eq!( + result.0.get(W3C_TRACE_STATE_KEY).map(|e| e.as_str()), + Some("") + ); + assert_eq!( + result.0.get(W3C_TRACE_PARENT_KEY).map(|e| e.as_str()), + Some(format!("00-{}-{}-{:02x}", trace_id, span_id, trace_flags.to_u8()).as_str()) + ); + } + + #[test] + fn test_get_context_with_span_invalid_parent() { + init_telemetry(); + + let result = get_context_with_span("test", &Context::default()); + + assert_eq!(result.span().span_context().is_valid(), false); + } + + #[test] + fn test_get_context_with_span_valid_parent() { + init_telemetry(); + + let (trace_id, result) = global::tracer("").in_span("parent", |ctx| { + let child_ctx = get_context_with_span("child", &ctx); + + (ctx.span().span_context().trace_id(), child_ctx) + }); + + let result_span = result.span(); + let result_span_context = result_span.span_context(); + + assert_eq!(result_span_context.is_valid(), true); + assert_eq!(result_span_context.trace_id(), trace_id); + } +} diff --git a/media_gateway_server/Cargo.toml b/media_gateway_server/Cargo.toml index e7f4589..704a791 100644 --- a/media_gateway_server/Cargo.toml +++ b/media_gateway_server/Cargo.toml @@ -23,8 +23,10 @@ env_logger = { workspace = true } actix-web = { workspace = true } actix-protobuf = { workspace = true } actix-web-httpauth = { workspace = true } +http = { workspace = true } openssl = { workspace = true } mockall = { workspace = true } +opentelemetry = { workspace = true } media_gateway_common = { path = "../media_gateway_common" } diff --git a/media_gateway_server/src/main.rs b/media_gateway_server/src/main.rs index 7412c16..31ba8c0 100644 --- a/media_gateway_server/src/main.rs +++ b/media_gateway_server/src/main.rs @@ -66,6 +66,7 @@ use tokio::sync::Mutex; use media_gateway_common::api::health; use media_gateway_common::configuration::Credentials; use media_gateway_common::health::HealthService; +use media_gateway_common::telemetry; use server::configuration::GatewayConfiguration; use crate::server::api::gateway; @@ -107,6 +108,13 @@ fn main() -> Result<()> { info!("Configuration: {}", conf_arg); let conf = GatewayConfiguration::new(&conf_arg)?; + + if let Some(telemetry_conf) = conf.telemetry.as_ref() { + runtime.block_on(async { + telemetry::init(telemetry_conf); + }); + } + let bind_address = (conf.ip.as_str(), conf.port); let gateway_service = web::Data::new(Mutex::new(GatewayService::try_from(&conf)?)); let health_service = web::Data::new(HealthService::new()); @@ -224,5 +232,9 @@ fn main() -> Result<()> { http_server.bind(bind_address).unwrap() }; - runtime.block_on(async { http_server.run().await.map_err(anyhow::Error::from) }) + let result = runtime.block_on(async { http_server.run().await.map_err(anyhow::Error::from) }); + + telemetry::shutdown(); + + result } diff --git a/media_gateway_server/src/server/api.rs b/media_gateway_server/src/server/api.rs index 37a0fbf..75cdccb 100644 --- a/media_gateway_server/src/server/api.rs +++ b/media_gateway_server/src/server/api.rs @@ -1,17 +1,40 @@ use actix_protobuf::ProtoBuf; use actix_web::web::{Data, ReqData}; -use actix_web::Responder; -use media_gateway_common::model::Media; +use actix_web::{HttpRequest, Responder}; +use http::{HeaderMap, HeaderName, HeaderValue}; +use opentelemetry::trace::TraceContextExt; use tokio::sync::Mutex; +use media_gateway_common::model::Media; +use media_gateway_common::telemetry::{get_context_with_span, get_header_context}; + use crate::server::service::gateway::GatewayService; use crate::server::service::user::UserData; pub async fn gateway( service: Data>, + request: HttpRequest, media: ProtoBuf, user_data: Option>, ) -> impl Responder { + let mut headers = HeaderMap::new(); + request + .headers() + .into_iter() + .map(|e| e.to_owned()) + .for_each(|e| { + headers.insert( + HeaderName::try_from(e.0.as_str()).unwrap(), + HeaderValue::try_from(e.1.as_bytes()).unwrap(), + ); + }); + let parent_ctx = get_header_context(&headers); + let ctx = get_context_with_span("store", &parent_ctx); + let gateway_service = service.lock().await; - gateway_service.process(media, user_data) + let result = gateway_service.process(media, user_data); + + ctx.span().end(); + + result } diff --git a/media_gateway_server/src/server/configuration.rs b/media_gateway_server/src/server/configuration.rs index cbebf96..013d062 100644 --- a/media_gateway_server/src/server/configuration.rs +++ b/media_gateway_server/src/server/configuration.rs @@ -1,6 +1,7 @@ use std::num::NonZeroUsize; use std::time::Duration; +use savant_core::telemetry::TelemetryConfiguration; use savant_core::transport::zeromq::{SyncWriter, WriterConfigBuilder}; use serde::{Deserialize, Serialize}; use twelf::{config, Layer}; @@ -18,6 +19,7 @@ pub struct GatewayConfiguration { pub(crate) out_stream: SinkConfiguration, pub(crate) auth: Option, pub(crate) statistics: Option, + pub(crate) telemetry: Option, } impl GatewayConfiguration { diff --git a/media_gateway_server/src/server/service/gateway.rs b/media_gateway_server/src/server/service/gateway.rs index d7ab9a9..4c26f89 100644 --- a/media_gateway_server/src/server/service/gateway.rs +++ b/media_gateway_server/src/server/service/gateway.rs @@ -2,6 +2,8 @@ use actix_protobuf::ProtoBuf; use actix_web::web::ReqData; use actix_web::HttpResponse; use log::{debug, error, info}; +use opentelemetry::trace::{Status, TraceContextExt}; +use opentelemetry::Context; use savant_core::message::Message; use savant_core::transport::zeromq::{SyncWriter, WriterResult}; @@ -30,17 +32,23 @@ impl GatewayService { media: ProtoBuf, user_data: Option>, ) -> HttpResponse { + let ctx = Context::current(); + let span = ctx.span(); + let topic_result = std::str::from_utf8(&media.topic); if topic_result.is_err() { + span.set_status(Status::error("invalid topic")); return HttpResponse::BadRequest().finish(); } let topic = topic_result.unwrap(); if media.message.is_none() { + span.set_status(Status::error("no message")); return HttpResponse::BadRequest().finish(); } let message_result = Message::try_from(media.message.as_ref().unwrap()); if message_result.is_err() { + span.set_status(Status::error("invalid message")); return HttpResponse::BadRequest().finish(); } let id = match self.statistics_service.as_ref() { @@ -71,6 +79,7 @@ impl GatewayService { .unwrap() .matches(&message.meta().routing_labels) { + span.set_status(Status::error("forbidden by user allowed routing labels")); return HttpResponse::Unauthorized().finish(); } } @@ -82,14 +91,26 @@ impl GatewayService { .collect::>(); let result = self.writer.send_message(topic, &message, &data); - let response = match result { - Ok(WriterResult::SendTimeout) => HttpResponse::GatewayTimeout().finish(), - Ok(WriterResult::AckTimeout(_)) => HttpResponse::BadGateway().finish(), - Ok(WriterResult::Ack { .. }) => HttpResponse::Ok().finish(), - Ok(WriterResult::Success { .. }) => HttpResponse::Ok().finish(), + let (response, status) = match result { + Ok(WriterResult::SendTimeout) => ( + HttpResponse::GatewayTimeout().finish(), + Status::error(format!("{:?}", result)), + ), + Ok(WriterResult::AckTimeout(_)) => ( + HttpResponse::BadGateway().finish(), + Status::error(format!("{:?}", result)), + ), + Ok(WriterResult::Ack { .. }) => (HttpResponse::Ok().finish(), Status::Ok), + Ok(WriterResult::Success { .. }) => (HttpResponse::Ok().finish(), Status::Ok), Err(e) => { error!("Failed to send a message: {:?}", e); - HttpResponse::InternalServerError().finish() + if span.is_recording() { + span.record_error(e.as_ref()); + } + ( + HttpResponse::InternalServerError().finish(), + Status::error("store failure"), + ) } }; if let Some(stat_id) = id { @@ -102,6 +123,7 @@ impl GatewayService { log::warn!("Error while ending message statistics: {:?}", e) } } + span.set_status(status); response } }