From 3dc857360655cf1602a656f2868a94b5e4f26346 Mon Sep 17 00:00:00 2001 From: chengxi Date: Sun, 22 Feb 2026 00:38:29 -0500 Subject: [PATCH 01/22] feat: surface response data when receiving an unexpected status code and content-type --- internal/transport/client_stream.go | 79 ++++++++++++++++++- internal/transport/http2_client.go | 49 ++++++++++-- test/end2end_test.go | 113 ++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+), 6 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index cd8152ef13c7..485c6cc96211 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -19,7 +19,10 @@ package transport import ( + "strconv" + "sync" "sync/atomic" + "time" "golang.org/x/net/http2" "google.golang.org/grpc/mem" @@ -28,6 +31,21 @@ import ( "google.golang.org/grpc/status" ) +// NonGRPCDataMaxLen is the maximum length of nonGRPCDataBuf. +const NonGRPCDataMaxLen = 1024 + +// nonGRPCDataCollectionTimeout is the timeout for collecting non-gRPC data. +const nonGRPCDataCollectionTimeout = 3 * time.Second + +// nonGRPCDataCollectionState indicates the stage of non-gRPC data collection. +type nonGRPCDataCollectionState int + +const ( + none nonGRPCDataCollectionState = iota + collecting + stopped +) + // ClientStream implements streaming functionality for a gRPC client. type ClientStream struct { Stream // Embed for common stream functionality. @@ -46,7 +64,13 @@ type ClientStream struct { // headerValid indicates whether a valid header was received. Only // meaningful after headerChan is closed (always call waitOnHeader() before // reading its value). - headerValid bool + headerValid bool + + collectionMu sync.Mutex + collectionState nonGRPCDataCollectionState // indicates the stage of non-gRPC data collection. + collectionTimer *time.Timer // used to limit the time spent on collecting non-gRPC error details. + nonGRPCDataBuf []byte // stores the data of a non-gRPC response. + noHeaders bool // set if the client never received headers (set only after the stream is done). headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream @@ -54,6 +78,57 @@ type ClientStream struct { statsHandler stats.Handler // nil for internal streams (e.g., health check, ORCA) where telemetry is not supported. } +func (s *ClientStream) startNonGRPCDataCollection(st *status.Status, onTimeout func()) { + s.collectionMu.Lock() + defer s.collectionMu.Unlock() + if s.collectionState != none { + return + } + s.status = st + s.collectionState = collecting + s.nonGRPCDataBuf = make([]byte, 0, NonGRPCDataMaxLen) + s.collectionTimer = time.AfterFunc(nonGRPCDataCollectionTimeout, onTimeout) +} + +// tryHandleNonGRPCData tries to collect non-gRPC body from the given data frame. +// It returns two booleans: +// handle indicates whether the frame should be handled as a non-gRPC response body, +// end indicates whether the stream should be closed after handling this frame. +func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, end bool) { + s.collectionMu.Lock() + defer s.collectionMu.Unlock() + switch s.collectionState { + case none: + return false, false + case stopped: + return true, true + case collecting: + // Continue to collect data. + } + + n := min(f.data.Len(), NonGRPCDataMaxLen-len(s.nonGRPCDataBuf)) + s.nonGRPCDataBuf = append(s.nonGRPCDataBuf, f.data.ReadOnlyData()[0:n]...) + if len(s.nonGRPCDataBuf) >= NonGRPCDataMaxLen || f.StreamEnded() { + return true, true + } + return true, false +} + +// stopNonGRPCBodyCollection stops collecting non-gRPC body and appends the collected. +// Should only be called in closeStream. +func (s *ClientStream) stopNonGRPCDataCollectionLocked() { + if s.collectionState != collecting { + return + } + s.collectionState = stopped + if s.collectionTimer != nil { + s.collectionTimer.Stop() + s.collectionTimer = nil + } + data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf)) + s.status = status.New(s.status.Code(), s.status.Message()+data) +} + // Read reads an n byte message from the input stream. func (s *ClientStream) Read(n int) (mem.BufferSlice, error) { b, err := s.Stream.read(n) @@ -126,6 +201,8 @@ func (s *ClientStream) Header() (metadata.MD, error) { s.waitOnHeader() if !s.headerValid || s.noHeaders { + s.collectionMu.Lock() + defer s.collectionMu.Unlock() return nil, s.status.Err() } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index c943503f3590..691fc6ba65ad 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -934,6 +934,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr, handler s return s, nil } +func (t *http2Client) closeStreamWithNonGRPCStatus(s *ClientStream) { + t.closeStream(s, nil, true, http2.ErrCodeProtocol, nil, nil, true) +} + func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) { // Set stream status to done. if s.swapState(streamDone) == streamDone { @@ -942,10 +946,19 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode <-s.done return } - // status and trailers can be updated here without any synchronization because the stream goroutine will - // only read it after it sees an io.EOF error from read or write and we'll write those errors - // only after updating this. + s.collectionMu.Lock() + if s.collectionState == collecting { + // If the stream is still in the collecting state, but stream is being closed, it means + // the stream is being closed in a non-graceful way, so we force stop collecting to release + // resource and finalize the status. + s.stopNonGRPCDataCollectionLocked() + } + if s.status != nil { + st = s.status + err = st.Err() + } s.status = st + s.collectionMu.Unlock() if len(mdata) > 0 { s.trailer = mdata } @@ -1222,6 +1235,21 @@ func (t *http2Client) handleData(f *parsedDataFrame) { t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) return } + + handle, end := s.tryHandleNonGRPCData(f) + if handle { + if w := s.fc.onRead(size); w > 0 { + t.controlBuf.put(&outgoingWindowUpdate{ + streamID: s.id, + increment: w, + }) + } + if end { + t.closeStreamWithNonGRPCStatus(s) + } + return + } + dataLen := f.data.Len() if f.Header().Flags.Has(http2.FlagDataPadded) { if w := s.fc.onRead(size - uint32(dataLen)); w > 0 { @@ -1561,8 +1589,19 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { errs = append(errs, contentTypeErr) } - se := status.New(grpcErrorCode, strings.Join(errs, "; ")) - t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) + errMsg := strings.Join(errs, "; ") + se := status.New(grpcErrorCode, errMsg) + if endStream { + t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, true) + return + } + + s.startNonGRPCDataCollection(se, func() { + t.closeStreamWithNonGRPCStatus(s) + }) + if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { + close(s.headerChan) + } return } diff --git a/test/end2end_test.go b/test/end2end_test.go index 3859881f3f56..5f6daec90d9d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -34,6 +34,7 @@ import ( "os" "reflect" "runtime" + "strconv" "strings" "sync" "sync/atomic" @@ -6787,6 +6788,118 @@ func (s) TestAuthorityHeader(t *testing.T) { } } +func (s) TestHTTPServerSendsNonGRPCHeaderSurfaceFurtherData(t *testing.T) { + tests := []struct { + name string + responses []httpServerResponse + wantCode codes.Code + wantErr string + }{ + { + name: "non-gRPC content-type without payload", + responses: []httpServerResponse{ + { + headers: [][]string{ + { + ":status", "200", + "content-type", "text/html", + }, + }, + // payload: nil + }, + }, + wantCode: codes.Unknown, + wantErr: `rpc error: code = Unknown desc = unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" +data: ""`, + }, + { + name: "non-gRPC content-type with payload", + responses: []httpServerResponse{ + { + headers: [][]string{ + { + ":status", "200", + "content-type", "text/html", + }, + }, + payload: []byte(`Hello World`), + }, + }, + wantCode: codes.Unknown, + wantErr: `rpc error: code = Unknown desc = unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" +data: "Hello World"`, + }, + { + name: "non-gRPC content-type with bytes payload length more than transport.NonGRPCDataMaxLen", + responses: []httpServerResponse{ + { + headers: [][]string{ + { + ":status", "200", + "content-type", "text/html", + }, + }, + payload: bytes.Repeat([]byte("a"), transport.NonGRPCDataMaxLen+1), + }, + }, + wantCode: codes.Unknown, + wantErr: `rpc error: code = Unknown desc = unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" +data: ` + strconv.Quote(strings.Repeat("a", transport.NonGRPCDataMaxLen)), + }, + { + name: "content-type not provided", + responses: []httpServerResponse{ + { + headers: [][]string{{ + ":status", "502", + }}, + payload: []byte("hello"), + }, + }, + wantCode: codes.Unavailable, + wantErr: `rpc error: code = Unavailable desc = unexpected HTTP status code received from server: 502 (Bad Gateway); malformed header: missing HTTP content-type +data: "hello"`, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + defer lis.Close() + + hs := &httpServer{responses: test.responses} + hs.start(t, lis) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient() failed: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + _, err = client.EmptyCall(ctx, &testpb.Empty{}) + + if err == nil { + t.Fatalf("EmptyCall() = nil; want non-nil error due to non-gRPC response") + } + + if got, want := status.Code(err), test.wantCode; got != want { + t.Fatalf("Unexpected error code: got %v, want %v\nfull error:\n%v", got, want, err) + } + + if err.Error() != test.wantErr { + t.Errorf("Unexpected error message: got\n %v, want\n %v", err.Error(), test.wantErr) + } + }) + } +} + // wrapCloseListener tracks Accepts/Closes and maintains a counter of the // number of open connections. type wrapCloseListener struct { From 3cd1bf544deb3de3a96478828da4c80fed6da150 Mon Sep 17 00:00:00 2001 From: chengxi Date: Sun, 22 Feb 2026 21:24:34 -0500 Subject: [PATCH 02/22] refactor: remove nonGRPCDataCollectionState enum, use collecting(bool) instead to simplify the code. --- internal/transport/client_stream.go | 29 +++++++---------------------- internal/transport/http2_client.go | 6 ++---- 2 files changed, 9 insertions(+), 26 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 485c6cc96211..a45d5ba6a70c 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -37,15 +37,6 @@ const NonGRPCDataMaxLen = 1024 // nonGRPCDataCollectionTimeout is the timeout for collecting non-gRPC data. const nonGRPCDataCollectionTimeout = 3 * time.Second -// nonGRPCDataCollectionState indicates the stage of non-gRPC data collection. -type nonGRPCDataCollectionState int - -const ( - none nonGRPCDataCollectionState = iota - collecting - stopped -) - // ClientStream implements streaming functionality for a gRPC client. type ClientStream struct { Stream // Embed for common stream functionality. @@ -67,9 +58,9 @@ type ClientStream struct { headerValid bool collectionMu sync.Mutex - collectionState nonGRPCDataCollectionState // indicates the stage of non-gRPC data collection. - collectionTimer *time.Timer // used to limit the time spent on collecting non-gRPC error details. - nonGRPCDataBuf []byte // stores the data of a non-gRPC response. + collecting bool // indicates if stream entered the stage of non-gRPC data collection. + collectionTimer *time.Timer // used to limit the time spent on collecting non-gRPC error details. + nonGRPCDataBuf []byte // stores the data of a non-gRPC response. noHeaders bool // set if the client never received headers (set only after the stream is done). headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. @@ -81,11 +72,11 @@ type ClientStream struct { func (s *ClientStream) startNonGRPCDataCollection(st *status.Status, onTimeout func()) { s.collectionMu.Lock() defer s.collectionMu.Unlock() - if s.collectionState != none { + if s.collecting { return } s.status = st - s.collectionState = collecting + s.collecting = true s.nonGRPCDataBuf = make([]byte, 0, NonGRPCDataMaxLen) s.collectionTimer = time.AfterFunc(nonGRPCDataCollectionTimeout, onTimeout) } @@ -97,13 +88,8 @@ func (s *ClientStream) startNonGRPCDataCollection(st *status.Status, onTimeout f func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, end bool) { s.collectionMu.Lock() defer s.collectionMu.Unlock() - switch s.collectionState { - case none: + if !s.collecting { return false, false - case stopped: - return true, true - case collecting: - // Continue to collect data. } n := min(f.data.Len(), NonGRPCDataMaxLen-len(s.nonGRPCDataBuf)) @@ -117,10 +103,9 @@ func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, en // stopNonGRPCBodyCollection stops collecting non-gRPC body and appends the collected. // Should only be called in closeStream. func (s *ClientStream) stopNonGRPCDataCollectionLocked() { - if s.collectionState != collecting { + if !s.collecting { return } - s.collectionState = stopped if s.collectionTimer != nil { s.collectionTimer.Stop() s.collectionTimer = nil diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 691fc6ba65ad..68c7cb15d6df 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -947,10 +947,8 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode return } s.collectionMu.Lock() - if s.collectionState == collecting { - // If the stream is still in the collecting state, but stream is being closed, it means - // the stream is being closed in a non-graceful way, so we force stop collecting to release - // resource and finalize the status. + if s.collecting { + // If the stream is collecting data for non-gRPC, stop collection to finalize status s.stopNonGRPCDataCollectionLocked() } if s.status != nil { From 038dd9929a7fd960203776b4f57220827252d1af Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 26 Feb 2026 00:48:49 -0500 Subject: [PATCH 03/22] refactor: stop exporting constant nonGRPCDataMaxLen --- internal/transport/client_stream.go | 13 ++++++++----- test/end2end_test.go | 7 ++++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index a45d5ba6a70c..02fc4c38b09e 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -31,8 +31,11 @@ import ( "google.golang.org/grpc/status" ) -// NonGRPCDataMaxLen is the maximum length of nonGRPCDataBuf. -const NonGRPCDataMaxLen = 1024 +// nonGRPCDataMaxLen is the maximum length of nonGRPCDataBuf. +// +// NOTE: If changed this value, you MUST update the corresponding test in: +// - /test/end2end_test.go:TestHTTPServerSendsNonGRPCHeaderSurfaceFurtherData +const nonGRPCDataMaxLen = 1024 // nonGRPCDataCollectionTimeout is the timeout for collecting non-gRPC data. const nonGRPCDataCollectionTimeout = 3 * time.Second @@ -77,7 +80,7 @@ func (s *ClientStream) startNonGRPCDataCollection(st *status.Status, onTimeout f } s.status = st s.collecting = true - s.nonGRPCDataBuf = make([]byte, 0, NonGRPCDataMaxLen) + s.nonGRPCDataBuf = make([]byte, 0, nonGRPCDataMaxLen) s.collectionTimer = time.AfterFunc(nonGRPCDataCollectionTimeout, onTimeout) } @@ -92,9 +95,9 @@ func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, en return false, false } - n := min(f.data.Len(), NonGRPCDataMaxLen-len(s.nonGRPCDataBuf)) + n := min(f.data.Len(), nonGRPCDataMaxLen-len(s.nonGRPCDataBuf)) s.nonGRPCDataBuf = append(s.nonGRPCDataBuf, f.data.ReadOnlyData()[0:n]...) - if len(s.nonGRPCDataBuf) >= NonGRPCDataMaxLen || f.StreamEnded() { + if len(s.nonGRPCDataBuf) >= nonGRPCDataMaxLen || f.StreamEnded() { return true, true } return true, false diff --git a/test/end2end_test.go b/test/end2end_test.go index 5f6daec90d9d..3869a097cd81 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -6789,6 +6789,7 @@ func (s) TestAuthorityHeader(t *testing.T) { } func (s) TestHTTPServerSendsNonGRPCHeaderSurfaceFurtherData(t *testing.T) { + const nonGRPCDataMaxLen = 1024 tests := []struct { name string responses []httpServerResponse @@ -6830,7 +6831,7 @@ data: ""`, data: "Hello World"`, }, { - name: "non-gRPC content-type with bytes payload length more than transport.NonGRPCDataMaxLen", + name: "non-gRPC content-type with bytes payload length more than nonGRPCDataMaxLen", responses: []httpServerResponse{ { headers: [][]string{ @@ -6839,12 +6840,12 @@ data: "Hello World"`, "content-type", "text/html", }, }, - payload: bytes.Repeat([]byte("a"), transport.NonGRPCDataMaxLen+1), + payload: bytes.Repeat([]byte("a"), nonGRPCDataMaxLen+1), }, }, wantCode: codes.Unknown, wantErr: `rpc error: code = Unknown desc = unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" -data: ` + strconv.Quote(strings.Repeat("a", transport.NonGRPCDataMaxLen)), +data: ` + strconv.Quote(strings.Repeat("a", nonGRPCDataMaxLen)), }, { name: "content-type not provided", From 6dd9c4f6ecfab4dc63eaa3ec4eb2acfe4172d489 Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 26 Feb 2026 01:51:25 -0500 Subject: [PATCH 04/22] refactor: remove unnecessary timer --- internal/transport/client_stream.go | 18 ++++-------------- internal/transport/http2_client.go | 4 +--- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 02fc4c38b09e..8e2432f36296 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -22,7 +22,6 @@ import ( "strconv" "sync" "sync/atomic" - "time" "golang.org/x/net/http2" "google.golang.org/grpc/mem" @@ -37,9 +36,6 @@ import ( // - /test/end2end_test.go:TestHTTPServerSendsNonGRPCHeaderSurfaceFurtherData const nonGRPCDataMaxLen = 1024 -// nonGRPCDataCollectionTimeout is the timeout for collecting non-gRPC data. -const nonGRPCDataCollectionTimeout = 3 * time.Second - // ClientStream implements streaming functionality for a gRPC client. type ClientStream struct { Stream // Embed for common stream functionality. @@ -60,10 +56,9 @@ type ClientStream struct { // reading its value). headerValid bool - collectionMu sync.Mutex - collecting bool // indicates if stream entered the stage of non-gRPC data collection. - collectionTimer *time.Timer // used to limit the time spent on collecting non-gRPC error details. - nonGRPCDataBuf []byte // stores the data of a non-gRPC response. + collectionMu sync.Mutex + collecting bool // indicates if stream entered the stage of non-gRPC data collection. + nonGRPCDataBuf []byte // stores the data of a non-gRPC response. noHeaders bool // set if the client never received headers (set only after the stream is done). headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. @@ -72,7 +67,7 @@ type ClientStream struct { statsHandler stats.Handler // nil for internal streams (e.g., health check, ORCA) where telemetry is not supported. } -func (s *ClientStream) startNonGRPCDataCollection(st *status.Status, onTimeout func()) { +func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) { s.collectionMu.Lock() defer s.collectionMu.Unlock() if s.collecting { @@ -81,7 +76,6 @@ func (s *ClientStream) startNonGRPCDataCollection(st *status.Status, onTimeout f s.status = st s.collecting = true s.nonGRPCDataBuf = make([]byte, 0, nonGRPCDataMaxLen) - s.collectionTimer = time.AfterFunc(nonGRPCDataCollectionTimeout, onTimeout) } // tryHandleNonGRPCData tries to collect non-gRPC body from the given data frame. @@ -109,10 +103,6 @@ func (s *ClientStream) stopNonGRPCDataCollectionLocked() { if !s.collecting { return } - if s.collectionTimer != nil { - s.collectionTimer.Stop() - s.collectionTimer = nil - } data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf)) s.status = status.New(s.status.Code(), s.status.Message()+data) } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 68c7cb15d6df..3aad02e50a1f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1594,9 +1594,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - s.startNonGRPCDataCollection(se, func() { - t.closeStreamWithNonGRPCStatus(s) - }) + s.startNonGRPCDataCollection(se) if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { close(s.headerChan) } From 4e270af0986f2eeb1b40db381ce7099ffa609801 Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 26 Feb 2026 02:20:00 -0500 Subject: [PATCH 05/22] refactor: replace unnecessary mutex with atmoic.Bool --- internal/transport/client_stream.go | 21 ++++++--------------- internal/transport/http2_client.go | 6 ++---- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 8e2432f36296..4a3832eb592d 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -20,7 +20,6 @@ package transport import ( "strconv" - "sync" "sync/atomic" "golang.org/x/net/http2" @@ -56,9 +55,8 @@ type ClientStream struct { // reading its value). headerValid bool - collectionMu sync.Mutex - collecting bool // indicates if stream entered the stage of non-gRPC data collection. - nonGRPCDataBuf []byte // stores the data of a non-gRPC response. + collecting atomic.Bool // indicates if stream entered the stage of non-gRPC data collection. + nonGRPCDataBuf []byte // stores the data of a non-gRPC response. noHeaders bool // set if the client never received headers (set only after the stream is done). headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. @@ -68,13 +66,10 @@ type ClientStream struct { } func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) { - s.collectionMu.Lock() - defer s.collectionMu.Unlock() - if s.collecting { + if !s.collecting.CompareAndSwap(false, true) { return } s.status = st - s.collecting = true s.nonGRPCDataBuf = make([]byte, 0, nonGRPCDataMaxLen) } @@ -83,9 +78,7 @@ func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) { // handle indicates whether the frame should be handled as a non-gRPC response body, // end indicates whether the stream should be closed after handling this frame. func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, end bool) { - s.collectionMu.Lock() - defer s.collectionMu.Unlock() - if !s.collecting { + if !s.collecting.Load() { return false, false } @@ -99,8 +92,8 @@ func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, en // stopNonGRPCBodyCollection stops collecting non-gRPC body and appends the collected. // Should only be called in closeStream. -func (s *ClientStream) stopNonGRPCDataCollectionLocked() { - if !s.collecting { +func (s *ClientStream) stopNonGRPCDataCollection() { + if !s.collecting.Load() { return } data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf)) @@ -179,8 +172,6 @@ func (s *ClientStream) Header() (metadata.MD, error) { s.waitOnHeader() if !s.headerValid || s.noHeaders { - s.collectionMu.Lock() - defer s.collectionMu.Unlock() return nil, s.status.Err() } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 3aad02e50a1f..f103c3a67e34 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -946,17 +946,15 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode <-s.done return } - s.collectionMu.Lock() - if s.collecting { + if s.collecting.Load() { // If the stream is collecting data for non-gRPC, stop collection to finalize status - s.stopNonGRPCDataCollectionLocked() + s.stopNonGRPCDataCollection() } if s.status != nil { st = s.status err = st.Err() } s.status = st - s.collectionMu.Unlock() if len(mdata) > 0 { s.trailer = mdata } From b59203a567079b0f77643d918d465c0c2799903c Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 26 Feb 2026 02:50:54 -0500 Subject: [PATCH 06/22] style: inline errMsg to simplify error message joining --- internal/transport/http2_client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index f103c3a67e34..68eb684d5da4 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1585,8 +1585,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { errs = append(errs, contentTypeErr) } - errMsg := strings.Join(errs, "; ") - se := status.New(grpcErrorCode, errMsg) + se := status.New(grpcErrorCode, strings.Join(errs, "; ")) if endStream { t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, true) return From 685f4ef47a225b4c6e10e51b592f14f1658636e0 Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 26 Feb 2026 03:23:26 -0500 Subject: [PATCH 07/22] Revert "refactor: replace unnecessary mutex with atmoic.Bool" This reverts commit 4e270af0986f2eeb1b40db381ce7099ffa609801. --- internal/transport/client_stream.go | 21 +++++++++++++++------ internal/transport/http2_client.go | 6 ++++-- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 4a3832eb592d..8e2432f36296 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -20,6 +20,7 @@ package transport import ( "strconv" + "sync" "sync/atomic" "golang.org/x/net/http2" @@ -55,8 +56,9 @@ type ClientStream struct { // reading its value). headerValid bool - collecting atomic.Bool // indicates if stream entered the stage of non-gRPC data collection. - nonGRPCDataBuf []byte // stores the data of a non-gRPC response. + collectionMu sync.Mutex + collecting bool // indicates if stream entered the stage of non-gRPC data collection. + nonGRPCDataBuf []byte // stores the data of a non-gRPC response. noHeaders bool // set if the client never received headers (set only after the stream is done). headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. @@ -66,10 +68,13 @@ type ClientStream struct { } func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) { - if !s.collecting.CompareAndSwap(false, true) { + s.collectionMu.Lock() + defer s.collectionMu.Unlock() + if s.collecting { return } s.status = st + s.collecting = true s.nonGRPCDataBuf = make([]byte, 0, nonGRPCDataMaxLen) } @@ -78,7 +83,9 @@ func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) { // handle indicates whether the frame should be handled as a non-gRPC response body, // end indicates whether the stream should be closed after handling this frame. func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, end bool) { - if !s.collecting.Load() { + s.collectionMu.Lock() + defer s.collectionMu.Unlock() + if !s.collecting { return false, false } @@ -92,8 +99,8 @@ func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, en // stopNonGRPCBodyCollection stops collecting non-gRPC body and appends the collected. // Should only be called in closeStream. -func (s *ClientStream) stopNonGRPCDataCollection() { - if !s.collecting.Load() { +func (s *ClientStream) stopNonGRPCDataCollectionLocked() { + if !s.collecting { return } data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf)) @@ -172,6 +179,8 @@ func (s *ClientStream) Header() (metadata.MD, error) { s.waitOnHeader() if !s.headerValid || s.noHeaders { + s.collectionMu.Lock() + defer s.collectionMu.Unlock() return nil, s.status.Err() } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 68eb684d5da4..7ac39ed5e49f 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -946,15 +946,17 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode <-s.done return } - if s.collecting.Load() { + s.collectionMu.Lock() + if s.collecting { // If the stream is collecting data for non-gRPC, stop collection to finalize status - s.stopNonGRPCDataCollection() + s.stopNonGRPCDataCollectionLocked() } if s.status != nil { st = s.status err = st.Err() } s.status = st + s.collectionMu.Unlock() if len(mdata) > 0 { s.trailer = mdata } From e60496ce42ab7462a1072669eb3ae5f5ccefd473 Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 31 Mar 2026 17:35:12 -0400 Subject: [PATCH 08/22] doc: update stale comment --- internal/transport/client_stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 8e2432f36296..06f81698759c 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -97,8 +97,8 @@ func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, en return true, false } -// stopNonGRPCBodyCollection stops collecting non-gRPC body and appends the collected. -// Should only be called in closeStream. +// stopNonGRPCDataCollectionLocked stops collecting non-gRPC data and appends the collected data to the status message. +// Must be called with collectionMu held. Should only be called in closeStream. func (s *ClientStream) stopNonGRPCDataCollectionLocked() { if !s.collecting { return From 8725f9f644295e5c4cb8edd8647a07ea34d7956c Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 31 Mar 2026 18:05:16 -0400 Subject: [PATCH 09/22] fix: hold collectionMu in Status() --- internal/transport/client_stream.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 06f81698759c..47468ec0e50c 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -199,6 +199,8 @@ func (s *ClientStream) TrailersOnly() bool { // Status can be read safely only after the stream has ended, // that is, after Done() is closed. func (s *ClientStream) Status() *status.Status { + s.collectionMu.Lock() + defer s.collectionMu.Unlock() return s.status } From a83d60e9295aa79119200824f5bc157129f98952 Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 9 Apr 2026 14:07:56 -0400 Subject: [PATCH 10/22] Revert "fix: hold collectionMu in Status()" This reverts commit 8725f9f644295e5c4cb8edd8647a07ea34d7956c. --- internal/transport/client_stream.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 47468ec0e50c..06f81698759c 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -199,8 +199,6 @@ func (s *ClientStream) TrailersOnly() bool { // Status can be read safely only after the stream has ended, // that is, after Done() is closed. func (s *ClientStream) Status() *status.Status { - s.collectionMu.Lock() - defer s.collectionMu.Unlock() return s.status } From 6e579f20d465642ea62e2da69f2fc349323c046f Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 9 Apr 2026 15:12:49 -0400 Subject: [PATCH 11/22] refactor: refactor the process to finalize the data collected from non-grpc message. --- internal/transport/client_stream.go | 14 ++++++++++---- internal/transport/http2_client.go | 9 +++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 06f81698759c..bd8da7f89fef 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -97,14 +97,20 @@ func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, en return true, false } -// stopNonGRPCDataCollectionLocked stops collecting non-gRPC data and appends the collected data to the status message. -// Must be called with collectionMu held. Should only be called in closeStream. -func (s *ClientStream) stopNonGRPCDataCollectionLocked() { +// finalizeNonGRPCDataCollectionLocked stops collecting non-gRPC data and +// appends the collected data to the status message. It returns the finalized +// status, or nil if the stream was not collecting. It should only be called +// in closeStream. +// +// Must be called with collectionMu held. +func (s *ClientStream) finalizeNonGRPCDataCollectionLocked() *status.Status { if !s.collecting { - return + return nil } + s.collecting = false data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf)) s.status = status.New(s.status.Code(), s.status.Message()+data) + return s.status } // Read reads an n byte message from the input stream. diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 525d8ad2fbc9..021e1eeacac3 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -949,12 +949,9 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode return } s.collectionMu.Lock() - if s.collecting { - // If the stream is collecting data for non-gRPC, stop collection to finalize status - s.stopNonGRPCDataCollectionLocked() - } - if s.status != nil { - st = s.status + // Finalize non-gRPC data collection and use the resulting status if available. + if finalized := s.finalizeNonGRPCDataCollectionLocked(); finalized != nil { + st = finalized err = st.Err() } s.status = st From 7895aefee4731fc181fb5ea521a093193fb3c73b Mon Sep 17 00:00:00 2001 From: chengxi Date: Thu, 9 Apr 2026 16:14:21 -0400 Subject: [PATCH 12/22] doc: add comment for collectionMu --- internal/transport/client_stream.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index bd8da7f89fef..103e2a17c199 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -56,9 +56,9 @@ type ClientStream struct { // reading its value). headerValid bool - collectionMu sync.Mutex - collecting bool // indicates if stream entered the stage of non-gRPC data collection. - nonGRPCDataBuf []byte // stores the data of a non-gRPC response. + collectionMu sync.Mutex // protects collecting, nonGRPCDataBuf, and status during the non-gRPC data collection lifecycle. + collecting bool // indicates if stream entered the stage of non-gRPC data collection. + nonGRPCDataBuf []byte // stores the data of a non-gRPC response. noHeaders bool // set if the client never received headers (set only after the stream is done). headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. From e3d1434a682a59ef7eb5e768c6d77718430007f0 Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 24 Apr 2026 21:26:36 -0400 Subject: [PATCH 13/22] refactor: introduce nonGRPCStatus field and inline finalization in closeStream Replace the `collecting` bool and `finalizeNonGRPCDataCollectionLocked` method with a `nonGRPCStatus` field on ClientStream. The non-gRPC status is now stored separately from the stream's `status` field and finalized inline in closeStream, which eliminates the race on `status` between Header() readers and closeStream writers. --- internal/transport/client_stream.go | 33 ++++++++-------------------- internal/transport/http2_client.go | 10 +++++---- internal/transport/transport_test.go | 11 +++++++++- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 103e2a17c199..ca57212978c0 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -19,7 +19,6 @@ package transport import ( - "strconv" "sync" "sync/atomic" @@ -56,9 +55,9 @@ type ClientStream struct { // reading its value). headerValid bool - collectionMu sync.Mutex // protects collecting, nonGRPCDataBuf, and status during the non-gRPC data collection lifecycle. - collecting bool // indicates if stream entered the stage of non-gRPC data collection. - nonGRPCDataBuf []byte // stores the data of a non-gRPC response. + collectionMu sync.Mutex // protects nonGRPCStatus and nonGRPCDataBuf during the non-gRPC data collection lifecycle. + nonGRPCStatus *status.Status // the initial status from the non-gRPC response header, finalized with collected data before closing. + nonGRPCDataBuf []byte // stores the data of a non-gRPC response. noHeaders bool // set if the client never received headers (set only after the stream is done). headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. @@ -70,11 +69,12 @@ type ClientStream struct { func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) { s.collectionMu.Lock() defer s.collectionMu.Unlock() - if s.collecting { + if s.nonGRPCStatus != nil { + // If nonGRPCStatus is already set, it means the stream is already in + // the non-gRPC data collection lifecycle. return } - s.status = st - s.collecting = true + s.nonGRPCStatus = st s.nonGRPCDataBuf = make([]byte, 0, nonGRPCDataMaxLen) } @@ -85,7 +85,8 @@ func (s *ClientStream) startNonGRPCDataCollection(st *status.Status) { func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, end bool) { s.collectionMu.Lock() defer s.collectionMu.Unlock() - if !s.collecting { + if s.nonGRPCStatus == nil { + // if is not in the non-gRPC data collection lifecycle, do not handle this frame. return false, false } @@ -97,22 +98,6 @@ func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, en return true, false } -// finalizeNonGRPCDataCollectionLocked stops collecting non-gRPC data and -// appends the collected data to the status message. It returns the finalized -// status, or nil if the stream was not collecting. It should only be called -// in closeStream. -// -// Must be called with collectionMu held. -func (s *ClientStream) finalizeNonGRPCDataCollectionLocked() *status.Status { - if !s.collecting { - return nil - } - s.collecting = false - data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf)) - s.status = status.New(s.status.Code(), s.status.Message()+data) - return s.status -} - // Read reads an n byte message from the input stream. func (s *ClientStream) Read(n int) (mem.BufferSlice, error) { b, err := s.Stream.read(n) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 021e1eeacac3..4a60ca1945cc 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -949,13 +949,15 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode return } s.collectionMu.Lock() - // Finalize non-gRPC data collection and use the resulting status if available. - if finalized := s.finalizeNonGRPCDataCollectionLocked(); finalized != nil { - st = finalized + if s.nonGRPCStatus != nil { + data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf)) + st = status.New(s.nonGRPCStatus.Code(), s.nonGRPCStatus.Message()+data) err = st.Err() + // Clear the nonGRPCStatus to indicate the non-grpc data collection is done. + s.nonGRPCStatus = nil } - s.status = st s.collectionMu.Unlock() + s.status = st if len(mdata) > 0 { s.trailer = mdata } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 66c80387c03c..9f202510ba7d 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2685,6 +2685,7 @@ func (s) TestClientDecodeHeader(t *testing.T) { name string metaHeaderFrame *http2.MetaHeadersFrame wantStatus *status.Status + isNonGRPCStatus bool }{ { name: "valid_header", @@ -2708,6 +2709,7 @@ func (s) TestClientDecodeHeader(t *testing.T) { codes.Unknown, "unexpected HTTP status code received from server: 200 (OK); malformed header: missing HTTP content-type", ), + isNonGRPCStatus: true, }, { name: "invalid_grpc_status", @@ -2734,6 +2736,7 @@ func (s) TestClientDecodeHeader(t *testing.T) { codes.Internal, "malformed header: missing HTTP status; transport: received unexpected content-type \"application/json\"", ), + isNonGRPCStatus: true, }, { name: "invalid_content_type_with_http_status_504", @@ -2747,6 +2750,7 @@ func (s) TestClientDecodeHeader(t *testing.T) { codes.Unavailable, "unexpected HTTP status code received from server: 504 (Gateway Timeout); transport: received unexpected content-type \"application/json\"", ), + isNonGRPCStatus: true, }, { name: "http_fallback_and_invalid_http_status", @@ -2803,7 +2807,12 @@ func (s) TestClientDecodeHeader(t *testing.T) { } s.operateHeaders(tc.metaHeaderFrame) - got := cs.status + var got *status.Status + if tc.isNonGRPCStatus { + got = cs.nonGRPCStatus + } else { + got = cs.status + } want := tc.wantStatus if got.Code() != want.Code() || got.Message() != want.Message() { t.Errorf("operateHeaders(%v) got status %q, want %q", tc.metaHeaderFrame, got, want) From 2069ce956cc3d2346476804924f2d4035fc3200c Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 24 Apr 2026 21:26:54 -0400 Subject: [PATCH 14/22] refactor: defer headerChan closure until stream closes for non-gRPC responses Do not close headerChan in operateHeaders when a non-gRPC response is received. Instead, let closeStream close it when the stream finishes. This ensures Header() only returns after the status is finalized, eliminating the need for collectionMu in Header(). --- internal/transport/client_stream.go | 2 -- internal/transport/http2_client.go | 3 --- 2 files changed, 5 deletions(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index ca57212978c0..683369ece7a0 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -170,8 +170,6 @@ func (s *ClientStream) Header() (metadata.MD, error) { s.waitOnHeader() if !s.headerValid || s.noHeaders { - s.collectionMu.Lock() - defer s.collectionMu.Unlock() return nil, s.status.Err() } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 4a60ca1945cc..1f95fab2efe2 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1599,9 +1599,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { } s.startNonGRPCDataCollection(se) - if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) { - close(s.headerChan) - } return } From 15e67d00625cd4c4fd7462c7863ec7db1f146224 Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 24 Apr 2026 21:27:09 -0400 Subject: [PATCH 15/22] test: split headerChan test into non-gRPC response and malformed header cases The original TestHeaderChanClosedAfterReceivingAnInvalidHeader was actually testing the non-gRPC response path (!isGRPC), not the malformed header path (headerError). Split into two tests: - TestHeaderChanClosedAfterReceivingNonGRPCResponse: verifies headerChan closes when the stream closes (via context timeout) - TestHeaderChanClosedAfterReceivingAnInvalidHeader: verifies headerChan closes immediately via closeStream on the headerError path --- internal/transport/transport_test.go | 51 +++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 9f202510ba7d..939a310068db 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -112,6 +112,7 @@ const ( misbehaved encodingRequiredStatus invalidHeaderField + malformedHeader delayRead pingpong ) @@ -230,6 +231,19 @@ func (h *testStreamHandler) handleStreamInvalidHeaderField(s *ServerStream) { }) } +func (h *testStreamHandler) handleStreamMalformedHeader(s *ServerStream) { + headerFields := []hpack.HeaderField{ + {Name: ":status", Value: "200"}, + {Name: "content-type", Value: "application/grpc"}, + {Name: "x-bad-bin", Value: "!!!invalid-base64!!!"}, + } + h.t.controlBuf.put(&headerFrame{ + streamID: s.id, + hf: headerFields, + endStream: false, + }) +} + // handleStreamDelayRead delays reads so that the other side has to halt on // stream-level flow control. // This handler assumes dynamic flow control is turned off and assumes window @@ -436,6 +450,17 @@ func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hT }) wg.Done() }() + case malformedHeader: + go func() { + transport.HandleStreams(ctx, func(s *ServerStream) { + wg.Add(1) + go func() { + h.handleStreamMalformedHeader(s) + wg.Done() + }() + }) + wg.Done() + }() case delayRead: h.notify = make(chan struct{}) h.getNotified = make(chan struct{}) @@ -1660,7 +1685,7 @@ func (s) TestInvalidHeaderField(t *testing.T) { server.stop() } -func (s) TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) { +func (s) TestHeaderChanClosedAfterReceivingNonGRPCResponse(t *testing.T) { server, ct, cancel := setUp(t, 0, invalidHeaderField) defer cancel() defer server.stop() @@ -1671,6 +1696,30 @@ func (s) TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) { if err != nil { t.Fatalf("failed to create the stream") } + // The server sends a non-gRPC response without ending the stream, so the + // stream enters data collection mode. headerChan is not closed until the + // stream itself closes (via context timeout here). + if _, err := s.Header(); err == nil { + t.Fatalf("Header() succeeded, want error") + } + select { + case <-s.headerChan: + default: + t.Errorf("s.headerChan: got open, want closed") + } +} + +func (s) TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) { + server, ct, cancel := setUp(t, 0, malformedHeader) + defer cancel() + defer server.stop() + defer ct.Close(fmt.Errorf("closed manually by test")) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + s, err := ct.NewStream(ctx, &CallHdr{Host: "localhost", Method: "foo"}, nil) + if err != nil { + t.Fatalf("failed to create the stream") + } timer := time.NewTimer(time.Second) defer timer.Stop() select { From db6ea10041eb68a1e0031145773879adc8d59adc Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 24 Apr 2026 21:45:18 -0400 Subject: [PATCH 16/22] chore: change invalidHeaderField to invalidContentType for readability --- internal/transport/transport_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 939a310068db..4ee7ecff6415 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -111,7 +111,7 @@ const ( notifyCall misbehaved encodingRequiredStatus - invalidHeaderField + invalidContentType malformedHeader delayRead pingpong @@ -221,7 +221,7 @@ func (h *testStreamHandler) handleStreamEncodingRequiredStatus(s *ServerStream) s.Read(math.MaxInt) } -func (h *testStreamHandler) handleStreamInvalidHeaderField(s *ServerStream) { +func (h *testStreamHandler) handleStreamInvalidContentType(s *ServerStream) { headerFields := []hpack.HeaderField{} headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: expectedInvalidHeaderField}) h.t.controlBuf.put(&headerFrame{ @@ -439,12 +439,12 @@ func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hT }) wg.Done() }() - case invalidHeaderField: + case invalidContentType: go func() { transport.HandleStreams(ctx, func(s *ServerStream) { wg.Add(1) go func() { - h.handleStreamInvalidHeaderField(s) + h.handleStreamInvalidContentType(s) wg.Done() }() }) @@ -1663,8 +1663,8 @@ func (s) TestEncodingRequiredStatus(t *testing.T) { s.Read(math.MaxInt) } -func (s) TestInvalidHeaderField(t *testing.T) { - server, ct, cancel := setUp(t, 0, invalidHeaderField) +func (s) TestInvalidContentType(t *testing.T) { + server, ct, cancel := setUp(t, 0, invalidContentType) defer cancel() callHdr := &CallHdr{ Host: "localhost", @@ -1686,7 +1686,7 @@ func (s) TestInvalidHeaderField(t *testing.T) { } func (s) TestHeaderChanClosedAfterReceivingNonGRPCResponse(t *testing.T) { - server, ct, cancel := setUp(t, 0, invalidHeaderField) + server, ct, cancel := setUp(t, 0, invalidContentType) defer cancel() defer server.stop() defer ct.Close(fmt.Errorf("closed manually by test")) @@ -1698,7 +1698,7 @@ func (s) TestHeaderChanClosedAfterReceivingNonGRPCResponse(t *testing.T) { } // The server sends a non-gRPC response without ending the stream, so the // stream enters data collection mode. headerChan is not closed until the - // stream itself closes (via context timeout here). + // stream itself closes. if _, err := s.Header(); err == nil { t.Fatalf("Header() succeeded, want error") } From 6afa8f8b0fa635f9b44a627f316ee7f3ebe52aa9 Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 24 Apr 2026 22:02:31 -0400 Subject: [PATCH 17/22] refactor: inline closeNonGRPCStream and provide comment to clarify --- internal/transport/http2_client.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 1f95fab2efe2..b9cd3e9e44dd 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -936,10 +936,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr, handler s return s, nil } -func (t *http2Client) closeStreamWithNonGRPCStatus(s *ClientStream) { - t.closeStream(s, nil, true, http2.ErrCodeProtocol, nil, nil, true) -} - func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) { // Set stream status to done. if s.swapState(streamDone) == streamDone { @@ -1244,7 +1240,9 @@ func (t *http2Client) handleData(f *parsedDataFrame) { }) } if end { - t.closeStreamWithNonGRPCStatus(s) + // closeStream will finalize the nonGRPCStatus and nonGRPCDataBuf, + // and provide them as err and st. + t.closeStream(s, nil, true, http2.ErrCodeProtocol, nil, nil, true) } return } From df12e443c73b6466660990d42d68429a282f8084 Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 24 Apr 2026 22:04:07 -0400 Subject: [PATCH 18/22] docs: provide explanation for nonGRPCStatus finalize --- internal/transport/http2_client.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index b9cd3e9e44dd..dbe83bfffe12 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -944,6 +944,11 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode <-s.done return } + + // If the stream is in the non-gRPC data collection lifecycle, use the + // nonGRPCStatus and nonGRPCDataBuf to construct the final status and + // error to return to the user. This is to ensure that non-gRPC data + // collected is included in the final status message returned to the user. s.collectionMu.Lock() if s.nonGRPCStatus != nil { data := "\ndata: " + strconv.Quote(string(s.nonGRPCDataBuf)) @@ -953,6 +958,7 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode s.nonGRPCStatus = nil } s.collectionMu.Unlock() + s.status = st if len(mdata) > 0 { s.trailer = mdata @@ -1240,7 +1246,7 @@ func (t *http2Client) handleData(f *parsedDataFrame) { }) } if end { - // closeStream will finalize the nonGRPCStatus and nonGRPCDataBuf, + // Close the stream; closeStream will finalize the nonGRPCStatus and nonGRPCDataBuf, // and provide them as err and st. t.closeStream(s, nil, true, http2.ErrCodeProtocol, nil, nil, true) } From f5c04f23be579a0f4f62166f94fc22834650988c Mon Sep 17 00:00:00 2001 From: chengxi Date: Fri, 24 Apr 2026 22:44:56 -0400 Subject: [PATCH 19/22] typo: if is not -> if not --- internal/transport/client_stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/client_stream.go b/internal/transport/client_stream.go index 683369ece7a0..961f35dc5de1 100644 --- a/internal/transport/client_stream.go +++ b/internal/transport/client_stream.go @@ -86,7 +86,7 @@ func (s *ClientStream) tryHandleNonGRPCData(f *parsedDataFrame) (handle bool, en s.collectionMu.Lock() defer s.collectionMu.Unlock() if s.nonGRPCStatus == nil { - // if is not in the non-gRPC data collection lifecycle, do not handle this frame. + // if not in the non-gRPC data collection lifecycle, do not handle this frame. return false, false } From 44e4144ccc37e8cb503a88f3aa2d79c30f3f482b Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 5 May 2026 21:19:08 -0400 Subject: [PATCH 20/22] chore: nix this newline --- test/end2end_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 200b5accd96d..784c27b8c54e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -6896,7 +6896,6 @@ data: "hello"`, client := testgrpc.NewTestServiceClient(cc) _, err = client.EmptyCall(ctx, &testpb.Empty{}) - if err == nil { t.Fatalf("EmptyCall() = nil; want non-nil error due to non-gRPC response") } From 0d23730a20f9f9ca2a3813aea9cde690eb693a27 Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 5 May 2026 21:28:03 -0400 Subject: [PATCH 21/22] style: avoid checking the code again in wantErr --- test/end2end_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 784c27b8c54e..9456088e3eb5 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -6821,7 +6821,7 @@ func (s) TestHTTPServerSendsNonGRPCHeaderSurfaceFurtherData(t *testing.T) { }, }, wantCode: codes.Unknown, - wantErr: `rpc error: code = Unknown desc = unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" + wantErr: `unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" data: ""`, }, { @@ -6838,7 +6838,7 @@ data: ""`, }, }, wantCode: codes.Unknown, - wantErr: `rpc error: code = Unknown desc = unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" + wantErr: `unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" data: "Hello World"`, }, { @@ -6855,7 +6855,7 @@ data: "Hello World"`, }, }, wantCode: codes.Unknown, - wantErr: `rpc error: code = Unknown desc = unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" + wantErr: `unexpected HTTP status code received from server: 200 (OK); transport: received unexpected content-type "text/html" data: ` + strconv.Quote(strings.Repeat("a", nonGRPCDataMaxLen)), }, { @@ -6869,7 +6869,7 @@ data: ` + strconv.Quote(strings.Repeat("a", nonGRPCDataMaxLen)), }, }, wantCode: codes.Unavailable, - wantErr: `rpc error: code = Unavailable desc = unexpected HTTP status code received from server: 502 (Bad Gateway); malformed header: missing HTTP content-type + wantErr: `unexpected HTTP status code received from server: 502 (Bad Gateway); malformed header: missing HTTP content-type data: "hello"`, }, } @@ -6904,8 +6904,8 @@ data: "hello"`, t.Fatalf("Unexpected error code: got %v, want %v\nfull error:\n%v", got, want, err) } - if err.Error() != test.wantErr { - t.Errorf("Unexpected error message: got\n %v, want\n %v", err.Error(), test.wantErr) + if got := status.Convert(err).Message(); got != test.wantErr { + t.Errorf("Unexpected error message: got\n %v, want\n %v", got, test.wantErr) } }) } From df306a0130cddd9e89ee31fe5945b1a06ac9b0ac Mon Sep 17 00:00:00 2001 From: chengxi Date: Tue, 5 May 2026 21:52:37 -0400 Subject: [PATCH 22/22] style: improve the Errorf output --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 9456088e3eb5..fb492fcb93b3 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -6905,7 +6905,7 @@ data: "hello"`, } if got := status.Convert(err).Message(); got != test.wantErr { - t.Errorf("Unexpected error message: got\n %v, want\n %v", got, test.wantErr) + t.Errorf("Unexpected error message: \ngot:\n%v\nwant:\n%v", got, test.wantErr) } }) }