Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions memorystore/redis/client_side_metrics/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os
import time
import redis
from redis.exceptions import ConnectionError, TimeoutError
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.cloud_monitoring import CloudMonitoringMetricsExporter
from opentelemetry.instrumentation.redis import RedisInstrumentor

# 1. Initialize Tracing
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(BatchSpanProcessor(CloudTraceSpanExporter()))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("redis.client")

# 2. Initialize Metrics
metrics_exporter = CloudMonitoringMetricsExporter()
metric_reader = PeriodicExportingMetricReader(metrics_exporter, export_interval_millis=10000)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter("redis.metrics")

rtt_hist = meter.create_histogram("redis_client_rtt", unit="ms")
client_block_hist = meter.create_histogram("redis_client_blocking_latency", unit="ms")
app_block_hist = meter.create_histogram("redis_application_blocking_latency", unit="ms")
retry_counter = meter.create_counter("redis_retry_count")
conn_error_counter = meter.create_counter("redis_connectivity_error_count")

retry_counter.add(0, {"operation": "startup"})
conn_error_counter.add(0, {"operation": "startup"})

# 3. Setup Redis
RedisInstrumentor().instrument()
redis_host = os.environ.get("REDISHOST", "localhost")
redis_port = int(os.environ.get("REDISPORT", 6379))

pool = redis.ConnectionPool(host=redis_host, port=redis_port, max_connections=10, decode_responses=True)
r = redis.Redis(connection_pool=pool)

def smart_redis_call(operation_name, func, *args, **kwargs):
max_retries = 3
attempt = 0

pool_start = time.time()
try:
conn = pool.get_connection('PING')
pool.release(conn)
except Exception:
pass
client_block_hist.record((time.time() - pool_start) * 1000, {"operation": operation_name})

while attempt < max_retries:
Comment thread
fosky94 marked this conversation as resolved.
try:
req_start = time.time()
response = func(*args, **kwargs)
rtt_hist.record((time.time() - req_start) * 1000, {"operation": operation_name})

app_start = time.time()
_ = str(response)
app_block_hist.record((time.time() - app_start) * 1000, {"operation": operation_name})

return response
Comment thread
fosky94 marked this conversation as resolved.

except (ConnectionError, TimeoutError) as e:
attempt += 1
conn_error_counter.add(1, {"operation": operation_name})
retry_counter.add(1, {"operation": operation_name})
if attempt >= max_retries:
raise e
time.sleep((2 ** attempt) * 0.1)

if __name__ == "__main__":
with tracer.start_as_current_span("process_user_span"):
try:
# Simple write and read operations
smart_redis_call("set_user", r.set, "user:123", "active")

result = smart_redis_call("get_user", r.get, "user:123")
print(f"Retrieved: {result}")
except Exception as e:
print(f"Error: {e}")

tracer_provider.force_flush()
meter_provider.force_flush()