diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala index cd9b3a7367..3d5fb1954b 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TcpSpec.scala @@ -519,6 +519,31 @@ class TcpSpec extends StreamSpec(""" binding.unbind() } + "not throw a NullPointerException when full-close is requested and upstream finishes before the connection is established (#3255)" in { + val binding = + Tcp() + .bind("127.0.0.1", 0) + .toMat(Sink.foreach { conn => + conn.flow.join(Flow[ByteString]).run() + })(Keep.left) + .run() + .futureValue + + // `Source.empty` completes synchronously during materialization, i.e. before the outbound + // connection is established (the connection ActorRef is still null at that point). With + // half-close disabled the upstream-finished handler used to dereference that null connection + // (`connection ! Close`) and fail the stage with a NullPointerException. The fix defers the + // close until the connection is established, so the stream must complete normally here. + val result = Source + .empty[ByteString] + .via(Tcp().outgoingConnection(binding.localAddress, halfClose = false)) + .runWith(Sink.ignore) + + result.futureValue should ===(Done) + + binding.unbind() + } + "Echo should work even if server is in full close mode" in { val serverAddress = temporaryServerAddress() diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala index 79c95e9383..3f6f2cac8c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala @@ -322,7 +322,12 @@ private[stream] object ConnectionSourceStage { stageActor.watch(connection) connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) if (isAvailable(bytesOut)) connection ! ResumeReading - if (isClosed(bytesIn)) connection ! ConfirmedClose + if (isClosed(bytesIn)) + // Upstream finished before the connection was established. Honor the half-close + // setting: ConfirmedClose only closes the write side (half-close), while Close + // tears down the whole connection. Always sending ConfirmedClose here would keep + // the read side open even when half-close is disabled (issue #3255). + connection ! (if (role.halfClose) ConfirmedClose else Close) else pull(bytesIn) case other => log.warning("Unexpected message to connecting TcpStage: [{}]", other.getClass) } @@ -396,23 +401,26 @@ private[stream] object ConnectionSourceStage { } private def closeConnectionUpstreamFinished(): Unit = { - if (isClosed(bytesOut) || !role.halfClose) { + if (connection eq null) { + // This is an outbound connection for which upstream finished before the connection + // was even established. The close is deferred until the connection is established; + // the `connecting` handler inspects `isClosed(bytesIn)` and closes the connection + // honoring the half-close setting. Dereferencing `connection` here would otherwise + // throw a NullPointerException when half-close is disabled (issue #3255). + } else if (isClosed(bytesOut) || !role.halfClose) { // Reading has stopped before, either because of cancel, or PeerClosed, so just Close now // (or half-close is turned off) if (writeInProgress) connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained else connection ! Close - } else if (connection ne null) { + } else { // We still read, so we only close the write side if (writeInProgress) connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained else connection ! ConfirmedClose } - // Otherwise, this is an outbound connection with half-close enabled for which upstream finished - // before the connection was even established. - // In that case we half-close the connection as soon as it's connected } private def closeConnectionDownstreamFinished(): Unit = {