Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion historyserver/pkg/eventserver/eventserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ func (h *EventHandler) Run(stop chan struct{}, numOfEventProcessors int) error {
// TODO: Filter out ones that have already been read
logrus.Infof("Reading event file: %s", eventFile)

eventioReader := h.reader.GetContent(clusterNameNamespace, eventFile)
eventioReader, err := h.reader.GetContent(clusterNameNamespace, eventFile)
if err != nil {
logrus.Errorf("Failed to get content for event file: %s, skipping: %v", eventFile, err)
continue
}
if eventioReader == nil {
logrus.Errorf("Failed to get content for event file: %s, skipping", eventFile)
continue
Expand Down
5 changes: 4 additions & 1 deletion historyserver/pkg/eventserver/log_event_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ func (r *LogEventReader) ReadLogEvents(clusterInfo utils.ClusterInfo, clusterSes
// Lines exceeding maxLineLengthLimit are drained and skipped without accumulating
// in memory, matching Ray Dashboard's _read_file() behavior in event_utils.py.
func (r *LogEventReader) readEventFile(clusterID, filePath string, jobEventMap *types.JobEventMap) error {
ioReader := r.reader.GetContent(clusterID, filePath)
ioReader, err := r.reader.GetContent(clusterID, filePath)
if err != nil {
return fmt.Errorf("failed to get content for %s: %w", filePath, err)
}
if ioReader == nil {
return fmt.Errorf("failed to get content for %s", filePath)
}
Expand Down
6 changes: 3 additions & 3 deletions historyserver/pkg/eventserver/log_event_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ func (m *logEventMockReader) addDir(clusterID, dirPath string, entries []string)

func (m *logEventMockReader) List() []utils.ClusterInfo { return nil }

func (m *logEventMockReader) GetContent(clusterID string, fileName string) io.Reader {
func (m *logEventMockReader) GetContent(clusterID string, fileName string) (io.Reader, error) {
if cd, ok := m.files[clusterID]; ok {
if content, ok := cd[fileName]; ok {
return strings.NewReader(content)
return strings.NewReader(content), nil
}
}
return nil
return nil, nil
}

func (m *logEventMockReader) ListFiles(clusterID string, dir string) []string {
Expand Down
15 changes: 11 additions & 4 deletions historyserver/pkg/historyserver/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *ServerHandler) listClusters(limit int) []utils.ClusterInfo {
return clusters
}

func (s *ServerHandler) _getNodeLogs(rayClusterNameNamespace, sessionId, nodeId, folder, glob string) ([]byte, error) {
func (s *ServerHandler) getNodeLogs(rayClusterNameNamespace, sessionId, nodeId, folder, glob string) ([]byte, error) {
logPath := path.Join(sessionId, utils.RAY_SESSIONDIR_LOGDIR_NAME, nodeId)
if folder != "" {
logPath = path.Join(logPath, folder)
Expand Down Expand Up @@ -170,7 +170,7 @@ func categorizeLogFiles(files []string) map[string][]string {
return result
}

func (s *ServerHandler) _getNodeLogFile(rayClusterNameNamespace, sessionID string, options GetLogFileOptions) ([]byte, error) {
func (s *ServerHandler) getNodeLogFile(rayClusterNameNamespace, sessionID string, options GetLogFileOptions) ([]byte, error) {
// Resolve node_id and filename based on options
nodeID, filename, err := s.resolveLogFilename(rayClusterNameNamespace, sessionID, options)
if err != nil {
Expand All @@ -184,7 +184,10 @@ func (s *ServerHandler) _getNodeLogFile(rayClusterNameNamespace, sessionID strin

// Build log path
logPath := path.Join(sessionID, utils.RAY_SESSIONDIR_LOGDIR_NAME, nodeID, filename)
reader := s.reader.GetContent(rayClusterNameNamespace, logPath)
reader, err := s.reader.GetContent(rayClusterNameNamespace, logPath)
if err != nil {
return nil, utils.NewHTTPError(fmt.Errorf("failed to get log file %s: %w", logPath, err), http.StatusNotFound)
}

if reader == nil {
return nil, utils.NewHTTPError(fmt.Errorf("log file not found: %s", logPath), http.StatusNotFound)
Expand Down Expand Up @@ -520,7 +523,11 @@ func (s *ServerHandler) ipToNodeId(rayClusterNameNamespace, sessionID, nodeIP st
// searchNodeIDHexInEventFile searches for a node with the given IP in a single event file.
// Returns (nodeIDHex, true) if found, ("", false) otherwise.
func (s *ServerHandler) searchNodeIDHexInEventFile(rayClusterNameNamespace, filePath, nodeIP string) (string, bool) {
reader := s.reader.GetContent(rayClusterNameNamespace, filePath)
reader, err := s.reader.GetContent(rayClusterNameNamespace, filePath)
if err != nil {
logrus.Warnf("Failed to get node event file %s: %v", filePath, err)
return "", false
}
if reader == nil {
return "", false
}
Expand Down
40 changes: 31 additions & 9 deletions historyserver/pkg/historyserver/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func routerAPI(s *ServerHandler) {
Doc("get cluster metadata (Ray version, Python version, etc.)").
Writes("")) // Placeholder for specific return type

ws.Route(ws.GET("/v0/logs").To(s.getNodeLogs).Filter(s.CookieHandle).
ws.Route(ws.GET("/v0/logs").To(s.handleGetNodeLogs).Filter(s.CookieHandle).
Doc("get logs").
Param(ws.QueryParameter("node_id", "node_id")).
Param(ws.QueryParameter("glob", "glob pattern")).
Expand Down Expand Up @@ -874,7 +874,11 @@ func (s *ServerHandler) buildFormattedClusterStatus(clusterName, clusterNamespac
for _, nodeID := range nodeIDs {
debugStatePath := path.Join(logsPath, nodeID, "debug_state.txt")

reader := s.reader.GetContent(clusterNameID, debugStatePath)
reader, err := s.reader.GetContent(clusterNameID, debugStatePath)
if err != nil {
logrus.Debugf("No debug_state.txt found for node %s: %v", nodeID, err)
continue
}
if reader == nil {
logrus.Debugf("No debug_state.txt found for node %s", nodeID)
continue
Expand Down Expand Up @@ -927,7 +931,12 @@ func (s *ServerHandler) getClusterMetadata(req *restful.Request, resp *restful.R
clusterNameID := clusterName + "_" + clusterNamespace
storageKey := utils.EndpointPathToStorageKey("/api/v0/cluster_metadata")
endpointPath := path.Join(sessionName, utils.RAY_SESSIONDIR_FETCHED_ENDPOINTS_NAME, storageKey)
reader := s.reader.GetContent(clusterNameID, endpointPath)
reader, err := s.reader.GetContent(clusterNameID, endpointPath)
if err != nil {
logrus.Errorf("Failed to get cluster metadata: %v", err)
resp.WriteErrorString(http.StatusNotFound, "Cluster metadata not found")
return
}
if reader == nil {
resp.WriteErrorString(http.StatusNotFound, "Cluster metadata not found")
return
Expand Down Expand Up @@ -967,7 +976,20 @@ func (s *ServerHandler) getAdditionalEndpoint(req *restful.Request, resp *restfu
// RequestURI() includes query params when present, and equals URL.Path when absent.
storageKey := utils.EndpointPathToStorageKey(req.Request.URL.RequestURI())
endpointPath := path.Join(sessionName, utils.RAY_SESSIONDIR_FETCHED_ENDPOINTS_NAME, storageKey)
reader := s.reader.GetContent(clusterNameID, endpointPath)
reader, err := s.reader.GetContent(clusterNameID, endpointPath)
if err != nil {
logrus.Errorf("Failed to get additional endpoint data for %s: %v", req.Request.URL.Path, err)
// For known frontend endpoints, return empty but valid JSON responses instead of 404.
// This prevents the frontend from showing error states for endpoints that may not have been
// collected (e.g., Serve was not enabled on the cluster).
if emptyResp := emptyResponseForEndpoint(req.Request.URL.Path); emptyResp != nil {
resp.Header().Set("Content-Type", "application/json")
resp.Write(emptyResp)
return
}
resp.WriteErrorString(http.StatusNotFound, "Endpoint data not found in storage")
return
Comment thread
dentiny marked this conversation as resolved.
}
if reader == nil {
// For known frontend endpoints, return empty but valid JSON responses instead of 404.
// This prevents the frontend from showing error states for endpoints that may not have been
Expand Down Expand Up @@ -1061,7 +1083,7 @@ func ensurePlacementGroupFields(data []byte) []byte {
return patched
}

func (s *ServerHandler) getNodeLogs(req *restful.Request, resp *restful.Response) {
func (s *ServerHandler) handleGetNodeLogs(req *restful.Request, resp *restful.Response) {
clusterNameID := req.Attribute(COOKIE_CLUSTER_NAME_KEY).(string)
clusterNamespace := req.Attribute(COOKIE_CLUSTER_NAMESPACE_KEY).(string)
sessionName := req.Attribute(COOKIE_SESSION_NAME_KEY).(string)
Expand Down Expand Up @@ -1091,7 +1113,7 @@ func (s *ServerHandler) getNodeLogs(req *restful.Request, resp *restful.Response
folder = base
}
}
data, err := s._getNodeLogs(clusterNameID+"_"+clusterNamespace, sessionName, nodeID, folder, glob)
data, err := s.getNodeLogs(clusterNameID+"_"+clusterNamespace, sessionName, nodeID, folder, glob)
if err != nil {
logrus.Errorf("Error: %v", err)
resp.WriteError(400, err)
Expand Down Expand Up @@ -1217,7 +1239,7 @@ func (s *ServerHandler) getLogicalActor(req *restful.Request, resp *restful.Resp
resp.Write(actData)
}

func (s *ServerHandler) getNodeLogFile(req *restful.Request, resp *restful.Response) {
func (s *ServerHandler) handleGetNodeLogFile(req *restful.Request, resp *restful.Response) {
clusterNameID := req.Attribute(COOKIE_CLUSTER_NAME_KEY).(string)
clusterNamespace := req.Attribute(COOKIE_CLUSTER_NAMESPACE_KEY).(string)
sessionName := req.Attribute(COOKIE_SESSION_NAME_KEY).(string)
Expand Down Expand Up @@ -1269,7 +1291,7 @@ func (s *ServerHandler) getNodeLogFile(req *restful.Request, resp *restful.Respo
options.NodeID = nodeID
}

content, err := s._getNodeLogFile(clusterNameID+"_"+clusterNamespace, sessionName, options)
content, err := s.getNodeLogFile(clusterNameID+"_"+clusterNamespace, sessionName, options)
if err != nil {
var httpErr *utils.HTTPError
if errors.As(err, &httpErr) {
Expand Down Expand Up @@ -1399,7 +1421,7 @@ func (s *ServerHandler) getNodeLog(req *restful.Request, resp *restful.Response)
mediaType := req.PathParameter("media_type")
switch mediaType {
case "file":
s.getNodeLogFile(req, resp)
s.handleGetNodeLogFile(req, resp)
case "stream":
s.getNodeLogStream(req, resp)
default:
Expand Down
8 changes: 7 additions & 1 deletion historyserver/pkg/historyserver/timezone.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ func (s *ServerHandler) getTimezone(req *restful.Request, resp *restful.Response

storageKey := utils.EndpointPathToStorageKey(timezoneEndpoint)
endpointPath := path.Join(sessionName, utils.RAY_SESSIONDIR_FETCHED_ENDPOINTS_NAME, storageKey)
reader := s.reader.GetContent(clusterNameID, endpointPath)
reader, err := s.reader.GetContent(clusterNameID, endpointPath)
if err != nil {
logrus.Errorf("Failed to get timezone metadata: %v", err)
resp.Header().Set("Content-Type", "application/json")
resp.Write([]byte(`{"offset":"","value":""}`))
return
}
if reader == nil {
resp.Header().Set("Content-Type", "application/json")
resp.Write([]byte(`{"offset":"","value":""}`))
Expand Down
10 changes: 5 additions & 5 deletions historyserver/pkg/storage/aliyunoss/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) {
return clusters
}

func (r *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader {
func (r *RayLogsHandler) GetContent(clusterId string, fileName string) (io.Reader, error) {
ctx := context.TODO()
logrus.Infof("Prepare to get object %s info ...", fileName)
result, err := r.OssClient.GetObject(ctx, &oss.GetObjectRequest{
Expand All @@ -214,25 +214,25 @@ func (r *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader
})
if err != nil {
logrus.Errorf("Failed to get object %s: %v", f, err)
return nil
return nil, err
}
found = true
break
}
}
if !found {
logrus.Errorf("Failed to get object by list all files %s", fileName)
return nil
return nil, fmt.Errorf("failed to get object by listing all files %s", fileName)
}
}
defer result.Body.Close()

data, err := io.ReadAll(result.Body)
if err != nil {
logrus.Errorf("Failed to read all data from object %s : %v", fileName, err)
return nil
return nil, err
}
return bytes.NewReader(data)
return bytes.NewReader(data), nil
}

func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (storage.StorageReader, error) {
Expand Down
14 changes: 7 additions & 7 deletions historyserver/pkg/storage/azureblob/azureblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (r *RayLogsHandler) List() (res []utils.ClusterInfo) {
return clusters
}

func (r *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader {
func (r *RayLogsHandler) GetContent(clusterId string, fileName string) (io.Reader, error) {
fullPath := path.Join(r.RootDir, clusterId, fileName)
logrus.Infof("Prepare to get blob %s info ...", fullPath)

Expand Down Expand Up @@ -251,30 +251,30 @@ func (r *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader
resp.Body.Close()
}
logrus.Errorf("Failed to get blob %s: %v", f, err)
return nil
return nil, err
}
// Read body before cancelling context to avoid incomplete stream
defer retryCancel()
defer resp.Body.Close()
data, err := io.ReadAll(resp.Body)
if err != nil {
logrus.Errorf("Failed to read all data from blob %s: %v", fileName, err)
return nil
return nil, err
}
return bytes.NewReader(data)
return bytes.NewReader(data), nil
}
}
logrus.Errorf("Failed to get blob by listing all files %s", fileName)
return nil
return nil, fmt.Errorf("failed to get blob by listing all files %s", fileName)
}
defer resp.Body.Close()

data, err := io.ReadAll(resp.Body)
if err != nil {
logrus.Errorf("Failed to read all data from blob %s: %v", fileName, err)
return nil
return nil, err
}
return bytes.NewReader(data)
return bytes.NewReader(data), nil
}

func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (storage.StorageReader, error) {
Expand Down
12 changes: 6 additions & 6 deletions historyserver/pkg/storage/gcs/gcs_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (h *RayLogsHandler) List() []utils.ClusterInfo {
return clusterList
}

func (h *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader {
func (h *RayLogsHandler) GetContent(clusterId string, fileName string) (io.Reader, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

Expand All @@ -201,27 +201,27 @@ func (h *RayLogsHandler) GetContent(clusterId string, fileName string) io.Reader
fileAttrs, err := objectIterator.Next()
if err == gIterator.Done {
logrus.Errorf("File %s was not found in bucket for cluster %s", fileName, clusterId)
return nil
return nil, fmt.Errorf("file %s was not found in bucket for cluster %s", fileName, clusterId)
}
if err != nil {
logrus.Errorf("Failed when searching for file %v", err)
return nil
return nil, err
}

reader, err := h.StorageClient.Bucket(h.GCSBucket).Object(fileAttrs.Name).NewReader(ctx)
if err != nil {
logrus.Errorf("Failed to create reader for file: %s in cluster: %s", fileName, clusterId)
return nil
return nil, err
}
defer reader.Close()
// TODO(chiayi): ReadAll can potentially cause OOM error depending on the size of the file.
// Change into bufio.Scanner if needed or limit the size of the read
data, err := io.ReadAll(reader)
if err != nil {
logrus.Errorf("Failed to get all content of the file: %s, %v", fileName, err)
return nil
return nil, err
}
return bytes.NewReader(data)
return bytes.NewReader(data), nil
}

func NewReader(c *types.RayHistoryServerConfig, jd map[string]interface{}) (storage.StorageReader, error) {
Expand Down
5 changes: 4 additions & 1 deletion historyserver/pkg/storage/gcs/gcs_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,10 @@ func TestGetContent(t *testing.T) {
_, client, bucketName := setupFakeGCS(t, initialObjects...)
handler := createRayLogsHandler(client, bucketName)

reader := handler.GetContent(clusterID, fileName)
reader, err := handler.GetContent(clusterID, fileName)
if err != nil {
t.Fatalf("GetContent(%q, %q) returned error: %v", clusterID, fileName, err)
}
if reader == nil {
t.Fatalf("GetContent(%q, %q) returned nil reader, expected non-nil", clusterID, fileName)
}
Expand Down
2 changes: 1 addition & 1 deletion historyserver/pkg/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type StorageReader interface {
// for log, pass filename to this func
// for metadata, fileName is generated by historyserver, which will obey the same rule as collector to make sure the historyserver can ready the right file.
//
GetContent(clusterId string, fileName string) io.Reader
GetContent(clusterId string, fileName string) (io.Reader, error)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dentiny!

We will discuss the interface change in the next meeting.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - - PoAn is working on opendal-based storage interface, while reviewing his PR I surprising found quite a few APIs don't propagate error, not sure why 🤷


ListFiles(clusterId string, dir string) []string
}
6 changes: 3 additions & 3 deletions historyserver/pkg/storage/localtest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func (r *MockReader) List() []utils.ClusterInfo {
}

// GetContent returns content for a specific file
func (r *MockReader) GetContent(clusterId string, fileName string) io.Reader {
func (r *MockReader) GetContent(clusterId string, fileName string) (io.Reader, error) {
if clusterData, ok := r.data[clusterId]; ok {
if content, ok := clusterData[fileName]; ok {
return strings.NewReader(content)
return strings.NewReader(content), nil
}
}
return strings.NewReader("")
return strings.NewReader(""), nil
}

func (r *MockReader) ListFiles(clusterId string, dir string) []string {
Expand Down
Loading
Loading