Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 91 additions & 6 deletions concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,98 @@
logging.getLogger('requests').setLevel(logging.WARNING)


# if windows, create script to kill this process
# because batch files don't provide easy way to know pid of last command
# ignored for posix != windows, because "concorepid" is handled by script
# ignored for docker (linux != windows), because handled by docker stop
# ===================================================================
# Windows PID Registry (Issue #391)
# ===================================================================
# Previous implementation wrote a single PID to concorekill.bat at
# import time, causing race conditions when multiple Python nodes
# started simultaneously (each overwrote the file). Only the last
# node's PID survived, and stale PIDs could kill unrelated processes.
#
# New approach:
# - Append each node's PID to concorekill_pids.txt (one per line)
# - Generate concorekill.bat that validates each PID before killing
# - Use atexit to remove the PID on graceful shutdown
# - Backward compatible: users still run concorekill.bat
# ===================================================================

_PID_REGISTRY_FILE = "concorekill_pids.txt"
_KILL_SCRIPT_FILE = "concorekill.bat"


def _register_pid():
"""Append the current process PID to the registry file."""
pid = os.getpid()
try:
with open(_PID_REGISTRY_FILE, "a") as f:
f.write(str(pid) + "\n")
except OSError:
pass # Non-fatal: best-effort registration


def _cleanup_pid():
"""Remove the current process PID from the registry on exit."""
pid = str(os.getpid())
try:
if not os.path.exists(_PID_REGISTRY_FILE):
return
with open(_PID_REGISTRY_FILE, "r") as f:
pids = [line.strip() for line in f if line.strip()]
remaining = [p for p in pids if p != pid]
if remaining:
with open(_PID_REGISTRY_FILE, "w") as f:
for p in remaining:
f.write(p + "\n")
else:
# No PIDs left — clean up both files
try:
os.remove(_PID_REGISTRY_FILE)
except OSError:
pass
try:
os.remove(_KILL_SCRIPT_FILE)
except OSError:
pass
except OSError:
pass # Non-fatal: best-effort cleanup


def _write_kill_script():
"""Generate concorekill.bat that validates PIDs before killing.

The script reads concorekill_pids.txt and for each PID:
1. Checks if the process still exists
2. Verifies it is a Python process
3. Only then issues taskkill
After killing, the registry file is deleted.
"""
try:
script = "@echo off\r\n"
script += "if not exist \"%~dp0" + _PID_REGISTRY_FILE + "\" (\r\n"
script += " echo No PID registry found. Nothing to kill.\r\n"
script += " exit /b 0\r\n"
script += ")\r\n"
script += "for /f \"tokens=*\" %%p in (%~dp0" + _PID_REGISTRY_FILE + ") do (\r\n"
script += " tasklist /FI \"PID eq %%p\" 2>nul | find /i \"python\" >nul\r\n"
script += " if not errorlevel 1 (\r\n"
script += " echo Killing Python process %%p\r\n"
script += " taskkill /F /PID %%p >nul 2>&1\r\n"
script += " ) else (\r\n"
script += " echo Skipping PID %%p - not a Python process or not running\r\n"
script += " )\r\n"
script += ")\r\n"
script += "del /q \"%~dp0" + _PID_REGISTRY_FILE + "\" 2>nul\r\n"
script += "del /q \"%~dp0" + _KILL_SCRIPT_FILE + "\" 2>nul\r\n"
with open(_KILL_SCRIPT_FILE, "w") as f:
f.write(script)
except OSError:
pass # Non-fatal: best-effort script generation


if hasattr(sys, 'getwindowsversion'):
with open("concorekill.bat","w") as fpid:
fpid.write("taskkill /F /PID "+str(os.getpid())+"\n")
_register_pid()
_write_kill_script()
atexit.register(_cleanup_pid)

ZeroMQPort = concore_base.ZeroMQPort
convert_numpy_to_python = concore_base.convert_numpy_to_python
Expand Down
134 changes: 134 additions & 0 deletions tests/test_concore.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,141 @@
import pytest
import os
import sys
import numpy as np


# ===================================================================
# PID Registry Tests (Issue #391)
# ===================================================================

class TestPidRegistry:
"""Tests for the Windows PID registry mechanism that replaces the
old single-overwrite concorekill.bat approach."""

@pytest.fixture(autouse=True)
def use_temp_dir(self, temp_dir, monkeypatch):
"""Run each test in an isolated temp directory."""
self.temp_dir = temp_dir
monkeypatch.chdir(temp_dir)

def test_register_pid_creates_registry_file(self):
"""_register_pid should create concorekill_pids.txt with current PID."""
from concore import _register_pid, _PID_REGISTRY_FILE
_register_pid()
assert os.path.exists(_PID_REGISTRY_FILE)
with open(_PID_REGISTRY_FILE) as f:
pids = [line.strip() for line in f if line.strip()]
assert str(os.getpid()) in pids

def test_register_pid_appends_not_overwrites(self):
"""Multiple calls to _register_pid should append, not overwrite."""
from concore import _register_pid, _PID_REGISTRY_FILE
# Simulate two different PIDs by writing manually then registering
with open(_PID_REGISTRY_FILE, "w") as f:
f.write("11111\n")
f.write("22222\n")
_register_pid()
with open(_PID_REGISTRY_FILE) as f:
pids = [line.strip() for line in f if line.strip()]
assert "11111" in pids
assert "22222" in pids
assert str(os.getpid()) in pids
assert len(pids) == 3

def test_cleanup_pid_removes_current_pid(self):
"""_cleanup_pid should remove only the current PID from the registry."""
from concore import _cleanup_pid, _PID_REGISTRY_FILE
current_pid = str(os.getpid())
with open(_PID_REGISTRY_FILE, "w") as f:
f.write("99999\n")
f.write(current_pid + "\n")
f.write("88888\n")
_cleanup_pid()
with open(_PID_REGISTRY_FILE) as f:
pids = [line.strip() for line in f if line.strip()]
assert current_pid not in pids
assert "99999" in pids
assert "88888" in pids

def test_cleanup_pid_deletes_files_when_last_pid(self):
"""When the current PID is the only one left, cleanup should
remove both the registry file and the kill script."""
from concore import _cleanup_pid, _PID_REGISTRY_FILE, _KILL_SCRIPT_FILE
current_pid = str(os.getpid())
with open(_PID_REGISTRY_FILE, "w") as f:
f.write(current_pid + "\n")
# Create a dummy kill script to verify it gets cleaned up
with open(_KILL_SCRIPT_FILE, "w") as f:
f.write("@echo off\n")
_cleanup_pid()
assert not os.path.exists(_PID_REGISTRY_FILE)
assert not os.path.exists(_KILL_SCRIPT_FILE)

def test_cleanup_pid_handles_missing_registry(self):
"""_cleanup_pid should not crash when registry file doesn't exist."""
from concore import _cleanup_pid, _PID_REGISTRY_FILE
assert not os.path.exists(_PID_REGISTRY_FILE)
_cleanup_pid() # Should not raise

def test_write_kill_script_generates_bat_file(self):
"""_write_kill_script should create concorekill.bat with validation logic."""
from concore import _write_kill_script, _KILL_SCRIPT_FILE, _PID_REGISTRY_FILE
_write_kill_script()
assert os.path.exists(_KILL_SCRIPT_FILE)
with open(_KILL_SCRIPT_FILE) as f:
content = f.read()
# Script should reference the PID registry file
assert _PID_REGISTRY_FILE in content
# Script should validate processes before killing
assert "tasklist" in content
assert "taskkill" in content
assert "python" in content.lower()

def test_multi_node_registration(self):
"""Simulate 3 nodes registering PIDs — all should be present."""
from concore import _register_pid, _PID_REGISTRY_FILE
fake_pids = ["1204", "1932", "8120"]
with open(_PID_REGISTRY_FILE, "w") as f:
for pid in fake_pids:
f.write(pid + "\n")
_register_pid() # Current process is the 4th
with open(_PID_REGISTRY_FILE) as f:
pids = [line.strip() for line in f if line.strip()]
for pid in fake_pids:
assert pid in pids
assert str(os.getpid()) in pids
assert len(pids) == 4

def test_cleanup_preserves_other_pids(self):
"""After cleanup, only the current process PID should be removed."""
from concore import _cleanup_pid, _PID_REGISTRY_FILE
current_pid = str(os.getpid())
other_pids = ["1111", "2222", "3333"]
with open(_PID_REGISTRY_FILE, "w") as f:
for pid in other_pids:
f.write(pid + "\n")
f.write(current_pid + "\n")
_cleanup_pid()
with open(_PID_REGISTRY_FILE) as f:
pids = [line.strip() for line in f if line.strip()]
assert len(pids) == 3
assert current_pid not in pids
for pid in other_pids:
assert pid in pids

@pytest.mark.skipif(not hasattr(sys, 'getwindowsversion'),
reason="Windows-only test")
def test_import_registers_pid_on_windows(self):
"""On Windows, importing concore should register the PID."""
from concore import _PID_REGISTRY_FILE
# The import already happened, so just verify the registry exists
# in our temp dir (we can't easily test the import-time side effect
# since concore was already imported — we test the functions directly)
from concore import _register_pid
_register_pid()
assert os.path.exists(_PID_REGISTRY_FILE)


class TestSafeLiteralEval:

def test_reads_dictionary_from_file(self, temp_dir):
Expand Down