Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 120 additions & 7 deletions s7/_s7commplus_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from typing import Any, Optional

from .connection import S7CommPlusConnection
from .protocol import FunctionCode, Ids, ElementID, DataType, ObjectId
from .protocol import FunctionCode, Ids, ElementID, DataType, ObjectId, ProtocolVersion
from .vlq import encode_uint32_vlq, decode_uint32_vlq, decode_uint64_vlq
from .codec import (
encode_item_address,
Expand Down Expand Up @@ -365,6 +365,15 @@ def list_datablocks(self) -> list[dict[str, Any]]:
if self._connection is None:
raise RuntimeError("Not connected")

if self._connection._protocol_version >= ProtocolVersion.V3:
# V3 PLCs (FW >= V4.5): EXPLORE 0x8A11FFFF returns a multi-frame
# zlib-compressed PlcContentInfo XML blob. The existing reassemble
# path does not strip V3 HMAC prefixes, so we collect frames manually.
payload = _build_explore_payload_v3(0x8A11FFFF)
first_response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5)
response = self._connection.collect_explore_frames(first_response)
return _parse_explore_datablocks_xml(response)

payload = _build_explore_request(Ids.NATIVE_THE_PLC_PROGRAM_RID, [Ids.OBJECT_VARIABLE_TYPE_NAME, Ids.BLOCK_BLOCK_NUMBER])
response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True)
return _parse_explore_datablocks(response)
Expand Down Expand Up @@ -394,9 +403,17 @@ def browse(self) -> list[dict[str, Any]]:
db_rid = db_info.get("rid", 0)
if db_rid == 0:
continue
payload = _build_explore_request(db_rid, [Ids.OBJECT_VARIABLE_TYPE_NAME])
is_v3 = self._connection._protocol_version >= ProtocolVersion.V3
if is_v3:
payload = _build_explore_payload_v3(db_rid)
else:
payload = _build_explore_request(db_rid, [Ids.OBJECT_VARIABLE_TYPE_NAME])
try:
response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True)
if is_v3:
first_response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5)
response = self._connection.collect_explore_frames(first_response)
else:
response = self._connection.send_request(FunctionCode.EXPLORE, payload, integrity_tail=5, reassemble=True)
fields = _parse_explore_fields(response, db_info["number"], db_info["name"])
variables.extend(fields)
except Exception:
Expand Down Expand Up @@ -763,6 +780,80 @@ def _build_explore_request(explore_id: int, attribute_ids: list[int]) -> bytes:
return bytes(payload)


def _build_explore_payload_v3(explore_id: int) -> bytes:
"""Build a V3-style EXPLORE payload targeting a specific RID.

V3 PLCs (FW >= V4.5) use a compact VLQ-encoded format instead of
the fixed big-endian layout of _build_explore_request(). The RID
0x8A11FFFF triggers the PLC to return a ``PlcContentInfo`` XML blob
compressed with zlib (magic ``78 DA``) spanning multiple TPKT frames.

Args:
explore_id: RID to explore (e.g. ``0x8A11FFFF`` for all blocks).

Returns:
Encoded EXPLORE payload.
"""
payload = bytearray()
payload += encode_uint32_vlq(explore_id)
# Trailing UInt32 fill + filler byte (same tail as _build_explore_request)
payload += struct.pack(">I", 0) + bytes([0])
return bytes(payload)


def _parse_explore_datablocks_xml(response: bytes) -> list[dict[str, Any]]:
"""Parse a V3 EXPLORE response containing a zlib-compressed PlcContentInfo XML blob.

On V3 PLCs the ``0x8A11FFFF`` EXPLORE returns a ``PlcContentInfo`` XML
document compressed with standard zlib (magic ``78 DA``) embedded inside a
large BLOB attribute that spans multiple TPKT frames. This parser locates
the zlib header in the concatenated response, decompresses it, and extracts
DB entities.

Falls back to :func:`_parse_explore_datablocks` when no ``78 DA`` magic is
found so that V1/V2 responses are handled transparently.

Returns:
List of dicts: ``{"name": str, "number": int, "rid": int}``
"""
import zlib
import xml.etree.ElementTree as ET

zlib_pos = response.find(b"\x78\xda")
if zlib_pos < 0:
logger.debug("_parse_explore_datablocks_xml: no zlib magic, falling back to PObject parser")
return _parse_explore_datablocks(response)

try:
xml_bytes = zlib.decompress(response[zlib_pos:])
except zlib.error as exc:
logger.debug(f"_parse_explore_datablocks_xml: zlib error {exc}")
return []

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ET.fromstring() uses the default XML parser which resolves external entities. While the XML comes from a PLC, for defense-in-depth consider at minimum disabling entity resolution. Python 3.8+ ET.fromstring is safe against XXE by default (entities are not expanded), so this is low-risk — but worth a comment noting the assumption.


try:
root = ET.fromstring(xml_bytes.decode("utf-8"))
except Exception as exc:
logger.debug(f"_parse_explore_datablocks_xml: XML parse error {exc}")
return []

datablocks: list[dict[str, Any]] = []
for entity in root.findall('.//Entity[@Id="Block"]'):
header = entity.find("Header")
if header is None or header.get("Type") != "DB":
continue
name = header.get("Name", "")
try:
number = int(header.get("Number", "0"))
rid = int(entity.get("Rid", "0"))
except ValueError:
continue
if name and number > 0:
datablocks.append({"name": name, "number": number, "rid": rid})

logger.debug(f"_parse_explore_datablocks_xml: found {len(datablocks)} DB(s)")
return datablocks


def _parse_explore_datablocks(response: bytes) -> list[dict[str, Any]]:
"""Parse an EXPLORE(thePLCProgram) response to extract datablock info.

Expand Down Expand Up @@ -910,26 +1001,48 @@ def _parse_explore_fields(response: bytes, db_number: int, db_name: str) -> list
datatype = response[offset + 1]
offset += 2

if attr_id == Ids.OBJECT_VARIABLE_TYPE_NAME and datatype == 0x13:
if attr_id == Ids.OBJECT_VARIABLE_TYPE_NAME and datatype in (0x13, 0x15): # S7String / WSTRING
if offset >= len(response):
break
str_len, consumed = _vlq32(response, offset)
offset += consumed
if offset + str_len <= len(response):
raw_str = response[offset : offset + str_len]
try:
field_name = response[offset : offset + str_len].decode("utf-16-be", errors="replace")
# V3 PLCs send UTF-8; V1/V2 send UTF-16-BE.
# UTF-16-BE always contains null bytes for ASCII names;
# UTF-8 ASCII names never do — use that as the discriminator.
if b"\x00" in raw_str:
field_name = raw_str.decode("utf-16-be", errors="replace").rstrip("\x00")
else:
field_name = raw_str.decode("utf-8", errors="replace")
except Exception:
field_name = ""
offset += str_len
continue

# Skip attribute value
if flags & 0x10:
# Skip attribute value. V3 PLCs insert an extra 0x00 byte before
# the VLQ length of BLOB (0x14) attributes; WSTRING (0x15) skip
# must also advance past the string data bytes.
if flags & 0x10: # array
if offset >= len(response):
break
count, consumed = _vlq32(response, offset)
offset += consumed
offset += count

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offset += 1 for the extra 0x00 byte before BLOB VLQ length is V3-specific, but this code runs for all protocol versions. If a V1/V2 EXPLORE response contains a BLOB attribute, this will skip one byte too many and corrupt all subsequent parsing. Guard with a V3 check, or pass the protocol version into this function.

elif datatype == 0x14: # BLOB — V3 adds an extra 0x00 before VLQ length
if offset >= len(response):
break
offset += 1 # extra 0x00 byte present in V3 encoding
if offset >= len(response):
break
blob_len, consumed = _vlq32(response, offset)
offset += consumed + blob_len
elif datatype in (0x13, 0x15): # S7String / WSTRING not matched above
if offset >= len(response):
break
str_len, consumed = _vlq32(response, offset)
offset += consumed + str_len
else:
if offset >= len(response):
break
Expand Down
50 changes: 50 additions & 0 deletions s7/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,56 @@ def _send_legitimation_legacy(self, response: bytes) -> None:
raise S7ConnectionError(f"Legacy legitimation rejected by PLC: return_value={return_value}")
logger.debug(f"Legacy legitimation return_value={return_value}")

def collect_explore_frames(self, first_payload: bytes) -> bytes:
"""Collect multi-fragment EXPLORE continuation frames for V3 PLCs.

On V3 PLCs (FW >= V4.5) a large EXPLORE response (e.g. RID 0x8A11FFFF)
spans multiple TPKT frames. The first frame is the normal response
(already stripped of its 10-byte header by send_request). Continuation
frames carry **no** response header — they are raw BLOB data protected
only by a V3 HMAC prefix. The caller must concatenate them before
parsing.

Detection of the last fragment: a frame whose body (after HMAC strip)
is measurably shorter than the first frame body is the last fragment.
We use a 5-byte tolerance to absorb minor size jitter.

Args:
first_payload: First EXPLORE response payload, already returned by
send_request() (10-byte response header already stripped).

Returns:
All fragment payloads concatenated (first_payload + continuations).
"""
# The first frame body (already header-stripped) was originally
# len(first_payload) + 10 bytes on the wire (10-byte response header).
# Continuation frames of the same "full" size will be that long after
# HMAC strip; a shorter body signals the last fragment.
reference_size = len(first_payload) + 10
all_data = first_payload
while True:
try:
raw = self._recv_s7_data()
if not raw:
break
# Strip the 4-byte S7CommPlus fragment header (0x72 ver len:2)
if len(raw) < 4 or raw[0] != 0x72:
break
frag_len = (raw[2] << 8) | raw[3]
body = raw[4 : 4 + frag_len]
# V3 non-TLS: strip the HMAC prefix ([hash_len][hash_bytes])
if self._protocol_version >= ProtocolVersion.V3 and len(body) > 33:
hash_len = body[0]
body = body[1 + hash_len :]

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No size or fragment-count cap. A malformed V3 response could loop indefinitely and allocate unbounded memory. Add limits similar to _recv_reassembled_payload (_MAX_REASSEMBLED_FRAGMENTS / _MAX_REASSEMBLED_BYTES).

Also, the "body shorter than reference by >5 bytes" heuristic is fragile — if the PLC sends a legitimately shorter intermediate frame, collection stops early and silently truncates the response.

if not body:
break
all_data += body
if len(body) < reference_size - 5:
break # last fragment
except Exception:
break
return all_data

def disconnect(self) -> None:
"""Disconnect from PLC."""
if self._connected and self._session_id:
Expand Down