Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
44474e6
refactor: Standardize main.go style
JiangJiaWei1103 May 1, 2026
c5904c7
fix: Surface configuration errors clearly
JiangJiaWei1103 May 1, 2026
1c1f383
Refactor: Switch to context-based shutdown via signal.NotifyContext
JiangJiaWei1103 May 1, 2026
3ff2245
feat: Add ProcessSingleSession for synchronous per-session loading
JiangJiaWei1103 May 1, 2026
3540f14
feat: Lazy session processing via Supervisor with isDead probe
JiangJiaWei1103 May 2, 2026
69e5ab7
docs: Add smoke tests
JiangJiaWei1103 May 2, 2026
c617b08
test: Cover Supervisor error propagation and no-retry contracts
JiangJiaWei1103 May 2, 2026
fad772e
docs: Add missing sa
JiangJiaWei1103 May 2, 2026
aab5dad
fix: Surface storage outage error in ProcessSingleSession
JiangJiaWei1103 May 3, 2026
1641ab6
fix: Handle isDead for RayCluster recreation with the same namespaced…
JiangJiaWei1103 May 3, 2026
1bff9d1
fix: Avoid setting cookie before a successful load or a live-skip
JiangJiaWei1103 May 3, 2026
6294d0e
fix: Use live sentinel to avoid querying empty in-mem state
JiangJiaWei1103 May 3, 2026
ea41c98
refactor: Skip singleflight for already-loaded sessions
JiangJiaWei1103 May 3, 2026
b2ad15e
refactor: Reuse k8s client
JiangJiaWei1103 May 3, 2026
3c54fb4
fix: Surface errors of two subsystems
JiangJiaWei1103 May 4, 2026
fec4ba5
Merge branch 'my-master' into hs-beta-lazy-loading
JiangJiaWei1103 May 4, 2026
93626b0
docs: Remove redundant comments
JiangJiaWei1103 May 6, 2026
536115b
refactor: Rename to SessionLoader and LoadSession
JiangJiaWei1103 May 6, 2026
caea59d
refactor: Rename to SessionProcessor for clarity
JiangJiaWei1103 May 6, 2026
949dea0
refactor: Address PR review nits for cleanness
JiangJiaWei1103 May 6, 2026
6d44bae
docs: Clean up logs and comments
JiangJiaWei1103 May 7, 2026
ca30acb
docs: Clean up docstrings
JiangJiaWei1103 May 7, 2026
68c8ad5
refactor: Rename to SessionStatusClusterStateUnknown to indicate stat…
JiangJiaWei1103 May 7, 2026
23caa56
refactor: Push ctx into ProcessSingleSession to avoid dup ctx err pol…
JiangJiaWei1103 May 7, 2026
557aa3d
refactor: Consolidate ProcessSingleSession test into eventserver_test.go
JiangJiaWei1103 May 7, 2026
49105e1
docs: Clean up more comments
JiangJiaWei1103 May 7, 2026
cb031d7
docs: Address more nits for clarity
JiangJiaWei1103 May 8, 2026
3ac6576
fix: Fix assertion string
JiangJiaWei1103 May 8, 2026
d42a2be
fix: Add session input validation and resolve Nary's nits
JiangJiaWei1103 May 11, 2026
0e67b0b
fix: Add timeout for a single cold-load to avoid resource leakage
JiangJiaWei1103 May 11, 2026
8b14840
feat: Add HTTPLiveSessionResolver for determining session name of liv…
JiangJiaWei1103 May 12, 2026
8a3914c
refactor: Remove client-side redundant timeout setup
JiangJiaWei1103 May 12, 2026
ae6ba95
fix: Enable cross-ns dns lookup
JiangJiaWei1103 May 12, 2026
fd7e413
refactor: Fix nits
JiangJiaWei1103 May 15, 2026
96a12be
Merge branch 'my-master' into hs-beta-lazy-loading
JiangJiaWei1103 May 15, 2026
8caa9a1
docs: Clarify comments
JiangJiaWei1103 May 15, 2026
0b90fc9
refactor: Improve error msg, comments and fix nits
JiangJiaWei1103 May 15, 2026
9d0098f
refactor: Session key as explicit param of storeEvent without mutatin…
JiangJiaWei1103 May 15, 2026
ab80886
refactor: Rename process session timeout and make it configurable
JiangJiaWei1103 May 15, 2026
1a55a2a
refactor: Remove all dead code
JiangJiaWei1103 May 15, 2026
70c7ee2
refactor: Remove HTTPLiveSessionResolver to simplify live-session det…
JiangJiaWei1103 May 17, 2026
824d1d5
fix: Add zero-value session status as a defensive guard
JiangJiaWei1103 May 17, 2026
8337ebb
refactor: Fix nits
JiangJiaWei1103 May 22, 2026
b6cf319
Merge branch 'my-master' into hs-beta-lazy-loading
JiangJiaWei1103 May 22, 2026
b76f5b9
refactor: Reuse sess key gen fn
JiangJiaWei1103 May 22, 2026
cac9a7b
docs: Clarify known limitation
JiangJiaWei1103 May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion historyserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ It provides a web interface to explore the history of Ray jobs, tasks, actors, a
The History Server consists of two main components:

1. **Collector**: Runs as a sidecar container in Ray clusters to collect logs and metadata
2. **History Server**: Central service that aggregates data from collectors and provides a web UI
2. **History Server**: Serves a Ray Dashboard-shaped HTTP API and ingests cluster sessions' events on demand
Comment thread
chiayi marked this conversation as resolved.
Outdated
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated

## Building

Expand Down Expand Up @@ -139,6 +139,87 @@ To run lint checks:
make alllint
```

## Smoke Tests

### 1. Deploy History Server

Apply MinIO and the History Server manifests:

```bash
kubectl apply -f historyserver/config/minio.yaml
kubectl apply -f historyserver/config/service_account.yaml
kubectl apply -f historyserver/config/historyserver.yaml
```

Port-forward the HS service:

```bash
kubectl port-forward svc/historyserver 8080:30080
```

### 2. Generate a Dead Session

Deploy the sample RayCluster, run a deterministic workload, then delete the CR:

```bash
kubectl apply -f historyserver/config/raycluster.yaml
kubectl wait pod -l ray.io/node-type=head --for=condition=Ready --timeout=180s

# Run a workload so events are written to MinIO
kubectl exec $(kubectl get pod -l ray.io/node-type=head -o name) \
-c ray-head -- python -c "
import ray
ray.init(address='auto')

@ray.remote
def add(x, y): return x + y

print('tasks:', ray.get([add.remote(i, i) for i in range(5)]))
"

# Delete the cluster — produces a 'dead' session
kubectl delete -f historyserver/config/raycluster.yaml

# Discover the session name. /clusters lists both live and dead sessions;
# dead sessions carry the `session_*` name you'll feed into /enter_cluster.
curl -sS http://localhost:8080/clusters
```

### 3. Cold Path (first visit)

Trigger the lazy load synchronously. Replace `<session>` with the session name from §2:

```bash
time curl -s -o /dev/null \
http://localhost:8080/enter_cluster/default/raycluster-historyserver/<session>
```

> [!NOTE]
> Cold path runs synchronously: K8s probe + event parse. Expect this to take seconds. Subsequent endpoint calls
> (`/api/v0/jobs`, `/api/v0/tasks/...`) read from in-memory state populated by this call.

### 4. Warm Path (subsequent visits on same replica)

Re-enter the same cluster:

```bash
time curl -s -o /dev/null \
http://localhost:8080/enter_cluster/default/raycluster-historyserver/<session>
```

> [!NOTE]
> Warm path returns immediately — Supervisor's loaded-session set fast-paths the request without re-parsing. If the HS
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
> process restarts, the next visit returns to cold-path latency.

### 5. Tail Logs

```bash
kubectl logs -f -l app=historyserver --tail=50
```

You should see one `current eventFileList for cluster ...` line (and the per-file `Reading event file: ...` entries)
per first-time cold-path call. Warm-path calls produce no parse log lines.

## Deployment

History Server can be deployed in Kubernetes using the manifests in the `config/samples/` directory.
Expand Down
143 changes: 98 additions & 45 deletions historyserver/cmd/historyserver/main.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,79 @@
// Package main is the entrypoint for the History Server HTTP daemon.
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
// It exposes Ray Dashboard-shaped API endpoints over HTTP and drives
// per-session event processing on demand via a Supervisor when
// /enter_cluster hits a dead session.
package main

Comment thread
Future-Outlier marked this conversation as resolved.
Outdated
import (
"context"
"encoding/json"
"flag"
"os"
"os/signal"
"sync"
"syscall"

"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/client"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"

"github.com/ray-project/kuberay/historyserver/pkg/collector"
"github.com/ray-project/kuberay/historyserver/pkg/collector/types"
"github.com/ray-project/kuberay/historyserver/pkg/eventserver"
"github.com/ray-project/kuberay/historyserver/pkg/historyserver"
"github.com/sirupsen/logrus"
)

func main() {
runtimeClassName := ""
rayRootDir := ""
kubeconfigs := ""
runtimeClassConfigPath := "/var/collector-config/data"
dashboardDir := ""
useKubernetesProxy := false
flag.StringVar(&runtimeClassName, "runtime-class-name", "", "")
flag.StringVar(&rayRootDir, "ray-root-dir", "", "")
flag.StringVar(&kubeconfigs, "kubeconfigs", "", "")
flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "")
flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data"
flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "")
// ===== Flags =====
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
var (
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
runtimeClassName string
rayRootDir string
kubeconfigs string
dashboardDir string
runtimeClassConfigPath string
useKubernetesProxy bool
)
flag.StringVar(&runtimeClassName, "runtime-class-name", "", "Storage backend: s3 / gcs / azureblob / aliyunoss / localtest")
flag.StringVar(&rayRootDir, "ray-root-dir", "", "Root dir inside the bucket")
flag.StringVar(&kubeconfigs, "kubeconfigs", "", "Kubeconfig path; empty = in-cluster")
flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "Path to Ray Dashboard static assets")
flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "Path to backend config JSON")
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use local kubeconfig instead of in-cluster config")
flag.Parse()

if runtimeClassName == "" {
logrus.Fatal("--runtime-class-name is required")
}

// ===== ClientManager =====
cliMgr, err := historyserver.NewClientManager(kubeconfigs, useKubernetesProxy)
if err != nil {
logrus.Errorf("Failed to create client manager: %v", err)
os.Exit(1)
logrus.Fatalf("client manager: %v", err)
}

// ===== Backend config =====
jsonData := make(map[string]interface{})
if runtimeClassConfigPath != "" {
data, err := os.ReadFile(runtimeClassConfigPath)
if err != nil {
panic("Failed to read runtime class config " + err.Error())
logrus.Fatalf("read runtime-class-config: %v", err)
}
err = json.Unmarshal(data, &jsonData)
if err != nil {
panic("Failed to parse runtime class config: " + err.Error())
if err := json.Unmarshal(data, &jsonData); err != nil {
logrus.Fatalf("parse runtime-class-config: %v", err)
}
}

// ===== Reader factory =====
registry := collector.GetReaderRegistry()
factory, ok := registry[runtimeClassName]
if !ok {
panic("Not supported runtime class name: " + runtimeClassName + ".")
logrus.Fatalf("unsupported runtime-class-name for reader: %s", runtimeClassName)
}

globalConfig := types.RayHistoryServerConfig{
Expand All @@ -60,50 +82,81 @@ func main() {

reader, err := factory(&globalConfig, jsonData)
if err != nil {
panic("Failed to create reader for runtime class name: " + runtimeClassName + ".")
logrus.Fatalf("create reader: %v", err)
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
}

// Create EventHandler with storage reader
// ===== EventHandler =====
eventHandler := eventserver.NewEventHandler(reader)

// WaitGroup to track goroutine completion
var wg sync.WaitGroup
// ===== Server context =====
serverCtx, serverCancel := signal.NotifyContext(
context.Background(),
syscall.SIGINT, syscall.SIGTERM,
)
defer serverCancel()

sigChan := make(chan os.Signal, 1)
stop := make(chan struct{}, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// ===== K8s client (for Pipeline.isDead) =====
k8sClient, err := buildK8sClient(kubeconfigs, useKubernetesProxy)
if err != nil {
logrus.Fatalf("build k8s client: %v", err)
}

// Start EventHandler in background goroutine
wg.Add(1)
// ===== Pipeline & Supervisor =====
pipeline := historyserver.NewPipeline(eventHandler, k8sClient)
supervisor := historyserver.NewSupervisor(pipeline, serverCtx)

// ===== Shutdown signaling =====
// Bridge serverCtx into the legacy stop channel that ServerHandler.Run
// consumes; the existing chan-based API is preserved.
var wg sync.WaitGroup
stop := make(chan struct{})
go func() {
defer wg.Done()
logrus.Info("Starting EventHandler in background...")
if err := eventHandler.Run(stop, 2); err != nil {
logrus.Errorf("EventHandler stopped with error: %v", err)
}
logrus.Info("EventHandler shutdown complete")
<-serverCtx.Done()
logrus.Info("Received shutdown signal, initiating graceful shutdown...")
close(stop)
}()
Comment on lines +87 to 90
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add an issue to use serverCtx and replace/remove the “stop channel”?
Thank you!


handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, useKubernetesProxy)
// ===== ServerHandler =====
handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, supervisor, useKubernetesProxy)
if err != nil {
logrus.Errorf("Failed to create server handler: %v", err)
os.Exit(1)
logrus.Fatalf("create server handler: %v", err)
}

// ===== Run HTTP server =====
wg.Add(1)
go func() {
defer wg.Done()
handler.Run(stop)
logrus.Info("HTTP server shutdown complete")
}()

<-sigChan
logrus.Info("Received shutdown signal, initiating graceful shutdown...")

// Stop both the server and the event handler
close(stop)

// Wait for both goroutines to complete
// ===== Wait for graceful shutdown =====
wg.Wait()
logrus.Info("Graceful shutdown complete")
}

// buildK8sClient constructs a controller-runtime client.Client with the
// rayv1 scheme registered. Used by Pipeline.isDead.
func buildK8sClient(kubeconfigs string, useKubeProxy bool) (client.Client, error) {
var cfg *rest.Config
var err error

switch {
case kubeconfigs != "":
cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfigs)
case useKubeProxy:
loading := clientcmd.NewDefaultClientConfigLoadingRules()
cfg, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loading, &clientcmd.ConfigOverrides{}).ClientConfig()
default:
cfg, err = rest.InClusterConfig()
}
if err != nil {
return nil, err
}
cfg.QPS = 50
cfg.Burst = 100

scheme := runtime.NewScheme()
utilruntime.Must(rayv1.AddToScheme(scheme))
return client.New(cfg, client.Options{Scheme: scheme})
}
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
77 changes: 77 additions & 0 deletions historyserver/pkg/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,3 +1531,80 @@ func extractActorIDFromTaskID(taskIDHex string) string {

return actorPortion + jobPortion
}

// ProcessSingleSession reads all event files for a single session synchronously
// and populates the handler's in-memory maps.
//
// Per-file failure handling:
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
// - GetContent/ReadAll failures are likely transient storage errors:
// skip this file.
// - json.Unmarshal/storeEvent failures are treated as corrupt-file:
// don't count as a hard failure since retrying won't help.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessSingleSession can return nil as long as at least one event file is successfully read. Downstream treats a successful cold load as “ready”, but a nil error does not guarantee that the ingested data is 100% identical to what is in object storage.
Should we add a short note on ProcessSingleSession and on README clarifying that success means “pass completed without hard failure”, not “all objects parsed”, with 1–2 concrete examples like (partial storage read outage, malformed JSON file, log-events path failed while Ray events succeeded) ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, and that works for me. I think we can solve this in the follow-up.

Since this issue has been raised multiple times, I put a detailed explanation here. Thank you!

//
// TODO(jwj): Empty event file list vs ListFiles outage is ambiguous without
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
// StorageReader interface surfacing errors.
func (h *EventHandler) ProcessSingleSession(clusterInfo utils.ClusterInfo) error {
clusterNameNamespace := clusterInfo.Name + "_" + clusterInfo.Namespace
clusterSessionKey := utils.BuildClusterSessionKey(clusterInfo.Name, clusterInfo.Namespace, clusterInfo.SessionName)

// Read Log Events from logs/{nodeId}/events/event_*.log
// This is the format used by Ray Dashboard's /events API.
logEventReader := NewLogEventReader(h.reader)
logEventErr := logEventReader.ReadLogEvents(clusterInfo, clusterSessionKey, h.ClusterLogEventMap)
if logEventErr != nil {
logrus.Errorf("Failed to read Log Events for %s: %v", clusterSessionKey, logEventErr)
Comment thread
andrewsykim marked this conversation as resolved.
Outdated
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessSingleSession ignores context during log event reading

Low Severity

ProcessSingleSession calls ReadLogEvents before checking ctx.Err(), and ReadLogEvents does not accept a context parameter. If the 2-minute loadTimeout or server shutdown fires during log event reading, the cancellation won't be observed until ReadLogEvents completes and the ray events loop begins its first iteration. For sessions with many nodes and log files, this can delay graceful shutdown and exceed the intended per-load timeout contract.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 96a12be. Configure here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine-grained ctx management will be resolved in the follow-up.


// Read RayEvents (Export Events) from node_events/ and job_events/.
// These are used for task/actor/job/node data APIs.
eventFileList := append(h.getAllJobEventFiles(clusterInfo), h.getAllNodeEventFiles(clusterInfo)...)
logrus.Infof("current eventFileList for cluster %s is: %v", clusterInfo.Name, eventFileList)
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated

rayEventsAttempted := len(eventFileList)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Retry sessions when listing event directories fails

When the storage backend has a transient directory-listing error, this path can still return nil with zero attempted RayEvent files, so Supervisor.runOnce marks the session as loaded with empty in-memory state and future /enter_cluster calls will never retry. This is distinct from the per-file failure handling: the S3/GCS/Azure ListFiles implementations log listing errors and return an empty/nil slice, and the new lazy path treats that the same as a legitimately empty session here.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed. The root cause is StorageReader.ListFiles not surfacing errors. When a storage backend has a transient outage, it logs internally and returns nil, which is indistinguishable from a legitimately empty session.

Adding error returns to the StorageReader interface (including 5 backends + all callers) seems out of scope here. I'll open a follow-up issue + PR to fix it.

rayEventsSucceeded := 0
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
for _, eventFile := range eventFileList {
logrus.Infof("Reading event file: %s", eventFile)
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

eventioReader := h.reader.GetContent(clusterNameNamespace, eventFile)
if eventioReader == nil {
logrus.Errorf("Failed to get content for event file: %s, skipping", eventFile)
continue
}
eventbytes, err := io.ReadAll(eventioReader)
if err != nil {
logrus.Errorf("Failed to read event file: %v", err)
continue
}
rayEventsSucceeded++
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

var eventList []map[string]any
if err := json.Unmarshal(eventbytes, &eventList); err != nil {
logrus.Errorf("Failed to unmarshal event: %v", err)
continue
}

for _, event := range eventList {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check ctx.Err() inside the inner loop? A single event file can contain many events, and right now SIGTERM cancellation won't be observed until the file finishes

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

Since GetContent / io.ReadAll / Unmarshal are also not context-aware, and in the interest of keeping things simple per Andrew's comments (#4795 (comment)), can we land all four changes (including the per-event check) together as one coherent follow-up?

On the LoadSession timeout: the 2-minute bound does limit resource leaks. That said, I think the worst case is the configured timeout + the duration of any in-progress storage I/O, unmarshal, or large-file processing.

if event == nil {
continue
}
event["clusterName"] = clusterSessionKey
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe use clusterNameSessionKey instead of just clusterName?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. This works for me.

Could we modify it in a rename follow-up PR at once? There are still lots of naming conflicts and we can keep this PR clean at this stage.

Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
if err := h.storeEvent(event); err != nil {
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
Outdated
logrus.Errorf("Failed to store event: %v", err)
continue
}
}
}

// If every attempted file failed, treat as transient outage and surface
// so the session is not marked loaded.
if rayEventsAttempted > 0 && rayEventsSucceeded == 0 {
return fmt.Errorf("ingested 0 of %d RayEvent files for %s: likely transient storage outage",
rayEventsAttempted, clusterSessionKey)
}

if logEventErr != nil {
return fmt.Errorf("read log events for %s: %w", clusterSessionKey, logEventErr)
}
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

return nil
Comment thread
JiangJiaWei1103 marked this conversation as resolved.
}
Comment thread
cursor[bot] marked this conversation as resolved.
Loading
Loading