diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 9b3e066..4f420b2 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -1,4 +1,4 @@ -name: Build and Test +name: Test on: push: @@ -10,36 +10,22 @@ jobs: test: runs-on: ${{ matrix.os }} strategy: + fail-fast: false # This ensures all matrix jobs run even if one fails matrix: - os: [ubuntu-latest, windows-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] - exclude: - - os: windows-latest - python-version: "3.8" # Windows doesn't support Python 3.8 - - os: windows-latest - python-version: "3.9" # Windows doesn't support Python 3.9 + os: [ubuntu-latest, windows-latest, macos-latest] + python-version: ['3.8', '3.9', '3.10', '3.11', '3.12'] steps: - - uses: actions/checkout@v4 - + - uses: actions/checkout@v3 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - - - name: Install build dependencies + - name: Install dependencies run: | python -m pip install --upgrade pip - pip install setuptools wheel build - - - name: Install package and test dependencies - run: | - pip install -e ".[test]" - + pip install pytest pytest-cov + pip install -e . - name: Run tests run: | - pytest tests/ - - - name: Build package - run: | - python setup.py sdist bdist_wheel \ No newline at end of file + pytest --cov=jbang tests/ -o log_cli_level=DEBUG -o log_cli=true \ No newline at end of file diff --git a/.gitignore b/.gitignore index 4948bdb..446fe4d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ dist jbang.egg-info jbang/__pycache__ __pycache__ +.coverage diff --git a/.justfile b/.justfile index b3145bf..18a95eb 100644 --- a/.justfile +++ b/.justfile @@ -1,13 +1,12 @@ -default: - echo 'Hello, world!' - test: - source venv/bin/activate - pip3 install -e . - pip install -e ".[test]" - python -m pytest + source .venv/bin/activate + #pip3 install -e . + uv pip install -e ".[test]" + + echo Running tests with no jbang in PATH + PATH=$(echo $PATH | tr ':' '\n' | grep -v "\.jbang/bin" | tr '\n' ':' | sed 's/:$//') .venv/bin/python -m pytest -o log_cli_level=DEBUG -o log_cli=true release: - source venv/bin/activate - pip3 install setuptools + source .venv/bin/activate + uv pip install setuptools gh release create `python3 setup.py --version` --generate-notes diff --git a/jbang/__init__.py b/jbang/__init__.py index 98302bb..b08f380 100644 --- a/jbang/__init__.py +++ b/jbang/__init__.py @@ -1,310 +1,6 @@ -import subprocess -import os -import platform -import logging -import sys -import signal -from typing import Optional, Dict, Any +from .jbang import exec, spawnSync, main, quote -log = logging.getLogger(__name__) - -class JbangExecutionError(Exception): - """Custom exception to capture Jbang execution errors with exit code.""" - def __init__(self, message, exit_code): - super().__init__(message) - self.exit_code = exit_code - -def _get_jbang_path() -> Optional[str]: - """Get the path to jbang executable.""" - for cmd in ['jbang', './jbang.cmd' if platform.system() == 'Windows' else None, './jbang']: - if cmd: - result = subprocess.run(f"which {cmd}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - if result.returncode == 0: - return cmd - return None - -def _get_installer_command() -> Optional[str]: - """Get the appropriate installer command based on available tools.""" - if subprocess.run("which curl", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).returncode == 0 and \ - subprocess.run("which bash", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).returncode == 0: - return "curl -Ls https://sh.jbang.dev | bash -s -" - elif subprocess.run("which powershell", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).returncode == 0: - return 'iex "& { $(iwr -useb https://ps.jbang.dev) } $args"' - return None - -def _setup_subprocess_args(capture_output: bool = False) -> Dict[str, Any]: - """Setup subprocess arguments with proper terminal interaction.""" - args = { - "shell": False, - "universal_newlines": True, - "start_new_session": False, # Changed to False to ensure proper signal propagation - "preexec_fn": os.setpgrp if platform.system() != "Windows" else None # Create new process group - } - - if capture_output: - args.update({ - "stdout": subprocess.PIPE, - "stderr": subprocess.PIPE, - "stdin": subprocess.PIPE - }) - else: - # Try to connect to actual terminal if available - try: - if hasattr(sys.stdin, 'fileno'): - args["stdin"] = sys.stdin - except (IOError, OSError): - args["stdin"] = subprocess.PIPE - - try: - if hasattr(sys.stdout, 'fileno'): - args["stdout"] = sys.stdout - except (IOError, OSError): - args["stdout"] = subprocess.PIPE - - try: - if hasattr(sys.stderr, 'fileno'): - args["stderr"] = sys.stderr - except (IOError, OSError): - args["stderr"] = subprocess.PIPE - - return args - -def _handle_signal(signum, frame): - """Handle signals and propagate them to child processes.""" - if hasattr(frame, 'f_globals') and 'process' in frame.f_globals: - process = frame.f_globals['process'] - if process and process.poll() is None: # Process is still running - if platform.system() == "Windows": - process.terminate() - else: - # Send signal to the entire process group - os.killpg(os.getpgid(process.pid), signum) - process.wait() - sys.exit(0) - -def _exec_library(*args: str, capture_output: bool = False) -> Any: - """Execute jbang command for library usage.""" - arg_line = " ".join(args) - jbang_path = _get_jbang_path() - installer_cmd = _get_installer_command() - - if not jbang_path and not installer_cmd: - raise JbangExecutionError( - f"Unable to pre-install jbang: {arg_line}. Please install jbang manually.", - 1 - ) - - subprocess_args = { - "shell": False, - "universal_newlines": True, - "stdout": subprocess.PIPE, - "stderr": subprocess.PIPE, - "stdin": subprocess.PIPE - } - - try: - if jbang_path: - process = subprocess.Popen( - [jbang_path] + list(args), - **subprocess_args - ) - else: - if "curl" in installer_cmd: - process = subprocess.Popen( - f"{installer_cmd} {arg_line}", - shell=True, - **{k: v for k, v in subprocess_args.items() if k != "shell"} - ) - else: - # PowerShell case - temp_script = os.path.join(os.environ.get('TEMP', '/tmp'), 'jbang.ps1') - with open(temp_script, 'w') as f: - f.write(installer_cmd) - process = subprocess.Popen( - ["powershell", "-Command", f"{temp_script} {arg_line}"], - **subprocess_args - ) - - stdout, stderr = process.communicate() - - if process.returncode != 0: - raise JbangExecutionError( - f"Command failed with code {process.returncode}: {arg_line}", - process.returncode - ) - - result = type('CommandResult', (), { - 'returncode': process.returncode, - 'stdout': stdout, - 'stderr': stderr - }) - - if not capture_output: - if stdout: - print(stdout, end='', flush=True) - if stderr: - print(stderr, end='', flush=True, file=sys.stderr) - - return result - - except Exception as e: - if isinstance(e, JbangExecutionError): - raise - raise JbangExecutionError(str(e), 1) - -def _exec_cli(*args: str, capture_output: bool = False) -> Any: - """Execute jbang command for CLI usage.""" - arg_line = " ".join(args) - jbang_path = _get_jbang_path() - installer_cmd = _get_installer_command() - - if not jbang_path and not installer_cmd: - raise JbangExecutionError( - f"Unable to pre-install jbang: {arg_line}. Please install jbang manually.", - 1 - ) - - subprocess_args = _setup_subprocess_args(capture_output) - - try: - if jbang_path: - process = subprocess.Popen( - [jbang_path] + list(args), - **subprocess_args - ) - else: - if "curl" in installer_cmd: - process = subprocess.Popen( - f"{installer_cmd} {arg_line}", - shell=True, - **{k: v for k, v in subprocess_args.items() if k != "shell"} - ) - else: - # PowerShell case - temp_script = os.path.join(os.environ.get('TEMP', '/tmp'), 'jbang.ps1') - with open(temp_script, 'w') as f: - f.write(installer_cmd) - process = subprocess.Popen( - ["powershell", "-Command", f"{temp_script} {arg_line}"], - **subprocess_args - ) - - # Store process in globals for signal handler - globals()['process'] = process - - try: - process.wait() - if process.returncode != 0: - raise JbangExecutionError( - f"Command failed with code {process.returncode}: {arg_line}", - process.returncode - ) - return type('CommandResult', (), {'returncode': process.returncode}) - except KeyboardInterrupt: - if platform.system() == "Windows": - process.terminate() - else: - os.killpg(os.getpgid(process.pid), signal.SIGINT) - process.wait() - raise - except Exception as e: - if isinstance(e, JbangExecutionError): - raise - raise JbangExecutionError(str(e), 1) - finally: - # Clean up globals - if 'process' in globals(): - del globals()['process'] - -def exec(*args: str, capture_output: bool = False) -> Any: - """Execute jbang command for library usage.""" - arg_line = " ".join(args) - jbang_path = _get_jbang_path() - installer_cmd = _get_installer_command() - - if not jbang_path and not installer_cmd: - raise JbangExecutionError( - f"Unable to pre-install jbang: {arg_line}. Please install jbang manually.", - 1 - ) - - subprocess_args = { - "shell": False, - "universal_newlines": True, - "stdout": subprocess.PIPE, - "stderr": subprocess.PIPE, - "stdin": subprocess.PIPE - } - - try: - if jbang_path: - process = subprocess.Popen( - [jbang_path] + list(args), - **subprocess_args - ) - else: - if "curl" in installer_cmd: - process = subprocess.Popen( - f"{installer_cmd} {arg_line}", - shell=True, - **{k: v for k, v in subprocess_args.items() if k != "shell"} - ) - else: - # PowerShell case - temp_script = os.path.join(os.environ.get('TEMP', '/tmp'), 'jbang.ps1') - with open(temp_script, 'w') as f: - f.write(installer_cmd) - process = subprocess.Popen( - ["powershell", "-Command", f"{temp_script} {arg_line}"], - **subprocess_args - ) - - stdout, stderr = process.communicate() - - if process.returncode != 0: - raise JbangExecutionError( - f"Command failed with code {process.returncode}: {arg_line}", - process.returncode - ) - - result = type('CommandResult', (), { - 'returncode': process.returncode, - 'stdout': stdout, - 'stderr': stderr - }) - - if not capture_output: - if stdout: - print(stdout, end='', flush=True) - if stderr: - print(stderr, end='', flush=True, file=sys.stderr) - - return result - - except Exception as e: - if isinstance(e, JbangExecutionError): - raise - raise JbangExecutionError(str(e), 1) - -def main(): - """Command-line entry point for jbang-python.""" - - # Register signal handlers - signal.signal(signal.SIGINT, _handle_signal) - signal.signal(signal.SIGTERM, _handle_signal) - signal.signal(signal.SIGHUP, _handle_signal) - signal.signal(signal.SIGQUIT, _handle_signal) - - try: - result = _exec_cli(*sys.argv[1:], capture_output=False) - sys.exit(result.returncode) - except KeyboardInterrupt: - sys.exit(0) - except JbangExecutionError as e: - sys.exit(e.exit_code) - except Exception as e: - print(f"Error: {str(e)}", file=sys.stderr) - sys.exit(1) +__all__ = ['exec', 'spawnSync', 'main', 'quote'] if __name__ == "__main__": main() \ No newline at end of file diff --git a/jbang/jbang.py b/jbang/jbang.py new file mode 100644 index 0000000..06d6702 --- /dev/null +++ b/jbang/jbang.py @@ -0,0 +1,153 @@ +import logging +import os +import platform +import shutil +import subprocess +import sys +from typing import Any, List, Optional, Union + +# Configure logging based on environment variable +debug_enabled = 'jbang' in os.environ.get('DEBUG', '') +if debug_enabled: + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(levelname)s %(name)s: %(message)s' + ) + +log = logging.getLogger(__name__) + +## used shell quote before but it is +## not working for Windows so ported from jbang + +def escapeCmdArgument(arg: str) -> str: + cmdSafeChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789.,_+=:;@()-\\" + if not all(c in cmdSafeChars for c in arg): + # Windows quoting is just weird + arg = ''.join('^' + c if c in '()!^<>&|% ' else c for c in arg) + arg = arg.replace('"', '\\"') + arg = '^"' + arg + '^"' + return arg + +def escapeBashArgument(arg: str) -> str: + shellSafeChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789._+=:@%/-" + if not all(c in shellSafeChars for c in arg): + arg = arg.replace("'", "'\\''") + arg = "'" + arg + "'" + return arg + +def quote(xs): + if platform.system() == 'Windows': + return ' '.join(escapeCmdArgument(s) for s in xs) + return ' '.join(escapeBashArgument(s) for s in xs) + + +def _getCommandLine(args: Union[str, List[str]]) -> Optional[str]: + """Get the jbang command line with arguments, using no-install option if needed.""" + log.debug("Searching for jbang executable...") + + # If args is a string, parse it into a list + if isinstance(args, str): + log.debug("args is a string, use as is") + argLine = args; + else: # else it is already a list and we need to quote each argument before joining them + log.debug("args is a list, quoting each argument") + argLine = quote(args) + + log.debug(f"argLine: {argLine}") + # Try different possible jbang locations + path = None + for cmd in ['./jbang.cmd' if platform.system() == 'Windows' else None, + 'jbang', + os.path.join(os.path.expanduser('~'), '.jbang', 'bin', 'jbang.cmd') if platform.system() == 'Windows' else None, + os.path.join(os.path.expanduser('~'), '.jbang', 'bin', 'jbang')]: + if cmd: + if shutil.which(cmd): + path = cmd + break + + if path: + log.debug(f"found existing jbang installation at: {path}") + return " ".join([path, argLine]) + + # Try no-install options + if shutil.which('curl') and shutil.which('bash'): + log.debug("running jbang using curl and bash") + return " ".join(["curl -Ls https://sh.jbang.dev | bash -s -", argLine]) + elif shutil.which('powershell'): + log.debug("running jbang using PowerShell") + return 'powershell -Command iex "& { $(iwr -useb https://ps.jbang.dev) } $argLine"' + else: + log.debug("no jbang installation found") + return None + +def exec(args: Union[str, List[str]]) -> Any: + log.debug(f"try to execute async command: {args} of type {type(args)}") + + cmdLine = _getCommandLine(args) + + if cmdLine: + log.debug("executing command: '%s'", cmdLine); + + result = subprocess.run( + cmdLine, + shell=True, + capture_output=True, + text=True, + check=False + ) + result = type('CommandResult', (), { + 'stdout': result.stdout, + 'stderr': result.stderr, + 'exitCode': result.returncode + }) + log.debug(f"result: {result.__dict__}") + return result + else: + print("Could not locate a way to run jbang. Try install jbang manually and try again.") + raise Exception( + "Could not locate a way to run jbang. Try install jbang manually and try again.", + 2 + ) + +def spawnSync(args: Union[str, List[str]]) -> Any: + log.debug(f"try to execute sync command: {args}") + + cmdLine = _getCommandLine(args) + + if cmdLine: + log.debug("spawning sync command: '%s'", cmdLine); + result = subprocess.run( + cmdLine, + shell=True, + stdin=sys.stdin, + stdout=sys.stdout, + stderr=sys.stderr, + check=False + ) + tuple = type('CommandResult', (), { + 'stdout': result.stdout, + 'stderr': result.stderr, + 'exitCode': result.returncode + }) + log.debug(f"result: {tuple.__dict__}") + return tuple + else: + print("Could not locate a way to run jbang. Try install jbang manually and try again.") + raise Exception( + "Could not locate a way to run jbang. Try install jbang manually and try again.", + 2 + ) + +def main(): + """Command-line entry point for jbang-python.""" + log.debug("Starting jbang-python CLI") + + try: + result = spawnSync(sys.argv[1:]) + sys.exit(result.exitCode) + except KeyboardInterrupt: + log.debug("Keyboard interrupt") + sys.exit(130) + except Exception as e: + log.error(f"Unexpected error: {str(e)}", exc_info=True) + sys.exit(1) \ No newline at end of file diff --git a/setup.py b/setup.py index 12a2ad1..0d549b6 100755 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = (this_directory / "README.md").read_text() setup(name='jbang', - version='0.5.6', + version='0.5.7', description='Python for JBang - Java Script in your Python', long_description=long_description, long_description_content_type='text/markdown', diff --git a/tests/test_jbang.py b/tests/test_jbang.py index 47e212b..043aba8 100644 --- a/tests/test_jbang.py +++ b/tests/test_jbang.py @@ -1,12 +1,16 @@ +import sys + import pytest + import jbang + def test_version_command(): """Test version command.""" print("\nTesting version command...") try: - out = jbang.exec('--version',capture_output=True) - assert out.returncode == 0 + out = jbang.exec('--version') + assert out.exitCode == 0 print("✓ Version command works") except Exception as e: pytest.fail(f"✗ Version command failed: {e}") @@ -15,7 +19,8 @@ def test_catalog_script(): """Test catalog script execution.""" print("\nTesting catalog script...") try: - jbang.exec('properties@jbangdev') + out = jbang.exec('properties@jbangdev') + assert out.exitCode == 0 print("✓ Catalog script works") except Exception as e: pytest.fail(f"✗ Catalog script failed: {e}") @@ -23,6 +28,66 @@ def test_catalog_script(): def test_error_handling(): """Test error handling.""" print("\nTesting error handling...") - with pytest.raises(Exception): - jbang.exec('nonexistent-script-name') - print("✓ Error handling works") \ No newline at end of file + out = jbang.exec('nonexistent-script-name') + assert out.exitCode == 2 + print("✓ Error handling works") + +def test_multiple_argument_as_string(): + """Test multiple arguments as string.""" + print("\nTesting multiple arguments...") + out = jbang.exec('-Dx="funky string" properties@jbangdev') + assert out.exitCode == 0 + assert 'funky string' in out.stdout + +def test_multiple_argument_as_list(): + """Test multiple arguments as list.""" + print("\nTesting multiple arguments...") + out = jbang.exec(['-Dx=funky list', 'properties@jbangdev']) + assert out.exitCode == 0 + assert 'funky list' in out.stdout + +def test_java_version_specification(): + """Test Java version specification.""" + print("\nTesting Java version specification...") + out = jbang.exec(['--java', '8+', 'properties@jbangdev', 'java.version']) + assert out.exitCode == 0 + assert any(char.isdigit() for char in out.stdout), "Expected version number in output" + +def test_invalid_java_version(): + """Test invalid Java version handling.""" + print("\nTesting invalid Java version handling...") + out = jbang.exec('--java invalid properties@jbangdev java.version') + assert 'Invalid version' in out.stderr + +@pytest.mark.skipif(sys.platform == 'win32', reason="Quote tests behave differently on Windows") +class TestQuoting: + def test_quote_empty_string(self): + """Test quoting empty string.""" + assert jbang.quote(['']) == "" + + def test_quote_simple_string(self): + """Test quoting simple string without special chars.""" + assert jbang.quote(['hello']) == 'hello' + + def test_quote_string_with_spaces(self): + """Test quoting string containing spaces.""" + assert jbang.quote(['hello world']) == "'hello world'" + + def test_quote_string_with_double_quotes(self): + """Test quoting string containing double quotes.""" + assert jbang.quote(['hello "world"']) == "'hello \"world\"'" + + def test_quote_string_with_single_quotes(self): + """Test quoting string containing single quotes.""" + assert jbang.quote(["hello'world"]) == "'hello'\\''world'" + + def test_quote_string_with_special_chars(self): + """Test quoting string containing special characters.""" + assert jbang.quote(['hello$world']) == "'hello$world'" + assert jbang.quote(['hello!world']) == "'hello!world'" + assert jbang.quote(['hello#world']) == "'hello#world'" + + def test_quote_multiple_strings(self): + """Test quoting multiple strings.""" + assert jbang.quote(['hello world']) == "'hello world'" + assert jbang.quote(["hello 'big world'"]) == "'hello '\\''big world'\\'''"