-
Notifications
You must be signed in to change notification settings - Fork 763
[1/N] [History Server Beta] [Feat] Lazy loading #4795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
andrewsykim
merged 46 commits into
ray-project:master
from
JiangJiaWei1103:hs-beta-lazy-loading
May 22, 2026
Merged
Changes from 45 commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
44474e6
refactor: Standardize main.go style
JiangJiaWei1103 c5904c7
fix: Surface configuration errors clearly
JiangJiaWei1103 1c1f383
Refactor: Switch to context-based shutdown via signal.NotifyContext
JiangJiaWei1103 3ff2245
feat: Add ProcessSingleSession for synchronous per-session loading
JiangJiaWei1103 3540f14
feat: Lazy session processing via Supervisor with isDead probe
JiangJiaWei1103 69e5ab7
docs: Add smoke tests
JiangJiaWei1103 c617b08
test: Cover Supervisor error propagation and no-retry contracts
JiangJiaWei1103 fad772e
docs: Add missing sa
JiangJiaWei1103 aab5dad
fix: Surface storage outage error in ProcessSingleSession
JiangJiaWei1103 1641ab6
fix: Handle isDead for RayCluster recreation with the same namespaced…
JiangJiaWei1103 1bff9d1
fix: Avoid setting cookie before a successful load or a live-skip
JiangJiaWei1103 6294d0e
fix: Use live sentinel to avoid querying empty in-mem state
JiangJiaWei1103 ea41c98
refactor: Skip singleflight for already-loaded sessions
JiangJiaWei1103 b2ad15e
refactor: Reuse k8s client
JiangJiaWei1103 3c54fb4
fix: Surface errors of two subsystems
JiangJiaWei1103 fec4ba5
Merge branch 'my-master' into hs-beta-lazy-loading
JiangJiaWei1103 93626b0
docs: Remove redundant comments
JiangJiaWei1103 536115b
refactor: Rename to SessionLoader and LoadSession
JiangJiaWei1103 caea59d
refactor: Rename to SessionProcessor for clarity
JiangJiaWei1103 949dea0
refactor: Address PR review nits for cleanness
JiangJiaWei1103 6d44bae
docs: Clean up logs and comments
JiangJiaWei1103 ca30acb
docs: Clean up docstrings
JiangJiaWei1103 68c8ad5
refactor: Rename to SessionStatusClusterStateUnknown to indicate stat…
JiangJiaWei1103 23caa56
refactor: Push ctx into ProcessSingleSession to avoid dup ctx err pol…
JiangJiaWei1103 557aa3d
refactor: Consolidate ProcessSingleSession test into eventserver_test.go
JiangJiaWei1103 49105e1
docs: Clean up more comments
JiangJiaWei1103 cb031d7
docs: Address more nits for clarity
JiangJiaWei1103 3ac6576
fix: Fix assertion string
JiangJiaWei1103 d42a2be
fix: Add session input validation and resolve Nary's nits
JiangJiaWei1103 0e67b0b
fix: Add timeout for a single cold-load to avoid resource leakage
JiangJiaWei1103 8b14840
feat: Add HTTPLiveSessionResolver for determining session name of liv…
JiangJiaWei1103 8a3914c
refactor: Remove client-side redundant timeout setup
JiangJiaWei1103 ae6ba95
fix: Enable cross-ns dns lookup
JiangJiaWei1103 fd7e413
refactor: Fix nits
JiangJiaWei1103 96a12be
Merge branch 'my-master' into hs-beta-lazy-loading
JiangJiaWei1103 8caa9a1
docs: Clarify comments
JiangJiaWei1103 0b90fc9
refactor: Improve error msg, comments and fix nits
JiangJiaWei1103 9d0098f
refactor: Session key as explicit param of storeEvent without mutatin…
JiangJiaWei1103 ab80886
refactor: Rename process session timeout and make it configurable
JiangJiaWei1103 1a55a2a
refactor: Remove all dead code
JiangJiaWei1103 70c7ee2
refactor: Remove HTTPLiveSessionResolver to simplify live-session det…
JiangJiaWei1103 824d1d5
fix: Add zero-value session status as a defensive guard
JiangJiaWei1103 8337ebb
refactor: Fix nits
JiangJiaWei1103 b6cf319
Merge branch 'my-master' into hs-beta-lazy-loading
JiangJiaWei1103 b76f5b9
refactor: Reuse sess key gen fn
JiangJiaWei1103 cac9a7b
docs: Clarify known limitation
JiangJiaWei1103 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,57 +1,63 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "flag" | ||
| "os" | ||
| "os/signal" | ||
| "sync" | ||
| "syscall" | ||
|
|
||
| "github.com/sirupsen/logrus" | ||
|
|
||
| "github.com/ray-project/kuberay/historyserver/pkg/collector" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/collector/types" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/eventserver" | ||
| "github.com/ray-project/kuberay/historyserver/pkg/historyserver" | ||
| "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| func main() { | ||
| runtimeClassName := "" | ||
| rayRootDir := "" | ||
| kubeconfigs := "" | ||
| runtimeClassConfigPath := "/var/collector-config/data" | ||
| runtimeClassConfigPath := "" | ||
| dashboardDir := "" | ||
| useKubernetesProxy := false | ||
| flag.StringVar(&runtimeClassName, "runtime-class-name", "", "") | ||
| flag.StringVar(&rayRootDir, "ray-root-dir", "", "") | ||
| flag.StringVar(&kubeconfigs, "kubeconfigs", "", "") | ||
| flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "") | ||
| flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "") //"/var/collector-config/data" | ||
| flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "") | ||
| sessionProcessTimeout := historyserver.DefaultSessionProcessTimeout | ||
| flag.StringVar(&runtimeClassName, "runtime-class-name", "", "Storage backend: s3 / gcs / azureblob / aliyunoss / localtest") | ||
| flag.StringVar(&rayRootDir, "ray-root-dir", "", "Root dir inside the bucket") | ||
| flag.StringVar(&kubeconfigs, "kubeconfigs", "", "Kubeconfig path; empty = in-cluster") | ||
| flag.StringVar(&dashboardDir, "dashboard-dir", "/dashboard", "Path to Ray Dashboard static assets") | ||
| flag.StringVar(&runtimeClassConfigPath, "runtime-class-config-path", "", "Path to backend config JSON") | ||
| flag.BoolVar(&useKubernetesProxy, "use-kubernetes-proxy", false, "Use local kubeconfig instead of in-cluster config") | ||
| flag.DurationVar(&sessionProcessTimeout, "session-process-timeout", historyserver.DefaultSessionProcessTimeout, "Timeout duration for processing and loading a single Ray cluster session.") | ||
| flag.Parse() | ||
|
|
||
| if runtimeClassName == "" { | ||
| logrus.Fatal("--runtime-class-name is required") | ||
| } | ||
|
|
||
| cliMgr, err := historyserver.NewClientManager(kubeconfigs, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to create client manager: %v", err) | ||
| os.Exit(1) | ||
| logrus.Fatalf("Failed to create client manager: %v", err) | ||
| } | ||
|
|
||
| jsonData := make(map[string]interface{}) | ||
| if runtimeClassConfigPath != "" { | ||
| data, err := os.ReadFile(runtimeClassConfigPath) | ||
| if err != nil { | ||
| panic("Failed to read runtime class config " + err.Error()) | ||
| logrus.Fatalf("Failed to read runtime-class-config-path from %s: %v", runtimeClassConfigPath, err) | ||
| } | ||
| err = json.Unmarshal(data, &jsonData) | ||
| if err != nil { | ||
| panic("Failed to parse runtime class config: " + err.Error()) | ||
| if err := json.Unmarshal(data, &jsonData); err != nil { | ||
| logrus.Fatalf("Failed to parse runtime-class-config-path from %s: %v", runtimeClassConfigPath, err) | ||
| } | ||
| } | ||
|
|
||
| registry := collector.GetReaderRegistry() | ||
| factory, ok := registry[runtimeClassName] | ||
| if !ok { | ||
| panic("Not supported runtime class name: " + runtimeClassName + ".") | ||
| logrus.Fatalf("Unsupported runtime-class-name for reader: %s", runtimeClassName) | ||
| } | ||
|
|
||
| globalConfig := types.RayHistoryServerConfig{ | ||
|
|
@@ -60,34 +66,32 @@ func main() { | |
|
|
||
| reader, err := factory(&globalConfig, jsonData) | ||
| if err != nil { | ||
| panic("Failed to create reader for runtime class name: " + runtimeClassName + ".") | ||
| logrus.Fatalf("Failed to create reader for runtime class name %s: %v", runtimeClassName, err) | ||
| } | ||
|
|
||
| // Create EventHandler with storage reader | ||
| eventHandler := eventserver.NewEventHandler(reader) | ||
|
|
||
| // WaitGroup to track goroutine completion | ||
| var wg sync.WaitGroup | ||
| serverCtx, serverCancel := signal.NotifyContext( | ||
| context.Background(), | ||
| syscall.SIGINT, syscall.SIGTERM, | ||
| ) | ||
| defer serverCancel() | ||
|
|
||
| sigChan := make(chan os.Signal, 1) | ||
| stop := make(chan struct{}, 1) | ||
| signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) | ||
| processor := historyserver.NewSessionProcessor(eventHandler, cliMgr.Client()) | ||
| sessionLoader := historyserver.NewSessionLoader(processor, serverCtx, sessionProcessTimeout) | ||
|
|
||
| // Start EventHandler in background goroutine | ||
| wg.Add(1) | ||
| // ServerHandler.Run consumes a stop chan; bridge serverCtx into it. | ||
| var wg sync.WaitGroup | ||
| stop := make(chan struct{}) | ||
| go func() { | ||
| defer wg.Done() | ||
| logrus.Info("Starting EventHandler in background...") | ||
| if err := eventHandler.Run(stop, 2); err != nil { | ||
| logrus.Errorf("EventHandler stopped with error: %v", err) | ||
| } | ||
| logrus.Info("EventHandler shutdown complete") | ||
| <-serverCtx.Done() | ||
| logrus.Info("Received shutdown signal, initiating graceful shutdown...") | ||
| close(stop) | ||
| }() | ||
|
Comment on lines
+87
to
90
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add an issue to use serverCtx and replace/remove the “stop channel”? |
||
|
|
||
| handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, useKubernetesProxy) | ||
| handler, err := historyserver.NewServerHandler(&globalConfig, dashboardDir, reader, cliMgr, eventHandler, sessionLoader, useKubernetesProxy) | ||
| if err != nil { | ||
| logrus.Errorf("Failed to create server handler: %v", err) | ||
| os.Exit(1) | ||
| logrus.Fatalf("Failed to create server handler: %v", err) | ||
| } | ||
|
|
||
| wg.Add(1) | ||
|
|
@@ -97,13 +101,6 @@ func main() { | |
| logrus.Info("HTTP server shutdown complete") | ||
| }() | ||
|
|
||
| <-sigChan | ||
| logrus.Info("Received shutdown signal, initiating graceful shutdown...") | ||
|
|
||
| // Stop both the server and the event handler | ||
| close(stop) | ||
|
|
||
| // Wait for both goroutines to complete | ||
| wg.Wait() | ||
| logrus.Info("Graceful shutdown complete") | ||
| } | ||
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.