Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 14 additions & 33 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,39 +956,16 @@ func (l *loopyWriter) processData() (bool, error) {
// from data is copied to h to make as big as the maximum possible HTTP2 frame
// size.

if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
return false, err
}
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
return false, err
}
} else {
l.activeStreams.enqueue(str)
}
return false, nil
}

isEmpty := len(dataItem.h) == 0 && reader.Remaining() == 0
// Figure out the maximum size we can send
maxSize := http2MaxFrameLen
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
strQuota := int(l.oiws) - str.bytesOutStanding
if strQuota <= 0 && !isEmpty { // stream-level flow control.
str.state = waitingOnStreamQuota
return false, nil
} else if maxSize > strQuota {
maxSize = strQuota
}
if maxSize > int(l.sendQuota) { // connection-level flow control.
maxSize = int(l.sendQuota)
}
maxSize = min(maxSize, max(strQuota, 0))
maxSize = min(maxSize, int(l.sendQuota)) // connection-level flow control.
// Compute how much of the header and data we can send within quota and max frame length
hSize := min(maxSize, len(dataItem.h))
dSize := min(maxSize-hSize, reader.Remaining())
Expand Down Expand Up @@ -1018,7 +995,7 @@ func (l *loopyWriter) processData() (bool, error) {
if dataItem.endStream && remainingBytes == 0 {
endStream = true
}
if dataItem.onEachWrite != nil {
if dataItem.onEachWrite != nil && !isEmpty {
dataItem.onEachWrite()
}
err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
Expand All @@ -1039,19 +1016,23 @@ func (l *loopyWriter) processData() (bool, error) {
reader.Close()
str.itl.dequeue()
}
return false, l.updateStreamAfterWrite(str)
}

func (l *loopyWriter) updateStreamAfterWrite(str *outStream) error {
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
return false, err
return err
}
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
return false, err
return err
}
} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
str.state = waitingOnStreamQuota
} else { // Otherwise add it back to the list of active streams.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please leave the comment here?

l.activeStreams.enqueue(str)
}
return false, nil
return nil
}
Loading