diff --git a/examples/browse_tags.py b/examples/browse_tags.py new file mode 100644 index 00000000..f6173954 --- /dev/null +++ b/examples/browse_tags.py @@ -0,0 +1,320 @@ +""" +browse_tags.py — Read symbolic I/Q/M tags from an S7-1200 FW V4.5 PLC. + +Technique: EXPLORE + decompression with a partial preset dictionary +(Adler-32 0xce9b821b, 594 of 32768 bytes reconstructed via oracle analysis). +The same FDICT is used for all three areas (I, Q, M) — confirmed on +independent Wireshark pcapng captures. + +Results on S7-1200 CPU 1212C DC/DC/DC, FW V4.5 (40-tag project): + I area (RID=80): 13/13 complete + Q area (RID=81): 11/11 complete + M area (RID=82): 9/15 — 6 structural gaps (see below) +Score vs TIA Portal export: 33/40 correct, 6 gap, 0 wrong. + +Prerequisites: + - python-snap7 S7CommPlus branch with Patches 1, 5, 6 applied + (SequenceNumber field, _collect_explore_frames, session key) + - No password, no TLS (adjust connect() call if needed) + +Usage: + python browse_tags.py # all areas + python browse_tags.py I # I area only + python browse_tags.py Q M # Q and M + +Structural limit — M area Byte/Word tags: + The EXPLORE blob uses an identical deflate sequence for %MB and %MW + addresses. It is not possible to distinguish them without external + information (e.g. a TIA Portal export). The 6 affected tags show + LogicalAddress='?' but always have correct ByteOffset values. + +Tested on: + Siemens S7-1200 CPU 1212C DC/DC/DC, firmware V4.5, IP 192.168.5.11 +""" + +import re +import sys +import zlib +from unittest import mock + +PLC_HOST = '192.168.5.11' +PLC_PORT = 102 + +AREAS = {'I': 80, 'Q': 81, 'M': 82} + + +# --------------------------------------------------------------------------- +# Partial preset dictionary (594 of 32768 positions mapped, Adler-32 0xce9b821b) +# Reconstructed via oracle technique: inflate the same blob 4 times with +# DICT_ZERO, DICT_FF, DICT_A (i%256), DICT_B (i>>8). Identical bytes are +# literals; differing bytes reveal FDICT position (B_out<<8)|A_out. +# --------------------------------------------------------------------------- + +def _build_fdict() -> bytes: + d = bytearray(32768) + + def s(p, t): + for i, c in enumerate(t.encode('latin-1')): + d[p + i] = c + + # Root / element structure + s(0x7ff1, 'ControllerTags>') + s(0x7cc3, '" />') + d[0x7cc7] = ord('<') + s(0x7cc8, 'Tags') + d[0x7ccc] = ord(' ') + s(0x7ccd, 'HmiVisible="') + s(0x7c0d, '/> bytes | None: + """Connect to PLC, send EXPLORE for the given RID, decompress the blob.""" + try: + from s7._s7commplus_client import _build_explore_payload_v3 + from s7.connection import S7CommPlusConnection + from s7.protocol import FunctionCode + except ImportError: + print('Error: python-snap7 not found. pip install python-snap7') + sys.exit(1) + + with mock.patch.object(S7CommPlusConnection, '_post_auth_legitimation', + return_value=None): + conn = S7CommPlusConnection(host=PLC_HOST, port=PLC_PORT) + conn.connect(use_tls=False, password='', timeout=5.0) + try: + resp = conn.send_request(FunctionCode.EXPLORE, + _build_explore_payload_v3(rid)) + full = conn._collect_explore_frames(resp) + finally: + try: + conn.disconnect() + except Exception: + pass + + p = full.find(b'\x78\x7d') + if p < 0: + return None + try: + return zlib.decompressobj(wbits=-15, zdict=fdict).decompress(full[p + 6:]) + except zlib.error: + return None + + +# --------------------------------------------------------------------------- +# Tag extraction +# --------------------------------------------------------------------------- + +def _normalize_name(raw: str) -> str: + m = re.match(r'^(Tag_)0+([0-9]+)$', raw) + if m: + return m.group(1) + m.group(2) + return raw + + +def _extract_tags(data: bytes, area_prefix: str = '') -> list[dict]: + """Extract tags from the decompressed XML blob. + + Unknown FDICT positions produce null bytes, shown as '?' after decoding. + Extraction anchors on always-literal ID values, which are never stored + in the preset dictionary and are therefore always visible in the output. + + Byte-type fallback for I/Q areas: + Bool -> garbled %I43.X in blob -> reconstruct %{A}{bo}.{bit} + Word -> %IW/%QW visible in blob -> append ByteOffset + Byte -> only remaining type -> reconstruct %{A}B{ByteOffset} + """ + text = data.replace(b'\x00', b'?').decode('latin-1') + tags: list[dict] = [] + seen_id: set[str] = set() + seen_name: set[str] = set() + _synthetic = 0 + + for m in re.finditer(r'ID="([0-9?]{1,6})[?"]', text): + raw_id = m.group(1) + leading = re.match(r'^([0-9]+)', raw_id) + if leading: + tag_id = leading.group(1) + if tag_id in seen_id: + continue + seen_id.add(tag_id) + else: + tag_id = f'?{_synthetic}' + _synthetic += 1 + + pos = m.start() + anchors = [a.start() for a in re.finditer(r'> Area {area} (RID={rid}) ... ', end='', flush=True) + + data = _fetch_area(rid, fdict) + if data is None: + print('ERROR: compressed blob not found or decompression failed') + continue + + tags = _extract_tags(data, area_prefix='%' + area) + total += len(tags) + print(f'{len(tags)} tags found') + + if tags: + print(f' {"Name":<22} {"Type":<8} {"Address":<18} {"Offset":<8} ID') + print(f' {"-"*22} {"-"*8} {"-"*18} {"-"*8} --') + for t in tags: + name = t.get('Name', '?') + dtype = t.get('DataType', '?') + addr = t.get('LogicalAddress', '?') + offset = t.get('ByteOffset', '?') + tid = t['ID'] + print(f' {name:<22} {dtype:<8} {addr:<18} {offset:<8} {tid}') + print() + + print(f'Total: {total} tags') + print() + print('Note: 6 M-area tags (Byte/Word type) show LogicalAddress="?" —') + print('structural limit of the S7CommPlus EXPLORE protocol (see module docstring).') + + +if __name__ == '__main__': + main()