Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .coverage
Binary file not shown.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,4 @@ coverage.xml
data/data/
real_cve_*.json
terraform.tfvars
.coverage
10 changes: 5 additions & 5 deletions agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from design-time to runtime, supporting all languages.
"""

from agents.core.agent_framework import AgentFramework, AgentConfig
from agents.core.agent_framework import AgentConfig, AgentFramework
from agents.core.agent_orchestrator import AgentOrchestrator
from agents.design_time.code_repo_agent import CodeRepoAgent
from agents.runtime.container_agent import ContainerAgent
from agents.language.python_agent import PythonAgent
from agents.language.javascript_agent import JavaScriptAgent
from agents.language.java_agent import JavaAgent
from agents.language.go_agent import GoAgent
from agents.language.java_agent import JavaAgent
from agents.language.javascript_agent import JavaScriptAgent
from agents.language.python_agent import PythonAgent
from agents.runtime.container_agent import ContainerAgent

__all__ = [
"AgentFramework",
Expand Down
105 changes: 53 additions & 52 deletions agents/core/agent_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

class AgentType(Enum):
"""Agent type categories."""

DESIGN_TIME = "design_time" # Code repos, CI/CD, design tools
RUNTIME = "runtime" # Containers, cloud, APIs
LANGUAGE = "language" # Language-specific agents
Expand All @@ -28,7 +28,7 @@ class AgentType(Enum):

class AgentStatus(Enum):
"""Agent status."""

IDLE = "idle"
CONNECTING = "connecting"
MONITORING = "monitoring"
Expand All @@ -41,7 +41,7 @@ class AgentStatus(Enum):
@dataclass
class AgentConfig:
"""Agent configuration."""

agent_id: str
agent_type: AgentType
name: str
Expand All @@ -57,7 +57,7 @@ class AgentConfig:
@dataclass
class AgentData:
"""Data collected by agent."""

agent_id: str
timestamp: datetime
data_type: str # sarif, sbom, cve, design_context, runtime_metrics, etc.
Expand All @@ -67,7 +67,7 @@ class AgentData:

class BaseAgent(ABC):
"""Base class for all FixOps agents."""

def __init__(self, config: AgentConfig, fixops_api_url: str, fixops_api_key: str):
"""Initialize agent."""
self.config = config
Expand All @@ -80,77 +80,80 @@ def __init__(self, config: AgentConfig, fixops_api_url: str, fixops_api_key: str
self.collection_count = 0
self.push_count = 0
self._stop_requested = False

@abstractmethod
async def connect(self) -> bool:
"""Connect to target system."""
pass

@abstractmethod
async def disconnect(self):
"""Disconnect from target system."""
pass

@abstractmethod
async def collect_data(self) -> List[AgentData]:
"""Collect data from target system."""
pass

async def push_data(self, data: List[AgentData]) -> bool:
"""Push data to FixOps API."""
import aiohttp

try:
self.status = AgentStatus.PUSHING

async with aiohttp.ClientSession() as session:
for agent_data in data:
# Push to appropriate FixOps endpoint
endpoint = self._get_endpoint(agent_data.data_type)
url = f"{self.fixops_api_url}{endpoint}"

headers = {
"X-API-Key": self.fixops_api_key,
"Content-Type": "application/json",
}

payload = {
"agent_id": agent_data.agent_id,
"timestamp": agent_data.timestamp.isoformat(),
"data_type": agent_data.data_type,
"data": agent_data.data,
"metadata": agent_data.metadata,
}

async with session.post(url, json=payload, headers=headers) as response:

async with session.post(
url, json=payload, headers=headers
) as response:
if response.status not in [200, 201]:
error_text = await response.text()
logger.error(
f"Failed to push {agent_data.data_type} from {self.config.agent_id}: "
f"{response.status} - {error_text}"
)
return False

self.push_count += 1
self.last_push = datetime.now(timezone.utc)

logger.info(
f"Successfully pushed {len(data)} data items from {self.config.agent_id}"
)
return True

except Exception as e:
logger.error(f"Error pushing data from {self.config.agent_id}: {e}")
self.error_count += 1
return False

finally:
if not self._stop_requested:
self.status = AgentStatus.MONITORING

def request_stop(self):
"""Signal the agent to stop after the current iteration."""
self._stop_requested = True

def _get_endpoint(self, data_type: str) -> str:
"""Get FixOps API endpoint for data type."""
endpoints = {
Expand All @@ -165,53 +168,53 @@ def _get_endpoint(self, data_type: str) -> str:
"iac_scan": "/api/v1/ingest/iac-scan",
}
return endpoints.get(data_type, "/api/v1/ingest/data")

async def run(self):
"""Main agent loop."""
if not self.config.enabled:
logger.info(f"Agent {self.config.agent_id} is disabled")
return

try:
# Connect
self.status = AgentStatus.CONNECTING
if not await self.connect():
self.status = AgentStatus.ERROR
logger.error(f"Failed to connect agent {self.config.agent_id}")
return

self.status = AgentStatus.MONITORING

# Main monitoring loop
while not self._stop_requested and self.status != AgentStatus.DISCONNECTED:
while not self._stop_requested and self.status != AgentStatus.DISCONNECTED:
try:
# Collect data
self.status = AgentStatus.COLLECTING
data = await self.collect_data()
self.last_collection = datetime.now(timezone.utc)
self.collection_count += len(data)

if data:
# Push data
success = await self.push_data(data)
if not success:
self.error_count += 1
if self._stop_requested:
break
self.status = AgentStatus.MONITORING

if self._stop_requested:
break

self.status = AgentStatus.MONITORING

# Wait for next polling interval
await asyncio.sleep(self.config.polling_interval)
if self._stop_requested:
break
await asyncio.sleep(self.config.polling_interval)
if self._stop_requested:
break

except Exception as e:
logger.error(f"Error in agent {self.config.agent_id} loop: {e}")
self.error_count += 1
self.status = AgentStatus.ERROR

# Retry logic
if self.error_count < self.config.retry_count:
await asyncio.sleep(self.config.retry_delay)
Expand All @@ -221,15 +224,15 @@ async def run(self):
f"Agent {self.config.agent_id} exceeded retry count, stopping"
)
break

except Exception as e:
logger.error(f"Fatal error in agent {self.config.agent_id}: {e}")
self.status = AgentStatus.ERROR

finally:
await self.disconnect()
self.status = AgentStatus.DISCONNECTED

def get_status(self) -> Dict[str, Any]:
"""Get agent status."""
return {
Expand All @@ -241,9 +244,7 @@ def get_status(self) -> Dict[str, Any]:
"last_collection": (
self.last_collection.isoformat() if self.last_collection else None
),
"last_push": (
self.last_push.isoformat() if self.last_push else None
),
"last_push": (self.last_push.isoformat() if self.last_push else None),
"collection_count": self.collection_count,
"push_count": self.push_count,
"error_count": self.error_count,
Expand All @@ -252,41 +253,41 @@ def get_status(self) -> Dict[str, Any]:

class AgentFramework:
"""FixOps Agent Framework - Manages all agents."""

def __init__(self, fixops_api_url: str, fixops_api_key: str):
"""Initialize agent framework."""
self.fixops_api_url = fixops_api_url
self.fixops_api_key = fixops_api_key
self.agents: Dict[str, BaseAgent] = {}
self.running = False

def register_agent(self, agent: BaseAgent):
"""Register an agent."""
self.agents[agent.config.agent_id] = agent
logger.info(f"Registered agent: {agent.config.agent_id}")

async def start_all(self):
"""Start all enabled agents."""
self.running = True

tasks = []
for agent in self.agents.values():
if agent.config.enabled:
task = asyncio.create_task(agent.run())
tasks.append(task)

logger.info(f"Started {len(tasks)} agents")
await asyncio.gather(*tasks, return_exceptions=True)

async def stop_all(self):
"""Stop all agents."""
self.running = False

for agent in self.agents.values():
agent.request_stop()

logger.info("Stopped all agents")

def get_all_status(self) -> List[Dict[str, Any]]:
"""Get status of all agents."""
return [agent.get_status() for agent in self.agents.values()]
Loading
Loading