Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,31 @@ class TcpSpec extends StreamSpec("""
binding.unbind()
}

"not throw a NullPointerException when full-close is requested and upstream finishes before the connection is established (#3255)" in {
val binding =
Tcp()
.bind("127.0.0.1", 0)
.toMat(Sink.foreach { conn =>
conn.flow.join(Flow[ByteString]).run()
})(Keep.left)
.run()
.futureValue

// `Source.empty` completes synchronously during materialization, i.e. before the outbound
// connection is established (the connection ActorRef is still null at that point). With
// half-close disabled the upstream-finished handler used to dereference that null connection
// (`connection ! Close`) and fail the stage with a NullPointerException. The fix defers the
// close until the connection is established, so the stream must complete normally here.
val result = Source
.empty[ByteString]
.via(Tcp().outgoingConnection(binding.localAddress, halfClose = false))
.runWith(Sink.ignore)

result.futureValue should ===(Done)

binding.unbind()
}

"Echo should work even if server is in full close mode" in {
val serverAddress = temporaryServerAddress()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,12 @@ private[stream] object ConnectionSourceStage {
stageActor.watch(connection)
connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
if (isAvailable(bytesOut)) connection ! ResumeReading
if (isClosed(bytesIn)) connection ! ConfirmedClose
if (isClosed(bytesIn))
// Upstream finished before the connection was established. Honor the half-close
// setting: ConfirmedClose only closes the write side (half-close), while Close
// tears down the whole connection. Always sending ConfirmedClose here would keep
// the read side open even when half-close is disabled (issue #3255).
connection ! (if (role.halfClose) ConfirmedClose else Close)
else pull(bytesIn)
case other => log.warning("Unexpected message to connecting TcpStage: [{}]", other.getClass)
}
Expand Down Expand Up @@ -396,23 +401,26 @@ private[stream] object ConnectionSourceStage {
}

private def closeConnectionUpstreamFinished(): Unit = {
if (isClosed(bytesOut) || !role.halfClose) {
if (connection eq null) {
// This is an outbound connection for which upstream finished before the connection
// was even established. The close is deferred until the connection is established;
// the `connecting` handler inspects `isClosed(bytesIn)` and closes the connection
// honoring the half-close setting. Dereferencing `connection` here would otherwise
// throw a NullPointerException when half-close is disabled (issue #3255).
} else if (isClosed(bytesOut) || !role.halfClose) {
// Reading has stopped before, either because of cancel, or PeerClosed, so just Close now
// (or half-close is turned off)
if (writeInProgress)
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
else
connection ! Close
} else if (connection ne null) {
} else {
// We still read, so we only close the write side
if (writeInProgress)
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
else
connection ! ConfirmedClose
}
// Otherwise, this is an outbound connection with half-close enabled for which upstream finished
// before the connection was even established.
// In that case we half-close the connection as soon as it's connected
}

private def closeConnectionDownstreamFinished(): Unit = {
Expand Down