diff --git a/add-static-analysis-workflow.py b/add-static-analysis-workflow.py index ab79328..2dec236 100644 --- a/add-static-analysis-workflow.py +++ b/add-static-analysis-workflow.py @@ -13,7 +13,6 @@ - Documents which repositories were updated vs. skipped in tracking logs """ -import os import sys import subprocess import yaml @@ -221,40 +220,62 @@ def write_workflow_file(self, content: str) -> bool: logger.error(f"Error writing workflow file: {e}") return False - def add_workflow(self) -> dict: - """Add or update the static analysis workflow. + def _create_result_dict(self, success=False, action=None, reason=None, default_branch=None) -> dict: + """Create a standardized result dictionary. + Args: + success: Whether the operation was successful + action: Action taken (created/updated/skipped) + reason: Reason for the action or failure + default_branch: Default branch name + Returns: Dictionary with result information """ - result = { + return { "repository": str(self.repo_path), "timestamp": datetime.now().isoformat(), - "success": False, - "action": None, - "reason": None, - "default_branch": None + "success": success, + "action": action, + "reason": reason, + "default_branch": default_branch } + + def _prepare_workflow_content(self, result: dict) -> Optional[str]: + """Prepare and validate workflow content. - # Detect default branch + Args: + result: Result dictionary to update on failure + + Returns: + Workflow content if successful, None otherwise + """ default_branch = self.detect_default_branch() if not default_branch: result["reason"] = "Could not detect default branch" logger.error(result["reason"]) - return result + return None result["default_branch"] = default_branch - - # Generate workflow content workflow_content = self.generate_workflow_content(default_branch) - # Validate YAML if not self.validate_yaml(workflow_content): result["reason"] = "Generated workflow has invalid YAML syntax" logger.error(result["reason"]) - return result + return None + + return workflow_content + + def _write_workflow_if_needed(self, workflow_content: str, result: dict) -> bool: + """Write workflow file if update is needed. - # Check if update is needed + Args: + workflow_content: Content to write + result: Result dictionary to update + + Returns: + True if successful, False otherwise + """ should_update, reason = self.should_update_workflow(workflow_content) result["reason"] = reason @@ -262,24 +283,36 @@ def add_workflow(self) -> dict: result["success"] = True result["action"] = "skipped" logger.info(f"Skipping: {reason}") - return result + return True - # Create directory if needed if not self.create_workflows_directory(): result["reason"] = "Failed to create workflows directory" logger.error(result["reason"]) - return result + return False - # Write workflow file if not self.write_workflow_file(workflow_content): result["reason"] = "Failed to write workflow file" logger.error(result["reason"]) - return result + return False result["success"] = True result["action"] = "updated" if self.workflow_file.exists() else "created" logger.info(f"Successfully {result['action']} workflow file") + return True + + def add_workflow(self) -> dict: + """Add or update the static analysis workflow. + + Returns: + Dictionary with result information + """ + result = self._create_result_dict() + + workflow_content = self._prepare_workflow_content(result) + if not workflow_content: + return result + self._write_workflow_if_needed(workflow_content, result) return result def save_tracking_log(self, result: dict, log_file: Optional[str] = None): @@ -314,10 +347,13 @@ def save_tracking_log(self, result: dict, log_file: Optional[str] = None): logger.error(f"Error saving tracking log: {e}") -def main(): - """Main entry point.""" - import argparse +def _create_argument_parser(): + """Create and configure argument parser. + Returns: + Configured ArgumentParser instance + """ + import argparse parser = argparse.ArgumentParser( description="Add standardized static analysis workflow to repositories" ) @@ -336,35 +372,43 @@ def main(): action="store_true", help="Show what would be done without making changes" ) + return parser + + +def _run_dry_run(manager): + """Execute dry run mode. - args = parser.parse_args() - - # Initialize manager - manager = StaticAnalysisWorkflowManager(args.repo_path) + Args: + manager: StaticAnalysisWorkflowManager instance + + Returns: + Exit code (0 for success) + """ + logger.info("DRY RUN MODE - No changes will be made") + default_branch = manager.detect_default_branch() - if args.dry_run: - logger.info("DRY RUN MODE - No changes will be made") - default_branch = manager.detect_default_branch() - if default_branch: - workflow_content = manager.generate_workflow_content(default_branch) - print("\nGenerated workflow content:") - print("-" * 80) - print(workflow_content) - print("-" * 80) - should_update, reason = manager.should_update_workflow(workflow_content) - print(f"\nAction needed: {'Yes' if should_update else 'No'}") - print(f"Reason: {reason}") - else: - print("ERROR: Could not detect default branch") + if not default_branch: + print("ERROR: Could not detect default branch") return 0 - # Add workflow - result = manager.add_workflow() + workflow_content = manager.generate_workflow_content(default_branch) + print("\nGenerated workflow content:") + print("-" * 80) + print(workflow_content) + print("-" * 80) - # Save tracking log - manager.save_tracking_log(result, args.log_file) + should_update, reason = manager.should_update_workflow(workflow_content) + print(f"\nAction needed: {'Yes' if should_update else 'No'}") + print(f"Reason: {reason}") + return 0 + + +def _print_summary(result): + """Print operation summary. - # Print summary + Args: + result: Result dictionary from add_workflow + """ print("\n" + "=" * 80) print("SUMMARY") print("=" * 80) @@ -374,6 +418,21 @@ def main(): print(f"Success: {result['success']}") print(f"Reason: {result['reason']}") print("=" * 80) + + +def main(): + """Main entry point.""" + parser = _create_argument_parser() + args = parser.parse_args() + + manager = StaticAnalysisWorkflowManager(args.repo_path) + + if args.dry_run: + return _run_dry_run(manager) + + result = manager.add_workflow() + manager.save_tracking_log(result, args.log_file) + _print_summary(result) return 0 if result['success'] else 1 diff --git a/batch-add-workflows.py b/batch-add-workflows.py index a43c2d5..2f8b8d5 100644 --- a/batch-add-workflows.py +++ b/batch-add-workflows.py @@ -7,13 +7,12 @@ report of all operations. """ -import os import sys import json import argparse import subprocess from pathlib import Path -from typing import List, Dict +from typing import List, Dict, Tuple, Optional from datetime import datetime import logging from concurrent.futures import ThreadPoolExecutor, as_completed @@ -42,50 +41,60 @@ def __init__(self, repos: List[str], parallel: bool = False, max_workers: int = self.max_workers = max_workers self.results = [] - def process_repository(self, repo_path: Path) -> Dict: - """Process a single repository. + def _create_result(self, repo_path: Path, success: bool = False, error: str = None) -> Dict: + """Create a standardized result dictionary. Args: repo_path: Path to the repository + success: Whether the operation was successful + error: Error message if any Returns: Dictionary with result information """ - logger.info(f"Processing repository: {repo_path}") - - result = { + return { "repository": str(repo_path), "timestamp": datetime.now().isoformat(), - "success": False, - "error": None + "success": success, + "error": error } + + def _validate_repository(self, repo_path: Path) -> Tuple[bool, Optional[str]]: + """Validate repository path and structure. - try: - # Check if path exists and is a directory - if not repo_path.exists(): - result["error"] = "Repository path does not exist" - logger.error(f"{repo_path}: {result['error']}") - return result - - if not repo_path.is_dir(): - result["error"] = "Repository path is not a directory" - logger.error(f"{repo_path}: {result['error']}") - return result - - # Check if it's a git repository - git_dir = repo_path / ".git" - if not git_dir.exists(): - result["error"] = "Not a git repository" - logger.error(f"{repo_path}: {result['error']}") - return result + Args: + repo_path: Path to validate - # Run the workflow script - cmd = [ - sys.executable, - str(Path(__file__).parent / "add-static-analysis-workflow.py"), - str(repo_path) - ] + Returns: + Tuple of (is_valid, error_message) + """ + if not repo_path.exists(): + return False, "Repository path does not exist" + + if not repo_path.is_dir(): + return False, "Repository path is not a directory" + + if not (repo_path / ".git").exists(): + return False, "Not a git repository" + + return True, None + + def _run_workflow_script(self, repo_path: Path) -> Tuple[bool, Optional[str]]: + """Run the workflow script on a repository. + + Args: + repo_path: Path to the repository + Returns: + Tuple of (success, error_message) + """ + cmd = [ + sys.executable, + str(Path(__file__).parent / "add-static-analysis-workflow.py"), + str(repo_path) + ] + + try: process_result = subprocess.run( cmd, capture_output=True, @@ -94,21 +103,41 @@ def process_repository(self, repo_path: Path) -> Dict: ) if process_result.returncode != 0: - result["error"] = f"Script failed: {process_result.stderr}" - logger.error(f"{repo_path}: {result['error']}") - return result + return False, f"Script failed: {process_result.stderr}" - result["success"] = True - logger.info(f"Successfully processed: {repo_path}") + return True, None except subprocess.TimeoutExpired: - result["error"] = "Script execution timed out" - logger.error(f"{repo_path}: {result['error']}") + return False, "Script execution timed out" except Exception as e: - result["error"] = f"Unexpected error: {str(e)}" - logger.error(f"{repo_path}: {result['error']}") + return False, f"Unexpected error: {str(e)}" + + def process_repository(self, repo_path: Path) -> Dict: + """Process a single repository. - return result + Args: + repo_path: Path to the repository + + Returns: + Dictionary with result information + """ + logger.info(f"Processing repository: {repo_path}") + + # Validate repository + is_valid, error = self._validate_repository(repo_path) + if not is_valid: + logger.error(f"{repo_path}: {error}") + return self._create_result(repo_path, success=False, error=error) + + # Run workflow script + success, error = self._run_workflow_script(repo_path) + + if success: + logger.info(f"Successfully processed: {repo_path}") + else: + logger.error(f"{repo_path}: {error}") + + return self._create_result(repo_path, success=success, error=error) def process_all(self) -> List[Dict]: """Process all repositories. @@ -161,6 +190,53 @@ def _process_parallel(self) -> List[Dict]: return results + def _get_action_from_log(self, repo_path: Path) -> str: + """Get the action from a repository's tracking log. + + Args: + repo_path: Path to the repository + + Returns: + Action string or None if not found + """ + log_file = repo_path / "tracking-log.json" + if not log_file.exists(): + return None + + try: + with open(log_file, 'r') as f: + logs = json.load(f) + + if not logs: + return None + + return logs[-1].get("action") + except Exception: + return None + + def _count_actions(self, results: List[Dict]) -> Dict[str, int]: + """Count actions taken across all repositories. + + Args: + results: List of result dictionaries + + Returns: + Dictionary of action counts + """ + actions = {"created": 0, "updated": 0, "skipped": 0} + + for result in results: + if not result["success"]: + continue + + repo_path = Path(result["repository"]) + action = self._get_action_from_log(repo_path) + + if action in actions: + actions[action] += 1 + + return actions + def generate_summary_report(self, results: List[Dict]) -> Dict: """Generate a summary report of all operations. @@ -173,36 +249,15 @@ def generate_summary_report(self, results: List[Dict]) -> Dict: total = len(results) successful = sum(1 for r in results if r["success"]) failed = total - successful + actions = self._count_actions(results) - # Load individual tracking logs to get detailed actions - actions = {"created": 0, "updated": 0, "skipped": 0} - - for result in results: - if result["success"]: - repo_path = Path(result["repository"]) - log_file = repo_path / "tracking-log.json" - if log_file.exists(): - try: - with open(log_file, 'r') as f: - logs = json.load(f) - if logs: - # Get the last entry for this repo - last_log = logs[-1] - action = last_log.get("action") - if action in actions: - actions[action] += 1 - except Exception: - pass - - summary = { + return { "total_repositories": total, "successful": successful, "failed": failed, "actions": actions, "timestamp": datetime.now().isoformat() } - - return summary def save_batch_report(self, results: List[Dict], output_file: str): """Save batch processing report to file. @@ -271,8 +326,12 @@ def read_repo_list(file_path: str) -> List[str]: return repos -def main(): - """Main entry point.""" +def _create_argument_parser(): + """Create and configure argument parser. + + Returns: + Configured ArgumentParser instance + """ parser = argparse.ArgumentParser( description="Batch add static analysis workflows to multiple repositories" ) @@ -305,31 +364,41 @@ def main(): help="Output file for batch report (default: batch-report.json)" ) - args = parser.parse_args() + return parser + + +def _get_repository_list(args): + """Get list of repositories from arguments. - # Get repository list + Args: + args: Parsed command-line arguments + + Returns: + List of repository paths + """ if args.repos: - repos = args.repos - else: - repos = read_repo_list(args.repo_file) + return args.repos + return read_repo_list(args.repo_file) + + +def main(): + """Main entry point.""" + parser = _create_argument_parser() + args = parser.parse_args() + repos = _get_repository_list(args) if not repos: logger.error("No repositories specified") return 1 logger.info(f"Processing {len(repos)} repositories") - # Process repositories manager = BatchWorkflowManager(repos, args.parallel, args.max_workers) results = manager.process_all() - # Save report manager.save_batch_report(results, args.output) - - # Print summary manager.print_summary(results) - # Return non-zero if any failed failed = sum(1 for r in results if not r["success"]) return 1 if failed > 0 else 0 diff --git a/integration_test.py b/integration_test.py index 5919e40..a9d386e 100644 --- a/integration_test.py +++ b/integration_test.py @@ -6,24 +6,10 @@ verifies the automation handles them correctly. """ -import os import sys import tempfile -import shutil -import subprocess from pathlib import Path - - -def run_command(cmd, cwd=None): - """Run a command and return result.""" - result = subprocess.run( - cmd, - cwd=cwd, - capture_output=True, - text=True, - shell=isinstance(cmd, str) - ) - return result.returncode, result.stdout, result.stderr +from test_utils import run_command def create_test_repo(path, branch_name="main"): diff --git a/test_utils.py b/test_utils.py new file mode 100644 index 0000000..ed7b6fc --- /dev/null +++ b/test_utils.py @@ -0,0 +1,26 @@ +""" +Shared utilities for test scripts. +""" + +import subprocess +from typing import Tuple + + +def run_command(cmd, cwd=None) -> Tuple[int, str, str]: + """Run a command and return the result. + + Args: + cmd: Command to run (string or list) + cwd: Working directory for the command + + Returns: + Tuple of (returncode, stdout, stderr) + """ + result = subprocess.run( + cmd, + cwd=cwd, + capture_output=True, + text=True, + shell=isinstance(cmd, str) + ) + return result.returncode, result.stdout, result.stderr diff --git a/test_workflow_implementation.py b/test_workflow_implementation.py index 507b70e..1ca83c4 100644 --- a/test_workflow_implementation.py +++ b/test_workflow_implementation.py @@ -3,23 +3,9 @@ Test script to verify the static analysis workflow implementation. """ -import os import sys -import subprocess -import tempfile -import shutil from pathlib import Path - -def run_command(cmd, cwd=None): - """Run a command and return the result.""" - result = subprocess.run( - cmd, - cwd=cwd, - capture_output=True, - text=True, - shell=True - ) - return result.returncode, result.stdout, result.stderr +from test_utils import run_command def test_script_exists(): """Test that the script exists."""