Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 168 additions & 21 deletions s7/_s7commplus_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@
_parse_write_response,
_build_area_read_payload,
_build_area_write_payload,
_build_symbolic_read_payload,
_build_explore_payload,
_build_invoke_payload,
_build_explore_request,
_parse_explore_datablocks,
_parse_explore_fields,
)
from . import typeinfo
from .protocol import Ids

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -482,39 +483,135 @@ async def set_plc_operating_state(self, state: int) -> None:
payload = _build_invoke_payload(state)
await self._send_request(FunctionCode.INVOKE, payload)

async def read_symbolic(self, access_area: int, lids: list[int], symbol_crc: int = 0) -> bytes:
"""Read a variable using S7CommPlus symbolic (LID-based) access.

.. warning:: This method is **experimental** and may change.
"""
payload = _build_symbolic_read_payload(access_area, lids, symbol_crc)
response = await self._send_request(FunctionCode.GET_MULTI_VARIABLES, payload)
results = _parse_read_response(response)
if not results or results[0] is None:
raise RuntimeError("Symbolic read failed")
return results[0]

async def list_datablocks(self) -> list[dict[str, Any]]:
"""List all datablocks on the PLC via EXPLORE.

.. warning:: This method is **experimental** and may change.
"""
payload = _build_explore_request(Ids.NATIVE_THE_PLC_PROGRAM_RID, [Ids.OBJECT_VARIABLE_TYPE_NAME, Ids.BLOCK_BLOCK_NUMBER])
response = await self._send_request(FunctionCode.EXPLORE, payload)
response = await self._send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True)
return _parse_explore_datablocks(response)

async def browse(self) -> list[dict[str, Any]]:
"""Browse the PLC symbol table via EXPLORE.
"""Browse the full per-tag symbol tree via EXPLORE + the type-info container.

.. warning:: This method is **experimental** and may change.

Returns a flat list of variable dicts with keys ``name``, ``access_sequence``
(the dot-separated hex LID path usable with :meth:`read_tag`), ``data_type``,
and the optimized/non-optimized byte+bit offsets. Steps: enumerate DBs, resolve
each DB's type-info RID via a LID=1 read, explore the OMS type-info container,
then recombine into the symbol tree.

Returns:
List of variable info dicts.
"""
dbs = await self.list_datablocks()
variables: list[dict[str, Any]] = []
for db_info in dbs:
db_rid = db_info.get("rid", 0)
if db_rid == 0:
# Phase A: enumerate data blocks. Phase B/C: resolve each DB's type-info RID
# (a LID=1 read — needed for instance DBs whose TI is not their own RID) and seed
# a root node per DB.
root_nodes: list[typeinfo.Node] = []
for db_info in await self.list_datablocks():
if db_info.get("number", 0) <= 0 or db_info.get("rid", 0) == 0:
continue
payload = _build_explore_request(db_rid, [Ids.OBJECT_VARIABLE_TYPE_NAME])
ti_rid = await self._read_typeinfo_rid(db_info["rid"])
if ti_rid == 0:
continue # load-memory-only DB, skip
root_nodes.append(
typeinfo.Node(
node_type=typeinfo.NodeType.ROOT, name=db_info["name"], access_id=db_info["rid"], relation_id=ti_rid
)
)

# Add the native process areas with their known synthetic type-info ids.
for name, access_rid, ti_rid in (
("IArea", Ids.NATIVE_THE_I_AREA_RID, 0x90010000),
("QArea", Ids.NATIVE_THE_Q_AREA_RID, 0x90020000),
("MArea", Ids.NATIVE_THE_M_AREA_RID, 0x90030000),
("S7Timers", Ids.NATIVE_THE_S7_TIMERS_RID, 0x90050000),
("S7Counters", Ids.NATIVE_THE_S7_COUNTERS_RID, 0x90060000),
):
root_nodes.append(
typeinfo.Node(node_type=typeinfo.NodeType.ROOT, name=name, access_id=access_rid, relation_id=ti_rid)
)

# Phase D: explore the OMS type-info container (a large, multi-fragment PDU).
type_objects = await self._explore_type_info_container()

# Phase E: recombine type-info with the DB/area nodes and flatten.
typeinfo.build_tree(root_nodes, type_objects)
variables: list[dict[str, Any]] = []
for v in typeinfo.build_flat_list(root_nodes):
try:
response = await self._send_request(FunctionCode.EXPLORE, payload)
fields = _parse_explore_fields(response, db_info["number"], db_info["name"])
variables.extend(fields)
except Exception:
continue
data_type = typeinfo.Softdatatype(v.softdatatype).name
except ValueError:
data_type = str(v.softdatatype)
variables.append(
{
"name": v.name,
"access_sequence": v.access_sequence,
"data_type": data_type,
"opt_address": v.opt_address,
"opt_bitoffset": v.opt_bitoffset,
"nonopt_address": v.nonopt_address,
"nonopt_bitoffset": v.nonopt_bitoffset,
}
)
return variables

async def _read_typeinfo_rid(self, db_rid: int) -> int:
"""Read LID=1 of a DB to get its type-info RID (0 if the DB has no readable value)."""
try:
raw = await self.read_symbolic(db_rid, [1], 0)
except Exception:
return 0
return struct.unpack(">I", raw[:4])[0] if len(raw) >= 4 else 0

async def _explore_type_info_container(self) -> list["typeinfo.PObject"]:
"""EXPLORE the OMS type-info container and return its per-type objects."""
payload = _build_explore_request(Ids.OBJECT_OMS_TYPE_INFO_CONTAINER, [])
response = await self._send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True)
return typeinfo.extract_type_info_objects(response)

# -- Internal methods --

async def _send_request(self, function_code: int, payload: bytes) -> bytes:
"""Send an S7CommPlus request and receive the response."""
# Sanity caps for fragment reassembly — generous vs. any real PLC EXPLORE response,
# but bounded so a malformed/adversarial stream can't drive unbounded allocation.
_MAX_REASSEMBLED_BYTES = 16 * 1024 * 1024
_MAX_REASSEMBLED_FRAGMENTS = 4096

async def _send_request(
self,
function_code: int,
payload: bytes,
integrity_tail: int = 4,
reassemble: bool = False,
) -> bytes:
"""Send an S7CommPlus request and receive the response.

Args:
function_code: S7CommPlus function code.
payload: Request payload (after the 14-byte request header).
integrity_tail: number of trailing payload bytes the V2 IntegrityId is
inserted *before* — 4 for GetMultiVariables/SetMultiVariables (a
trailing UInt32), 5 for Explore (a trailing UInt32 + filler byte).
reassemble: when True, concatenate a multi-fragment response (e.g. Explore)
before returning its payload.

Returns:
Response payload (after the 10-byte response header).
"""
async with self._lock:
if not self._connected or self._writer is None or self._reader is None:
raise RuntimeError("Not connected")
Expand All @@ -529,7 +626,8 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes:
0x0000,
seq_num,
self._session_id,
0x34 if function_code == FunctionCode.GET_MULTI_VARIABLES else 0x36,
# Transport flags: 0x34 for GetMultiVariables and Explore, 0x36 otherwise.
0x34 if function_code in (FunctionCode.GET_MULTI_VARIABLES, FunctionCode.EXPLORE) else 0x36,
)

integrity_id_bytes = b""
Expand All @@ -538,10 +636,10 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes:
integrity_id = self._integrity_id_read if is_read else self._integrity_id_write
integrity_id_bytes = encode_uint32_vlq(integrity_id)

# For V2+ the IntegrityId is spliced in just before the payload's trailing
# UInt32 (i.e. at the end), not right after the header.
if integrity_id_bytes and len(payload) >= 4:
request = request_header + payload[:-4] + integrity_id_bytes + payload[-4:]
# The IntegrityId is spliced in just before the payload's trailing fill bytes
# (integrity_tail of them), not right after the header.
if integrity_id_bytes and len(payload) >= integrity_tail:
request = request_header + payload[:-integrity_tail] + integrity_id_bytes + payload[-integrity_tail:]
else:
request = request_header + integrity_id_bytes + payload

Expand All @@ -555,6 +653,13 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes:
else:
self._integrity_id_write = (self._integrity_id_write + 1) & 0xFFFFFFFF

# Large responses (e.g. Explore) are split across several S7CommPlus PDUs.
if reassemble:
data = await self._recv_reassembled_payload()
if len(data) < 10:
raise RuntimeError("Response too short")
return bytes(data[10:])

response_data = await self._recv_cotp_dt()

version, data_length, consumed = decode_header(response_data)
Expand All @@ -568,6 +673,48 @@ async def _send_request(self, function_code: int, payload: bytes) -> bytes:
# IntegrityId travels at the END of the payload and is ignored by the parsers.
return response[10:]

async def _recv_reassembled_payload(self) -> bytes:
"""Receive a possibly-fragmented S7CommPlus response, returning its data section.

A large response is split into several S7CommPlus PDUs. Each fragment is
``0x72 <ver> <len:2> <data:len>`` with no trailer; only the final fragment is
followed by the ``0x72 <ver> 0x0000`` trailer. We concatenate the data parts
of every fragment until the trailer is seen. Works for single-PDU responses
too (one fragment immediately followed by the trailer).
"""
buf = bytearray()

async def ensure(n: int) -> None:
while len(buf) < n:
chunk = await self._recv_cotp_dt()
if not chunk:
raise RuntimeError("Connection closed during response reassembly")
buf.extend(chunk)

data = bytearray()
fragments = 0
while True:
await ensure(4)
if buf[0] != 0x72:
raise RuntimeError("Expected S7CommPlus fragment header (0x72)")
frag_len = (buf[2] << 8) | buf[3]
del buf[:4]
if frag_len == 0:
break # standalone trailer (defensive)
await ensure(frag_len)
data.extend(buf[:frag_len])
del buf[:frag_len]
fragments += 1
if fragments > self._MAX_REASSEMBLED_FRAGMENTS or len(data) > self._MAX_REASSEMBLED_BYTES:
raise RuntimeError(f"Reassembled response exceeds limits ({len(data)} bytes, {fragments} fragments)")
# The next 4 bytes are either the trailer (0x72 ver 0x0000) or the next
# fragment's header (0x72 ver len>0).
await ensure(4)
if buf[0] == 0x72 and buf[2] == 0 and buf[3] == 0:
del buf[:4] # consume trailer — last fragment
break
return bytes(data)

async def _cotp_connect(self, local_tsap: int, remote_tsap: bytes) -> None:
"""Perform COTP Connection Request / Confirm handshake."""
if self._writer is None or self._reader is None:
Expand Down
Loading
Loading