Skip to content

[1/N] [History Server Beta] [Feat] Lazy loading#4795

Merged
andrewsykim merged 46 commits into
ray-project:masterfrom
JiangJiaWei1103:hs-beta-lazy-loading
May 22, 2026
Merged

[1/N] [History Server Beta] [Feat] Lazy loading#4795
andrewsykim merged 46 commits into
ray-project:masterfrom
JiangJiaWei1103:hs-beta-lazy-loading

Conversation

@JiangJiaWei1103
Copy link
Copy Markdown
Member

@JiangJiaWei1103 JiangJiaWei1103 commented May 1, 2026

Why are these changes needed?

Alpha Problems

  • Long data warm-up window: Dead sessions were not fully available until startup processing finishes
  • Duplicated processing: EventHandler re-parsed all raw events every hour (bounded by O(#sessions))

Process restart wiped all in-memory maps and the next startup repeated the entire warm-up cycle.

Beta Strategy

In this PR, we remove startup processing/ticker and introduce lazy loading to process raw events on user requests and records processed dead sessions in in-memory maps (inherited from alpha).

Note

The unbounded memory growth problem will be solved in the next PR via caching and the third PR's dead code cleanup.

Overview

System Architecture

                        ┌──────────────────┐
                        │   Dashboard UI   │ ◄── user browser
                        └────────┬─────────┘
                                 │ HTTP
                                 ▼
                        ┌──────────────────┐
                        │   Load Balancer  │
                        └────────┬─────────┘
                                 │
                                 ▼
   ┌── Ray Cluster ────────┐  ┌── History Server (replicas, single binary) ─────┐
   │                       │  │                                                 │
   │  Head pod             │  │   HTTP Router                                   │
   │  ray-head+collector   │  │   /enter_cluster (blocking on dead)             │
   │           │           │  │     │                                           │
   │           │           │  │     ▼                                           │
   │           │           │  │   SessionLoader                                 │
   │           │           │  │   (singleflight + loaded set)                   │
   │           │ raw       │  │     │                                           │
   │           │ events    │  │   ┌─┴──────────┐                                │
   │           │ (PUT)     │  │   ▼            ▼                                │     
   │           ▼           │  │ EventHandler  SessionProcessor ── isDead? (RayCluster) ┼──┐
   └───────────┼───────────┘  │ in-mem maps   (parse only)        HTTPLiveSessionResolver │
               │              │  (Tasks/        │                               │         ▼
               │              │   Actors/       │ ProcessSingleSession          │    ┌─────────┐
               │              │   Jobs/         │ (writes events into maps)     │    │  K8s    │
               │              │   Nodes/        │                               │    │  API    │
               │              │   LogEvents)    │                               │    └─────────┘
               │              │     ▲           │                               │
               │              │     └───────────┘                               │
               │              └─────────────────────────────────────────────────┘
               │                               ▲
               ▼                               │  raw events (GET)
   ┌──────────────────────────────────────────────────────────┐
   │  Object Storage  (S3 / GCS / Azure / Aliyun OSS)         │
   │                                                          │
   │  {rootDir}/{cluster}_{ns}/{session}/                     │
   │   node_events/, job_events/, logs/, fetched_endpoints/   │
   └──────────────────────────────────────────────────────────┘

Request Data Flow

sequenceDiagram
  actor User as Winner
  actor User2 as Follower
  participant HS as HS handler
  participant Loader as SessionLoader (singleflight + loaded set)
  participant Proc as SessionProcessor
  participant Resolver as HTTPLiveSessionResolver
  participant K8s as K8s API
  participant Dash as Ray Dashboard
  participant S3 as Object Storage
  participant EH as EventHandler maps

  User->>HS: GET /enter_cluster (cold, dead session)
  User2->>HS: GET /enter_cluster (concurrent, same session)
  HS->>Loader: LoadSession (winner)
  HS->>Loader: LoadSession (follower)
  Note over Loader: same singleflight key →<br/>follower joins winner's group
  Loader->>Loader: loaded set check → not found
  Loader->>Proc: ProcessSession(serverCtx)
  Note over Proc,Loader: serverCtx (NOT reqCtx) —<br/>winner disconnect cannot cancel<br/>work followers still depend on

  Proc->>K8s: Get RayCluster
  alt CR NotFound
    K8s-->>Proc: NotFound → dead
  else CR exists
    K8s-->>Proc: RayCluster (Status.Head.ServiceName set)
    Proc->>Resolver: FetchSessionName(ns, headSvcName)
    Resolver->>Dash: GET /api/version
    Dash-->>Resolver: { session_name: "<live_session>" }
    Resolver-->>Proc: "<live_session>"
    Note over Proc: queried != live → dead
  end

  Proc->>S3: List + GET event files
  Proc->>EH: ProcessSingleSession (writes into Tasks/Actors/Jobs/Nodes/LogEvents)
  Proc-->>Loader: SessionStatusProcessed, nil
  Loader->>Loader: mark session in loaded set
  Loader-->>HS: nil (winner)
  Loader-->>HS: nil (follower, shared result)
  HS-->>User: 200 OK
  HS-->>User2: 200 OK

  Note over User,EH: subsequent /api/v0/* calls read from EventHandler maps directly<br/>repeat /enter_cluster returns immediately via loaded set fast-path<br/>(no re-parse on this replica)
Loading

Components and Design Decisions

SessionProcessor

Determine session liveness by comparing the queried session name against the one currently running on the live RayCluster; execute ProcessSingleSession for dead sessions.

Design Decisions
Probe-based comparison replaces the old timestamp comparison, which could not distinguish multiple sessions in the same CR (head-pod restarts) and had a time zone alignment issue in ParseSessionTimestamp.

Trade-off: One HTTP round-trip per cold load; hot path unaffected by loaded cache.

SessionLoader

Gate /enter_cluster until session events are in memory.

Design Decisions

  • singleflight per session key prevents redundant storage scans when concurrent callers race on the same dead session
  • 2-min per-load timeout prevents goroutine leak

Trade-off: First-request cold-load latency; subsequent hits are O(1).

HTTPLiveSessionResolver

Call /api/version on the Ray dashboard and return the cluster's current session_name.

Design Decisions
Fresh implementation rather than reusing RayDashboardClient since the existing interface lacked GetVersion() and its factory required manager.Manager, which is unavailable in HS.

Trade-off: URL-construction logic is duplicated with redirectRequest.

ProcessSingleSession

Process ray-events and log-events into in-memory maps for one session.

Design Decisions
ray-events failure blocks the loaded marker; log-events failure is logged but does not block. log-events feeds only /events, so coupling both error paths would let a transient log outage block all other dead-session APIs.

Trade-off: Silent data loss on /events if log-events fails; no retry until HS restart.

/enter_cluster

Route requests to SessionLoader (dead session) or dashboard proxy (live session).

Design Decisions
Session-name validation lives at the router level rather than inside isDead, following the KubeRay caller-validates convention.

Trade-off:

Related issue number

Part of #4709.

How to test

echo "=== Build ===" && \
go build -o /tmp/hs-bin ./cmd/historyserver/ && rm -f /tmp/hs-bin && echo "BUILD OK"

echo && echo "=== Vet ===" && go vet ./... && echo "VET OK"

# Directly affected packages
echo && echo "=== Tests in packages touched by this PR (must PASS) ===" && \
go test -race -cover ./pkg/historyserver/...

# Verbose run of NEW tests added by this PR
echo && echo "=== New tests (verbose) ===" && \
go test -race -v -run "TestEnsure_PipelineError" ./pkg/historyserver/

echo && echo "=== eventserver tests (must PASS) ===" && \
go test -race ./pkg/eventserver/types/... && \
go test -race ./pkg/eventserver/ 2>&1 | tail -5 || true

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

- Add package doc
- Move flag declarations into a var() block
- Add description strings to each flag
- Add section dividers for readability
- Group imports into stdlib / third-party / project
- Remove dead initialization of runtimeClassConfigPath (overwritten by flag default)

No behavior change.

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
- Use logrus.Fatalf(...) for consistent log formatting
- Add required-flag check on --runtime-class-name for fast fail

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
- Replace manual sigChan + signal.Notify with signal.NotifyContext
- Add bridge goroutine that closes the legacy stop ch when serverCtx fires

No behavior change.

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
- Add a Supervisor as /enter_cluster broker
  - Use in-process loaded set to record processed sessions
- Add a Pipeline that wraps the parse steps
- Remove eager processing on startup and ticker

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
@JiangJiaWei1103 JiangJiaWei1103 changed the title [WIP] [Feat] [History Server] [Beta] Lazy loading [1/N] [History Server Beta] [Feat] Lazy loading May 2, 2026
@JiangJiaWei1103
Copy link
Copy Markdown
Member Author

JiangJiaWei1103 commented May 2, 2026

Alpha E2E tests pass and the following demonstrates manual test:

History Server Beta - Lazy Loading

@JiangJiaWei1103 JiangJiaWei1103 marked this pull request as ready for review May 2, 2026 04:37
Comment thread historyserver/pkg/eventserver/eventserver.go
Comment thread historyserver/pkg/historyserver/session_processor.go
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
@Future-Outlier
Copy link
Copy Markdown
Member

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: fad772ebf8

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread historyserver/pkg/historyserver/session_processor.go
Comment thread historyserver/pkg/eventserver/eventserver.go
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Comment thread historyserver/pkg/historyserver/router.go
…name

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Comment thread historyserver/pkg/historyserver/router.go
Comment thread historyserver/cmd/historyserver/main.go Outdated
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Comment thread historyserver/pkg/historyserver/session_loader.go
Comment thread historyserver/pkg/eventserver/eventserver.go Outdated
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Comment thread historyserver/pkg/historyserver/session_processor.go Outdated
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
@Future-Outlier
Copy link
Copy Markdown
Member

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: fec4ba535c

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread historyserver/pkg/historyserver/session_processor.go Outdated
Copy link
Copy Markdown
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do more review today.

Comment thread historyserver/cmd/historyserver/main.go Outdated
if err := logEventReader.ReadLogEvents(clusterInfo, clusterSessionKey, h.ClusterLogEventMap); err != nil {
logrus.Errorf("Incomplete Log Events read for %s: %v. /events endpoint may serve partial data.",
clusterSessionKey, err)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessSingleSession ignores context during log event reading

Low Severity

ProcessSingleSession calls ReadLogEvents before checking ctx.Err(), and ReadLogEvents does not accept a context parameter. If the 2-minute loadTimeout or server shutdown fires during log event reading, the cancellation won't be observed until ReadLogEvents completes and the ray events loop begins its first iteration. For sessions with many nodes and log files, this can delay graceful shutdown and exceed the intended per-load timeout contract.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 96a12be. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine-grained ctx management will be resolved in the follow-up.

}
rayEventsRead++

// Corrupt file: don't count as a hard failure since retrying won't help. Accept partial loss.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this comment be inside the err != nil block below?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fixed in 8caa9a1 to explain how the following two errors are handled and why.

}
}

if rayEventsTotal > 0 && rayEventsRead == 0 {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here, should this just check rayEventsTotal != rayEventsRead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. For log events, total != read is essentially free: errors do not propagate past ProcessSingleSession, so loaded is still set, and the next /enter_cluster hits the fast-path. The only effect is better visibility.

For ray events, errors propagate to doLoadSession, which blocks loaded from being set. So rayEventsTotal != rayEventsRead here means every subsequent /enter_cluster call re-runs the full reload until the partial-failure condition heals.

The proper fix for ray events might be to surface "loaded but incomplete" as a richer marker instead of a bool. I think it is a non-trivial change because it touches loaded semantics in SessionLoader and ProcessSession's returns.

Would it make sense to defer richer error handling to follow-up? Thanks!

Comment thread historyserver/pkg/historyserver/router.go Outdated
Comment thread historyserver/pkg/historyserver/session_loader.go Outdated
}

// TODO(jwj): Graceful drain if needed. Currently SIGTERM immediately cancels
// in-flight work. If 500-during-shutdown becomes a customer pain point, switch
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is confusing, what is "500-during-shutdown"?

Copy link
Copy Markdown
Member Author

@JiangJiaWei1103 JiangJiaWei1103 May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serverCtx is signal.NotifyContext(SIGINT, SIGTERM), which cancels the moment the signal arrives (no grace window). loadCtx derives from serverCtx, so any in-flight doLoadSession cancels with it and surfaces as HTTP 500 to clients via router.go.

Clarified comments in 0b90fc9.

Comment thread historyserver/pkg/eventserver/eventserver.go Outdated
@andrewsykim
Copy link
Copy Markdown
Member

andrewsykim commented May 15, 2026

Is there unused code in eventserver.go we can delete now? Like this method?

// Run will start numOfEventProcessors (default to 5) processing functions and the event reader. The event reader will run once an hr,
// which is currently how often the collector flushes.
func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error {
var wg sync.WaitGroup
if numOfEventProcessors == 0 {
numOfEventProcessors = 5
}
eventProcessorChannels := make([]chan map[string]any, numOfEventProcessors)
cctx := make([]context.CancelFunc, numOfEventProcessors)
for i := range numOfEventProcessors {
eventProcessorChannels[i] = make(chan map[string]any, 100)
}
for i, currEventChannel := range eventProcessorChannels {
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
cctx[i] = cancel
go func() {
defer wg.Done()
var processor EventProcessor[map[string]any] = h
err := processor.ProcessEvents(ctx, currEventChannel)
if err == ctx.Err() {
logrus.Warnf("Event processor go routine %d is now closed", i)
return
}
if err != nil {
logrus.Errorf("event processor %d go routine failed %v", i, err)
return
}
}()
}
// Start reading files and sending events for processing
wg.Add(1)
go func() {
defer wg.Done()
logrus.Info("Starting event file reader loop")
// Create a LogEventReader for reading logs/events/event_*.log files
logEventReader := NewLogEventReader(h.reader)
// Helper function to process all events
processAllEvents := func() {
clusterList := h.reader.List()
for _, clusterInfo := range clusterList {
clusterNameNamespace := clusterInfo.Name + "_" + clusterInfo.Namespace
clusterSessionKey := utils.BuildClusterSessionKey(clusterInfo.Name, clusterInfo.Namespace, clusterInfo.SessionName)
// Read Log Events from logs/{nodeId}/events/event_*.log
// This is the format used by Ray Dashboard's /events API
if err := logEventReader.ReadLogEvents(clusterInfo, clusterSessionKey, h.ClusterLogEventMap); err != nil {
logrus.Errorf("Failed to read Log Events for %s: %v", clusterSessionKey, err)
}
// Also read RayEvents (Export Events) from node_events/ and job_events/ for backward compatibility
// These are used for task/actor/job/node data APIs
eventFileList := append(h.getAllJobEventFiles(clusterInfo), h.getAllNodeEventFiles(clusterInfo)...)
logrus.Infof("current eventFileList for cluster %s is: %v", clusterInfo.Name, eventFileList)
for _, eventFile := range eventFileList {
// TODO: Filter out ones that have already been read
logrus.Infof("Reading event file: %s", eventFile)
eventioReader := h.reader.GetContent(clusterNameNamespace, eventFile)
if eventioReader == nil {
logrus.Errorf("Failed to get content for event file: %s, skipping", eventFile)
continue
}
eventbytes, err := io.ReadAll(eventioReader)
if err != nil {
logrus.Errorf("Failed to read event file: %v", err)
continue
}
var eventList []map[string]any
if err := json.Unmarshal(eventbytes, &eventList); err != nil {
logrus.Errorf("Failed to unmarshal event: %v", err)
continue
}
// Evenly distribute events to each channel
for i, curr := range eventList {
// Skip nil events (can occur with corrupted event files containing null elements)
if curr == nil {
continue
}
curr["clusterName"] = clusterSessionKey
eventProcessorChannels[i%numOfEventProcessors] <- curr
}
}
}
}
// Process events immediately on startup
processAllEvents()
// Create a ticker for hourly processing
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
logrus.Info("Finished reading files, waiting for next cycle...")
select {
case <-stop:
// Received stop signal, clean up and exit
for i, currChan := range eventProcessorChannels {
close(currChan)
cctx[i]()
}
logrus.Info("Event processor received stop signal, exiting.")
return
case <-ticker.C:
// Process events every hour
processAllEvents()
}
}
}()
wg.Wait()
return nil
}

Comment thread historyserver/pkg/historyserver/session_loader.go Outdated
Comment on lines +15 to +16
const defaultSessionLoadTimeout = 2 * time.Minute

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. rename defaultSessionLoadTimeout -> defaultSessionProcessTimeout
  2. change to 10 minutes

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: after read Aaron's benchmark test, I think the timeout can stay 2 minute, it's fine

image

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed and made it configurable (keeping 2-min default) in ab80886.

Copy link
Copy Markdown
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a flag for user to configure their session process timeout

Copy link
Copy Markdown
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. todo: Open an issue to discuss whether an live session resolver is necessary for the live cluster dead-session check. we can delete the live session resolver first.
  2. Overall LGTM, and I’ll check the details more closely soon.

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
…g event obj

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Comment thread historyserver/pkg/historyserver/session_loader.go
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
reader: reader,
clientManager: clientManager,
eventHandler: eventHandler,
sessionLoader: sessionLoader,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cold-load timeout exceeds HTTP server WriteTimeout

High Severity

The enter_cluster handler now blocks synchronously on LoadSession, which can run for up to DefaultSessionProcessTimeout (2 minutes). However, the HTTP server's WriteTimeout is only 35 seconds. Any cold-load exceeding 35 seconds causes the server to close the TCP connection before the handler writes a response, resulting in a client-visible error even though the singleflight processing completes successfully in the background.

Additional Locations (2)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit ab80886. Configure here.

Copy link
Copy Markdown
Member Author

@JiangJiaWei1103 JiangJiaWei1103 May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional for now. Processing continues in the background, so users can return later to see the result.

Better UX would be showing a message in the frontend to set expectations around processing time.

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
@JiangJiaWei1103
Copy link
Copy Markdown
Member Author

Is there unused code in eventserver.go we can delete now? Like this method?

// Run will start numOfEventProcessors (default to 5) processing functions and the event reader. The event reader will run once an hr,
// which is currently how often the collector flushes.
func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error {
var wg sync.WaitGroup
if numOfEventProcessors == 0 {
numOfEventProcessors = 5
}
eventProcessorChannels := make([]chan map[string]any, numOfEventProcessors)
cctx := make([]context.CancelFunc, numOfEventProcessors)
for i := range numOfEventProcessors {
eventProcessorChannels[i] = make(chan map[string]any, 100)
}
for i, currEventChannel := range eventProcessorChannels {
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
cctx[i] = cancel
go func() {
defer wg.Done()
var processor EventProcessor[map[string]any] = h
err := processor.ProcessEvents(ctx, currEventChannel)
if err == ctx.Err() {
logrus.Warnf("Event processor go routine %d is now closed", i)
return
}
if err != nil {
logrus.Errorf("event processor %d go routine failed %v", i, err)
return
}
}()
}
// Start reading files and sending events for processing
wg.Add(1)
go func() {
defer wg.Done()
logrus.Info("Starting event file reader loop")
// Create a LogEventReader for reading logs/events/event_*.log files
logEventReader := NewLogEventReader(h.reader)
// Helper function to process all events
processAllEvents := func() {
clusterList := h.reader.List()
for _, clusterInfo := range clusterList {
clusterNameNamespace := clusterInfo.Name + "_" + clusterInfo.Namespace
clusterSessionKey := utils.BuildClusterSessionKey(clusterInfo.Name, clusterInfo.Namespace, clusterInfo.SessionName)
// Read Log Events from logs/{nodeId}/events/event_*.log
// This is the format used by Ray Dashboard's /events API
if err := logEventReader.ReadLogEvents(clusterInfo, clusterSessionKey, h.ClusterLogEventMap); err != nil {
logrus.Errorf("Failed to read Log Events for %s: %v", clusterSessionKey, err)
}
// Also read RayEvents (Export Events) from node_events/ and job_events/ for backward compatibility
// These are used for task/actor/job/node data APIs
eventFileList := append(h.getAllJobEventFiles(clusterInfo), h.getAllNodeEventFiles(clusterInfo)...)
logrus.Infof("current eventFileList for cluster %s is: %v", clusterInfo.Name, eventFileList)
for _, eventFile := range eventFileList {
// TODO: Filter out ones that have already been read
logrus.Infof("Reading event file: %s", eventFile)
eventioReader := h.reader.GetContent(clusterNameNamespace, eventFile)
if eventioReader == nil {
logrus.Errorf("Failed to get content for event file: %s, skipping", eventFile)
continue
}
eventbytes, err := io.ReadAll(eventioReader)
if err != nil {
logrus.Errorf("Failed to read event file: %v", err)
continue
}
var eventList []map[string]any
if err := json.Unmarshal(eventbytes, &eventList); err != nil {
logrus.Errorf("Failed to unmarshal event: %v", err)
continue
}
// Evenly distribute events to each channel
for i, curr := range eventList {
// Skip nil events (can occur with corrupted event files containing null elements)
if curr == nil {
continue
}
curr["clusterName"] = clusterSessionKey
eventProcessorChannels[i%numOfEventProcessors] <- curr
}
}
}
}
// Process events immediately on startup
processAllEvents()
// Create a ticker for hourly processing
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
logrus.Info("Finished reading files, waiting for next cycle...")
select {
case <-stop:
// Received stop signal, clean up and exit
for i, currChan := range eventProcessorChannels {
close(currChan)
cctx[i]()
}
logrus.Info("Event processor received stop signal, exiting.")
return
case <-ticker.C:
// Process events every hour
processAllEvents()
}
}
}()
wg.Wait()
return nil
}

All dead codes (including tests) are deleted in 1a55a2a!

Comment thread historyserver/pkg/historyserver/session_processor.go
…ection

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Copy link
Copy Markdown
Member

@andrewsykim andrewsykim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just some minor nits, can merge after those are resolved

Comment thread historyserver/cmd/historyserver/main.go Outdated
Comment thread historyserver/cmd/historyserver/main.go Outdated
Comment thread historyserver/cmd/historyserver/main.go Outdated
// isDead determines if the RayCluster CR is absent.
func (p *SessionProcessor) isDead(ctx context.Context, session utils.ClusterInfo) (bool, error) {
rc := &rayv1.RayCluster{}
err := p.k8sClient.Get(ctx, k8stypes.NamespacedName{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cache read right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is uncached now. The K8s pressure is bounded by the number of distinct dead sessions ever accessed, not by request volume. singleflight + the loaded fast-path absorb repeated reads of the same session.

Switching to a cached client (informer) would still be cleaner (failed-load retries stop hitting the API, and reads become O(1) in-memory). Do you prefer introducing this here? Thanks!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm following you, I assumed the controller-runtime client creates an informer by default and does a cache read from informer, is that not the case here?

If we are doing an API GET request here every time that's probably problematic

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, even if it's a GET request every time, it's only once per cluster when we load the session, so it's probably fine

Copy link
Copy Markdown
Member Author

@JiangJiaWei1103 JiangJiaWei1103 May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current impl, it is client.New(...) (not a Manager-backed client) so no informer, every isDead is a real API GET. Let me switch to ctrl.NewManager + mgr.GetClient().

Sorry for missing your latest reply! Would you prefer to make this switch in the current PR, or handle it separately? Either works for me. Thx.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate PR is fine, thanks for checking

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG, thanks!

Comment thread historyserver/README.md Outdated
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Copy link
Copy Markdown
Member

@Future-Outlier Future-Outlier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I only reviewed the main logic and didn’t review the test code.

Please also resolve the TODOs I listed. Thank you!

Comment on lines +87 to 90
<-serverCtx.Done()
logrus.Info("Received shutdown signal, initiating graceful shutdown...")
close(stop)
}()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an issue to use serverCtx and replace/remove the “stop channel”?
Thank you!

Comment on lines +24 to +27
// Client returns the primary controller-runtime client.
func (c *ClientManager) Client() client.Client {
return c.clients[0]
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: we should only have 1 client.

Comment on lines +29 to +30
serverCtx context.Context
processTimeout time.Duration
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz do this in a follow-up PR, thanks!

Do not store Contexts inside a struct type; instead, pass a Context explicitly to each function that needs it. This is discussed further in https://go.dev/blog/context-and-structs.

Comment thread historyserver/pkg/historyserver/session_processor.go
Comment on lines +22 to +38
const (
// SessionStatusUnknown is the zero value, reserved as a defensive guard.
SessionStatusUnknown SessionStatus = iota
// SessionStatusLive means the RayCluster CR is still present and the
// session is intentionally skipped.
SessionStatusLive
// SessionStatusProcessed means events were ingested into EventHandler's
// in-memory state.
SessionStatusProcessed
// SessionStatusClusterStateUnknown means the cluster state could not be
// determined (e.g., transient API failure).
SessionStatusClusterStateUnknown
// SessionStatusEventsErr means event parsing failed.
SessionStatusEventsErr
// SessionStatusCanceled means ctx was canceled mid-processing.
SessionStatusCanceled
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionStatusUnknown and SessionStatusClusterStateUnknown feel
ambiguous to me, we can add a follow-up issue to solve this

Comment thread historyserver/pkg/historyserver/session_loader.go Outdated
Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
@Future-Outlier
Copy link
Copy Markdown
Member

@codex review

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b76f5b9979

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +84 to +90
if apierrors.IsNotFound(err) {
return true, nil
}
if err != nil {
return false, err
}
return false, nil
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Distinguish queried session from current live session

isDead treats any existing RayCluster CR as live, but /enter_cluster/{namespace}/{name}/{session} now relies on this result to decide whether to load historical events or rewrite the cookie to live. When a cluster is still running but the user selects an older session_* from storage (common after head restarts), this path incorrectly returns live, skips ProcessSingleSession, and prevents access to that dead session’s data. The dead/live check needs to compare the requested session against the cluster’s current live session_name, not just CR existence.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We focus on main use cases only and might solve this in the follow-up.

Signed-off-by: JiangJiaWei1103 <waynechuang97@gmail.com>
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 3 total unresolved issues (including 2 from previous reviews).

Fix All in Cursor

Reviewed by Cursor Bugbot for commit cac9a7b. Configure here.

Comment thread historyserver/pkg/historyserver/session_loader.go
@andrewsykim andrewsykim merged commit 2444257 into ray-project:master May 22, 2026
32 checks passed
@KunWuLuan
Copy link
Copy Markdown
Contributor

lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants