Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 105 additions & 46 deletions add-static-analysis-workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
- Documents which repositories were updated vs. skipped in tracking logs
"""

import os
import sys
import subprocess
import yaml
Expand Down Expand Up @@ -221,65 +220,99 @@ def write_workflow_file(self, content: str) -> bool:
logger.error(f"Error writing workflow file: {e}")
return False

def add_workflow(self) -> dict:
"""Add or update the static analysis workflow.
def _create_result_dict(self, success=False, action=None, reason=None, default_branch=None) -> dict:
"""Create a standardized result dictionary.

Args:
success: Whether the operation was successful
action: Action taken (created/updated/skipped)
reason: Reason for the action or failure
default_branch: Default branch name

Returns:
Dictionary with result information
"""
result = {
return {
"repository": str(self.repo_path),
"timestamp": datetime.now().isoformat(),
"success": False,
"action": None,
"reason": None,
"default_branch": None
"success": success,
"action": action,
"reason": reason,
"default_branch": default_branch
}

def _prepare_workflow_content(self, result: dict) -> Optional[str]:
"""Prepare and validate workflow content.

# Detect default branch
Args:
result: Result dictionary to update on failure

Returns:
Workflow content if successful, None otherwise
"""
default_branch = self.detect_default_branch()
if not default_branch:
result["reason"] = "Could not detect default branch"
logger.error(result["reason"])
return result
return None

result["default_branch"] = default_branch

# Generate workflow content
workflow_content = self.generate_workflow_content(default_branch)

# Validate YAML
if not self.validate_yaml(workflow_content):
result["reason"] = "Generated workflow has invalid YAML syntax"
logger.error(result["reason"])
return result
return None

return workflow_content

def _write_workflow_if_needed(self, workflow_content: str, result: dict) -> bool:
"""Write workflow file if update is needed.

# Check if update is needed
Args:
workflow_content: Content to write
result: Result dictionary to update

Returns:
True if successful, False otherwise
"""
should_update, reason = self.should_update_workflow(workflow_content)
result["reason"] = reason

if not should_update:
result["success"] = True
result["action"] = "skipped"
logger.info(f"Skipping: {reason}")
return result
return True

# Create directory if needed
if not self.create_workflows_directory():
result["reason"] = "Failed to create workflows directory"
logger.error(result["reason"])
return result
return False

# Write workflow file
if not self.write_workflow_file(workflow_content):
result["reason"] = "Failed to write workflow file"
logger.error(result["reason"])
return result
return False

result["success"] = True
result["action"] = "updated" if self.workflow_file.exists() else "created"
logger.info(f"Successfully {result['action']} workflow file")
return True

def add_workflow(self) -> dict:
"""Add or update the static analysis workflow.

Returns:
Dictionary with result information
"""
result = self._create_result_dict()

workflow_content = self._prepare_workflow_content(result)
if not workflow_content:
return result

self._write_workflow_if_needed(workflow_content, result)
return result

def save_tracking_log(self, result: dict, log_file: Optional[str] = None):
Expand Down Expand Up @@ -314,10 +347,13 @@ def save_tracking_log(self, result: dict, log_file: Optional[str] = None):
logger.error(f"Error saving tracking log: {e}")


def main():
"""Main entry point."""
import argparse
def _create_argument_parser():
"""Create and configure argument parser.

Returns:
Configured ArgumentParser instance
"""
import argparse
parser = argparse.ArgumentParser(
description="Add standardized static analysis workflow to repositories"
)
Expand All @@ -336,35 +372,43 @@ def main():
action="store_true",
help="Show what would be done without making changes"
)
return parser


def _run_dry_run(manager):
"""Execute dry run mode.

args = parser.parse_args()

# Initialize manager
manager = StaticAnalysisWorkflowManager(args.repo_path)
Args:
manager: StaticAnalysisWorkflowManager instance

Returns:
Exit code (0 for success)
"""
logger.info("DRY RUN MODE - No changes will be made")
default_branch = manager.detect_default_branch()

if args.dry_run:
logger.info("DRY RUN MODE - No changes will be made")
default_branch = manager.detect_default_branch()
if default_branch:
workflow_content = manager.generate_workflow_content(default_branch)
print("\nGenerated workflow content:")
print("-" * 80)
print(workflow_content)
print("-" * 80)
should_update, reason = manager.should_update_workflow(workflow_content)
print(f"\nAction needed: {'Yes' if should_update else 'No'}")
print(f"Reason: {reason}")
else:
print("ERROR: Could not detect default branch")
if not default_branch:
print("ERROR: Could not detect default branch")
return 0

# Add workflow
result = manager.add_workflow()
workflow_content = manager.generate_workflow_content(default_branch)
print("\nGenerated workflow content:")
print("-" * 80)
print(workflow_content)
print("-" * 80)

# Save tracking log
manager.save_tracking_log(result, args.log_file)
should_update, reason = manager.should_update_workflow(workflow_content)
print(f"\nAction needed: {'Yes' if should_update else 'No'}")
print(f"Reason: {reason}")
return 0


def _print_summary(result):
"""Print operation summary.

# Print summary
Args:
result: Result dictionary from add_workflow
"""
print("\n" + "=" * 80)
print("SUMMARY")
print("=" * 80)
Expand All @@ -374,6 +418,21 @@ def main():
print(f"Success: {result['success']}")
print(f"Reason: {result['reason']}")
print("=" * 80)


def main():
"""Main entry point."""
parser = _create_argument_parser()
args = parser.parse_args()

manager = StaticAnalysisWorkflowManager(args.repo_path)

if args.dry_run:
return _run_dry_run(manager)

result = manager.add_workflow()
manager.save_tracking_log(result, args.log_file)
_print_summary(result)

return 0 if result['success'] else 1

Expand Down
Loading