diff --git a/concore_base.py b/concore_base.py index f6b3e1b..476a6ed 100644 --- a/concore_base.py +++ b/concore_base.py @@ -45,8 +45,7 @@ def send_json_with_retry(self, message): except zmq.Again: logger.warning(f"Send timeout (attempt {attempt + 1}/5)") time.sleep(0.5) - logger.error("Failed to send after retries.") - return + raise TimeoutError(f"ZMQ send failed after 5 retries on {self.address}") def recv_json_with_retry(self): """Receive JSON message with retries if timeout occurs.""" @@ -56,8 +55,7 @@ def recv_json_with_retry(self): except zmq.Again: logger.warning(f"Receive timeout (attempt {attempt + 1}/5)") time.sleep(0.5) - logger.error("Failed to receive after retries.") - return None + raise TimeoutError(f"ZMQ recv failed after 5 retries on {self.address}") def init_zmq_port(mod, port_name, port_type, address, socket_type_str): @@ -221,6 +219,9 @@ def read(mod, port_identifier, name, initstr_val): mod.simtime = max(mod.simtime, first_element) return message[1:] return message + except TimeoutError as e: + logger.error(f"ZMQ recv timeout on port {port_identifier} (name: {name}): {e}. Returning default.") + return default_return_val except zmq.error.ZMQError as e: logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.") return default_return_val @@ -304,6 +305,8 @@ def write(mod, port_identifier, name, val, delta=0): # Mutation breaks cross-language determinism (see issue #385). else: zmq_p.send_json_with_retry(zmq_val) + except TimeoutError as e: + logger.error(f"ZMQ send timeout on port {port_identifier} (name: {name}): {e}") except zmq.error.ZMQError as e: logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}") except Exception as e: diff --git a/tests/test_concore.py b/tests/test_concore.py index fd89729..d4bae02 100644 --- a/tests/test_concore.py +++ b/tests/test_concore.py @@ -419,3 +419,91 @@ def test_write_timestamp_matches_cpp_semantics(self, temp_dir): "After 3 writes with delta=1 simtime must remain 0 " "(matching C++/MATLAB/Verilog); got %s" % concore.simtime ) + + +# =================================================================== +# ZMQ Retry Exhaustion Tests (Issue #393) +# =================================================================== + +class TestZMQRetryExhaustion: + """Tests for issue #393 — TimeoutError on retry exhaustion.""" + + @pytest.fixture(autouse=True) + def reset_zmq_ports(self): + import concore + original_ports = concore.zmq_ports.copy() + yield + concore.zmq_ports.clear() + concore.zmq_ports.update(original_ports) + + @pytest.fixture(autouse=True) + def reset_simtime(self): + import concore + old_simtime = concore.simtime + yield + concore.simtime = old_simtime + + def test_recv_json_with_retry_raises_timeout_error(self): + """recv_json_with_retry must raise TimeoutError after 5 failed attempts.""" + from concore import ZeroMQPort + from unittest.mock import MagicMock, patch + import zmq + + with patch.object(ZeroMQPort, '__init__', lambda self, *a, **kw: None): + port = ZeroMQPort.__new__(ZeroMQPort) + port.socket = MagicMock() + port.socket.recv_json.side_effect = zmq.Again() + port.address = "tcp://test:5555" + + with pytest.raises(TimeoutError, match="ZMQ recv failed after 5 retries"): + port.recv_json_with_retry() + + assert port.socket.recv_json.call_count == 5 + + def test_send_json_with_retry_raises_timeout_error(self): + """send_json_with_retry must raise TimeoutError after 5 failed attempts.""" + from concore import ZeroMQPort + from unittest.mock import MagicMock, patch + import zmq + + with patch.object(ZeroMQPort, '__init__', lambda self, *a, **kw: None): + port = ZeroMQPort.__new__(ZeroMQPort) + port.socket = MagicMock() + port.socket.send_json.side_effect = zmq.Again() + port.address = "tcp://test:5555" + + with pytest.raises(TimeoutError, match="ZMQ send failed after 5 retries"): + port.send_json_with_retry({"test": "data"}) + + assert port.socket.send_json.call_count == 5 + + def test_read_returns_default_on_zmq_timeout(self): + """read() must return default_return_val when recv exhausts retries, not None.""" + import concore + + class MockZMQPort: + def recv_json_with_retry(self): + raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555") + + concore.zmq_ports["test_timeout_port"] = MockZMQPort() + concore.simtime = 0 + + result = concore.read("test_timeout_port", "test_name", "[1.0, 2.0]") + + assert result == [1.0, 2.0], ( + "read() must return default_return_val on TimeoutError, got %s" % result + ) + + def test_write_does_not_crash_on_zmq_send_timeout(self): + """write() must handle TimeoutError from send gracefully.""" + import concore + + class MockZMQPort: + def send_json_with_retry(self, message): + raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555") + + concore.zmq_ports["test_timeout_port"] = MockZMQPort() + concore.simtime = 0 + + # Should not raise — just log the error + concore.write("test_timeout_port", "test_name", [1.0, 2.0]) diff --git a/tests/test_concoredocker.py b/tests/test_concoredocker.py index c374391..3592171 100644 --- a/tests/test_concoredocker.py +++ b/tests/test_concoredocker.py @@ -235,3 +235,57 @@ def recv_json_with_retry(self): result = concoredocker.read("roundtrip", "data", "[]") assert result == original + + +# =================================================================== +# ZMQ Retry Exhaustion Tests (Issue #393) +# =================================================================== + +class TestZMQRetryExhaustion: + """Tests for issue #393 — TimeoutError on retry exhaustion via concoredocker.""" + + @pytest.fixture(autouse=True) + def reset_zmq_ports(self): + import concoredocker + original_ports = concoredocker.zmq_ports.copy() + yield + concoredocker.zmq_ports.clear() + concoredocker.zmq_ports.update(original_ports) + + @pytest.fixture(autouse=True) + def reset_simtime(self): + import concoredocker + old_simtime = concoredocker.simtime + yield + concoredocker.simtime = old_simtime + + def test_read_returns_default_on_zmq_timeout(self): + """read() must return default_return_val when recv exhausts retries, not None.""" + import concoredocker + + class MockZMQPort: + def recv_json_with_retry(self): + raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555") + + concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort() + concoredocker.simtime = 0 + + result = concoredocker.read("test_timeout_port", "test_name", "[1.0, 2.0]") + + assert result == [1.0, 2.0], ( + "read() must return default_return_val on TimeoutError, got %s" % result + ) + + def test_write_does_not_crash_on_zmq_send_timeout(self): + """write() must handle TimeoutError from send gracefully.""" + import concoredocker + + class MockZMQPort: + def send_json_with_retry(self, message): + raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555") + + concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort() + concoredocker.simtime = 0 + + # Should not raise — just log the error + concoredocker.write("test_timeout_port", "test_name", [1.0, 2.0])