Skip to content

Commit f630f73

Browse files
gijzelaerrclaude
andcommitted
Fix heartbeat race condition with RLock on _send_receive
The heartbeat thread holds _reconnect_lock while calling get_cpu_state(), which calls _send_receive(). Adding the lock to _send_receive() caused a deadlock with threading.Lock. Switch to RLock (reentrant) so the same thread can acquire the lock multiple times without deadlocking. This also protects _send_receive_parallel() with the lock to prevent conflicts between parallel dispatch and heartbeat probes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 95062aa commit f630f73

1 file changed

Lines changed: 52 additions & 47 deletions

File tree

snap7/client.py

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ def __init__(
167167
self._heartbeat_stop_event = threading.Event()
168168
self._is_alive = False
169169

170-
# Lock for thread safety during reconnection
171-
self._reconnect_lock = threading.Lock()
170+
# Lock for thread safety during reconnection and heartbeat
171+
self._reconnect_lock = threading.RLock()
172172

173173
logger.info("S7Client initialized (pure Python implementation)")
174174

@@ -194,6 +194,8 @@ def _send_receive(self, request: bytes, max_stale_retries: int = 3) -> dict[str,
194194
195195
Wraps the repeated send_data -> receive_data -> parse_response pattern
196196
with PDU reference validation and automatic retry on stale packets.
197+
Acquires ``_reconnect_lock`` to prevent conflicts with the heartbeat
198+
thread.
197199
198200
Args:
199201
request: Complete S7 PDU to send.
@@ -207,20 +209,22 @@ def _send_receive(self, request: bytes, max_stale_retries: int = 3) -> dict[str,
207209
S7ProtocolError: If all retries are exhausted or other protocol error.
208210
"""
209211
conn = self._get_connection()
210-
conn.send_data(request)
211212

212-
for attempt in range(max_stale_retries + 1):
213-
response_data = conn.receive_data()
214-
response = self.protocol.parse_response(response_data)
213+
with self._reconnect_lock:
214+
conn.send_data(request)
215215

216-
try:
217-
self.protocol.validate_pdu_reference(response["sequence"])
218-
return response
219-
except S7StalePacketError:
220-
if attempt < max_stale_retries:
221-
logger.warning(f"Stale packet (attempt {attempt + 1}/{max_stale_retries}), retrying receive")
222-
continue
223-
raise S7ProtocolError(f"Max stale packet retries ({max_stale_retries}) exceeded")
216+
for attempt in range(max_stale_retries + 1):
217+
response_data = conn.receive_data()
218+
response = self.protocol.parse_response(response_data)
219+
220+
try:
221+
self.protocol.validate_pdu_reference(response["sequence"])
222+
return response
223+
except S7StalePacketError:
224+
if attempt < max_stale_retries:
225+
logger.warning(f"Stale packet (attempt {attempt + 1}/{max_stale_retries}), retrying receive")
226+
continue
227+
raise S7ProtocolError(f"Max stale packet retries ({max_stale_retries}) exceeded")
224228

225229
raise S7ProtocolError("Failed to receive valid response") # Should not reach here
226230

@@ -798,39 +802,40 @@ def _send_receive_parallel(
798802
"""
799803
conn = self._get_connection()
800804

801-
# Build seq_num → packet_index lookup
802-
pending: dict[int, int] = {}
803-
for packet_index, pdu in requests:
804-
seq = struct.unpack(">H", pdu[4:6])[0]
805-
pending[seq] = packet_index
806-
807-
# Send all requests back-to-back
808-
for _, pdu in requests:
809-
conn.send_data(pdu)
810-
811-
# Receive responses, matching by sequence number
812-
results: dict[int, dict[str, Any]] = {}
813-
remaining = len(requests)
814-
deadline = time.monotonic() + conn.timeout
815-
816-
while remaining > 0:
817-
wait_time = deadline - time.monotonic()
818-
if wait_time <= 0:
819-
raise S7TimeoutError(f"Timeout waiting for {remaining} parallel response(s)")
820-
821-
if not conn.data_available(timeout=wait_time):
822-
raise S7TimeoutError(f"Timeout waiting for {remaining} parallel response(s)")
823-
824-
response_data = conn.receive_data()
825-
response = self.protocol.parse_response(response_data)
826-
resp_seq = response["sequence"]
827-
828-
if resp_seq in pending:
829-
packet_index = pending.pop(resp_seq)
830-
results[packet_index] = response
831-
remaining -= 1
832-
else:
833-
logger.warning(f"Discarding unexpected response with sequence {resp_seq}")
805+
with self._reconnect_lock:
806+
# Build seq_num → packet_index lookup
807+
pending: dict[int, int] = {}
808+
for packet_index, pdu in requests:
809+
seq = struct.unpack(">H", pdu[4:6])[0]
810+
pending[seq] = packet_index
811+
812+
# Send all requests back-to-back
813+
for _, pdu in requests:
814+
conn.send_data(pdu)
815+
816+
# Receive responses, matching by sequence number
817+
results: dict[int, dict[str, Any]] = {}
818+
remaining = len(requests)
819+
deadline = time.monotonic() + conn.timeout
820+
821+
while remaining > 0:
822+
wait_time = deadline - time.monotonic()
823+
if wait_time <= 0:
824+
raise S7TimeoutError(f"Timeout waiting for {remaining} parallel response(s)")
825+
826+
if not conn.data_available(timeout=wait_time):
827+
raise S7TimeoutError(f"Timeout waiting for {remaining} parallel response(s)")
828+
829+
response_data = conn.receive_data()
830+
response = self.protocol.parse_response(response_data)
831+
resp_seq = response["sequence"]
832+
833+
if resp_seq in pending:
834+
packet_index = pending.pop(resp_seq)
835+
results[packet_index] = response
836+
remaining -= 1
837+
else:
838+
logger.warning(f"Discarding unexpected response with sequence {resp_seq}")
834839

835840
return results
836841

0 commit comments

Comments
 (0)