diff --git a/s7/_s7commplus_async_client.py b/s7/_s7commplus_async_client.py index 3c6cf1cf..19eaaab6 100644 --- a/s7/_s7commplus_async_client.py +++ b/s7/_s7commplus_async_client.py @@ -40,12 +40,13 @@ _parse_write_response, _build_area_read_payload, _build_area_write_payload, + _build_symbolic_read_payload, _build_explore_payload, _build_invoke_payload, _build_explore_request, _parse_explore_datablocks, - _parse_explore_fields, ) +from . import typeinfo from .protocol import Ids logger = logging.getLogger(__name__) @@ -482,39 +483,135 @@ async def set_plc_operating_state(self, state: int) -> None: payload = _build_invoke_payload(state) await self._send_request(FunctionCode.INVOKE, payload) + async def read_symbolic(self, access_area: int, lids: list[int], symbol_crc: int = 0) -> bytes: + """Read a variable using S7CommPlus symbolic (LID-based) access. + + .. warning:: This method is **experimental** and may change. + """ + payload = _build_symbolic_read_payload(access_area, lids, symbol_crc) + response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload) + results = _parse_read_response(response) + if not results or results[0] is None: + raise RuntimeError("Symbolic read failed") + return results[0] + async def list_datablocks(self) -> list[dict[str, Any]]: """List all datablocks on the PLC via EXPLORE. .. warning:: This method is **experimental** and may change. """ payload = _build_explore_request(Ids.NATIVE_THE_PLC_PROGRAM_RID, [Ids.OBJECT_VARIABLE_TYPE_NAME, Ids.BLOCK_BLOCK_NUMBER]) - response = await self._send_request(FunctionCode.EXPLORE, payload) + response = await self._send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True) return _parse_explore_datablocks(response) async def browse(self) -> list[dict[str, Any]]: - """Browse the PLC symbol table via EXPLORE. + """Browse the full per-tag symbol tree via EXPLORE + the type-info container. .. warning:: This method is **experimental** and may change. + + Returns a flat list of variable dicts with keys ``name``, ``access_sequence`` + (the dot-separated hex LID path usable with :meth:`read_tag`), ``data_type``, + and the optimized/non-optimized byte+bit offsets. Steps: enumerate DBs, resolve + each DB's type-info RID via a LID=1 read, explore the OMS type-info container, + then recombine into the symbol tree. + + Returns: + List of variable info dicts. """ - dbs = await self.list_datablocks() - variables: list[dict[str, Any]] = [] - for db_info in dbs: - db_rid = db_info.get("rid", 0) - if db_rid == 0: + # Phase A: enumerate data blocks. Phase B/C: resolve each DB's type-info RID + # (a LID=1 read — needed for instance DBs whose TI is not their own RID) and seed + # a root node per DB. + root_nodes: list[typeinfo.Node] = [] + for db_info in await self.list_datablocks(): + if db_info.get("number", 0) <= 0 or db_info.get("rid", 0) == 0: continue - payload = _build_explore_request(db_rid, [Ids.OBJECT_VARIABLE_TYPE_NAME]) + ti_rid = await self._read_typeinfo_rid(db_info["rid"]) + if ti_rid == 0: + continue # load-memory-only DB, skip + root_nodes.append( + typeinfo.Node( + node_type=typeinfo.NodeType.ROOT, name=db_info["name"], access_id=db_info["rid"], relation_id=ti_rid + ) + ) + + # Add the native process areas with their known synthetic type-info ids. + for name, access_rid, ti_rid in ( + ("IArea", Ids.NATIVE_THE_I_AREA_RID, 0x90010000), + ("QArea", Ids.NATIVE_THE_Q_AREA_RID, 0x90020000), + ("MArea", Ids.NATIVE_THE_M_AREA_RID, 0x90030000), + ("S7Timers", Ids.NATIVE_THE_S7_TIMERS_RID, 0x90050000), + ("S7Counters", Ids.NATIVE_THE_S7_COUNTERS_RID, 0x90060000), + ): + root_nodes.append( + typeinfo.Node(node_type=typeinfo.NodeType.ROOT, name=name, access_id=access_rid, relation_id=ti_rid) + ) + + # Phase D: explore the OMS type-info container (a large, multi-fragment PDU). + type_objects = await self._explore_type_info_container() + + # Phase E: recombine type-info with the DB/area nodes and flatten. + typeinfo.build_tree(root_nodes, type_objects) + variables: list[dict[str, Any]] = [] + for v in typeinfo.build_flat_list(root_nodes): try: - response = await self._send_request(FunctionCode.EXPLORE, payload) - fields = _parse_explore_fields(response, db_info["number"], db_info["name"]) - variables.extend(fields) - except Exception: - continue + data_type = typeinfo.Softdatatype(v.softdatatype).name + except ValueError: + data_type = str(v.softdatatype) + variables.append( + { + "name": v.name, + "access_sequence": v.access_sequence, + "data_type": data_type, + "opt_address": v.opt_address, + "opt_bitoffset": v.opt_bitoffset, + "nonopt_address": v.nonopt_address, + "nonopt_bitoffset": v.nonopt_bitoffset, + } + ) return variables + async def _read_typeinfo_rid(self, db_rid: int) -> int: + """Read LID=1 of a DB to get its type-info RID (0 if the DB has no readable value).""" + try: + raw = await self.read_symbolic(db_rid, [1], 0) + except Exception: + return 0 + return struct.unpack(">I", raw[:4])[0] if len(raw) >= 4 else 0 + + async def _explore_type_info_container(self) -> list["typeinfo.PObject"]: + """EXPLORE the OMS type-info container and return its per-type objects.""" + payload = _build_explore_request(Ids.OBJECT_OMS_TYPE_INFO_CONTAINER, []) + response = await self._send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True) + return typeinfo.extract_type_info_objects(response) + # -- Internal methods -- - async def _send_request(self, function_code: int, payload: bytes) -> bytes: - """Send an S7CommPlus request and receive the response.""" + # Sanity caps for fragment reassembly — generous vs. any real PLC EXPLORE response, + # but bounded so a malformed/adversarial stream can't drive unbounded allocation. + _MAX_REASSEMBLED_BYTES = 16 * 1024 * 1024 + _MAX_REASSEMBLED_FRAGMENTS = 4096 + + async def _send_request( + self, + function_code: int, + payload: bytes, + integrity_tail: int = 4, + reassemble: bool = False, + ) -> bytes: + """Send an S7CommPlus request and receive the response. + + Args: + function_code: S7CommPlus function code. + payload: Request payload (after the 14-byte request header). + integrity_tail: number of trailing payload bytes the V2 IntegrityId is + inserted *before* — 4 for GetMultiVariables/SetMultiVariables (a + trailing UInt32), 5 for Explore (a trailing UInt32 + filler byte). + reassemble: when True, concatenate a multi-fragment response (e.g. Explore) + before returning its payload. + + Returns: + Response payload (after the 10-byte response header). + """ async with self._lock: if not self._connected or self._writer is None or self._reader is None: raise RuntimeError("Not connected") @@ -529,7 +626,8 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: 0x0000, seq_num, self._session_id, - 0x34 if function_code == FunctionCode.GET_MULTI_VARIABLES else 0x36, + # Transport flags: 0x34 for GetMultiVariables and Explore, 0x36 otherwise. + 0x34 if function_code in (FunctionCode.GET_MULTI_VARIABLES, FunctionCode.EXPLORE) else 0x36, ) integrity_id_bytes = b"" @@ -538,10 +636,10 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: integrity_id = self._integrity_id_read if is_read else self._integrity_id_write integrity_id_bytes = encode_uint32_vlq(integrity_id) - # For V2+ the IntegrityId is spliced in just before the payload's trailing - # UInt32 (i.e. at the end), not right after the header. - if integrity_id_bytes and len(payload) >= 4: - request = request_header + payload[:-4] + integrity_id_bytes + payload[-4:] + # The IntegrityId is spliced in just before the payload's trailing fill bytes + # (integrity_tail of them), not right after the header. + if integrity_id_bytes and len(payload) >= integrity_tail: + request = request_header + payload[:-integrity_tail] + integrity_id_bytes + payload[-integrity_tail:] else: request = request_header + integrity_id_bytes + payload @@ -555,6 +653,13 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: else: self._integrity_id_write = (self._integrity_id_write + 1) & 0xFFFFFFFF + # Large responses (e.g. Explore) are split across several S7CommPlus PDUs. + if reassemble: + data = await self._recv_reassembled_payload() + if len(data) < 10: + raise RuntimeError("Response too short") + return bytes(data[10:]) + response_data = await self._recv_cotp_dt() version, data_length, consumed = decode_header(response_data) @@ -568,6 +673,48 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes: # IntegrityId travels at the END of the payload and is ignored by the parsers. return response[10:] + async def _recv_reassembled_payload(self) -> bytes: + """Receive a possibly-fragmented S7CommPlus response, returning its data section. + + A large response is split into several S7CommPlus PDUs. Each fragment is + ``0x72 `` with no trailer; only the final fragment is + followed by the ``0x72 0x0000`` trailer. We concatenate the data parts + of every fragment until the trailer is seen. Works for single-PDU responses + too (one fragment immediately followed by the trailer). + """ + buf = bytearray() + + async def ensure(n: int) -> None: + while len(buf) < n: + chunk = await self._recv_cotp_dt() + if not chunk: + raise RuntimeError("Connection closed during response reassembly") + buf.extend(chunk) + + data = bytearray() + fragments = 0 + while True: + await ensure(4) + if buf[0] != 0x72: + raise RuntimeError("Expected S7CommPlus fragment header (0x72)") + frag_len = (buf[2] << 8) | buf[3] + del buf[:4] + if frag_len == 0: + break # standalone trailer (defensive) + await ensure(frag_len) + data.extend(buf[:frag_len]) + del buf[:frag_len] + fragments += 1 + if fragments > self._MAX_REASSEMBLED_FRAGMENTS or len(data) > self._MAX_REASSEMBLED_BYTES: + raise RuntimeError(f"Reassembled response exceeds limits ({len(data)} bytes, {fragments} fragments)") + # The next 4 bytes are either the trailer (0x72 ver 0x0000) or the next + # fragment's header (0x72 ver len>0). + await ensure(4) + if buf[0] == 0x72 and buf[2] == 0 and buf[3] == 0: + del buf[:4] # consume trailer — last fragment + break + return bytes(data) + async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None: """Perform COTP Connection Request / Confirm handshake.""" if self._writer is None or self._reader is None: diff --git a/s7/_s7commplus_client.py b/s7/_s7commplus_client.py index 0246eb17..3be77384 100644 --- a/s7/_s7commplus_client.py +++ b/s7/_s7commplus_client.py @@ -11,6 +11,7 @@ import struct from typing import Any, Optional +from . import typeinfo from .connection import S7CommPlusConnection from .protocol import FunctionCode, Ids, ElementID, DataType, ObjectId from .vlq import encode_uint32_vlq, decode_uint32_vlq, decode_uint64_vlq @@ -370,14 +371,15 @@ def list_datablocks(self) -> list[dict[str, Any]]: return _parse_explore_datablocks(response) def browse(self) -> list[dict[str, Any]]: - """Browse the PLC symbol table via EXPLORE. + """Browse the full per-tag symbol tree via EXPLORE + the type-info container. .. warning:: This method is **experimental** and may change. - Returns a flat list of variable info dicts with keys: - ``name``, ``db_number``, ``byte_offset``, ``data_type``, ``bit_size``. - Results can be converted to :class:`~snap7.tags.Tag` objects for use - with :meth:`~s7.client.Client.read_tag`. + Returns a flat list of variable dicts with keys ``name``, ``access_sequence`` + (the dot-separated hex LID path usable with :meth:`read_tag`), ``data_type``, + and the optimized/non-optimized byte+bit offsets. Steps: enumerate DBs, resolve + each DB's type-info RID via a LID=1 read, explore the OMS type-info container, + then recombine into the symbol tree. Returns: List of variable info dicts. @@ -385,26 +387,72 @@ def browse(self) -> list[dict[str, Any]]: if self._connection is None: raise RuntimeError("Not connected") - # Step 1: list datablocks - dbs = self.list_datablocks() - - # Step 2: for each DB, explore its type info to get field layout - variables: list[dict[str, Any]] = [] - for db_info in dbs: - db_rid = db_info.get("rid", 0) - if db_rid == 0: + # Phase A: enumerate data blocks. Phase B/C: resolve each DB's type-info RID + # (a LID=1 read — needed for instance DBs whose TI is not their own RID) and seed + # a root node per DB. + root_nodes: list[typeinfo.Node] = [] + for db_info in self.list_datablocks(): + if db_info.get("number", 0) <= 0 or db_info.get("rid", 0) == 0: continue - payload = _build_explore_request(db_rid, [Ids.OBJECT_VARIABLE_TYPE_NAME]) + ti_rid = self._read_typeinfo_rid(db_info["rid"]) + if ti_rid == 0: + continue # load-memory-only DB, skip + root_nodes.append( + typeinfo.Node( + node_type=typeinfo.NodeType.ROOT, name=db_info["name"], access_id=db_info["rid"], relation_id=ti_rid + ) + ) + + # Add the native process areas with their known synthetic type-info ids. + for name, access_rid, ti_rid in ( + ("IArea", Ids.NATIVE_THE_I_AREA_RID, 0x90010000), + ("QArea", Ids.NATIVE_THE_Q_AREA_RID, 0x90020000), + ("MArea", Ids.NATIVE_THE_M_AREA_RID, 0x90030000), + ("S7Timers", Ids.NATIVE_THE_S7_TIMERS_RID, 0x90050000), + ("S7Counters", Ids.NATIVE_THE_S7_COUNTERS_RID, 0x90060000), + ): + root_nodes.append( + typeinfo.Node(node_type=typeinfo.NodeType.ROOT, name=name, access_id=access_rid, relation_id=ti_rid) + ) + + # Phase D: explore the OMS type-info container (a large, multi-fragment PDU). + type_objects = self._explore_type_info_container() + + # Phase E: recombine type-info with the DB/area nodes and flatten. + typeinfo.build_tree(root_nodes, type_objects) + variables: list[dict[str, Any]] = [] + for v in typeinfo.build_flat_list(root_nodes): try: - response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True) - fields = _parse_explore_fields(response, db_info["number"], db_info["name"]) - variables.extend(fields) - except Exception: - logger.debug(f"Failed to explore DB {db_info['name']} (rid={db_rid:#x})") - continue - + data_type = typeinfo.Softdatatype(v.softdatatype).name + except ValueError: + data_type = str(v.softdatatype) + variables.append( + { + "name": v.name, + "access_sequence": v.access_sequence, + "data_type": data_type, + "opt_address": v.opt_address, + "opt_bitoffset": v.opt_bitoffset, + "nonopt_address": v.nonopt_address, + "nonopt_bitoffset": v.nonopt_bitoffset, + } + ) return variables + def _read_typeinfo_rid(self, db_rid: int) -> int: + """Read LID=1 of a DB to get its type-info RID (0 if the DB has no readable value).""" + try: + raw = self.read_symbolic(db_rid, [1], 0) + except Exception: + return 0 + return struct.unpack(">I", raw[:4])[0] if len(raw) >= 4 else 0 + + def _explore_type_info_container(self) -> list["typeinfo.PObject"]: + """EXPLORE the OMS type-info container and return its per-type objects.""" + payload = _build_explore_request(Ids.OBJECT_OMS_TYPE_INFO_CONTAINER, []) + response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True) + return typeinfo.extract_type_info_objects(response) + def create_subscription(self, items: list[tuple[int, int, int]], cycle_ms: int = 0) -> int: """Create a data change subscription. @@ -751,7 +799,7 @@ def _build_explore_request(explore_id: int, attribute_ids: list[int]) -> bytes: payload += struct.pack(">I", explore_id) # ExploreId (fixed UInt32, not VLQ) payload += encode_uint32_vlq(0) # ExploreRequestId (0 = none) payload += bytes([1]) # ExploreChildsRecursive - payload += bytes([1]) # unknown (the C# reference driver always sends 1 here) + payload += bytes([1]) # unknown flag — the protocol always carries 1 here payload += bytes([0]) # ExploreParents payload += bytes([0]) # number of following filter objects (none) payload += encode_uint32_vlq(len(attribute_ids)) # AddressList count @@ -770,7 +818,7 @@ def _parse_explore_datablocks(response: bytes) -> list[dict[str, Any]]: stack of ``[relation_id, class_id, name]``. A DataBlock is an object whose ClassId is ``DB_CLASS_RID`` and whose RelationId is a DB area id (``relid >> 16 == 0x8A0E``); its number is ``relid & 0xFFFF`` and its name comes from the ObjectVariableTypeName - attribute. Mirrors the C# reference driver's Browse step 1. + attribute (the first step of the symbol-tree browse). Returns: List of dicts: ``{"name": str, "number": int, "rid": int}`` @@ -844,106 +892,6 @@ def _parse_explore_datablocks(response: bytes) -> list[dict[str, Any]]: return datablocks -def _parse_explore_fields(response: bytes, db_number: int, db_name: str) -> list[dict[str, Any]]: - """Parse an EXPLORE response for a single DB to extract field layout. - - Returns: - List of dicts with keys: - ``name``, ``db_number``, ``byte_offset``, ``data_type``, ``lid``, - ``symbol_crc``. ``lid`` and ``symbol_crc`` enable symbolic access - for optimized DBs. - """ - from .vlq import decode_uint32_vlq as _vlq32 - - fields: list[dict[str, Any]] = [] - offset = 0 - field_name = "" - byte_offset = 0 - field_lid = 0 - field_crc = 0 - - # Skip return code VLQ at start of response - if offset < len(response): - _, consumed = _vlq32(response, offset) - offset += consumed - - while offset < len(response): - tag = response[offset] - offset += 1 - - if tag == 0xA1: # START_OF_OBJECT - if offset + 4 > len(response): - break - # The RID bytes serve as the LID for symbolic access - field_lid = struct.unpack(">I", response[offset : offset + 4])[0] - offset += 4 - for _ in range(3): - if offset >= len(response): - break - _, consumed = _vlq32(response, offset) - offset += consumed - field_name = "" - byte_offset = 0 - field_crc = 0 - - elif tag == 0xA2: # TERMINATING_OBJECT - if field_name: - fields.append( - { - "name": f"{db_name}.{field_name}", - "db_number": db_number, - "byte_offset": byte_offset, - "data_type": "BYTE", # default; refined by type info - "lid": field_lid, - "symbol_crc": field_crc, - } - ) - - elif tag == 0xA3: # ATTRIBUTE - if offset >= len(response): - break - attr_id, consumed = _vlq32(response, offset) - offset += consumed - if offset + 2 > len(response): - break - flags = response[offset] - datatype = response[offset + 1] - offset += 2 - - if attr_id == Ids.OBJECT_VARIABLE_TYPE_NAME and datatype == 0x13: - if offset >= len(response): - break - str_len, consumed = _vlq32(response, offset) - offset += consumed - if offset + str_len <= len(response): - try: - field_name = response[offset : offset + str_len].decode("utf-16-be", errors="replace") - except Exception: - field_name = "" - offset += str_len - continue - - # Skip attribute value - if flags & 0x10: - if offset >= len(response): - break - count, consumed = _vlq32(response, offset) - offset += consumed - offset += count - else: - if offset >= len(response): - break - _, consumed = _vlq32(response, offset) - offset += consumed - - elif tag == 0x00: - continue - else: - continue - - return fields - - # --------------------------------------------------------------------------- # Subscription helpers (experimental) # --------------------------------------------------------------------------- diff --git a/s7/client.py b/s7/client.py index 5ebdb74b..e932d10a 100644 --- a/s7/client.py +++ b/s7/client.py @@ -289,12 +289,13 @@ def browse(self) -> list[dict[str, Any]]: .. warning:: This method is **experimental** and may change. - Returns a flat list of variable info dicts. Can be converted to - :class:`~snap7.tags.Tag` objects:: + Returns a flat list of variable info dicts, one per tag, with keys + ``name``, ``access_sequence`` (the dot-separated hex LID path), ``data_type`` + and the optimized/non-optimized byte+bit offsets:: - from snap7 import Tag variables = client.browse() - tags = {v["name"]: Tag(Area.DB, v["db_number"], v["byte_offset"], v["data_type"]) for v in variables} + for v in variables: + print(v["name"], v["access_sequence"], v["data_type"]) Requires S7CommPlus connection. """ diff --git a/s7/codec.py b/s7/codec.py index f29f0485..2ddde01e 100644 --- a/s7/codec.py +++ b/s7/codec.py @@ -261,10 +261,14 @@ def encode_typed_value(datatype: int, value: Any) -> bytes: return tag + struct.pack(">B", value) elif datatype == DataType.UINT or datatype == DataType.WORD: return tag + struct.pack(">H", value) - elif datatype == DataType.UDINT or datatype == DataType.DWORD: + elif datatype == DataType.UDINT: return tag + encode_uint32_vlq(value) - elif datatype == DataType.ULINT or datatype == DataType.LWORD: + elif datatype == DataType.DWORD: + return tag + struct.pack(">I", value) # fixed 4-byte, not VLQ + elif datatype == DataType.ULINT: return tag + encode_uint64_vlq(value) + elif datatype == DataType.LWORD: + return tag + struct.pack(">Q", value) # fixed 8-byte, not VLQ elif datatype == DataType.SINT: return tag + struct.pack(">b", value) elif datatype == DataType.INT: @@ -414,10 +418,13 @@ def decode_pvalue_to_bytes(data: bytes, offset: int) -> tuple[bytes, int]: return data[offset + consumed : offset + consumed + 1], consumed + 1 elif datatype in (DataType.UINT, DataType.WORD, DataType.INT): return data[offset + consumed : offset + consumed + 2], consumed + 2 - elif datatype in (DataType.UDINT, DataType.DWORD): + elif datatype == DataType.UDINT: val, c = decode_uint32_vlq(data, offset + consumed) consumed += c return struct.pack(">I", val), consumed + elif datatype == DataType.DWORD: + # ValueDWord is a fixed 4-byte big-endian value, not VLQ. + return data[offset + consumed : offset + consumed + 4], consumed + 4 elif datatype in (DataType.DINT,): # Signed VLQ from .vlq import decode_int32_vlq @@ -429,10 +436,13 @@ def decode_pvalue_to_bytes(data: bytes, offset: int) -> tuple[bytes, int]: return data[offset + consumed : offset + consumed + 4], consumed + 4 elif datatype == DataType.LREAL: return data[offset + consumed : offset + consumed + 8], consumed + 8 - elif datatype in (DataType.ULINT, DataType.LWORD): + elif datatype == DataType.ULINT: val, c = decode_uint64_vlq(data, offset + consumed) consumed += c return struct.pack(">Q", val), consumed + elif datatype == DataType.LWORD: + # ValueLWord is a fixed 8-byte big-endian value, not VLQ. + return data[offset + consumed : offset + consumed + 8], consumed + 8 elif datatype in (DataType.LINT,): from .vlq import decode_int64_vlq @@ -556,12 +566,16 @@ def skip_typed_value(data: bytes, offset: int, datatype: int, flags: int) -> int return offset + 1 elif datatype in (DataType.UINT, DataType.WORD, DataType.INT): return offset + 2 - elif datatype in (DataType.UDINT, DataType.DWORD, DataType.AID, DataType.DINT): + elif datatype in (DataType.UDINT, DataType.AID, DataType.DINT): _, consumed = decode_uint32_vlq(data, offset) return offset + consumed - elif datatype in (DataType.ULINT, DataType.LWORD, DataType.LINT): + elif datatype == DataType.DWORD: + return offset + 4 # fixed 4-byte, not VLQ + elif datatype in (DataType.ULINT, DataType.LINT): _, consumed = decode_uint64_vlq(data, offset) return offset + consumed + elif datatype == DataType.LWORD: + return offset + 8 # fixed 8-byte, not VLQ elif datatype == DataType.REAL: return offset + 4 elif datatype == DataType.LREAL: diff --git a/s7/typeinfo.py b/s7/typeinfo.py new file mode 100644 index 00000000..8d27e2ff --- /dev/null +++ b/s7/typeinfo.py @@ -0,0 +1,777 @@ +"""S7CommPlus type-info parsing and symbol-tree reconstruction. + +Decodes the type-information container returned by an S7-1500's EXPLORE response +and flattens it into a list of readable tags. The structures below follow the +S7CommPlus wire protocol (PVartypeList, POffsetInfoType, the PObject tree and the +VarnameList) as observed on an S7-1500 and documented publicly. + +The end product is a flat list of :class:`VarInfo` records, each carrying the +symbolic name, the access sequence (used to address the tag), the software +datatype and the optimized / non-optimized byte (and bit) offsets. +""" + +from __future__ import annotations + +import struct +from dataclasses import dataclass, field +from enum import IntEnum + +from .codec import decode_pvalue_to_bytes +from .protocol import DataType # noqa: F401 (re-exported convenience for callers) +from .vlq import decode_uint32_vlq + +# -- Element ID tags in the PObject stream -- + +START_OF_OBJECT = 0xA1 +TERMINATING_OBJECT = 0xA2 +ATTRIBUTE = 0xA3 +VARTYPE_LIST = 0xAB +VARNAME_LIST = 0xAC + +# Class id of the container object holding the per-type objects. +CLASS_OMS_TYPE_INFO_CONTAINER = 534 + +# Attribute id carrying a struct/UDT byte size (TI_TComSize), stored as big-endian u32. +TI_TCOM_SIZE = 1502 + + +# --------------------------------------------------------------------------- +# 1. Software datatypes +# --------------------------------------------------------------------------- + + +class Softdatatype(IntEnum): + """PLC "software datatype" ids (distinct from the PValue wire DataType).""" + + VOID = 0 + BOOL = 1 + BYTE = 2 + CHAR = 3 + WORD = 4 + INT = 5 + DWORD = 6 + DINT = 7 + REAL = 8 + DATE = 9 + TIMEOFDAY = 10 + TIME = 11 + S5TIME = 12 + DATEANDTIME = 14 + INTERNETTIME = 15 + ARRAY = 16 + STRUCT = 17 + ENDSTRUCT = 18 + STRING = 19 + POINTER = 20 + MULTIFB = 21 + ANY = 22 + BLOCKFB = 23 + BLOCKFC = 24 + BLOCKDB = 25 + BLOCKSDB = 26 + COUNTER = 28 + TIMER = 29 + BBOOL = 40 + LREAL = 48 + ULINT = 49 + LINT = 50 + LWORD = 51 + USINT = 52 + UINT = 53 + UDINT = 54 + SINT = 55 + WCHAR = 61 + WSTRING = 62 + VARIANT = 63 + LTIME = 64 + LTOD = 65 + LDT = 66 + DTL = 67 + REMOTE = 96 + AOMIDENT = 128 + + +# Leaf datatypes that browse() emits as readable tags. Containers and markers +# (VOID, ARRAY, STRUCT, ENDSTRUCT, MULTIFB, VARIANT, DTL, system ids) are not +# leaves — their members are emitted instead. +SUPPORTED_SOFTDATATYPES = frozenset( + { + Softdatatype.BOOL, + Softdatatype.BYTE, + Softdatatype.CHAR, + Softdatatype.WORD, + Softdatatype.INT, + Softdatatype.DWORD, + Softdatatype.DINT, + Softdatatype.REAL, + Softdatatype.DATE, + Softdatatype.TIMEOFDAY, + Softdatatype.TIME, + Softdatatype.S5TIME, + Softdatatype.DATEANDTIME, + Softdatatype.STRING, + Softdatatype.POINTER, + Softdatatype.ANY, + Softdatatype.BBOOL, + Softdatatype.LREAL, + Softdatatype.ULINT, + Softdatatype.LINT, + Softdatatype.LWORD, + Softdatatype.USINT, + Softdatatype.UINT, + Softdatatype.UDINT, + Softdatatype.SINT, + Softdatatype.WCHAR, + Softdatatype.WSTRING, + Softdatatype.LTIME, + Softdatatype.LTOD, + Softdatatype.LDT, + } +) + + +def is_softdatatype_supported(code: int) -> bool: + """True if ``code`` is a leaf datatype browse() emits as a readable tag.""" + return code in SUPPORTED_SOFTDATATYPES + + +_SIZE_1 = { + Softdatatype.BOOL, + Softdatatype.BYTE, + Softdatatype.CHAR, + Softdatatype.USINT, + Softdatatype.SINT, + Softdatatype.BBOOL, +} +_SIZE_2 = { + Softdatatype.WORD, + Softdatatype.INT, + Softdatatype.UINT, + Softdatatype.DATE, + Softdatatype.S5TIME, + Softdatatype.WCHAR, +} +_SIZE_4 = { + Softdatatype.DWORD, + Softdatatype.DINT, + Softdatatype.REAL, + Softdatatype.TIMEOFDAY, + Softdatatype.TIME, + Softdatatype.UDINT, +} +_SIZE_8 = { + Softdatatype.LREAL, + Softdatatype.ULINT, + Softdatatype.LINT, + Softdatatype.LWORD, + Softdatatype.DATEANDTIME, + Softdatatype.LTIME, + Softdatatype.LTOD, + Softdatatype.LDT, +} + + +def datatype_size(code: int, *, string_len: int = 0) -> int: + """Element byte stride for a software datatype. + + ``string_len`` is the array offset-info's ``unspecified1`` and is only used + for STRING / WSTRING (whose stride is ``string_len + 2``). + """ + if code in _SIZE_1: + return 1 + if code in _SIZE_2: + return 2 + if code in _SIZE_4: + return 4 + if code in _SIZE_8: + return 8 + if code == Softdatatype.DTL: + return 12 + if code == Softdatatype.POINTER: + return 6 + if code in (Softdatatype.ANY, Softdatatype.REMOTE): + return 10 + if code in (Softdatatype.STRING, Softdatatype.WSTRING): + return string_len + 2 + return 0 + + +# --------------------------------------------------------------------------- +# 3. POffsetInfoType +# --------------------------------------------------------------------------- + + +@dataclass +class OffsetInfo: + """Union of the address/dimension info attached to a vartype element.""" + + code: int = 0 + opt_addr: int = 0 + nonopt_addr: int = 0 + unspecified1: int = 0 + unspecified2: int = 0 + array_lower_bound: int = 0 + array_element_count: int = 0 + mdim_lower_bounds: list[int] = field(default_factory=lambda: [0] * 6) + mdim_element_count: list[int] = field(default_factory=lambda: [0] * 6) + nonopt_struct_size: int = 0 + opt_struct_size: int = 0 + relation_id: int = 0 + is_1dim: bool = False + is_mdim: bool = False + has_relation: bool = False + + +def parse_offset_info(data: bytes, offset: int, offsetinfotype: int) -> tuple[OffsetInfo, int]: + """Parse a POffsetInfoType selected by ``offsetinfotype`` (0..15).""" + oi = OffsetInfo(code=offsetinfotype) + + if offsetinfotype in (1, 8): # Std + a, b = struct.unpack_from(" int: + """The 4-bit POffsetInfoType selector held in AttributeFlags bits 12..15.""" + return (self.attribute_flags >> 12) & 0x0F + + @property + def attribute_bitoffset(self) -> int: + """Optimized bit offset carried in the low 3 bits of AttributeFlags.""" + return self.attribute_flags & 0x07 + + @property + def nonopt_bitoffset(self) -> int: + return (self.bitoffsetinfo_flags & 0x70) >> 4 + + @property + def opt_bitoffset(self) -> int: + return self.bitoffsetinfo_flags & 0x07 + + @property + def classic(self) -> bool: + return bool(self.bitoffsetinfo_flags & 0x08) + + +def parse_vartype_element(data: bytes, offset: int) -> tuple[VartypeListElement, int]: + """Parse one VartypeListElement and its embedded POffsetInfoType.""" + lid, symbol_crc = struct.unpack_from("H", data, offset) + offset += 2 + bitoffsetinfo_flags = data[offset] + offset += 1 + + el = VartypeListElement( + lid=lid, + symbol_crc=symbol_crc, + softdatatype=softdatatype, + attribute_flags=attribute_flags, + bitoffsetinfo_flags=bitoffsetinfo_flags, + ) + el.offset_info, offset = parse_offset_info(data, offset, el.offsetinfotype) + return el, offset + + +def parse_vartype_list(data: bytes, offset: int) -> tuple[list[VartypeListElement], int]: + """Parse a PVartypeList. + + Framing: a sequence of one or more blocks, terminated by a zero-length block. + Each block opens with a BE-u16 block length (``block_end = offset_after_len + + block_length``). Only the very first block carries a leading LE-u32 FirstId + (a starting index, counted inside that block's length) before its elements. + Elements are read until ``offset >= block_end`` (at least one per block). A + following BE-u16 length > 0 is a further block of elements (no FirstId); a + length of 0 terminates the list (its 2 bytes are consumed). + + Real PLCs split a long element list across multiple blocks, so the loop is + mandatory — treating it as a single block plus a 2-byte terminator misaligns + the cursor and corrupts the subsequent VarnameList parse. + """ + elements: list[VartypeListElement] = [] + first_block = True + while True: + (block_len,) = struct.unpack_from(">H", data, offset) + offset += 2 + if block_len == 0: + # Zero-length block terminates the list. + break + block_end = offset + block_len + if first_block: + offset += 4 # leading LE-u32 FirstId — a starting index, not a count + first_block = False + # At least one element per block; read until the block is consumed. + while offset < block_end: + el, offset = parse_vartype_element(data, offset) + elements.append(el) + return elements, offset + + +# --------------------------------------------------------------------------- +# 5/6. PObject tree & VarnameList +# --------------------------------------------------------------------------- + + +def parse_varname_list(data: bytes, offset: int) -> tuple[list[str], int]: + """Parse a VarnameList: BE-u16 block lengths of name entries, ending at a zero block.""" + names: list[str] = [] + while True: + (block_len,) = struct.unpack_from(">H", data, offset) + offset += 2 + if block_len == 0: + break + end = offset + block_len + while offset < end: + name_len = data[offset] + offset += 1 + raw = data[offset : offset + name_len] + offset += name_len + offset += 1 # trailing zero byte + names.append(raw.decode("utf-8", errors="replace")) + return names, offset + + +@dataclass +class PObject: + """A node of the S7CommPlus object tree (one type / container / nested object).""" + + relation_id: int = 0 + class_id: int = 0 + attributes: dict[int, bytes] = field(default_factory=dict) + vartype_list: list[VartypeListElement] = field(default_factory=list) + varname_list: list[str] = field(default_factory=list) + objects: list["PObject"] = field(default_factory=list) + + +def parse_object(data: bytes, offset: int) -> tuple[PObject, int]: + """Parse a single PObject starting at a 0xA1 tag, up to its 0xA2 terminator.""" + assert data[offset] == START_OF_OBJECT + offset += 1 + (relation_id,) = struct.unpack_from(">I", data, offset) + offset += 4 + class_id, consumed = decode_uint32_vlq(data, offset) + offset += consumed + for _ in range(2): # ClassFlags, AttributeId + _, consumed = decode_uint32_vlq(data, offset) + offset += consumed + + obj = PObject(relation_id=relation_id, class_id=class_id) + + while offset < len(data): + tag = data[offset] + if tag == TERMINATING_OBJECT: + offset += 1 + break + if tag == ATTRIBUTE: + offset += 1 + attr_id, consumed = decode_uint32_vlq(data, offset) + offset += consumed + value, consumed = decode_pvalue_to_bytes(data, offset) + offset += consumed + obj.attributes[attr_id] = value + elif tag == VARTYPE_LIST: + offset += 1 + obj.vartype_list, offset = parse_vartype_list(data, offset) + elif tag == VARNAME_LIST: + offset += 1 + obj.varname_list, offset = parse_varname_list(data, offset) + elif tag == START_OF_OBJECT: + child, offset = parse_object(data, offset) + obj.objects.append(child) + else: + offset += 1 # unknown tag — skip defensively + return obj, offset + + +def parse_object_list(data: bytes, offset: int) -> tuple[list[PObject], int]: + """Parse a sequence of sibling PObjects (consecutive 0xA1 blocks).""" + objects: list[PObject] = [] + while offset < len(data) and data[offset] == START_OF_OBJECT: + obj, offset = parse_object(data, offset) + objects.append(obj) + return objects, offset + + +def _find_container(objects: list[PObject], class_id: int) -> PObject | None: + for obj in objects: + if obj.class_id == class_id: + return obj + found = _find_container(obj.objects, class_id) + if found is not None: + return found + return None + + +def extract_type_info_objects(response: bytes) -> list[PObject]: + """Return the per-type objects from an EXPLORE(type-info) response. + + Skips the leading ReturnValue VLQ and any preamble, parses the object stream, + and returns the children of the type-info container object (or ``[]``). + """ + _return_value, consumed = decode_uint32_vlq(response, 0) + offset = consumed + while offset < len(response) and response[offset] != START_OF_OBJECT: + offset += 1 + if offset >= len(response): + return [] + objects, _ = parse_object_list(response, offset) + container = _find_container(objects, CLASS_OMS_TYPE_INFO_CONTAINER) + return container.objects if container is not None else [] + + +# --------------------------------------------------------------------------- +# 7. Tree model & builder +# --------------------------------------------------------------------------- + + +class NodeType(IntEnum): + UNDEFINED = 0 + ROOT = 1 + VAR = 2 + ARRAY = 3 + STRUCT_ARRAY = 4 + + +@dataclass +class Node: + node_type: NodeType = NodeType.UNDEFINED + name: str = "" + access_id: int = 0 + softdatatype: int = 0 + relation_id: int = 0 + vte: VartypeListElement | None = None + array_adr_offset_opt: int = 0 + array_adr_offset_nonopt: int = 0 + children: list["Node"] = field(default_factory=list) + + +@dataclass +class VarInfo: + """A flattened, readable tag.""" + + name: str = "" + access_sequence: str = "" + softdatatype: int = 0 + opt_address: int = 0 + opt_bitoffset: int = 0 + nonopt_address: int = 0 + nonopt_bitoffset: int = 0 + + +def _tcom_size(obj: PObject | None) -> int: + if obj is None: + return 0 + raw = obj.attributes.get(TI_TCOM_SIZE) + if not raw or len(raw) < 4: + # Stored big-endian u32; pad/parse defensively. + return int.from_bytes(raw, "big") if raw else 0 + return struct.unpack_from(">I", raw, 0)[0] + + +def _find_type_object(objects: list[PObject], relation_id: int) -> PObject | None: + for obj in objects: + if obj.relation_id == relation_id: + return obj + return None + + +def build_tree(root_nodes: list[Node], type_objects: list[PObject]) -> None: + """Expand each ROOT node against the matching type object (in place).""" + for node in root_nodes: + if node.node_type != NodeType.ROOT: + continue + obj = _find_type_object(type_objects, node.relation_id) + if obj is not None: + _add_subnodes(node, obj, type_objects) + + +def _add_subnodes(node: Node, obj: PObject, objects: list[PObject]) -> None: + for i, vte in enumerate(obj.vartype_list): + name = obj.varname_list[i] if i < len(obj.varname_list) else "" + subnode = Node( + node_type=NodeType.UNDEFINED, + name=name, + access_id=vte.lid, + softdatatype=vte.softdatatype, + vte=vte, + ) + node.children.append(subnode) + oi = vte.offset_info + + if oi.is_1dim: + for elem in range(oi.array_element_count): + label = f"[{elem + oi.array_lower_bound}]" + if oi.has_relation: + struct_type = _find_type_object(objects, oi.relation_id) + stride = _tcom_size(struct_type) + arr_node = Node( + node_type=NodeType.STRUCT_ARRAY, + name=label, + access_id=elem, + softdatatype=vte.softdatatype, + relation_id=oi.relation_id, + vte=vte, + array_adr_offset_opt=elem * stride, + array_adr_offset_nonopt=elem * stride, + ) + subnode.children.append(arr_node) + if struct_type is not None: + _add_subnodes(arr_node, struct_type, objects) + else: + stride = datatype_size(vte.softdatatype, string_len=oi.unspecified1) + arr_node = Node( + node_type=NodeType.ARRAY, + name=label, + access_id=elem, + softdatatype=vte.softdatatype, + vte=vte, + array_adr_offset_opt=elem * stride, + array_adr_offset_nonopt=elem * stride, + ) + subnode.children.append(arr_node) + + elif oi.is_mdim: + _add_mdim_subnodes(subnode, vte, oi, objects) + + elif oi.has_relation: + struct_type = _find_type_object(objects, oi.relation_id) + if struct_type is not None: + _add_subnodes(subnode, struct_type, objects) + # else: scalar leaf, no children. + + +def _add_mdim_subnodes(subnode: Node, vte: VartypeListElement, oi: OffsetInfo, objects: list[PObject]) -> None: + counts = oi.mdim_element_count + lowers = oi.mdim_lower_bounds + actdimensions = sum(1 for c in counts if c > 0) + struct_type = _find_type_object(objects, oi.relation_id) if oi.has_relation else None + stride = _tcom_size(struct_type) if oi.has_relation else datatype_size(vte.softdatatype, string_len=oi.unspecified1) + + xx = [0] * 6 + arr_id = 0 + n = 1 + while n <= oi.array_element_count: + indices = (str(xx[j] + lowers[j]) for j in range(actdimensions - 1, -1, -1)) + label = "[" + ",".join(indices) + "]" + elem_off = (n - 1) * stride + + if oi.has_relation: + arr_node = Node( + node_type=NodeType.STRUCT_ARRAY, + name=label, + access_id=arr_id, + softdatatype=vte.softdatatype, + relation_id=oi.relation_id, + vte=vte, + array_adr_offset_opt=elem_off, + array_adr_offset_nonopt=elem_off, + ) + subnode.children.append(arr_node) + if struct_type is not None: + _add_subnodes(arr_node, struct_type, objects) + else: + arr_node = Node( + node_type=NodeType.ARRAY, + name=label, + access_id=arr_id, + softdatatype=vte.softdatatype, + vte=vte, + array_adr_offset_opt=elem_off, + array_adr_offset_nonopt=elem_off, + ) + subnode.children.append(arr_node) + + # Odometer step (axis 0 fastest). + xx[0] += 1 + if vte.softdatatype == Softdatatype.BBOOL and xx[0] >= counts[0] and counts[0] % 8 != 0: + arr_id += 8 - (xx[0] % 8) # rows of bits pad up to a byte + for dim in range(5): + if xx[dim] >= counts[dim]: + xx[dim] = 0 + xx[dim + 1] += 1 + arr_id += 1 + n += 1 + + +# --------------------------------------------------------------------------- +# 8. Flatten +# --------------------------------------------------------------------------- + + +def build_flat_list(root_nodes: list[Node]) -> list[VarInfo]: + """Walk each populated ROOT and produce the flat list of readable tags.""" + result: list[VarInfo] = [] + for root in root_nodes: + if not root.children: + continue + _walk(root, "", "", 0, 0, result) + return result + + +def _walk(node: Node, names: str, access_ids: str, opt_off: int, nonopt_off: int, result: list[VarInfo]) -> None: + # Accumulate this node's name and access-id contribution. + if node.node_type == NodeType.ROOT: + names = names + node.name + access_ids = access_ids + f"{node.access_id:X}" + elif node.node_type == NodeType.ARRAY: + names = names + node.name # "[..]" index label, no dot + access_ids = access_ids + "." + f"{node.access_id:X}" + elif node.node_type == NodeType.STRUCT_ARRAY: + names = names + node.name + access_ids = access_ids + "." + f"{node.access_id:X}" + ".1" + else: # UNDEFINED / VAR member + names = names + "." + node.name + access_ids = access_ids + "." + f"{node.access_id:X}" + + if node.children: + # Descend into a branch — advance the running byte offsets. + if node.node_type == NodeType.ARRAY: + assert node.vte is not None + opt_off = node.vte.offset_info.opt_addr + nonopt_off = node.vte.offset_info.nonopt_addr + elif node.node_type == NodeType.STRUCT_ARRAY: + opt_off += node.array_adr_offset_opt + nonopt_off += node.array_adr_offset_nonopt + elif node.vte is not None: + opt_off += node.vte.offset_info.opt_addr + nonopt_off += node.vte.offset_info.nonopt_addr + + for child in node.children: + child_opt, child_nonopt = opt_off, nonopt_off + if child.node_type == NodeType.ARRAY: + child_opt += child.array_adr_offset_opt + child_nonopt += child.array_adr_offset_nonopt + _walk(child, names, access_ids, child_opt, child_nonopt, result) + return + + # Leaf node — emit if the datatype is a readable leaf. + if not is_softdatatype_supported(node.softdatatype): + return + + info = VarInfo(name=names, access_sequence=access_ids, softdatatype=node.softdatatype) + if node.node_type == NodeType.ARRAY: + # Basic-array element: offset already includes the element stride. + info.opt_address = opt_off + info.nonopt_address = nonopt_off + else: + assert node.vte is not None + info.opt_address = opt_off + node.vte.offset_info.opt_addr + info.nonopt_address = nonopt_off + node.vte.offset_info.nonopt_addr + + vte = node.vte + if node.softdatatype == Softdatatype.BOOL and vte is not None: + info.opt_bitoffset = vte.attribute_bitoffset + info.nonopt_bitoffset = vte.nonopt_bitoffset if vte.classic else vte.attribute_bitoffset + elif node.softdatatype == Softdatatype.BBOOL and vte is not None: + info.opt_bitoffset = vte.opt_bitoffset + + result.append(info) diff --git a/tests/test_s7_codec.py b/tests/test_s7_codec.py index fedc65cc..7234abdd 100644 --- a/tests/test_s7_codec.py +++ b/tests/test_s7_codec.py @@ -469,10 +469,11 @@ def test_udint(self) -> None: assert result == struct.pack(">I", 100000) def test_dword(self) -> None: - vlq = encode_uint32_vlq(0xDEADBEEF) - data = bytes([0x00, DataType.DWORD]) + vlq + # ValueDWord is fixed 4-byte big-endian (not VLQ). + data = bytes([0x00, DataType.DWORD]) + struct.pack(">I", 0xDEADBEEF) result, consumed = decode_pvalue_to_bytes(data, 0) assert result == struct.pack(">I", 0xDEADBEEF) + assert consumed == 6 def test_dint_positive(self) -> None: vlq = encode_int32_vlq(12345) @@ -505,10 +506,11 @@ def test_ulint(self) -> None: assert result == struct.pack(">Q", 2**40) def test_lword(self) -> None: - vlq = encode_uint64_vlq(0xCAFEBABE12345678) - data = bytes([0x00, DataType.LWORD]) + vlq + # ValueLWord is fixed 8-byte big-endian (not VLQ). + data = bytes([0x00, DataType.LWORD]) + struct.pack(">Q", 0xCAFEBABE12345678) result, consumed = decode_pvalue_to_bytes(data, 0) assert result == struct.pack(">Q", 0xCAFEBABE12345678) + assert consumed == 10 def test_lint_positive(self) -> None: vlq = encode_int64_vlq(2**50) @@ -627,6 +629,12 @@ def test_struct_id_boundary_is_normal(self) -> None: assert result == bytes([0x07]), f"id {boundary:#010x} must parse as a normal struct" assert consumed == len(data) + def test_udint_stays_vlq(self) -> None: + # UDInt remains VLQ-encoded (regression guard for the DWORD split). + data = bytes([0x00, DataType.UDINT]) + encode_uint32_vlq(300) + result, consumed = decode_pvalue_to_bytes(data, 0) + assert result == struct.pack(">I", 300) + def test_unsupported_type(self) -> None: data = bytes([0x00, 0xFF]) with pytest.raises(ValueError, match="Unsupported PValue datatype"): diff --git a/tests/test_s7_unit.py b/tests/test_s7_unit.py index 7800114f..6e8b509a 100644 --- a/tests/test_s7_unit.py +++ b/tests/test_s7_unit.py @@ -244,9 +244,9 @@ def test_udint(self) -> None: assert new_offset == len(vlq) def test_dword(self) -> None: - vlq = encode_uint32_vlq(0xDEADBEEF) - new_offset = skip_typed_value(vlq, 0, DataType.DWORD, 0x00) - assert new_offset == len(vlq) + # DWORD is fixed 4-byte (not VLQ). + data = struct.pack(">I", 0xDEADBEEF) + assert skip_typed_value(data, 0, DataType.DWORD, 0x00) == 4 def test_aid(self) -> None: vlq = encode_uint32_vlq(306) @@ -264,9 +264,9 @@ def test_ulint(self) -> None: assert new_offset == len(vlq) def test_lword(self) -> None: - vlq = encode_uint64_vlq(0xCAFE) - new_offset = skip_typed_value(vlq, 0, DataType.LWORD, 0x00) - assert new_offset == len(vlq) + # LWORD is fixed 8-byte (not VLQ). + data = struct.pack(">Q", 0xCAFE) + assert skip_typed_value(data, 0, DataType.LWORD, 0x00) == 8 def test_lint(self) -> None: from s7.vlq import encode_int64_vlq diff --git a/tests/test_typeinfo.py b/tests/test_typeinfo.py new file mode 100644 index 00000000..a558bec7 --- /dev/null +++ b/tests/test_typeinfo.py @@ -0,0 +1,599 @@ +"""Tests for the S7CommPlus type-info parser (PVartypeList / POffsetInfoType / VarnameList). + +Pins the on-wire layouts and the tree-building behaviour browse() relies on to +reconstruct the per-tag symbol tree from a PLC's EXPLORE type-info response. +""" + +import struct + +from s7 import typeinfo as ti +from s7.protocol import DataType +from s7.vlq import encode_uint32_vlq + + +def _vartype_list_bytes(*elements: bytes) -> bytes: + block = b"".join(elements) + return struct.pack(">H", len(block) + 4) + struct.pack("H", 0) + + +def _multiblock_vartype_list_bytes(*blocks: tuple[bytes, ...]) -> bytes: + """Frame several blocks of elements as a real multi-block PVartypeList. + + The first block carries the leading LE-u32 FirstId (counted in its length); + subsequent blocks carry only elements. A BE-u16 zero block terminates the list. + """ + out = b"" + for idx, elements in enumerate(blocks): + block = b"".join(elements) + if idx == 0: + # FirstId is counted inside the first block's length. + out += struct.pack(">H", len(block) + 4) + struct.pack("H", len(block)) + block + out += struct.pack(">H", 0) # zero-length terminator block + return out + + +def _varname_list_bytes(*names: str) -> bytes: + block = b"".join(bytes([len(n.encode())]) + n.encode() + b"\x00" for n in names) + return struct.pack(">H", len(block)) + block + struct.pack(">H", 0) + + +class TestSoftdatatype: + def test_known_values(self) -> None: + assert ti.Softdatatype.BOOL == 1 + assert ti.Softdatatype.BYTE == 2 + assert ti.Softdatatype.INT == 5 + assert ti.Softdatatype.REAL == 8 + assert ti.Softdatatype.LREAL == 48 + assert ti.Softdatatype.BBOOL == 40 + assert ti.Softdatatype.WSTRING == 62 + assert ti.Softdatatype.DTL == 67 + + def test_supported_includes_scalars(self) -> None: + for sdt in (ti.Softdatatype.BOOL, ti.Softdatatype.INT, ti.Softdatatype.REAL, ti.Softdatatype.LREAL): + assert ti.is_softdatatype_supported(int(sdt)) + + def test_supported_excludes_containers(self) -> None: + # VOID(0), ARRAY(16), STRUCT(17), VARIANT(63) are container/marker types, not leaves. + for sdt in (0, 16, 17, 63): + assert not ti.is_softdatatype_supported(sdt) + + +class TestDatatypeSize: + def test_one_byte_types(self) -> None: + for sdt in ( + ti.Softdatatype.BOOL, + ti.Softdatatype.BYTE, + ti.Softdatatype.CHAR, + ti.Softdatatype.USINT, + ti.Softdatatype.SINT, + ti.Softdatatype.BBOOL, + ): + assert ti.datatype_size(int(sdt)) == 1 + + def test_two_byte_types(self) -> None: + for sdt in (ti.Softdatatype.WORD, ti.Softdatatype.INT, ti.Softdatatype.UINT, ti.Softdatatype.DATE): + assert ti.datatype_size(int(sdt)) == 2 + + def test_four_byte_types(self) -> None: + for sdt in (ti.Softdatatype.DWORD, ti.Softdatatype.DINT, ti.Softdatatype.REAL, ti.Softdatatype.UDINT): + assert ti.datatype_size(int(sdt)) == 4 + + def test_eight_byte_types(self) -> None: + for sdt in (ti.Softdatatype.LREAL, ti.Softdatatype.LINT, ti.Softdatatype.LWORD, ti.Softdatatype.ULINT): + assert ti.datatype_size(int(sdt)) == 8 + + def test_dtl_is_twelve(self) -> None: + assert ti.datatype_size(int(ti.Softdatatype.DTL)) == 12 + + def test_string_uses_unspecified_length(self) -> None: + # STRING/WSTRING stride comes from the array offset-info's first unspecified field + 2. + assert ti.datatype_size(int(ti.Softdatatype.STRING), string_len=10) == 12 + + def test_unknown_is_zero(self) -> None: + assert ti.datatype_size(0) == 0 + + +def _name_entry(s: str) -> bytes: + raw = s.encode("utf-8") + return bytes([len(raw)]) + raw + b"\x00" # 1-byte len + UTF-8 bytes + null terminator + + +class TestVarnameList: + def test_single_block_two_names(self) -> None: + block = _name_entry("Foo") + _name_entry("Ab") # 5 + 4 = 9 bytes + data = struct.pack(">H", len(block)) + block + struct.pack(">H", 0) # block + zero terminator + names, off = ti.parse_varname_list(data, 0) + assert names == ["Foo", "Ab"] + assert off == len(data) + + def test_empty_list(self) -> None: + data = struct.pack(">H", 0) # immediate zero blocklen + names, off = ti.parse_varname_list(data, 0) + assert names == [] + assert off == 2 + + def test_with_offset(self) -> None: + block = _name_entry("X") + data = b"\xff\xff" + struct.pack(">H", len(block)) + block + struct.pack(">H", 0) + names, off = ti.parse_varname_list(data, 2) + assert names == ["X"] + assert off == len(data) + + +class TestOffsetInfo: + def test_std_new_code8_opt_then_nonopt(self) -> None: + # New Std (8): first u16 = optimized, second = non-optimized. + data = struct.pack(" None: + # Legacy StructElemStd (1): order swapped — first u16 = non-optimized. + data = struct.pack(" None: + # String (9): u16 unspec1, u16 unspec2, u32 opt, u32 nonopt. + data = struct.pack(" None: + # Array1Dim (10): u16,u16,u32 opt,u32 nonopt,i32 lower,u32 count. + data = struct.pack(" None: + # Struct (12): u16,u16,u32 opt,u32 nonopt,u32 relid, then 4x u32 structinfo. + data = struct.pack(" None: + # Struct1Dim (13): u16,u16,u32,u32,i32 lower,u32 count,u32 nonoptsize,u32 optsize,u32 relid,4xu32. + data = struct.pack(" None: + # ArrayMDim (11): base 20 + 6 i32 lower bounds + 6 u32 counts = 68 bytes. + data = ( + struct.pack(" None: + # StructMDim (14): 68 (mdim) + nonoptsize,optsize,relid + 4x structinfo = 96 bytes. + data = ( + struct.pack(" None: + # FbSfb (15): u16,u16,u32,u32,u32 relid,4x info,u32 retainoff,u32 volatileoff = 40 bytes. + data = struct.pack(" None: + # FbArray (0): 12 base + relid+4info(20) + 6 section u32 (24) + 6 i32 + 6 u32 = 104 bytes. + data = ( + struct.pack(" bytes: + # LID (u32 LE) + SymbolCrc (u32 LE) + Softdatatype (1B) + AttributeFlags (u16 BE!) + + # BitoffsetinfoFlags (1B) + offset-info bytes. + return struct.pack("H", attr_flags) + bytes([bitoff_flags]) + offset_info + + +class TestVartypeElement: + def test_scalar_int_std(self) -> None: + # offsetinfotype = 8 (Std) lives in AttributeFlags bits 12..15 → 0x8000. + data = _element(5, 0, int(ti.Softdatatype.INT), 0x8000, 0x00, struct.pack(" None: + # AttributeFlags low 3 bits carry the optimized bit offset for a BOOL. + data = _element(7, 0, int(ti.Softdatatype.BOOL), 0x8000 | 0x0003, 0x00, struct.pack(" None: + # BitoffsetinfoFlags: 0x70 nonopt bitoffset (>>4), 0x08 classic, 0x07 opt bitoffset. + data = _element(1, 0, int(ti.Softdatatype.BOOL), 0x8000, 0x58, struct.pack("> 4 + assert el.classic is True # 0x58 & 0x08 + assert el.opt_bitoffset == 0 # 0x58 & 0x07 + + +class TestVartypeList: + def test_two_elements_one_block(self) -> None: + e1 = _element(1, 0, int(ti.Softdatatype.INT), 0x8000, 0, struct.pack("H", len(block) + 4) + struct.pack("H", 0) + elements, off = ti.parse_vartype_list(data, 0) + assert [e.lid for e in elements] == [1, 2] + assert elements[1].softdatatype == int(ti.Softdatatype.REAL) + assert off == len(data) + + def test_multi_block_returns_all_elements(self) -> None: + # A real PLC splits a long element list across multiple blocks. Only the + # first block has the leading FirstId; later blocks are pure elements. + e1 = _element(1, 0, int(ti.Softdatatype.INT), 0x8000, 0, struct.pack(" None: + # The bug left the cursor mid-stream on multi-block lists, corrupting the + # next field. Verify the returned offset cleanly points at trailing data. + e1 = _element(1, 0, int(ti.Softdatatype.INT), 0x8000, 0, struct.pack(" bytes: + return bytes([0xA1]) + struct.pack(">I", relid) + encode_uint32_vlq(class_id) + encode_uint32_vlq(0) + encode_uint32_vlq(0) + + +class TestParseObject: + def test_header_attribute_vartype_varname(self) -> None: + el = _element(10, 0, int(ti.Softdatatype.INT), 0x8000, 0, struct.pack(" None: + child = _obj_header(0x00000002, 511) + bytes([0xA2]) + parent = _obj_header(0x00000001, 534) + child + bytes([0xA2]) + obj, off = ti.parse_object(parent, 0) + assert obj.relation_id == 1 and obj.class_id == 534 + assert len(obj.objects) == 1 + assert obj.objects[0].relation_id == 2 and obj.objects[0].class_id == 511 + assert off == len(parent) + + def test_object_list_two_siblings(self) -> None: + a = _obj_header(1, 511) + bytes([0xA2]) + b = _obj_header(2, 511) + bytes([0xA2]) + objs, off = ti.parse_object_list(a + b, 0) + assert [o.relation_id for o in objs] == [1, 2] + assert off == len(a + b) + + +def _vte(lid: int, sdt, oi: "ti.OffsetInfo", attr_flags: int = 0, bitoff: int = 0) -> "ti.VartypeListElement": + return ti.VartypeListElement( + lid=lid, symbol_crc=0, softdatatype=int(sdt), attribute_flags=attr_flags, bitoffsetinfo_flags=bitoff, offset_info=oi + ) + + +def _root(name: str, relid_db: int, ti_relid: int) -> "ti.Node": + return ti.Node(node_type=ti.NodeType.ROOT, name=name, access_id=relid_db, relation_id=ti_relid) + + +class TestTreeBuilder: + def test_scalar_leaf(self) -> None: + root = _root("DB1", 0x8A0E0001, 0x100) + type_obj = ti.PObject( + relation_id=0x100, + vartype_list=[_vte(10, ti.Softdatatype.INT, ti.OffsetInfo(code=8, opt_addr=2, nonopt_addr=4))], + varname_list=["Speed"], + ) + ti.build_tree([root], [type_obj]) + infos = ti.build_flat_list([root]) + assert len(infos) == 1 + v = infos[0] + assert v.name == "DB1.Speed" + assert v.access_sequence == "8A0E0001.A" # DB relid + LID 0xA + assert v.softdatatype == int(ti.Softdatatype.INT) + assert v.opt_address == 2 and v.nonopt_address == 4 + + def test_nested_struct_offsets_accumulate(self) -> None: + root = _root("DB1", 0x8A0E0001, 0x100) + outer = ti.PObject( + relation_id=0x100, + vartype_list=[ + _vte( + 5, + ti.Softdatatype.STRUCT, + ti.OffsetInfo(code=12, opt_addr=10, nonopt_addr=100, relation_id=0x200, has_relation=True), + ) + ], + varname_list=["Motor"], + ) + inner = ti.PObject( + relation_id=0x200, + vartype_list=[_vte(3, ti.Softdatatype.INT, ti.OffsetInfo(code=8, opt_addr=6, nonopt_addr=8))], + varname_list=["Rpm"], + ) + ti.build_tree([root], [outer, inner]) + infos = ti.build_flat_list([root]) + assert len(infos) == 1 + v = infos[0] + assert v.name == "DB1.Motor.Rpm" + assert v.access_sequence == "8A0E0001.5.3" + assert v.opt_address == 16 # 10 (Motor) + 6 (Rpm) + assert v.nonopt_address == 108 + + def test_basic_array_elements(self) -> None: + root = _root("DB1", 0x8A0E0001, 0x100) + type_obj = ti.PObject( + relation_id=0x100, + vartype_list=[ + _vte( + 9, + ti.Softdatatype.INT, + ti.OffsetInfo(code=10, opt_addr=20, nonopt_addr=40, array_element_count=3, array_lower_bound=0, is_1dim=True), + ) + ], + varname_list=["Vals"], + ) + ti.build_tree([root], [type_obj]) + infos = ti.build_flat_list([root]) + assert [v.name for v in infos] == ["DB1.Vals[0]", "DB1.Vals[1]", "DB1.Vals[2]"] + assert [v.access_sequence for v in infos] == ["8A0E0001.9.0", "8A0E0001.9.1", "8A0E0001.9.2"] + assert [v.opt_address for v in infos] == [20, 22, 24] # base + i*2 (INT stride) + + def test_struct_array_inserts_extra_one(self) -> None: + root = _root("DB1", 0x8A0E0001, 0x100) + outer = ti.PObject( + relation_id=0x100, + vartype_list=[ + _vte( + 7, + ti.Softdatatype.STRUCT, + ti.OffsetInfo( + code=13, + opt_addr=0, + nonopt_addr=0, + array_element_count=2, + array_lower_bound=0, + relation_id=0x300, + has_relation=True, + is_1dim=True, + ), + ) + ], + varname_list=["Items"], + ) + item_type = ti.PObject( + relation_id=0x300, + attributes={1502: struct.pack(">I", 8)}, # TI_TComSize = 8-byte struct stride + vartype_list=[_vte(2, ti.Softdatatype.REAL, ti.OffsetInfo(code=8, opt_addr=0, nonopt_addr=0))], + varname_list=["X"], + ) + ti.build_tree([root], [outer, item_type]) + infos = ti.build_flat_list([root]) + assert [v.name for v in infos] == ["DB1.Items[0].X", "DB1.Items[1].X"] + # StructArray inserts a ".1" between the array index id and the member LID. + assert [v.access_sequence for v in infos] == ["8A0E0001.7.0.1.2", "8A0E0001.7.1.1.2"] + assert [v.opt_address for v in infos] == [0, 8] # element stride 8 + + +class TestMDimArrays: + def test_basic_mdim_array_ordering_bounds_and_offsets(self) -> None: + # ARRAY[1..3, 10..11] of INT — dim0 (fastest, in-memory) has 3 elements, dim1 has 2. + # The name lists the highest dimension first: [dim1, dim0]. + root = _root("DB1", 0x8A0E0001, 0x100) + type_obj = ti.PObject( + relation_id=0x100, + vartype_list=[ + _vte( + 9, + ti.Softdatatype.INT, + ti.OffsetInfo( + code=11, + opt_addr=20, + nonopt_addr=40, + array_element_count=6, + mdim_element_count=[3, 2, 0, 0, 0, 0], + mdim_lower_bounds=[1, 10, 0, 0, 0, 0], + is_mdim=True, + ), + ) + ], + varname_list=["Grid"], + ) + ti.build_tree([root], [type_obj]) + infos = ti.build_flat_list([root]) + assert [v.name for v in infos] == [ + "DB1.Grid[10,1]", + "DB1.Grid[10,2]", + "DB1.Grid[10,3]", + "DB1.Grid[11,1]", + "DB1.Grid[11,2]", + "DB1.Grid[11,3]", + ] + assert [v.access_sequence for v in infos] == [ + "8A0E0001.9.0", + "8A0E0001.9.1", + "8A0E0001.9.2", + "8A0E0001.9.3", + "8A0E0001.9.4", + "8A0E0001.9.5", + ] + assert [v.opt_address for v in infos] == [20, 22, 24, 26, 28, 30] # base + (n-1)*2 + assert [v.nonopt_address for v in infos] == [40, 42, 44, 46, 48, 50] + + def test_bbool_mdim_access_id_aligns_to_byte(self) -> None: + # ARRAY[0..2, 0..1] of BOOL stored as BBOOL: each row of 3 bits rounds up to a byte, + # so the access-id of the second row starts at 8, not 3. Offsets stay linear (n-1). + root = _root("DB1", 0x8A0E0001, 0x100) + type_obj = ti.PObject( + relation_id=0x100, + vartype_list=[ + _vte( + 7, + ti.Softdatatype.BBOOL, + ti.OffsetInfo( + code=11, + opt_addr=0, + nonopt_addr=0, + array_element_count=6, + mdim_element_count=[3, 2, 0, 0, 0, 0], + mdim_lower_bounds=[0, 0, 0, 0, 0, 0], + is_mdim=True, + ), + ) + ], + varname_list=["Flags"], + ) + ti.build_tree([root], [type_obj]) + infos = ti.build_flat_list([root]) + assert [v.name for v in infos] == [ + "DB1.Flags[0,0]", + "DB1.Flags[0,1]", + "DB1.Flags[0,2]", + "DB1.Flags[1,0]", + "DB1.Flags[1,1]", + "DB1.Flags[1,2]", + ] + assert [v.access_sequence for v in infos] == [ + "8A0E0001.7.0", + "8A0E0001.7.1", + "8A0E0001.7.2", + "8A0E0001.7.8", + "8A0E0001.7.9", + "8A0E0001.7.A", + ] + + def test_struct_mdim_array_inserts_extra_one(self) -> None: + # ARRAY[0..1, 0..1] of — StructArray nodes insert ".1" and stride by TComSize. + root = _root("DB1", 0x8A0E0001, 0x100) + outer = ti.PObject( + relation_id=0x100, + vartype_list=[ + _vte( + 7, + ti.Softdatatype.STRUCT, + ti.OffsetInfo( + code=14, + opt_addr=0, + nonopt_addr=0, + array_element_count=4, + mdim_element_count=[2, 2, 0, 0, 0, 0], + mdim_lower_bounds=[0, 0, 0, 0, 0, 0], + relation_id=0x300, + has_relation=True, + is_mdim=True, + ), + ) + ], + varname_list=["Cells"], + ) + item_type = ti.PObject( + relation_id=0x300, + attributes={1502: struct.pack(">I", 8)}, # TI_TComSize = 8-byte struct stride + vartype_list=[_vte(2, ti.Softdatatype.REAL, ti.OffsetInfo(code=8, opt_addr=0, nonopt_addr=0))], + varname_list=["X"], + ) + ti.build_tree([root], [outer, item_type]) + infos = ti.build_flat_list([root]) + assert [v.name for v in infos] == [ + "DB1.Cells[0,0].X", + "DB1.Cells[0,1].X", + "DB1.Cells[1,0].X", + "DB1.Cells[1,1].X", + ] + assert [v.access_sequence for v in infos] == [ + "8A0E0001.7.0.1.2", + "8A0E0001.7.1.1.2", + "8A0E0001.7.2.1.2", + "8A0E0001.7.3.1.2", + ] + assert [v.opt_address for v in infos] == [0, 8, 16, 24] # (n-1) * TComSize + + +class TestExtractTypeInfoObjects: + def test_finds_container_children(self) -> None: + child1 = _obj_header(0x100, 511) + bytes([0xA2]) + child2 = _obj_header(0x200, 511) + bytes([0xA2]) + container = _obj_header(0x537, ti.CLASS_OMS_TYPE_INFO_CONTAINER) + child1 + child2 + bytes([0xA2]) + response = bytes([0x00]) + container # leading return-value VLQ + the container object + objs = ti.extract_type_info_objects(response) + assert [o.relation_id for o in objs] == [0x100, 0x200] + + def test_no_container_returns_empty(self) -> None: + response = bytes([0x00]) + _obj_header(1, 999) + bytes([0xA2]) + assert ti.extract_type_info_objects(response) == [] + + def test_skips_preamble_before_first_object(self) -> None: + # Real EXPLORE(537) responses carry several preamble bytes between the return-value + # and the first StartOfObject (0xA1) — the parser must skip to it. + child = _obj_header(0x100, 511) + bytes([0xA2]) + container = _obj_header(0x219, ti.CLASS_OMS_TYPE_INFO_CONTAINER) + child + bytes([0xA2]) + response = bytes([0x00, 0x00, 0x00, 0x00, 0xC9, 0x4F]) + container # 6-byte preamble + objs = ti.extract_type_info_objects(response) + assert [o.relation_id for o in objs] == [0x100]