Skip to content
This repository was archived by the owner on May 26, 2026. It is now read-only.

Commit 1074e05

Browse files
committed
implement in-memory streamview counter
1 parent cf81a79 commit 1074e05

7 files changed

Lines changed: 854 additions & 0 deletions

File tree

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Usage
2+
```
3+
log_format json_logs escape=json '{'
4+
'"time_local": "$time_local",'
5+
'"remote_addr": "$remote_addr",' # client IP
6+
'"method": "$request_method",' # request method, usually “GET” or “POST”
7+
'"protocol": "$server_protocol",' # request protocol, usually “HTTP/1.0”, “HTTP/1.1”, “HTTP/2.0”, or “HTTP/3.0”
8+
'"uri": "$uri",' # current URI in request
9+
'"status": "$status",' # response status code
10+
'"bytes_sent": "$bytes_sent", ' # the number of bytes sent to a client
11+
'"request_length": "$request_length", ' # request length (including headers and body)
12+
'"connection_requests": "$connection_requests",' # number of requests made in connection
13+
'"upstream": "$upstream_addr", ' # upstream backend server for proxied requests
14+
'"upstream_connect_time": "$upstream_connect_time", ' # upstream handshake time incl. TLS
15+
'"upstream_header_time": "$upstream_header_time", ' # time spent receiving upstream headers
16+
'"upstream_response_time": "$upstream_response_time", ' # time spend receiving upstream body
17+
'"upstream_response_length": "$upstream_response_length", ' # upstream response length
18+
'"upstream_cache_status": "$upstream_cache_status", ' # cache HIT/MISS where applicable
19+
'"ssl_protocol": "$ssl_protocol", ' # TLS protocol
20+
'"ssl_cipher": "$ssl_cipher", ' # TLS cipher
21+
'"scheme": "$scheme", ' # http or https
22+
'"user_agent": "$http_user_agent"'
23+
'}';
24+
25+
access_log syslog:server=unix:/var/log/relay.sock json_logs;
26+
```
27+
28+
# Customization
29+
The code must be able to derive the sub playlist from the request path.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"log/slog"
8+
"net"
9+
"net/http"
10+
"os"
11+
"os/signal"
12+
"time"
13+
14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/collectors"
16+
"github.com/prometheus/client_golang/prometheus/promhttp"
17+
"github.com/voc/stream-api/logreceiver"
18+
"golang.org/x/sys/unix"
19+
)
20+
21+
// maxSegmentDuration is the maximum duration of a segment to expect
22+
const maxSegmentDuration = time.Second * 5
23+
24+
func main() {
25+
if err := run(); err != nil {
26+
slog.Error("collector failed", "err", err)
27+
os.Exit(1)
28+
}
29+
}
30+
31+
func run() error {
32+
slidingWindow := flag.Duration("sliding-window-duration", time.Second*30, "duration of the sliding window for the counting")
33+
prometheusListen := flag.String("prometheus-listen", ":9273", "listen address of the prometheus endpoint")
34+
socket := flag.String("socket", "/var/log/relay.sock", "syslog socket")
35+
debug := flag.Bool("debug", false, "enable debug mode")
36+
flag.Parse()
37+
38+
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, unix.SIGTERM)
39+
defer cancel()
40+
41+
logOpts := &slog.HandlerOptions{
42+
Level: slog.LevelInfo,
43+
}
44+
if *debug {
45+
logOpts.Level = slog.LevelDebug
46+
}
47+
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, logOpts)))
48+
49+
reg := prometheus.NewPedanticRegistry()
50+
51+
// Add the standard process and Go metrics to the custom registry.
52+
reg.MustRegister(
53+
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
54+
collectors.NewGoCollector(),
55+
)
56+
57+
_, err := logreceiver.NewParser(ctx, logreceiver.ParserConfig{
58+
SocketPath: *socket,
59+
MetricsRegisterer: reg,
60+
SlidingWindow: *slidingWindow,
61+
MinSegments: int(*slidingWindow/maxSegmentDuration) / 2,
62+
})
63+
if err != nil {
64+
return err
65+
}
66+
67+
// serve metrics
68+
mux := http.NewServeMux()
69+
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
70+
server := http.Server{
71+
Handler: mux,
72+
}
73+
74+
listener, err := net.Listen("tcp", *prometheusListen)
75+
if err != nil {
76+
return fmt.Errorf("failed to listen on %s: %w", *prometheusListen, err)
77+
}
78+
defer listener.Close()
79+
slog.Info("serving metrics on", "addr", *prometheusListen)
80+
81+
go func() {
82+
if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
83+
slog.Error("failed to serve prometheus metrics", "err", err)
84+
cancel()
85+
}
86+
}()
87+
<-ctx.Done()
88+
return nil
89+
}

logreceiver/dash.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package logreceiver
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"log"
7+
"net/http"
8+
"net/url"
9+
"path/filepath"
10+
"sync"
11+
"time"
12+
13+
"github.com/zencoder/go-dash/mpd"
14+
)
15+
16+
type Streams struct {
17+
cache sync.Map
18+
}
19+
20+
type DashManifest struct {
21+
LastRefresh time.Time
22+
Qualities map[string]string
23+
}
24+
25+
func (s *Streams) getManifest(segmentPath string) DashManifest {
26+
base := filepath.Dir(segmentPath)
27+
val, ok := s.cache.LoadOrStore(base, DashManifest{Qualities: make(map[string]string)})
28+
if !ok {
29+
// not present in Cache getInBackground
30+
go func(b string) {
31+
err := s.getManifestInBackground(b)
32+
if err == nil {
33+
return
34+
}
35+
36+
log.Println(err.Error())
37+
s.cache.Delete(b)
38+
}(base)
39+
}
40+
41+
return val.(DashManifest)
42+
}
43+
44+
func (s *Streams) getManifestInBackground(base string) error {
45+
path, err := url.JoinPath("http://127.0.0.1/", base, "manifest.mpd")
46+
47+
log.Printf("try to get %s\n", path)
48+
49+
if err != nil {
50+
return fmt.Errorf("failed to get manifest path: %s", err)
51+
}
52+
53+
resp, err := http.Get(path)
54+
if err != nil {
55+
return fmt.Errorf("failed to get manifest: %s", err.Error())
56+
}
57+
58+
if resp.StatusCode != http.StatusOK {
59+
return fmt.Errorf("failed to get manifest: return status is %d", resp.StatusCode)
60+
}
61+
62+
content, err := io.ReadAll(resp.Body)
63+
if err != nil {
64+
return fmt.Errorf("failed to get manifest body: %s", err.Error())
65+
}
66+
67+
manifest, err := mpd.ReadFromString(string(content))
68+
if err != nil {
69+
return fmt.Errorf("failed to decode manifest: %s", err.Error())
70+
}
71+
72+
mD := DashManifest{}
73+
mD.Qualities = make(map[string]string)
74+
if len(manifest.Periods) != 1 {
75+
return fmt.Errorf("not exactly one period")
76+
}
77+
78+
for _, a := range manifest.Periods[0].AdaptationSets {
79+
if a == nil {
80+
continue
81+
}
82+
83+
if a.ContentType == nil || *a.ContentType != "video" {
84+
continue
85+
}
86+
87+
for _, r := range a.Representations {
88+
if r == nil || r.Height == nil || r.ID == nil {
89+
continue
90+
}
91+
92+
mD.Qualities[*r.ID] = fmt.Sprintf("%d", *r.Height)
93+
}
94+
}
95+
96+
mD.LastRefresh = time.Now()
97+
s.cache.Store(base, mD)
98+
return nil
99+
}
100+
101+
func (s *Streams) RefreshRoutine() {
102+
ticker := time.NewTicker(time.Minute * 30)
103+
for {
104+
<-ticker.C
105+
106+
s.cache.Range(func(key, value any) bool {
107+
m := value.(DashManifest)
108+
base := key.(string)
109+
if time.Since(m.LastRefresh) < time.Hour {
110+
return true
111+
}
112+
113+
err := s.getManifestInBackground(base)
114+
if err != nil {
115+
log.Println(err.Error())
116+
s.cache.Delete(base)
117+
}
118+
119+
return true
120+
})
121+
}
122+
}

0 commit comments

Comments
 (0)