-
Notifications
You must be signed in to change notification settings - Fork 763
[1/N] [History Server Beta] [Feat] Lazy loading #4795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 38 commits
44474e6
c5904c7
1c1f383
3ff2245
3540f14
69e5ab7
c617b08
fad772e
aab5dad
1641ab6
1bff9d1
6294d0e
ea41c98
b2ad15e
3c54fb4
fec4ba5
93626b0
536115b
caea59d
949dea0
6d44bae
ca30acb
68c8ad5
23caa56
557aa3d
49105e1
cb031d7
3ac6576
d42a2be
0e67b0b
8b14840
8a3914c
ae6ba95
fd7e413
96a12be
8caa9a1
0b90fc9
9d0098f
ab80886
1a55a2a
70c7ee2
824d1d5
8337ebb
b6cf319
b76f5b9
cac9a7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,57 +1,61 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "flag" | ||
| "os" | ||
| "os/signal" | ||
| "sync" | ||
| "syscall" | ||
|
|
||
| "github.com/sirupsen/logrus" | ||
|
|
||
| "github.com/ray-project/kuberay/historyserver/pkg/collector" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/collector/types" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/eventserver" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/historyserver" | ||
| "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| func main() { | ||
| runtimeClassName := "" | ||
| rayRootDir := "" | ||
| kubeconfigs := "" | ||
| runtimeClassConfigPath := "/var/collector-config/data" | ||
| runtimeClassConfigPath := "" | ||
| dashboardDir := "" | ||
| useKubernetesProxy := false | ||
| flag.StringVar(&runtimeClassName, "runtime-class-name", "", "") | ||
| flag.StringVar(&rayRootDir, "ray-root-dir", "", "") | ||
| flag.StringVar(&kubeconfigs, "kubeconfigs", "", "") | ||
| flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "") | ||
| flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data" | ||
| flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "") | ||
| flag.StringVar(&runtimeClassName, "runtime-class-name", "", "Storage backend: s3 / gcs / azureblob / aliyunoss / localtest") | ||
| flag.StringVar(&rayRootDir, "ray-root-dir", "", "Root dir inside the bucket") | ||
| flag.StringVar(&kubeconfigs, "kubeconfigs", "", "Kubeconfig path; empty = in-cluster") | ||
| flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "Path to Ray Dashboard static assets") | ||
| flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "Path to backend config JSON") | ||
|
JiangJiaWei1103 marked this conversation as resolved.
|
||
| flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use local kubeconfig instead of in-cluster config") | ||
| flag.Parse() | ||
|
|
||
| if runtimeClassName == "" { | ||
| logrus.Fatal("--runtime-class-name is required") | ||
| } | ||
|
|
||
| cliMgr, err := historyserver.NewClientManager(kubeconfigs, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to create client manager: %v", err) | ||
| os.Exit(1) | ||
| logrus.Fatalf("Failed to create client manager: %v", err) | ||
| } | ||
|
|
||
| jsonData := make(map[string]interface{}) | ||
| if runtimeClassConfigPath != "" { | ||
| data, err := os.ReadFile(runtimeClassConfigPath) | ||
| if err != nil { | ||
| panic("Failed to read runtime class config " + err.Error()) | ||
| logrus.Fatalf("Failed to read runtime-class-config from %s: %v", runtimeClassConfigPath, err) | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| } | ||
| err = json.Unmarshal(data, &jsonData) | ||
| if err != nil { | ||
| panic("Failed to parse runtime class config: " + err.Error()) | ||
| if err := json.Unmarshal(data, &jsonData); err != nil { | ||
| logrus.Fatalf("Failed to parse runtime-class-config from %s: %v", runtimeClassConfigPath, err) | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| } | ||
| } | ||
|
|
||
| registry := collector.GetReaderRegistry() | ||
| factory, ok := registry[runtimeClassName] | ||
| if !ok { | ||
| panic("Not supported runtime class name: " + runtimeClassName + ".") | ||
| logrus.Fatalf("Unsupported runtime-class-name for reader: %s", runtimeClassName) | ||
| } | ||
|
|
||
| globalConfig := types.RayHistoryServerConfig{ | ||
|
|
@@ -60,34 +64,36 @@ func main() { | |
|
|
||
| reader, err := factory(&globalConfig, jsonData) | ||
| if err != nil { | ||
| panic("Failed to create reader for runtime class name: " + runtimeClassName + ".") | ||
| logrus.Fatalf("Failed to create reader for runtime class name %s: %v", runtimeClassName, err) | ||
| } | ||
|
|
||
| // Create EventHandler with storage reader | ||
| eventHandler := eventserver.NewEventHandler(reader) | ||
|
|
||
| // WaitGroup to track goroutine completion | ||
| var wg sync.WaitGroup | ||
| serverCtx, serverCancel := signal.NotifyContext( | ||
| context.Background(), | ||
| syscall.SIGINT, syscall.SIGTERM, | ||
| ) | ||
| defer serverCancel() | ||
|
|
||
| sigChan := make(chan os.Signal, 1) | ||
| stop := make(chan struct{}, 1) | ||
| signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | ||
| resolver, err := historyserver.NewHTTPLiveSessionResolver(cliMgr, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Fatalf("Failed to create live session resolver: %v", err) | ||
| } | ||
| processor := historyserver.NewSessionProcessor(eventHandler, cliMgr.Client(), resolver) | ||
| sessionLoader := historyserver.NewSessionLoader(processor, serverCtx) | ||
|
|
||
| // Start EventHandler in background goroutine | ||
| wg.Add(1) | ||
| // ServerHandler.Run consumes a stop chan; bridge serverCtx into it. | ||
| var wg sync.WaitGroup | ||
| stop := make(chan struct{}) | ||
| go func() { | ||
| defer wg.Done() | ||
| logrus.Info("Starting EventHandler in background...") | ||
| if err := eventHandler.Run(stop, 2); err != nil { | ||
| logrus.Errorf("EventHandler stopped with error: %v", err) | ||
| } | ||
| logrus.Info("EventHandler shutdown complete") | ||
| <-serverCtx.Done() | ||
| logrus.Info("Received shutdown signal, initiating graceful shutdown...") | ||
| close(stop) | ||
| }() | ||
|
Comment on lines
+87
to
90
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add an issue to use serverCtx and replace/remove the “stop channel”? |
||
|
|
||
| handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, useKubernetesProxy) | ||
| handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, sessionLoader, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to create server handler: %v", err) | ||
| os.Exit(1) | ||
| logrus.Fatalf("Failed to create server handler: %v", err) | ||
| } | ||
|
|
||
| wg.Add(1) | ||
|
|
@@ -97,13 +103,6 @@ func main() { | |
| logrus.Info("HTTP server shutdown complete") | ||
| }() | ||
|
|
||
| <-sigChan | ||
| logrus.Info("Received shutdown signal, initiating graceful shutdown...") | ||
|
|
||
| // Stop both the server and the event handler | ||
| close(stop) | ||
|
|
||
| // Wait for both goroutines to complete | ||
| wg.Wait() | ||
| logrus.Info("Graceful shutdown complete") | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,7 +76,12 @@ func (h *EventHandler) ProcessEvents(ctx context.Context, ch <-chan map[string]a | |
| logrus.Warnf("Channel was closed") | ||
| return nil | ||
| } | ||
| if err := h.storeEvent(currEventData); err != nil { | ||
| clusterSessionKey, ok := currEventData["clusterName"].(string) | ||
| if !ok { | ||
| logrus.Errorf("event missing or non-string 'clusterName' field") | ||
| continue | ||
| } | ||
| if err := h.storeEvent(clusterSessionKey, currEventData); err != nil { | ||
| logrus.Errorf("Failed to store event: %v", err) | ||
| continue | ||
| } | ||
|
|
@@ -208,8 +213,8 @@ func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error { | |
| return nil | ||
| } | ||
|
|
||
| // storeEvent unmarshals the event map into the correct actor/task struct and then stores it into the corresonding list | ||
| func (h *EventHandler) storeEvent(eventMap map[string]any) error { | ||
| // storeEvent unmarshals the event map into the correct actor/task struct and then stores it into the corresonding list. | ||
| func (h *EventHandler) storeEvent(clusterSessionKey string, eventMap map[string]any) error { | ||
| eventTypeVal, ok := eventMap["eventType"] | ||
| if !ok { | ||
| return fmt.Errorf("event missing 'eventType' field") | ||
|
|
@@ -220,17 +225,6 @@ func (h *EventHandler) storeEvent(eventMap map[string]any) error { | |
| } | ||
| eventType := types.EventType(eventTypeStr) | ||
|
|
||
| // clusterSessionKey is injected during event reading (Run function) and contains | ||
| // the full key: "{clusterName}_{namespace}_{sessionName}" | ||
| clusterSessionKeyVal, ok := eventMap["clusterName"] | ||
| if !ok { | ||
| return fmt.Errorf("event missing 'clusterName' field (clusterSessionKey)") | ||
| } | ||
| clusterSessionKey, ok := clusterSessionKeyVal.(string) | ||
| if !ok { | ||
| return fmt.Errorf("clusterName is not a string, got %T", clusterSessionKeyVal) | ||
| } | ||
|
|
||
| logrus.Infof("current eventType: %v", eventType) | ||
| switch eventType { | ||
| case types.TASK_DEFINITION_EVENT: | ||
|
|
@@ -667,7 +661,7 @@ func (h *EventHandler) GetTasks(clusterSessionKey string) []types.Task { | |
|
|
||
| taskMap, ok := h.ClusterTaskMap.ClusterTaskMap[clusterSessionKey] | ||
| if !ok { | ||
| // TODO(jwj): Add error handling. | ||
| // TODO(jiangjiawei1103): Add error handling. | ||
| logrus.Errorf("Task map not found for cluster session: %s", clusterSessionKey) | ||
| return []types.Task{} | ||
| } | ||
|
|
@@ -938,7 +932,7 @@ func (h *EventHandler) handleTaskLifecycleEvent(eventMap map[string]any, cluster | |
| } | ||
| normalizeTaskIDsToHex(&currTask) | ||
|
|
||
| // TODO(jwj): Clarify if there must be at least one state transition. Can one task have more than one state transition? | ||
| // TODO(jiangjiawei1103): Clarify if there must be at least one state transition. Can one task have more than one state transition? | ||
| if len(currTask.StateTransitions) == 0 { | ||
| return fmt.Errorf("TASK_LIFECYCLE_EVENT must have at least one state transition") | ||
| } | ||
|
|
@@ -974,7 +968,7 @@ func (h *EventHandler) handleTaskLifecycleEvent(eventMap map[string]any, cluster | |
| return | ||
| } | ||
|
|
||
| // TODO(jwj): Before beta, the lifecycle-related fields are overwritten. | ||
| // TODO(jiangjiawei1103): Before beta, the lifecycle-related fields are overwritten. | ||
| // In beta, the complete historical replay will be supported. | ||
| task.RayErrorInfo = currTask.RayErrorInfo | ||
| if currTask.JobID != "" { | ||
|
|
@@ -1531,3 +1525,71 @@ func extractActorIDFromTaskID(taskIDHex string) string { | |
|
|
||
| return actorPortion + jobPortion | ||
| } | ||
|
|
||
| // ProcessSingleSession reads all event files for a single session synchronously | ||
| // and populates the handler's in-memory maps. | ||
| // | ||
| // TODO(jiangjiawei1103): Empty event file list vs ListFiles outage is ambiguous without | ||
| // StorageReader interface surfacing errors. | ||
| func (h *EventHandler) ProcessSingleSession(ctx context.Context, clusterInfo utils.ClusterInfo) error { | ||
| clusterNameNamespace := clusterInfo.Name + "_" + clusterInfo.Namespace | ||
| clusterSessionKey := utils.BuildClusterSessionKey(clusterInfo.Name, clusterInfo.Namespace, clusterInfo.SessionName) | ||
|
|
||
| // ClusterLogEventMap backs only the /events endpoint, so log event read failures must not | ||
| // block marking the session as loaded and force subsequent Ray event re-processing. | ||
| logEventReader := NewLogEventReader(h.reader) | ||
| if err := logEventReader.ReadLogEvents(clusterInfo, clusterSessionKey, h.ClusterLogEventMap); err != nil { | ||
| logrus.Errorf("Incomplete Log Events read for %s: %v. /events endpoint may serve partial data.", | ||
| clusterSessionKey, err) | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ProcessSingleSession ignores context during log event readingLow Severity
Additional Locations (1)Reviewed by Cursor Bugbot for commit 96a12be. Configure here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine-grained ctx management will be resolved in the follow-up. |
||
|
|
||
| eventFileList := append(h.getAllJobEventFiles(clusterInfo), h.getAllNodeEventFiles(clusterInfo)...) | ||
| logrus.Debugf("current eventFileList for cluster %s is: %v", clusterInfo.Name, eventFileList) | ||
|
|
||
| rayEventsTotal := len(eventFileList) | ||
| rayEventsRead := 0 | ||
| for _, eventFile := range eventFileList { | ||
| if err := ctx.Err(); err != nil { | ||
| return err | ||
| } | ||
| logrus.Debugf("Reading event file: %s", eventFile) | ||
|
|
||
| // GetContent and io.ReadAll failures are treated as transient storage errors: | ||
| // skip this file and continue. | ||
| eventioReader := h.reader.GetContent(clusterNameNamespace, eventFile) | ||
| if eventioReader == nil { | ||
| logrus.Errorf("Failed to get content for event file: %s, skipping", eventFile) | ||
| continue | ||
| } | ||
| eventbytes, err := io.ReadAll(eventioReader) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to read events for file %s: %v", eventFile, err) | ||
| continue | ||
| } | ||
| rayEventsRead++ | ||
|
|
||
| // json.Unmarshal and storeEvent failures are treated as corrupt-data errors: | ||
| // retrying won't fix bad bytes, accepting partial loss. | ||
| var eventList []map[string]any | ||
| if err := json.Unmarshal(eventbytes, &eventList); err != nil { | ||
| logrus.Errorf("Failed to unmarshal events for file %s: %v", eventFile, err) | ||
| continue | ||
| } | ||
|
|
||
| for _, event := range eventList { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also check ctx.Err() inside the inner loop? A single event file can contain many events, and right now
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Since On the |
||
| if event == nil { | ||
| continue | ||
| } | ||
| if err := h.storeEvent(clusterSessionKey, event); err != nil { | ||
| logrus.Errorf("Failed to store events for file %s: %v", eventFile, err) | ||
| continue | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if rayEventsTotal > 0 && rayEventsRead == 0 { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment here, should this just check rayEventsTotal != rayEventsRead?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. For log events, For ray events, errors propagate to The proper fix for ray events might be to surface "loaded but incomplete" as a richer marker instead of a bool. I think it is a non-trivial change because it touches Would it make sense to defer richer error handling to follow-up? Thanks! |
||
| return fmt.Errorf("read 0 of %d event files for %s: likely transient storage outage", | ||
| rayEventsTotal, clusterSessionKey) | ||
| } | ||
| return nil | ||
|
JiangJiaWei1103 marked this conversation as resolved.
|
||
| } | ||
|
cursor[bot] marked this conversation as resolved.
|
||


Uh oh!
There was an error while loading. Please reload this page.