-
Notifications
You must be signed in to change notification settings - Fork 763
[1/N] [History Server Beta] [Feat] Lazy loading #4795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
44474e6
c5904c7
1c1f383
3ff2245
3540f14
69e5ab7
c617b08
fad772e
aab5dad
1641ab6
1bff9d1
6294d0e
ea41c98
b2ad15e
3c54fb4
fec4ba5
93626b0
536115b
caea59d
949dea0
6d44bae
ca30acb
68c8ad5
23caa56
557aa3d
49105e1
cb031d7
3ac6576
d42a2be
0e67b0b
8b14840
8a3914c
ae6ba95
fd7e413
96a12be
8caa9a1
0b90fc9
9d0098f
ab80886
1a55a2a
70c7ee2
824d1d5
8337ebb
b6cf319
b76f5b9
cac9a7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,57 +1,79 @@ | ||
| // Package main is the entrypoint for the History Server HTTP daemon. | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| // It exposes Ray Dashboard-shaped API endpoints over HTTP and drives | ||
| // per-session event processing on demand via a Supervisor when | ||
| // /enter_cluster hits a dead session. | ||
| package main | ||
|
|
||
|
Future-Outlier marked this conversation as resolved.
Outdated
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "flag" | ||
| "os" | ||
| "os/signal" | ||
| "sync" | ||
| "syscall" | ||
|
|
||
| "github.com/sirupsen/logrus" | ||
|
|
||
| "k8s.io/apimachinery/pkg/runtime" | ||
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
| "k8s.io/client-go/rest" | ||
| "k8s.io/client-go/tools/clientcmd" | ||
| "sigs.k8s.io/controller-runtime/pkg/client" | ||
|
|
||
| rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" | ||
|
|
||
| "github.com/ray-project/kuberay/historyserver/pkg/collector" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/collector/types" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/eventserver" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/historyserver" | ||
| "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| func main() { | ||
| runtimeClassName := "" | ||
| rayRootDir := "" | ||
| kubeconfigs := "" | ||
| runtimeClassConfigPath := "/var/collector-config/data" | ||
| dashboardDir := "" | ||
| useKubernetesProxy := false | ||
| flag.StringVar(&runtimeClassName, "runtime-class-name", "", "") | ||
| flag.StringVar(&rayRootDir, "ray-root-dir", "", "") | ||
| flag.StringVar(&kubeconfigs, "kubeconfigs", "", "") | ||
| flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "") | ||
| flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data" | ||
| flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "") | ||
| // ===== Flags ===== | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| var ( | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| runtimeClassName string | ||
| rayRootDir string | ||
| kubeconfigs string | ||
| dashboardDir string | ||
| runtimeClassConfigPath string | ||
| useKubernetesProxy bool | ||
| ) | ||
| flag.StringVar(&runtimeClassName, "runtime-class-name", "", "Storage backend: s3 / gcs / azureblob / aliyunoss / localtest") | ||
| flag.StringVar(&rayRootDir, "ray-root-dir", "", "Root dir inside the bucket") | ||
| flag.StringVar(&kubeconfigs, "kubeconfigs", "", "Kubeconfig path; empty = in-cluster") | ||
| flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "Path to Ray Dashboard static assets") | ||
| flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "Path to backend config JSON") | ||
|
JiangJiaWei1103 marked this conversation as resolved.
|
||
| flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use local kubeconfig instead of in-cluster config") | ||
| flag.Parse() | ||
|
|
||
| if runtimeClassName == "" { | ||
| logrus.Fatal("--runtime-class-name is required") | ||
| } | ||
|
|
||
| // ===== ClientManager ===== | ||
| cliMgr, err := historyserver.NewClientManager(kubeconfigs, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to create client manager: %v", err) | ||
| os.Exit(1) | ||
| logrus.Fatalf("client manager: %v", err) | ||
| } | ||
|
|
||
| // ===== Backend config ===== | ||
| jsonData := make(map[string]interface{}) | ||
| if runtimeClassConfigPath != "" { | ||
| data, err := os.ReadFile(runtimeClassConfigPath) | ||
| if err != nil { | ||
| panic("Failed to read runtime class config " + err.Error()) | ||
| logrus.Fatalf("read runtime-class-config: %v", err) | ||
| } | ||
| err = json.Unmarshal(data, &jsonData) | ||
| if err != nil { | ||
| panic("Failed to parse runtime class config: " + err.Error()) | ||
| if err := json.Unmarshal(data, &jsonData); err != nil { | ||
| logrus.Fatalf("parse runtime-class-config: %v", err) | ||
| } | ||
| } | ||
|
|
||
| // ===== Reader factory ===== | ||
| registry := collector.GetReaderRegistry() | ||
| factory, ok := registry[runtimeClassName] | ||
| if !ok { | ||
| panic("Not supported runtime class name: " + runtimeClassName + ".") | ||
| logrus.Fatalf("unsupported runtime-class-name for reader: %s", runtimeClassName) | ||
| } | ||
|
|
||
| globalConfig := types.RayHistoryServerConfig{ | ||
|
|
@@ -60,50 +82,81 @@ func main() { | |
|
|
||
| reader, err := factory(&globalConfig, jsonData) | ||
| if err != nil { | ||
| panic("Failed to create reader for runtime class name: " + runtimeClassName + ".") | ||
| logrus.Fatalf("create reader: %v", err) | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| // Create EventHandler with storage reader | ||
| // ===== EventHandler ===== | ||
| eventHandler := eventserver.NewEventHandler(reader) | ||
|
|
||
| // WaitGroup to track goroutine completion | ||
| var wg sync.WaitGroup | ||
| // ===== Server context ===== | ||
| serverCtx, serverCancel := signal.NotifyContext( | ||
| context.Background(), | ||
| syscall.SIGINT, syscall.SIGTERM, | ||
| ) | ||
| defer serverCancel() | ||
|
|
||
| sigChan := make(chan os.Signal, 1) | ||
| stop := make(chan struct{}, 1) | ||
| signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | ||
| // ===== K8s client (for Pipeline.isDead) ===== | ||
| k8sClient, err := buildK8sClient(kubeconfigs, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Fatalf("build k8s client: %v", err) | ||
| } | ||
|
|
||
| // Start EventHandler in background goroutine | ||
| wg.Add(1) | ||
| // ===== Pipeline & Supervisor ===== | ||
| pipeline := historyserver.NewPipeline(eventHandler, k8sClient) | ||
| supervisor := historyserver.NewSupervisor(pipeline, serverCtx) | ||
|
|
||
| // ===== Shutdown signaling ===== | ||
| // Bridge serverCtx into the legacy stop channel that ServerHandler.Run | ||
| // consumes; the existing chan-based API is preserved. | ||
| var wg sync.WaitGroup | ||
| stop := make(chan struct{}) | ||
| go func() { | ||
| defer wg.Done() | ||
| logrus.Info("Starting EventHandler in background...") | ||
| if err := eventHandler.Run(stop, 2); err != nil { | ||
| logrus.Errorf("EventHandler stopped with error: %v", err) | ||
| } | ||
| logrus.Info("EventHandler shutdown complete") | ||
| <-serverCtx.Done() | ||
| logrus.Info("Received shutdown signal, initiating graceful shutdown...") | ||
| close(stop) | ||
| }() | ||
|
Comment on lines
+87
to
90
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add an issue to use serverCtx and replace/remove the “stop channel”? |
||
|
|
||
| handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, useKubernetesProxy) | ||
| // ===== ServerHandler ===== | ||
| handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, supervisor, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to create server handler: %v", err) | ||
| os.Exit(1) | ||
| logrus.Fatalf("create server handler: %v", err) | ||
| } | ||
|
|
||
| // ===== Run HTTP server ===== | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| handler.Run(stop) | ||
| logrus.Info("HTTP server shutdown complete") | ||
| }() | ||
|
|
||
| <-sigChan | ||
| logrus.Info("Received shutdown signal, initiating graceful shutdown...") | ||
|
|
||
| // Stop both the server and the event handler | ||
| close(stop) | ||
|
|
||
| // Wait for both goroutines to complete | ||
| // ===== Wait for graceful shutdown ===== | ||
| wg.Wait() | ||
| logrus.Info("Graceful shutdown complete") | ||
| } | ||
|
|
||
| // buildK8sClient constructs a controller-runtime client.Client with the | ||
| // rayv1 scheme registered. Used by Pipeline.isDead. | ||
| func buildK8sClient(kubeconfigs string, useKubeProxy bool) (client.Client, error) { | ||
| var cfg *rest.Config | ||
| var err error | ||
|
|
||
| switch { | ||
| case kubeconfigs != "": | ||
| cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfigs) | ||
| case useKubeProxy: | ||
| loading := clientcmd.NewDefaultClientConfigLoadingRules() | ||
| cfg, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loading, &clientcmd.ConfigOverrides{}).ClientConfig() | ||
| default: | ||
| cfg, err = rest.InClusterConfig() | ||
| } | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| cfg.QPS = 50 | ||
| cfg.Burst = 100 | ||
|
|
||
| scheme := runtime.NewScheme() | ||
| utilruntime.Must(rayv1.AddToScheme(scheme)) | ||
| return client.New(cfg, client.Options{Scheme: scheme}) | ||
| } | ||
|
cursor[bot] marked this conversation as resolved.
Outdated
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1531,3 +1531,66 @@ func extractActorIDFromTaskID(taskIDHex string) string { | |
|
|
||
| return actorPortion + jobPortion | ||
| } | ||
|
|
||
| // ProcessSingleSession reads all event files for a single session synchronously | ||
| // and populates the handler's in-memory maps. It does not touch other sessions | ||
| // and does not use the channel fan-out that Run() uses. | ||
| // | ||
| // This is a fresh, synchronous implementation — intentionally does NOT share | ||
| // code with the processAllEvents closure inside Run(), because that closure's | ||
| // channel-based fan-out is v1-specific and would couple v1 to v2's lifecycle. | ||
| // | ||
| // Errors during individual file reads are logged but do not abort the whole | ||
| // session — matching v1's error tolerance in processAllEvents. | ||
| func (h *EventHandler) ProcessSingleSession(clusterInfo utils.ClusterInfo) error { | ||
| clusterNameNamespace := clusterInfo.Name + "_" + clusterInfo.Namespace | ||
| clusterSessionKey := utils.BuildClusterSessionKey(clusterInfo.Name, clusterInfo.Namespace, clusterInfo.SessionName) | ||
|
|
||
| // Read Log Events from logs/{nodeId}/events/event_*.log | ||
| // This is the format used by Ray Dashboard's /events API. | ||
| logEventReader := NewLogEventReader(h.reader) | ||
| if err := logEventReader.ReadLogEvents(clusterInfo, clusterSessionKey, h.ClusterLogEventMap); err != nil { | ||
| logrus.Errorf("Failed to read Log Events for %s: %v", clusterSessionKey, err) | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ProcessSingleSession ignores context during log event readingLow Severity
Additional Locations (1)Reviewed by Cursor Bugbot for commit 96a12be. Configure here.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fine-grained ctx management will be resolved in the follow-up. |
||
|
|
||
| // Read RayEvents (Export Events) from node_events/ and job_events/. | ||
| // These are used for task/actor/job/node data APIs. | ||
| eventFileList := append(h.getAllJobEventFiles(clusterInfo), h.getAllNodeEventFiles(clusterInfo)...) | ||
| logrus.Infof("current eventFileList for cluster %s is: %v", clusterInfo.Name, eventFileList) | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
|
||
|
|
||
| for _, eventFile := range eventFileList { | ||
| logrus.Infof("Reading event file: %s", eventFile) | ||
|
JiangJiaWei1103 marked this conversation as resolved.
Outdated
cursor[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| eventioReader := h.reader.GetContent(clusterNameNamespace, eventFile) | ||
| if eventioReader == nil { | ||
| logrus.Errorf("Failed to get content for event file: %s, skipping", eventFile) | ||
| continue | ||
| } | ||
| eventbytes, err := io.ReadAll(eventioReader) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to read event file: %v", err) | ||
| continue | ||
| } | ||
|
|
||
| var eventList []map[string]any | ||
| if err := json.Unmarshal(eventbytes, &eventList); err != nil { | ||
| logrus.Errorf("Failed to unmarshal event: %v", err) | ||
| continue | ||
| } | ||
|
|
||
| for _, curr := range eventList { | ||
| // Skip nil events (can occur with corrupted event files containing null elements). | ||
| // Matches Run()'s nil-tolerance at line 172. | ||
| if curr == nil { | ||
| continue | ||
| } | ||
| curr["clusterName"] = clusterSessionKey | ||
| if err := h.storeEvent(curr); err != nil { | ||
| logrus.Errorf("Failed to store event: %v", err) | ||
| continue | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
|
JiangJiaWei1103 marked this conversation as resolved.
|
||
| } | ||
|
cursor[bot] marked this conversation as resolved.
|
||


Uh oh!
There was an error while loading. Please reload this page.