diff --git a/asyncua/server/reverse_connect.py b/asyncua/server/reverse_connect.py new file mode 100644 index 000000000..b0ca150e7 --- /dev/null +++ b/asyncua/server/reverse_connect.py @@ -0,0 +1,364 @@ +""" +OPC UA Reverse Connect support for the asyncua server. + +Implements the server-side of OPC UA Reverse Connect as specified in +OPC UA Part 6 §7.1.3 and Part 2 §6.14. + +In Reverse Connect the *server* dials an outgoing TCP connection to a +pre-configured client URI, then immediately sends a ``ReverseHello`` +message. The client uses that message to identify the connecting server +and, if desired, drives the rest of the OPC UA handshake exactly as if +*it* had connected (OpenSecureChannel → CreateSession → ActivateSession). + +Usage example:: + + from asyncua import Server + from asyncua.server.reverse_connect import ReverseConnectConfig, ReverseConnectClientEntry + + server = Server() + await server.init() + server.reverse_connect = ReverseConnectConfig( + clients=[ + ReverseConnectClientEntry(endpoint_url="opc.tcp://192.168.1.10:4840"), + ], + connect_interval=15_000, # retry every 15 s + ) + async with server: + await asyncio.sleep(3600) +""" +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass, field + +from asyncua import ua +from asyncua.ua.ua_binary import uatcp_to_binary + +from ..common.connection import TransportLimits +from ..common.utils import Buffer, NotEnoughData +from ..ua.ua_binary import header_from_binary +from .internal_server import InternalServer +from .uaprocessor import UaProcessor + +_logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Configuration dataclasses +# --------------------------------------------------------------------------- + + +@dataclass +class ReverseConnectClientEntry: + """Configuration for a single reverse-connect target (one client endpoint).""" + + endpoint_url: str + """OPC UA endpoint URL of the client listener, e.g. ``opc.tcp://host:4840``.""" + + timeout: int = 30_000 + """Connection timeout in milliseconds (per-attempt).""" + + max_session_count: int = 0 + """Maximum simultaneous sessions over this reverse connection (0 = unlimited).""" + + enabled: bool = True + """Set to ``False`` to disable this entry without removing it from the config.""" + + +@dataclass +class ReverseConnectConfig: + """Server-side Reverse Connect configuration.""" + + clients: list[ReverseConnectClientEntry] = field(default_factory=list) + """Ordered list of client endpoints the server should dial.""" + + connect_interval: int = 15_000 + """Milliseconds to wait between connection attempts (or between retries on failure).""" + + connect_timeout: int = 30_000 + """Milliseconds to wait for the TCP connect+ReverseHello to succeed.""" + + reject_timeout: int = 60_000 + """Milliseconds to wait before retrying after the remote actively refused the connection.""" + + +# --------------------------------------------------------------------------- +# Protocol for outgoing reverse connections +# --------------------------------------------------------------------------- + + +class OPCUAReverseProtocol(asyncio.Protocol): + """ + asyncio Protocol used for *outgoing* reverse-connect TCP connections. + + Behaviour is nearly identical to :class:`OPCUAProtocol` (the inbound + server protocol), with two differences: + + 1. In ``connection_made`` the protocol immediately writes a + ``ReverseHello`` message so the remote client can identify the + server. + 2. ``connection_lost`` sets an internal ``asyncio.Event`` that the + owning :class:`ReverseConnectManager` task awaits to know when to + schedule a reconnect. + """ + + def __init__( + self, + iserver: InternalServer, + policies: list, + clients: list, + closing_tasks: list, + limits: TransportLimits, + server_uri: str, + server_endpoint_url: str, + ) -> None: + self.peer_name = None + self.transport: asyncio.Transport | None = None + self.processor: UaProcessor | None = None + self._buffer = b"" + self.iserver: InternalServer = iserver + self.policies = policies + self.clients = clients + self.closing_tasks = closing_tasks + self.messages: asyncio.Queue = asyncio.Queue() + self.limits = limits + self.server_uri = server_uri + self.server_endpoint_url = server_endpoint_url + # Signal fired in connection_lost so the manager loop can reschedule. + self.closed_event: asyncio.Event = asyncio.Event() + self._task: asyncio.Task | None = None + + def __str__(self) -> str: + return f"OPCUAReverseProtocol({self.peer_name})" + + __repr__ = __str__ + + def connection_made(self, transport: asyncio.Transport) -> None: # type: ignore[override] + self.peer_name = transport.get_extra_info("peername") + _logger.info("Reverse connection established to %s", self.peer_name) + self.transport = transport + self.processor = UaProcessor(self.iserver, self.transport, self.limits) + self.processor.set_policies(self.policies) + self.iserver.asyncio_transports.append(transport) + self.clients.append(self) + self._task = asyncio.create_task(self._process_received_message_loop()) + + # --- Send ReverseHello immediately -------------------------------- + rhel = ua.ReverseHello() + rhel.ServerUri = self.server_uri + rhel.EndpointUrl = self.server_endpoint_url + transport.write(uatcp_to_binary(ua.MessageType.ReverseHello, rhel)) + _logger.debug( + "Sent ReverseHello(ServerUri=%s, EndpointUrl=%s) to %s", + self.server_uri, + self.server_endpoint_url, + self.peer_name, + ) + + def connection_lost(self, exc: Exception | None) -> None: + _logger.info("Reverse connection to %s closed (%s)", self.peer_name, exc) + assert self.transport is not None + assert self.processor is not None + self.transport.close() + if self.transport in self.iserver.asyncio_transports: + self.iserver.asyncio_transports.remove(self.transport) + closing_task = asyncio.create_task(self.processor.close()) + self.closing_tasks.append(closing_task) + if self in self.clients: + self.clients.remove(self) + self.messages.put_nowait((None, None)) + if self._task is not None: + self._task.cancel() + self.closed_event.set() + + def data_received(self, data: bytes) -> None: + assert self.transport is not None + self._buffer += data + while self._buffer: + try: + buf = Buffer(self._buffer) + try: + header = header_from_binary(buf) + except NotEnoughData: + return + if header.header_size + header.body_size <= header.header_size: + _logger.error("Got malformed header %s from %s", header, self.peer_name) + self.transport.close() + return + if len(buf) < header.body_size: + _logger.debug( + "Not enough data from %s: need %s, got %s", + self.peer_name, + header.body_size, + len(buf), + ) + return + self.messages.put_nowait((header, buf)) + self._buffer = self._buffer[(header.header_size + header.body_size) :] + except Exception: + _logger.exception("Exception while parsing message from %s", self.peer_name) + return + + async def _process_received_message_loop(self) -> None: + while True: + header, buf = await self.messages.get() + if header is None and buf is None: + break + try: + await self._process_one_msg(header, buf) + except Exception: + _logger.exception("Exception while processing message from %s", self.peer_name) + + async def _process_one_msg(self, header, buf) -> None: + assert self.processor is not None + assert self.transport is not None + _logger.debug("_process_received_message %s %s", header.body_size, len(buf)) + ret = await self.processor.process(header, buf) + if not ret: + _logger.info("Processor returned False; closing reverse connection to %s", self.peer_name) + self.transport.close() + + +# --------------------------------------------------------------------------- +# Manager +# --------------------------------------------------------------------------- + + +class ReverseConnectManager: + """ + Manages outgoing reverse-connect TCP connections on behalf of the server. + + For each enabled :class:`ReverseConnectClientEntry` a long-running + asyncio task is created that: + + * dials the target URI, + * sends a ``ReverseHello``, + * waits until the connection closes, + * then sleeps for ``connect_interval`` ms before retrying. + + The manager is started/stopped by :class:`~asyncua.server.Server`. + """ + + def __init__( + self, + iserver: InternalServer, + policies: list, + closing_tasks: list, + limits: TransportLimits, + server_uri: str, + server_endpoint_url: str, + config: ReverseConnectConfig, + ) -> None: + self.iserver = iserver + self.policies = policies + self.closing_tasks = closing_tasks + self.limits = limits + self.server_uri = server_uri + self.server_endpoint_url = server_endpoint_url + self.config = config + self._clients: list = [] + self._tasks: list[asyncio.Task] = [] + + async def start(self) -> None: + """Start one reconnect loop per enabled client entry.""" + for entry in self.config.clients: + if not entry.enabled: + _logger.info("ReverseConnect: skipping disabled entry %s", entry.endpoint_url) + continue + task = asyncio.create_task( + self._connect_loop(entry), + name=f"rc-{entry.endpoint_url}", + ) + self._tasks.append(task) + _logger.info("ReverseConnect: started loop for %s", entry.endpoint_url) + + async def stop(self) -> None: + """Cancel all reconnect tasks and wait for them to finish.""" + for task in self._tasks: + task.cancel() + if self._tasks: + await asyncio.gather(*self._tasks, return_exceptions=True) + self._tasks.clear() + _logger.info("ReverseConnect: all loops stopped") + + async def _connect_loop(self, entry: ReverseConnectClientEntry) -> None: + """ + Infinite retry loop for a single reverse-connect target. + + The loop: + 1. Resolves host/port from the entry URL. + 2. Dials the client. + 3. Waits for the connection to close. + 4. Sleeps ``connect_interval`` ms and goes back to 1. + """ + from urllib.parse import urlparse + + url = urlparse(entry.endpoint_url) + host = url.hostname + port = url.port or 4840 + connect_timeout_s = (entry.timeout or self.config.connect_timeout) / 1000.0 + interval_s = self.config.connect_interval / 1000.0 + reject_timeout_s = self.config.reject_timeout / 1000.0 + + while True: + try: + _logger.info("ReverseConnect: attempting connection to %s:%s", host, port) + loop = asyncio.get_running_loop() + + closed_event = asyncio.Event() + + def protocol_factory(): + proto = OPCUAReverseProtocol( + iserver=self.iserver, + policies=self.policies, + clients=self._clients, + closing_tasks=self.closing_tasks, + limits=self.limits, + server_uri=self.server_uri, + server_endpoint_url=self.server_endpoint_url, + ) + # Bind the event so we can await it below. + closed_event.__setattr__ # access to check it's there + proto.closed_event = closed_event + return proto + + _transport, _protocol = await asyncio.wait_for( + loop.create_connection(protocol_factory, host, port), + timeout=connect_timeout_s, + ) + _logger.info("ReverseConnect: connected to %s:%s - waiting for session to end", host, port) + # Wait until the protocol's connection_lost fires. + await closed_event.wait() + _logger.info("ReverseConnect: connection to %s:%s ended; will retry in %.1f s", host, port, interval_s) + + except asyncio.CancelledError: + _logger.debug("ReverseConnect: loop for %s cancelled", entry.endpoint_url) + return + + except ConnectionRefusedError: + _logger.warning( + "ReverseConnect: connection to %s:%s was refused; retrying in %.1f s", + host, + port, + reject_timeout_s, + ) + try: + await asyncio.sleep(reject_timeout_s) + except asyncio.CancelledError: + return + continue + + except (OSError, asyncio.TimeoutError) as exc: + _logger.warning( + "ReverseConnect: could not connect to %s:%s (%s); retrying in %.1f s", + host, + port, + exc, + interval_s, + ) + + try: + await asyncio.sleep(interval_s) + except asyncio.CancelledError: + return diff --git a/asyncua/server/server.py b/asyncua/server/server.py index bf0c455e9..2047ee43f 100644 --- a/asyncua/server/server.py +++ b/asyncua/server/server.py @@ -32,6 +32,7 @@ from .binary_server_asyncio import BinaryServer from .event_generator import EventGenerator from .internal_server import InternalServer +from .reverse_connect import ReverseConnectConfig, ReverseConnectManager _logger = logging.getLogger(__name__) @@ -96,6 +97,8 @@ def __init__(self, iserver: InternalServer = None, user_manager=None): self.iserver: InternalServer = iserver if iserver else InternalServer(user_manager=user_manager) self.bserver: BinaryServer | None = None self.socket_address: tuple[str, int] | None = None + self.reverse_connect: ReverseConnectConfig | None = None + self._reverse_connect_manager: ReverseConnectManager | None = None self._discovery_clients = {} self._discovery_period = 60 self._discovery_handle = None @@ -504,6 +507,18 @@ async def start(self): else: _logger.debug("%s server started", self) + if self.reverse_connect is not None: + self._reverse_connect_manager = ReverseConnectManager( + iserver=self.iserver, + policies=self._policies, + closing_tasks=self.bserver.closing_tasks, + limits=self.limits, + server_uri=self._application_uri, + server_endpoint_url=self.endpoint.geturl(), + config=self.reverse_connect, + ) + await self._reverse_connect_manager.start() + def _get_bind_socket_info(self) -> tuple[str | None, int | None]: if self.socket_address is not None: return self.socket_address @@ -519,6 +534,8 @@ async def stop(self): if self._discovery_clients: await asyncio.gather(*[client.disconnect() for client in self._discovery_clients.values()]) + if self._reverse_connect_manager is not None: + await self._reverse_connect_manager.stop() await self.bserver.stop() await self.iserver.stop() if self._pubsub is not None: diff --git a/asyncua/ua/uaprotocol_hand.py b/asyncua/ua/uaprotocol_hand.py index ff2f99800..e16b880d9 100644 --- a/asyncua/ua/uaprotocol_hand.py +++ b/asyncua/ua/uaprotocol_hand.py @@ -22,11 +22,24 @@ class Hello: EndpointUrl: uatypes.String = "" +@dataclass(slots=True) +class ReverseHello: + """ + OPC UA ReverseHello message (Part 6 §7.1.3). + Sent by the server immediately after establishing a reverse TCP connection to a client. + The client uses ServerUri / EndpointUrl to decide whether to accept and open a Secure Channel. + """ + + ServerUri: uatypes.String = "" + EndpointUrl: uatypes.String = "" + + class MessageType: Invalid: bytes = b"INV" # FIXME: check value Hello: bytes = b"HEL" Acknowledge: bytes = b"ACK" Error: bytes = b"ERR" + ReverseHello: bytes = b"RHE" SecureOpen: bytes = b"OPN" SecureClose: bytes = b"CLO" SecureMessage: bytes = b"MSG" diff --git a/tests/test_reverse_connect.py b/tests/test_reverse_connect.py new file mode 100644 index 000000000..a6753f925 --- /dev/null +++ b/tests/test_reverse_connect.py @@ -0,0 +1,335 @@ +""" +Tests for OPC UA Reverse Connect (server-side outgoing connections). + +Covers: + - UA protocol type additions (ReverseHello dataclass, MessageType.ReverseHello) + - Binary serialisation / deserialisation of ReverseHello + - OPCUAReverseProtocol sends the correct ReverseHello frame + - ReverseConnectManager dials configured URIs and retries on failure + - Manager stops cleanly under cancellation +""" +from __future__ import annotations + +import asyncio +import socket +import struct +from contextlib import closing + +from asyncua import ua +from asyncua.common.connection import TransportLimits +from asyncua.common.utils import Buffer +from asyncua.server.internal_server import InternalServer +from asyncua.server.reverse_connect import ( + OPCUAReverseProtocol, + ReverseConnectClientEntry, + ReverseConnectConfig, + ReverseConnectManager, +) +from asyncua.ua.ua_binary import header_from_binary, struct_from_binary, uatcp_to_binary + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + + +def find_free_port() -> int: + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + s.bind(("", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + +def make_limits() -> TransportLimits: + return TransportLimits( + max_recv_buffer=65535, + max_send_buffer=65535, + max_chunk_count=100, + max_message_size=10 * 1024 * 1024, + ) + + +# --------------------------------------------------------------------------- +# Unit tests - UA type layer +# --------------------------------------------------------------------------- + + +def test_message_type_reverse_hello(): + """MessageType.ReverseHello must equal the on-wire byte sequence 'RHE'.""" + assert ua.MessageType.ReverseHello == b"RHE" + + +def test_reverse_hello_dataclass(): + """ReverseHello dataclass has the expected fields with defaults.""" + rhel = ua.ReverseHello() + assert rhel.ServerUri == "" + assert rhel.EndpointUrl == "" + + rhel2 = ua.ReverseHello( + ServerUri="urn:test:server", + EndpointUrl="opc.tcp://127.0.0.1:4840", + ) + assert rhel2.ServerUri == "urn:test:server" + assert rhel2.EndpointUrl == "opc.tcp://127.0.0.1:4840" + + +def test_reverse_hello_serialise_deserialise(): + """ReverseHello round-trips through uatcp_to_binary / header_from_binary.""" + server_uri = "urn:opcua:testserver" + endpoint_url = "opc.tcp://localhost:4840/test" + + rhel = ua.ReverseHello(ServerUri=server_uri, EndpointUrl=endpoint_url) + raw = uatcp_to_binary(ua.MessageType.ReverseHello, rhel) + + # Check the header bytes + buf = Buffer(raw) + hdr = header_from_binary(buf) + assert hdr.MessageType == b"RHE" + assert hdr.ChunkType == b"F" + + # Decode body + decoded = struct_from_binary(ua.ReverseHello, buf) + assert decoded.ServerUri == server_uri + assert decoded.EndpointUrl == endpoint_url + + +# --------------------------------------------------------------------------- +# Unit tests - config dataclasses +# --------------------------------------------------------------------------- + + +def test_reverse_connect_config_defaults(): + cfg = ReverseConnectConfig() + assert cfg.clients == [] + assert cfg.connect_interval == 15_000 + assert cfg.connect_timeout == 30_000 + assert cfg.reject_timeout == 60_000 + + +def test_reverse_connect_client_entry_defaults(): + entry = ReverseConnectClientEntry(endpoint_url="opc.tcp://host:4840") + assert entry.enabled is True + assert entry.max_session_count == 0 + assert entry.timeout == 30_000 + + +# --------------------------------------------------------------------------- +# Integration: OPCUAReverseProtocol sends ReverseHello +# --------------------------------------------------------------------------- + + +async def _read_reverse_hello(reader: asyncio.StreamReader) -> ua.ReverseHello: + """Read one OPC UA message frame from *reader* and decode it as ReverseHello.""" + # Read fixed 8-byte header + header_bytes = await asyncio.wait_for(reader.readexactly(8), timeout=5.0) + msg_type, chunk_type, packet_size = struct.unpack("<3scI", header_bytes) + body_size = packet_size - 8 + body_bytes = await asyncio.wait_for(reader.readexactly(body_size), timeout=5.0) + + assert msg_type == b"RHE", f"Expected RHE, got {msg_type!r}" + assert chunk_type == b"F" + + buf = Buffer(body_bytes) + return struct_from_binary(ua.ReverseHello, buf) + + +async def test_protocol_sends_reverse_hello(): + """ + OPCUAReverseProtocol must send a correctly formatted ReverseHello immediately + after the TCP connection is established. + """ + port = find_free_port() + received: list[ua.ReverseHello] = [] + server_ready = asyncio.Event() + + async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + rhel = await _read_reverse_hello(reader) + received.append(rhel) + writer.close() + await writer.wait_closed() + + srv = await asyncio.start_server(handle_client, "127.0.0.1", port) + server_ready.set() + + async with srv: + # Create an InternalServer (minimal, without full init) + iserver = InternalServer() + limits = make_limits() + + close_ev = asyncio.Event() + + def factory(): + proto = OPCUAReverseProtocol( + iserver=iserver, + policies=[], + clients=[], + closing_tasks=[], + limits=limits, + server_uri="urn:test:server", + server_endpoint_url="opc.tcp://127.0.0.1:4840/testserver", + ) + proto.closed_event = close_ev + return proto + + loop = asyncio.get_running_loop() + _transport, _protocol = await loop.create_connection(factory, "127.0.0.1", port) + + # Wait for the server handler to finish reading + await asyncio.wait_for(close_ev.wait(), timeout=5.0) + + assert len(received) == 1 + assert received[0].ServerUri == "urn:test:server" + assert received[0].EndpointUrl == "opc.tcp://127.0.0.1:4840/testserver" + + +# --------------------------------------------------------------------------- +# Integration: ReverseConnectManager dials out and retries +# --------------------------------------------------------------------------- + + +async def test_manager_dials_and_sends_reverse_hello(): + """ + ReverseConnectManager must dial the configured URI and send ReverseHello. + """ + port = find_free_port() + received: list[ua.ReverseHello] = [] + first_connection_ev = asyncio.Event() + + async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + try: + rhel = await _read_reverse_hello(reader) + received.append(rhel) + finally: + first_connection_ev.set() + writer.close() + + srv = await asyncio.start_server(handle_client, "127.0.0.1", port) + async with srv: + iserver = InternalServer() + config = ReverseConnectConfig( + clients=[ReverseConnectClientEntry(endpoint_url=f"opc.tcp://127.0.0.1:{port}")], + connect_interval=2_000, + ) + manager = ReverseConnectManager( + iserver=iserver, + policies=[], + closing_tasks=[], + limits=make_limits(), + server_uri="urn:myserver", + server_endpoint_url="opc.tcp://127.0.0.1:4840", + config=config, + ) + await manager.start() + try: + await asyncio.wait_for(first_connection_ev.wait(), timeout=5.0) + finally: + await manager.stop() + + assert len(received) >= 1 + assert received[0].ServerUri == "urn:myserver" + + +async def test_manager_retries_on_refused(): + """ + When the target is not reachable, the manager should retry after reject_timeout + and eventually connect when the listener comes up. + """ + port = find_free_port() + connected_ev = asyncio.Event() + + async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + await _read_reverse_hello(reader) + connected_ev.set() + writer.close() + + iserver = InternalServer() + config = ReverseConnectConfig( + clients=[ReverseConnectClientEntry(endpoint_url=f"opc.tcp://127.0.0.1:{port}")], + connect_interval=200, # quick retry for tests + reject_timeout=200, + connect_timeout=1_000, + ) + manager = ReverseConnectManager( + iserver=iserver, + policies=[], + closing_tasks=[], + limits=make_limits(), + server_uri="urn:retry:server", + server_endpoint_url="opc.tcp://127.0.0.1:4840", + config=config, + ) + await manager.start() + + # Give manager a moment to fail a couple of times (port not yet open) + await asyncio.sleep(0.5) + + # Now open the listener - manager should connect on its next retry + srv = await asyncio.start_server(handle_client, "127.0.0.1", port) + async with srv: + try: + await asyncio.wait_for(connected_ev.wait(), timeout=5.0) + finally: + await manager.stop() + + assert connected_ev.is_set() + + +async def test_manager_disabled_entry(): + """Disabled entries must never cause a connection attempt.""" + port = find_free_port() + connection_count = 0 + + async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + nonlocal connection_count + connection_count += 1 + writer.close() + + srv = await asyncio.start_server(handle_client, "127.0.0.1", port) + async with srv: + iserver = InternalServer() + config = ReverseConnectConfig( + clients=[ + ReverseConnectClientEntry( + endpoint_url=f"opc.tcp://127.0.0.1:{port}", + enabled=False, + ) + ], + ) + manager = ReverseConnectManager( + iserver=iserver, + policies=[], + closing_tasks=[], + limits=make_limits(), + server_uri="urn:disabled:server", + server_endpoint_url="opc.tcp://127.0.0.1:4840", + config=config, + ) + await manager.start() + await asyncio.sleep(0.3) + await manager.stop() + + assert connection_count == 0, "Disabled entry should not have connected" + + +async def test_manager_stop_cancels_tasks(): + """ReverseConnectManager.stop() must complete without hanging.""" + port = find_free_port() # nothing listening on this port + + iserver = InternalServer() + config = ReverseConnectConfig( + clients=[ReverseConnectClientEntry(endpoint_url=f"opc.tcp://127.0.0.1:{port}")], + connect_interval=60_000, + reject_timeout=60_000, + connect_timeout=10_000, + ) + manager = ReverseConnectManager( + iserver=iserver, + policies=[], + closing_tasks=[], + limits=make_limits(), + server_uri="urn:stop:server", + server_endpoint_url="opc.tcp://127.0.0.1:4840", + config=config, + ) + await manager.start() + # Should return quickly even though connect_interval is huge + await asyncio.wait_for(manager.stop(), timeout=3.0)