-
Notifications
You must be signed in to change notification settings - Fork 763
[1/N] [History Server Beta] [Feat] Lazy loading #4795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
44474e6
c5904c7
1c1f383
3ff2245
3540f14
69e5ab7
c617b08
fad772e
aab5dad
1641ab6
1bff9d1
6294d0e
ea41c98
b2ad15e
3c54fb4
fec4ba5
93626b0
536115b
caea59d
949dea0
6d44bae
ca30acb
68c8ad5
23caa56
557aa3d
49105e1
cb031d7
3ac6576
d42a2be
0e67b0b
8b14840
8a3914c
ae6ba95
fd7e413
96a12be
8caa9a1
0b90fc9
9d0098f
ab80886
1a55a2a
70c7ee2
824d1d5
8337ebb
b6cf319
b76f5b9
cac9a7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,57 +1,79 @@ | ||
| // Package main is the entrypoint for the History Server HTTP daemon. | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| // It exposes Ray Dashboard-shaped API endpoints over HTTP and drives | ||
| // per-session event processing on demand via a Supervisor when | ||
| // /enter_cluster hits a dead session. | ||
| package main | ||
|
|
||
|
Future-Outlier marked this conversation as resolved.
Outdated
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "flag" | ||
| "os" | ||
| "os/signal" | ||
| "sync" | ||
| "syscall" | ||
|
|
||
| "github.com/sirupsen/logrus" | ||
|
|
||
| "k8s.io/apimachinery/pkg/runtime" | ||
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
| "k8s.io/client-go/rest" | ||
| "k8s.io/client-go/tools/clientcmd" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
|
|
||
| rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" | ||
|
|
||
| "github.com/ray-project/kuberay/historyserver/pkg/collector" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/collector/types" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/eventserver" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/historyserver" | ||
| "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| func main() { | ||
| runtimeClassName := "" | ||
| rayRootDir := "" | ||
| kubeconfigs := "" | ||
| runtimeClassConfigPath := "/var/collector-config/data" | ||
| dashboardDir := "" | ||
| useKubernetesProxy := false | ||
| flag.StringVar(&runtimeClassName, "runtime-class-name", "", "") | ||
| flag.StringVar(&rayRootDir, "ray-root-dir", "", "") | ||
| flag.StringVar(&kubeconfigs, "kubeconfigs", "", "") | ||
| flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "") | ||
| flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data" | ||
| flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "") | ||
| // ===== Flags ===== | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| var ( | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| runtimeClassName string | ||
| rayRootDir string | ||
| kubeconfigs string | ||
| dashboardDir string | ||
| runtimeClassConfigPath string | ||
| useKubernetesProxy bool | ||
| ) | ||
| flag.StringVar(&runtimeClassName, "runtime-class-name", "", "Storage backend: s3 / gcs / azureblob / aliyunoss / localtest") | ||
| flag.StringVar(&rayRootDir, "ray-root-dir", "", "Root dir inside the bucket") | ||
| flag.StringVar(&kubeconfigs, "kubeconfigs", "", "Kubeconfig path; empty = in-cluster") | ||
| flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "Path to Ray Dashboard static assets") | ||
| flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "Path to backend config JSON") | ||
|
JiangJiaWei1103 marked this conversation as resolved.
|
||
| flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use local kubeconfig instead of in-cluster config") | ||
| flag.Parse() | ||
|
|
||
| if runtimeClassName == "" { | ||
| logrus.Fatal("--runtime-class-name is required") | ||
| } | ||
|
|
||
| // ===== ClientManager ===== | ||
| cliMgr, err := historyserver.NewClientManager(kubeconfigs, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to create client manager: %v", err) | ||
| os.Exit(1) | ||
| logrus.Fatalf("client manager: %v", err) | ||
| } | ||
|
|
||
| // ===== Backend config ===== | ||
| jsonData := make(map[string]interface{}) | ||
| if runtimeClassConfigPath != "" { | ||
| data, err := os.ReadFile(runtimeClassConfigPath) | ||
| if err != nil { | ||
| panic("Failed to read runtime class config " + err.Error()) | ||
| logrus.Fatalf("read runtime-class-config: %v", err) | ||
| } | ||
| err = json.Unmarshal(data, &jsonData) | ||
| if err != nil { | ||
| panic("Failed to parse runtime class config: " + err.Error()) | ||
| if err := json.Unmarshal(data, &jsonData); err != nil { | ||
| logrus.Fatalf("parse runtime-class-config: %v", err) | ||
| } | ||
| } | ||
|
|
||
| // ===== Reader factory ===== | ||
| registry := collector.GetReaderRegistry() | ||
| factory, ok := registry[runtimeClassName] | ||
| if !ok { | ||
| panic("Not supported runtime class name: " + runtimeClassName + ".") | ||
| logrus.Fatalf("unsupported runtime-class-name for reader: %s", runtimeClassName) | ||
| } | ||
|
|
||
| globalConfig := types.RayHistoryServerConfig{ | ||
|
|
@@ -60,50 +82,81 @@ func main() { | |
|
|
||
| reader, err := factory(&globalConfig, jsonData) | ||
| if err != nil { | ||
| panic("Failed to create reader for runtime class name: " + runtimeClassName + ".") | ||
| logrus.Fatalf("create reader: %v", err) | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| // Create EventHandler with storage reader | ||
| // ===== EventHandler ===== | ||
| eventHandler := eventserver.NewEventHandler(reader) | ||
|
|
||
| // WaitGroup to track goroutine completion | ||
| var wg sync.WaitGroup | ||
| // ===== Server context ===== | ||
| serverCtx, serverCancel := signal.NotifyContext( | ||
| context.Background(), | ||
| syscall.SIGINT, syscall.SIGTERM, | ||
| ) | ||
| defer serverCancel() | ||
|
|
||
| sigChan := make(chan os.Signal, 1) | ||
| stop := make(chan struct{}, 1) | ||
| signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | ||
| // ===== K8s client (for Pipeline.isDead) ===== | ||
| k8sClient, err := buildK8sClient(kubeconfigs, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Fatalf("build k8s client: %v", err) | ||
| } | ||
|
|
||
| // Start EventHandler in background goroutine | ||
| wg.Add(1) | ||
| // ===== Pipeline & Supervisor ===== | ||
| pipeline := historyserver.NewPipeline(eventHandler, k8sClient) | ||
| supervisor := historyserver.NewSupervisor(pipeline, serverCtx) | ||
|
|
||
| // ===== Shutdown signaling ===== | ||
| // Bridge serverCtx into the legacy stop channel that ServerHandler.Run | ||
| // consumes; the existing chan-based API is preserved. | ||
| var wg sync.WaitGroup | ||
| stop := make(chan struct{}) | ||
| go func() { | ||
| defer wg.Done() | ||
| logrus.Info("Starting EventHandler in background...") | ||
| if err := eventHandler.Run(stop, 2); err != nil { | ||
| logrus.Errorf("EventHandler stopped with error: %v", err) | ||
| } | ||
| logrus.Info("EventHandler shutdown complete") | ||
| <-serverCtx.Done() | ||
| logrus.Info("Received shutdown signal, initiating graceful shutdown...") | ||
| close(stop) | ||
| }() | ||
|
Comment on lines
+87
to
90
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add an issue to use serverCtx and replace/remove the “stop channel”? |
||
|
|
||
| handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, useKubernetesProxy) | ||
| // ===== ServerHandler ===== | ||
| handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, supervisor, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to create server handler: %v", err) | ||
| os.Exit(1) | ||
| logrus.Fatalf("create server handler: %v", err) | ||
| } | ||
|
|
||
| // ===== Run HTTP server ===== | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| handler.Run(stop) | ||
| logrus.Info("HTTP server shutdown complete") | ||
| }() | ||
|
|
||
| <-sigChan | ||
| logrus.Info("Received shutdown signal, initiating graceful shutdown...") | ||
|
|
||
| // Stop both the server and the event handler | ||
| close(stop) | ||
|
|
||
| // Wait for both goroutines to complete | ||
| // ===== Wait for graceful shutdown ===== | ||
| wg.Wait() | ||
| logrus.Info("Graceful shutdown complete") | ||
| } | ||
|
|
||
| // buildK8sClient constructs a controller-runtime client.Client with the | ||
| // rayv1 scheme registered. Used by Pipeline.isDead. | ||
| func buildK8sClient(kubeconfigs string, useKubeProxy bool) (client.Client, error) { | ||
| var cfg *rest.Config | ||
| var err error | ||
|
|
||
| switch { | ||
| case kubeconfigs != "": | ||
| cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfigs) | ||
| case useKubeProxy: | ||
| loading := clientcmd.NewDefaultClientConfigLoadingRules() | ||
| cfg, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loading, &clientcmd.ConfigOverrides{}).ClientConfig() | ||
| default: | ||
| cfg, err = rest.InClusterConfig() | ||
| } | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| cfg.QPS = 50 | ||
| cfg.Burst = 100 | ||
|
|
||
| scheme := runtime.NewScheme() | ||
| utilruntime.Must(rayv1.AddToScheme(scheme)) | ||
| return client.New(cfg, client.Options{Scheme: scheme}) | ||
| } | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1531,3 +1531,80 @@ func extractActorIDFromTaskID(taskIDHex string) string { | |
|
|
||
| return actorPortion + jobPortion | ||
| } | ||
|
|
||
| // ProcessSingleSession reads all event files for a single session synchronously | ||
| // and populates the handler's in-memory maps. | ||
| // | ||
| // Per-file failure handling: | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| // - GetContent/ReadAll failures are likely transient storage errors: | ||
| // skip this file. | ||
| // - json.Unmarshal/storeEvent failures are treated as corrupt-file: | ||
| // don't count as a hard failure since retrying won't help. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, and that works for me. I think we can solve this in the follow-up. Since this issue has been raised multiple times, I put a detailed explanation here. Thank you! |
||
| // | ||
| // TODO(jwj): Empty event file list vs ListFiles outage is ambiguous without | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| // StorageReader interface surfacing errors. | ||
| func (h *EventHandler) ProcessSingleSession(clusterInfo utils.ClusterInfo) error { | ||
| clusterNameNamespace := clusterInfo.Name + "_" + clusterInfo.Namespace | ||
| clusterSessionKey := utils.BuildClusterSessionKey(clusterInfo.Name, clusterInfo.Namespace, clusterInfo.SessionName) | ||
|
|
||
| // Read Log Events from logs/{nodeId}/events/event_*.log | ||
| // This is the format used by Ray Dashboard's /events API. | ||
| logEventReader := NewLogEventReader(h.reader) | ||
| logEventErr := logEventReader.ReadLogEvents(clusterInfo, clusterSessionKey, h.ClusterLogEventMap) | ||
| if logEventErr != nil { | ||
| logrus.Errorf("Failed to read Log Events for %s: %v", clusterSessionKey, logEventErr) | ||
|
andrewsykim marked this conversation as resolved.
Outdated
|
||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ProcessSingleSession ignores context during log event readingLow Severity
Additional Locations (1)Reviewed by Cursor Bugbot for commit 96a12be. Configure here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine-grained ctx management will be resolved in the follow-up. |
||
|
|
||
| // Read RayEvents (Export Events) from node_events/ and job_events/. | ||
| // These are used for task/actor/job/node data APIs. | ||
| eventFileList := append(h.getAllJobEventFiles(clusterInfo), h.getAllNodeEventFiles(clusterInfo)...) | ||
| logrus.Infof("current eventFileList for cluster %s is: %v", clusterInfo.Name, eventFileList) | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
|
|
||
| rayEventsAttempted := len(eventFileList) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the storage backend has a transient directory-listing error, this path can still return Useful? React with 👍 / 👎.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirmed. The root cause is Adding error returns to the |
||
| rayEventsSucceeded := 0 | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| for _, eventFile := range eventFileList { | ||
| logrus.Infof("Reading event file: %s", eventFile) | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
cursor[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| eventioReader := h.reader.GetContent(clusterNameNamespace, eventFile) | ||
| if eventioReader == nil { | ||
| logrus.Errorf("Failed to get content for event file: %s, skipping", eventFile) | ||
| continue | ||
| } | ||
| eventbytes, err := io.ReadAll(eventioReader) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to read event file: %v", err) | ||
| continue | ||
| } | ||
| rayEventsSucceeded++ | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| var eventList []map[string]any | ||
| if err := json.Unmarshal(eventbytes, &eventList); err != nil { | ||
| logrus.Errorf("Failed to unmarshal event: %v", err) | ||
| continue | ||
| } | ||
|
|
||
| for _, event := range eventList { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also check ctx.Err() inside the inner loop? A single event file can contain many events, and right now
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Since On the |
||
| if event == nil { | ||
| continue | ||
| } | ||
| event["clusterName"] = clusterSessionKey | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe use
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. This works for me. Could we modify it in a rename follow-up PR at once? There are still lots of naming conflicts and we can keep this PR clean at this stage.
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| if err := h.storeEvent(event); err != nil { | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| logrus.Errorf("Failed to store event: %v", err) | ||
| continue | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // If every attempted file failed, treat as transient outage and surface | ||
| // so the session is not marked loaded. | ||
| if rayEventsAttempted > 0 && rayEventsSucceeded == 0 { | ||
| return fmt.Errorf("ingested 0 of %d RayEvent files for %s: likely transient storage outage", | ||
| rayEventsAttempted, clusterSessionKey) | ||
| } | ||
|
|
||
| if logEventErr != nil { | ||
| return fmt.Errorf("read log events for %s: %w", clusterSessionKey, logEventErr) | ||
| } | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| return nil | ||
|
JiangJiaWei1103 marked this conversation as resolved.
|
||
| } | ||
|
cursor[bot] marked this conversation as resolved.
|
||


Uh oh!
There was an error while loading. Please reload this page.