Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions historyserver/pkg/historyserver/enter_cluster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package historyserver

import (
"net/http"
"net/http/httptest"
"sort"
"testing"

"github.com/emicklei/go-restful/v3"
"github.com/ray-project/kuberay/historyserver/pkg/utils"
)

func TestEnterCluster(t *testing.T) {
// 1. Create ServerHandler with a pre-populated clustersMap
handler := &ServerHandler{
maxClusters: 100,
clustersMap: make(map[utils.ClusterKey][]utils.ClusterInfo),
}

// Define test clusters
// A: Single session cluster
keyA := utils.ClusterKey{
Namespace: "default",
Name: "cluster-a",
}
handler.clustersMap[keyA] = []utils.ClusterInfo{
{
Namespace: "default",
Name: "cluster-a",
SessionName: "session-a",
OwnerKind: "RayJob",
OwnerName: "job-a",
},
}

// B: Multi-session cluster (past session AND live session)
keyB := utils.ClusterKey{
Namespace: "default",
Name: "cluster-b",
}
handler.clustersMap[keyB] = []utils.ClusterInfo{
{
Namespace: "default",
Name: "cluster-b",
SessionName: "session-b-past",
OwnerKind: "RayService",
OwnerName: "svc-b",
CreateTimeStamp: 1000, // Older
},
{
Namespace: "default",
Name: "cluster-b",
SessionName: "live",
OwnerKind: "RayService",
OwnerName: "svc-b",
CreateTimeStamp: 2000, // Newer (latest)
},
}

// Explicitly sort B to simulate listClusters post-sorting logic
sort.Sort(utils.ClusterInfoList(handler.clustersMap[keyB]))

// Register router using the actual router/helper logic
ws := new(restful.WebService)
ws.Path("/enter_cluster").Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON)

enterHandler := func(namespace, name, session string, r2 *restful.Response) {
resolvedSession, found := handler.findSessionInMap(namespace, name, session)
if !found {
r2.WriteErrorString(http.StatusNotFound, "cluster not found")
return
}

http.SetCookie(r2, &http.Cookie{MaxAge: 600, Path: "/", Name: COOKIE_CLUSTER_NAME_KEY, Value: name})
http.SetCookie(r2, &http.Cookie{MaxAge: 600, Path: "/", Name: COOKIE_CLUSTER_NAMESPACE_KEY, Value: namespace})
http.SetCookie(r2, &http.Cookie{MaxAge: 600, Path: "/", Name: COOKIE_SESSION_NAME_KEY, Value: resolvedSession})
r2.WriteJson(map[string]interface{}{
"result": "success",
"name": name,
"namespace": namespace,
"session": resolvedSession,
}, "application/json")
}

ws.Route(ws.GET("/{namespace}/{name}/{session}").To(func(r1 *restful.Request, r2 *restful.Response) {
name := r1.PathParameter("name")
namespace := r1.PathParameter("namespace")
session := r1.PathParameter("session")
enterHandler(namespace, name, session, r2)
}))

ws.Route(ws.GET("/{namespace}/{name}").To(func(r1 *restful.Request, r2 *restful.Response) {
name := r1.PathParameter("name")
namespace := r1.PathParameter("namespace")
enterHandler(namespace, name, "latest", r2)
}))

container := restful.NewContainer()
container.Add(ws)

t.Run("Verify sorted order of multi-session slice puts latest first", func(t *testing.T) {
sessions := handler.clustersMap[keyB]
if len(sessions) != 2 {
t.Fatalf("Expected 2 sessions, got %d", len(sessions))
}
// Index 0 must be the newer session (live, timestamp 2000)
if sessions[0].SessionName != "live" {
t.Errorf("Expected latest session 'live' at index 0, got %s", sessions[0].SessionName)
}
})

t.Run("Enter existing single-session cluster with explicit session", func(t *testing.T) {
req := httptest.NewRequest("GET", "/enter_cluster/default/cluster-a/session-a", nil)
resp := httptest.NewRecorder()
container.ServeHTTP(resp, req)

if resp.Code != http.StatusOK {
t.Errorf("Expected status 200, got %d", resp.Code)
}
})

t.Run("Enter cluster with 'latest' keyword resolves to newest session in the list", func(t *testing.T) {
req := httptest.NewRequest("GET", "/enter_cluster/default/cluster-b/latest", nil)
resp := httptest.NewRecorder()
container.ServeHTTP(resp, req)

if resp.Code != http.StatusOK {
t.Errorf("Expected status 200, got %d", resp.Code)
}

// Verify cookies are set to the ACTUAL newest session name ("live") rather than literal "latest"
cookies := resp.Result().Cookies()
cookieMap := make(map[string]*http.Cookie)
for _, cookie := range cookies {
cookieMap[cookie.Name] = cookie
}

if c, ok := cookieMap[COOKIE_CLUSTER_NAME_KEY]; !ok || c.Value != "cluster-b" {
t.Errorf("Expected cookie %s to be 'cluster-b', got %v", COOKIE_CLUSTER_NAME_KEY, c)
}
if c, ok := cookieMap[COOKIE_CLUSTER_NAMESPACE_KEY]; !ok || c.Value != "default" {
t.Errorf("Expected cookie %s to be 'default', got %v", COOKIE_CLUSTER_NAMESPACE_KEY, c)
}
if c, ok := cookieMap[COOKIE_SESSION_NAME_KEY]; !ok || c.Value != "live" {
t.Errorf("Expected cookie %s to be 'live' (actual latest session name), got %v", COOKIE_SESSION_NAME_KEY, c)
}
})

t.Run("Enter cluster with NO session parameter defaults to latest", func(t *testing.T) {
req := httptest.NewRequest("GET", "/enter_cluster/default/cluster-b", nil)
resp := httptest.NewRecorder()
container.ServeHTTP(resp, req)

if resp.Code != http.StatusOK {
t.Errorf("Expected status 200, got %d", resp.Code)
}

// Verify cookies are set to the ACTUAL newest session name ("live") rather than literal "latest"
cookies := resp.Result().Cookies()
cookieMap := make(map[string]*http.Cookie)
for _, cookie := range cookies {
cookieMap[cookie.Name] = cookie
}

if c, ok := cookieMap[COOKIE_SESSION_NAME_KEY]; !ok || c.Value != "live" {
t.Errorf("Expected cookie %s to default to 'live' (actual latest session name), got %v", COOKIE_SESSION_NAME_KEY, c)
}
})
}
41 changes: 41 additions & 0 deletions historyserver/pkg/historyserver/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,50 @@ func (s *ServerHandler) listClusters(limit int) []utils.ClusterInfo {
clusters = clusters[:limit]
}
clusters = append(liveClusterInfos, clusters...)

clustersMap := make(map[utils.ClusterKey][]utils.ClusterInfo)
for _, c := range clusters {
key := utils.ClusterKey{
Namespace: c.Namespace,
Name: c.Name,
}
clustersMap[key] = append(clustersMap[key], c)
}

for key := range clustersMap {
sort.Sort(utils.ClusterInfoList(clustersMap[key]))
}

s.mu.Lock()
s.clustersMap = clustersMap
s.mu.Unlock()

return clusters
}

func (s *ServerHandler) findSessionInMap(namespace, name, session string) (string, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
key := utils.ClusterKey{
Namespace: namespace,
Name: name,
}
if list, ok := s.clustersMap[key]; ok {
if len(list) == 0 {
return "", false
}
if session == "latest" || session == "" {
return list[0].SessionName, true
}
for _, c := range list {
if c.SessionName == session {
return c.SessionName, true
}
}
}
return "", false
}

func (s *ServerHandler) _getNodeLogs(rayClusterNameNamespace, sessionId, nodeId, folder, glob string) ([]byte, error) {
logPath := path.Join(sessionId, utils.RAY_SESSIONDIR_LOGDIR_NAME, nodeId)
if folder != "" {
Expand Down
38 changes: 32 additions & 6 deletions historyserver/pkg/historyserver/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,25 +294,51 @@ func routerRayClusterSet(s *ServerHandler) {
defer restful.Add(ws)

ws.Path("/enter_cluster").Consumes(restful.MIME_JSON).Produces(restful.MIME_JSON).Filter(RequestLogFilter)
ws.Route(ws.GET("/{namespace}/{name}/{session}").To(func(r1 *restful.Request, r2 *restful.Response) {
name := r1.PathParameter("name")
namespace := r1.PathParameter("namespace")
session := r1.PathParameter("session")

enterHandler := func(namespace, name, session string, r2 *restful.Response) {
resolvedSession, found := s.findSessionInMap(namespace, name, session)
if !found {
s.listClusters(s.maxClusters)
resolvedSession, found = s.findSessionInMap(namespace, name, session)
}

if !found {
r2.WriteErrorString(http.StatusNotFound, fmt.Sprintf("cluster %s/%s with session %s not found", namespace, name, session))
return
}

http.SetCookie(r2, &http.Cookie{MaxAge: 600, Path: "/", Name: COOKIE_CLUSTER_NAME_KEY, Value: name})
http.SetCookie(r2, &http.Cookie{MaxAge: 600, Path: "/", Name: COOKIE_CLUSTER_NAMESPACE_KEY, Value: namespace})
http.SetCookie(r2, &http.Cookie{MaxAge: 600, Path: "/", Name: COOKIE_SESSION_NAME_KEY, Value: session})
http.SetCookie(r2, &http.Cookie{MaxAge: 600, Path: "/", Name: COOKIE_SESSION_NAME_KEY, Value: resolvedSession})
r2.WriteJson(map[string]interface{}{
"result": "success",
"name": name,
"namespace": namespace,
"session": session,
"session": resolvedSession,
}, "application/json")
}

ws.Route(ws.GET("/{namespace}/{name}/{session}").To(func(r1 *restful.Request, r2 *restful.Response) {
name := r1.PathParameter("name")
namespace := r1.PathParameter("namespace")
session := r1.PathParameter("session")
enterHandler(namespace, name, session, r2)
}).
Doc("set cookie for cluster").
Param(ws.PathParameter("namespace", "namespace")).
Param(ws.PathParameter("name", "name")).
Param(ws.PathParameter("session", "session")).
Writes("")) // Placeholder for specific return type

ws.Route(ws.GET("/{namespace}/{name}").To(func(r1 *restful.Request, r2 *restful.Response) {
name := r1.PathParameter("name")
namespace := r1.PathParameter("namespace")
enterHandler(namespace, name, "latest", r2)
}).
Doc("set cookie for cluster (defaults session to latest)").
Param(ws.PathParameter("namespace", "namespace")).
Param(ws.PathParameter("name", "name")).
Writes(""))
}

func (s *ServerHandler) RegisterRouter() {
Expand Down
5 changes: 5 additions & 0 deletions historyserver/pkg/historyserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"fmt"
"log"
"net/http"
"sync"
"time"

"github.com/ray-project/kuberay/historyserver/pkg/collector/types"
"github.com/ray-project/kuberay/historyserver/pkg/eventserver"
"github.com/ray-project/kuberay/historyserver/pkg/storage"
"github.com/ray-project/kuberay/historyserver/pkg/utils"
"github.com/sirupsen/logrus"
"k8s.io/client-go/transport"
)
Expand All @@ -25,6 +27,9 @@ type ServerHandler struct {
httpClient *http.Client

useKubernetesProxy bool

mu sync.RWMutex
clustersMap map[utils.ClusterKey][]utils.ClusterInfo
}

func NewServerHandler(c *types.RayHistoryServerConfig, dashboardDir string, reader storage.StorageReader, clientManager *ClientManager, eventHandler *eventserver.EventHandler, useKubernetesProxy bool) (*ServerHandler, error) {
Expand Down
9 changes: 9 additions & 0 deletions historyserver/pkg/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ type ClusterInfo struct {
SessionName string `json:"sessionName"`
CreateTime string `json:"createTime"`
CreateTimeStamp int64 `json:"createTimeStamp"`
OwnerKind string `json:"ownerKind,omitempty"`
OwnerName string `json:"ownerName,omitempty"`
}

type ClusterKey struct {
Namespace string
Name string
OwnerKind string
OwnerName string
}

type ClusterInfoList []ClusterInfo
Expand Down
Loading