From 5bcbb7b6e49aa3202fae72cebbf3478dd146fe3d Mon Sep 17 00:00:00 2001 From: fortishield <161459699+FortiShield@users.noreply.github.com> Date: Thu, 16 Apr 2026 12:23:21 +0600 Subject: [PATCH 1/6] Delete .gitignore --- .gitignore | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .gitignore diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 3c3629e..0000000 --- a/.gitignore +++ /dev/null @@ -1 +0,0 @@ -node_modules From 6dacdd4f8f3a6a2722a9cde914904aa3ec2ef117 Mon Sep 17 00:00:00 2001 From: fortishield <161459699+FortiShield@users.noreply.github.com> Date: Thu, 16 Apr 2026 12:24:55 +0600 Subject: [PATCH 2/6] Add files via upload --- README.md | 71 ++++++ agents/example_agent.py | 48 ++++ agents/types/base_agent.py | 30 +++ agents/types/specialized.py | 45 ++++ infra/ansible/gpu-node.yml | 54 +++++ infra/k8s/crds/aijob.yaml | 78 +++++++ infra/k8s/gpu/device-plugin.yaml | 39 ++++ infra/k8s/ray/head.yaml | 46 ++++ infra/k8s/ray/worker.yaml | 34 +++ infra/k8s/scaling/keda.yaml | 20 ++ infra/terraform/cloud/main.tf | 24 ++ platform/ai/critic.py | 44 ++++ platform/ai/workflow_generator.py | 65 ++++++ platform/api/auth.py | 20 ++ platform/api/db.py | 70 ++++++ platform/api/main.py | 117 ++++++++++ platform/api/monitor.py | 45 ++++ platform/api/requirements.txt | 4 + platform/api/ws.py | 63 ++++++ platform/audit/observer.py | 45 ++++ platform/governance/policy_engine.py | 61 +++++ platform/governance/quotas.py | 43 ++++ platform/intelligence/adaptive_weights.py | 39 ++++ platform/memory/vector_store.py | 53 +++++ platform/operator/controller.py | 84 +++++++ platform/scheduler/autoscaler.py | 69 ++++++ platform/scheduler/cost_engine.py | 28 +++ platform/scheduler/heartbeat_monitor.py | 37 +++ platform/scheduler/lifecycle.py | 70 ++++++ platform/scheduler/locks.py | 50 ++++ platform/scheduler/resource_manager.py | 76 +++++++ platform/scheduler/scheduler.py | 96 ++++++++ platform/scheduler/state.py | 64 ++++++ runtime/dsl/dag_manager.py | 31 +++ runtime/dsl/engine.py | 77 +++++++ runtime/dsl/ray_adapter.py | 36 +++ runtime/ray/worker_task.py | 51 +++++ ui/index.html | 13 ++ ui/package.json | 30 +++ ui/postcss.config.js | 6 + ui/src/App.tsx | 11 + ui/src/components/AdaptivePanel.tsx | 50 ++++ ui/src/components/CostTimeline.tsx | 59 +++++ ui/src/components/DecisionTimeline.tsx | 72 ++++++ ui/src/components/GPUHeatmap.tsx | 52 +++++ ui/src/components/GridDashboard.tsx | 264 ++++++++++++++++++++++ ui/src/components/JobGantt.tsx | 54 +++++ ui/src/components/MemoryGraph.tsx | 62 +++++ ui/src/components/NodeDrillDown.tsx | 85 +++++++ ui/src/components/PolicyGuardStatus.tsx | 58 +++++ ui/src/components/UserQuotas.tsx | 66 ++++++ ui/src/index.css | 37 +++ ui/src/main.tsx | 10 + ui/tailwind.config.js | 22 ++ 54 files changed, 2878 insertions(+) create mode 100644 README.md create mode 100644 agents/example_agent.py create mode 100644 agents/types/base_agent.py create mode 100644 agents/types/specialized.py create mode 100644 infra/ansible/gpu-node.yml create mode 100644 infra/k8s/crds/aijob.yaml create mode 100644 infra/k8s/gpu/device-plugin.yaml create mode 100644 infra/k8s/ray/head.yaml create mode 100644 infra/k8s/ray/worker.yaml create mode 100644 infra/k8s/scaling/keda.yaml create mode 100644 infra/terraform/cloud/main.tf create mode 100644 platform/ai/critic.py create mode 100644 platform/ai/workflow_generator.py create mode 100644 platform/api/auth.py create mode 100644 platform/api/db.py create mode 100644 platform/api/main.py create mode 100644 platform/api/monitor.py create mode 100644 platform/api/requirements.txt create mode 100644 platform/api/ws.py create mode 100644 platform/audit/observer.py create mode 100644 platform/governance/policy_engine.py create mode 100644 platform/governance/quotas.py create mode 100644 platform/intelligence/adaptive_weights.py create mode 100644 platform/memory/vector_store.py create mode 100644 platform/operator/controller.py create mode 100644 platform/scheduler/autoscaler.py create mode 100644 platform/scheduler/cost_engine.py create mode 100644 platform/scheduler/heartbeat_monitor.py create mode 100644 platform/scheduler/lifecycle.py create mode 100644 platform/scheduler/locks.py create mode 100644 platform/scheduler/resource_manager.py create mode 100644 platform/scheduler/scheduler.py create mode 100644 platform/scheduler/state.py create mode 100644 runtime/dsl/dag_manager.py create mode 100644 runtime/dsl/engine.py create mode 100644 runtime/dsl/ray_adapter.py create mode 100644 runtime/ray/worker_task.py create mode 100644 ui/index.html create mode 100644 ui/package.json create mode 100644 ui/postcss.config.js create mode 100644 ui/src/App.tsx create mode 100644 ui/src/components/AdaptivePanel.tsx create mode 100644 ui/src/components/CostTimeline.tsx create mode 100644 ui/src/components/DecisionTimeline.tsx create mode 100644 ui/src/components/GPUHeatmap.tsx create mode 100644 ui/src/components/GridDashboard.tsx create mode 100644 ui/src/components/JobGantt.tsx create mode 100644 ui/src/components/MemoryGraph.tsx create mode 100644 ui/src/components/NodeDrillDown.tsx create mode 100644 ui/src/components/PolicyGuardStatus.tsx create mode 100644 ui/src/components/UserQuotas.tsx create mode 100644 ui/src/index.css create mode 100644 ui/src/main.tsx create mode 100644 ui/tailwind.config.js diff --git a/README.md b/README.md new file mode 100644 index 0000000..a7568dd --- /dev/null +++ b/README.md @@ -0,0 +1,71 @@ +# 🚀 KhulnaSoft AI Compute Grid + +A production-grade deployment pack for a hybrid GPU cluster (local + cloud) tailored for AI-native platforms and distributed pentesting pipelines. + +## 🌟 Features + +- **Hybrid GPU Scaling**: Routes jobs between local and cloud clusters based on availability and priority. +- **Ray-Based Compute**: Leverages [Ray](https://ray.io) for seamless distributed Python execution. +- **K8s & GPU Ready**: Includes manifests for NVIDIA Device Plugin and GPU-aware worker deployments. +- **Secure by Default**: Example security contexts and isolated runtime configurations. +- **Smart Control Plane**: FastAPI-based API with Redis job tracking and status monitoring. + +## 🏗️ Repository Structure + +```bash +infra/ + terraform/cloud/ # AWS/Cloud GPU provisioning + ansible/ # Node bootstrapping (Docker, NVIDIA drivers) + k8s/ # K8s manifests (GPU plugin, Ray cluster) +platform/ + api/ # FastAPI job submission API + scheduler/ # Smart routing logic +runtime/ + ray/ # AI task definitions +agents/ # Example client implementations +``` + +## 🚀 Quick Start + +### 1. Provision Cloud Infrastructure +```bash +cd infra/terraform/cloud +terraform init +terraform apply +``` + +### 2. Bootstrap Nodes (Local or Cloud) +```bash +cd infra/ansible +ansible-playbook -i inventory.ini gpu-node.yml +``` + +### 3. Deploy Kubernetes Cluster +```bash +kubectl apply -f infra/k8s/gpu/device-plugin.yaml +kubectl apply -f infra/k8s/ray/head.yaml +kubectl apply -f infra/k8s/ray/worker.yaml +``` + +### 4. Run the Platform +```bash +# Start API (Port 8000) +uvicorn platform.api.main:app --host 0.0.0.0 + +# Start Scheduler +python platform/scheduler/scheduler.py +``` + +### 5. Submit a Job +```bash +python agents/example_agent.py +``` + +## 🔐 Security & Observability + +- **Sandboxing**: Pods are configured with restricted security contexts. Advanced isolation via gVisor is recommended for multi-tenant environments. +- **Monitoring**: Integration with Prometheus & Grafana is supported via Ray's native exporters. + +## 🤝 Contributing + +This is a production blueprint. Please adapt configurations (AMIs, regions, instance types) to your specific requirements before final deployment. diff --git a/agents/example_agent.py b/agents/example_agent.py new file mode 100644 index 0000000..8c57e0e --- /dev/null +++ b/agents/example_agent.py @@ -0,0 +1,48 @@ +import requests +import time +import json +import logging + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("AIAgent") + +API_URL = "http://localhost:8000" + +def submit_and_wait(task_type, params): + payload = { + "task_type": task_type, + "params": params, + "priority": 1 + } + + try: + # Submit job + response = requests.post(f"{API_URL}/submit", json=payload) + response.raise_for_status() + job_id = response.json()["job_id"] + logger.info(f"Submitted {task_type} job. ID: {job_id}") + + # Poll for status + while True: + status_resp = requests.get(f"{API_URL}/job/{job_id}") + status_resp.raise_for_status() + job_data = status_resp.json() + + status = job_data.get("status") + logger.info(f"Job {job_id} status: {status}") + + if "running" in status or "completed" in status: + return job_data + + time.sleep(2) + + except Exception as e: + logger.error(f"Error communicating with Grid API: {str(e)}") + return None + +if __name__ == "__main__": + logger.info("Starting AI Agent example...") + result = submit_and_wait("CVE-Scanner", {"target": "khulnasoft.com", "depth": "advanced"}) + if result: + print(f"Final Job State: {json.dumps(result, indent=2)}") diff --git a/agents/types/base_agent.py b/agents/types/base_agent.py new file mode 100644 index 0000000..1ca74cd --- /dev/null +++ b/agents/types/base_agent.py @@ -0,0 +1,30 @@ +import abc +import logging +from typing import Dict, Any + +logger = logging.getLogger("GridAgent") + +class Agent(abc.ABC): + """ + Formalized Base Agent for the multi-agent orchestration system. + Every agent must implement the 'run' method with input_data and context. + """ + def __init__(self, agent_id: str, role: str, config: Dict[str, Any] = None): + self.id = agent_id + self.role = role + self.config = config or {} + self.status = "idle" + + @abc.abstractmethod + def run(self, input_data: Any, context: Dict[str, Any] = None) -> Dict[str, Any]: + """ + Main execution logic. + """ + pass + + def log_decision(self, action: str, rationale: str, data: Any = None): + """ + Helper to record autonomous decisions for the audit trail. + """ + logger.info(f"[{self.role}] ACTION: {action} | RATIONALE: {rationale}") + # In production, this would call the platform/audit/logger.py diff --git a/agents/types/specialized.py b/agents/types/specialized.py new file mode 100644 index 0000000..287122c --- /dev/null +++ b/agents/types/specialized.py @@ -0,0 +1,45 @@ +from typing import Dict, Any, List +from agents.types.base_agent import Agent + +class PlannerAgent(Agent): + """ + Agent specialized in decomposing complex goals into DSL workflows. + """ + def __init__(self, agent_id: str, config: Dict[str, Any] = None): + super().__init__(agent_id, "Planner", config) + + def run(self, goal: Any, context: Dict[str, Any] = None) -> Dict[str, Any]: + self.log_decision("Generate Workflow", f"Decomposing goal: {goal}") + # Simplified mock logic - would call workflow_generator.py + return { + "type": "autonomous_scan", + "workflow": [ + {"name": "Recon", "agent": "recon", "params": {"target": goal}} + ] + } + +class CriticAgent(Agent): + """ + Agent specialized in safety audit and efficiency review. + """ + def __init__(self, agent_id: str, config: Dict[str, Any] = None): + super().__init__(agent_id, "Critic", config) + + def run(self, plan: Any, context: Dict[str, Any] = None) -> Dict[str, Any]: + self.log_decision("Audit Plan", "Reviewing proposed workflow for safety breaches.") + # Logic to check for blocked agents or dangerous commands + return {"approved": True, "score": 0.95} + +class ObserverAgent(Agent): + """ + Agent specialized in real-time execution monitoring and anomaly detection. + """ + def __init__(self, agent_id: str, config: Dict[str, Any] = None): + super().__init__(agent_id, "Observer", config) + + def run(self, job_telemetry: Any, context: Dict[str, Any] = None) -> Dict[str, Any]: + runtime = job_telemetry.get("runtime", 0) + if runtime > 600: + self.log_decision("Alert Anomaly", f"Job runtime ({runtime}s) exceeds threshold.") + return {"anomaly": True, "action": "Kill"} + return {"anomaly": False} diff --git a/infra/ansible/gpu-node.yml b/infra/ansible/gpu-node.yml new file mode 100644 index 0000000..2d3dad6 --- /dev/null +++ b/infra/ansible/gpu-node.yml @@ -0,0 +1,54 @@ +--- +- name: Bootstrap KhulnaSoft AI GPU Node + hosts: gpu_nodes + become: yes + tasks: + - name: Update apt cache + apt: + update_cache: yes + + - name: Install baseline dependencies + apt: + name: + - curl + - ca-certificates + - gnupg + - lsb-release + state: present + + - name: Install Docker + apt: + name: docker.io + state: present + + - name: Install NVIDIA drivers + apt: + name: nvidia-driver-535 + state: present + + - name: Install NVIDIA Container Toolkit + shell: | + curl -s -L https://nvidia.github.io/libnvidia-container/gpgkey | apt-key add - + curl -s -L https://nvidia.github.io/libnvidia-container/$(. /etc/os-release;echo $ID$VERSION_ID)/libnvidia-container.list | tee /etc/apt/sources.list.d/nvidia-container-toolkit.list + apt-get update && apt-get install -y nvidia-container-toolkit + args: + creates: /usr/bin/nvidia-container-toolkit + + - name: Configure Docker to use NVIDIA runtime + copy: + content: | + { + "default-runtime": "nvidia", + "runtimes": { + "nvidia": { + "path": "nvidia-container-runtime", + "runtimeArgs": [] + } + } + } + dest: /etc/docker/daemon.json + + - name: Restart Docker + service: + name: docker + state: restarted diff --git a/infra/k8s/crds/aijob.yaml b/infra/k8s/crds/aijob.yaml new file mode 100644 index 0000000..65e2273 --- /dev/null +++ b/infra/k8s/crds/aijob.yaml @@ -0,0 +1,78 @@ +# infra/k8s/crds/aijob.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: aijobs.khulnasoft.ai +spec: + group: khulnasoft.ai + names: + kind: AIJob + plural: aijobs + singular: aijob + shortNames: + - aij + scope: Namespaced + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + type: + type: string + enum: ["scanner", "inference", "training", "custom"] + resources: + type: object + properties: + gpu: + type: integer + vram_gb: + type: integer + cpu: + type: integer + workflow: + type: array + items: + type: object + properties: + name: + type: string + agent: + type: string + params: + type: object + x-kubernetes-preserve-unknown-fields: true + status: + type: object + properties: + phase: + type: string + progress: + type: string + node: + type: string + cost: + type: number + startTime: + type: string + format: date-time + completionTime: + type: string + format: date-time + subresources: + status: {} + additionalPrinterColumns: + - name: Status + type: string + jsonPath: .status.phase + - name: Progress + type: string + jsonPath: .status.progress + - name: Age + type: date + jsonPath: .metadata.creationTimestamp diff --git a/infra/k8s/gpu/device-plugin.yaml b/infra/k8s/gpu/device-plugin.yaml new file mode 100644 index 0000000..696ecd5 --- /dev/null +++ b/infra/k8s/gpu/device-plugin.yaml @@ -0,0 +1,39 @@ +# infra/k8s/gpu/device-plugin.yaml +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: nvidia-device-plugin + namespace: kube-system + labels: + app.kubernetes.io/name: nvidia-device-plugin +spec: + selector: + matchLabels: + name: nvidia-device-plugin-ds + template: + metadata: + labels: + name: nvidia-device-plugin-ds + spec: + tolerations: + - key: nvidia.com/gpu + operator: Exists + effect: NoSchedule + # Mark this pod as a critical add-on; when enabled, the critical add-on scheduler + # reserves resources for critical add-on pods so that they can be rescheduled after + # a failure. + priorityClassName: "system-node-critical" + containers: + - image: nvcr.io/nvidia/k8s-device-plugin:latest + name: nvidia-device-plugin + securityContext: + allowPrivilegeEscalation: false + capabilities: + drop: ["ALL"] + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins diff --git a/infra/k8s/ray/head.yaml b/infra/k8s/ray/head.yaml new file mode 100644 index 0000000..e624541 --- /dev/null +++ b/infra/k8s/ray/head.yaml @@ -0,0 +1,46 @@ +# infra/k8s/ray/head.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ray-head +spec: + replicas: 1 + selector: + matchLabels: + component: ray-head + template: + metadata: + labels: + component: ray-head + spec: + containers: + - name: ray-head + image: rayproject/ray:latest + command: ["/bin/bash", "-c", "--"] + args: ["ray start --head --port=6379 --dashboard-host=0.0.0.0 --block"] + ports: + - containerPort: 6379 # Redis + - containerPort: 8265 # Dashboard + - containerPort: 10001 # Client + resources: + requests: + cpu: 2 + memory: 4Gi + limits: + cpu: 4 + memory: 8Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: ray-head +spec: + ports: + - name: redis + port: 6379 + - name: dashboard + port: 8265 + - name: client + port: 10001 + selector: + component: ray-head diff --git a/infra/k8s/ray/worker.yaml b/infra/k8s/ray/worker.yaml new file mode 100644 index 0000000..54cd324 --- /dev/null +++ b/infra/k8s/ray/worker.yaml @@ -0,0 +1,34 @@ +# infra/k8s/ray/worker.yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ray-worker +spec: + replicas: 2 + selector: + matchLabels: + component: ray-worker + template: + metadata: + labels: + component: ray-worker + spec: + containers: + - name: ray-worker + image: rayproject/ray:latest-gpu # Using GPU image for workers + command: ["/bin/bash", "-c", "--"] + args: ["ray start --address=ray-head:6379 --block"] + resources: + limits: + nvidia.com/gpu: 1 + cpu: 4 + memory: 8Gi + requests: + nvidia.com/gpu: 1 + cpu: 2 + memory: 4Gi + # Ensure workers run on nodes with GPUs + tolerations: + - key: "nvidia.com/gpu" + operator: "Exists" + effect: "NoSchedule" diff --git a/infra/k8s/scaling/keda.yaml b/infra/k8s/scaling/keda.yaml new file mode 100644 index 0000000..3461d32 --- /dev/null +++ b/infra/k8s/scaling/keda.yaml @@ -0,0 +1,20 @@ +# infra/k8s/scaling/keda.yaml +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: ray-worker-scaler + namespace: default +spec: + scaleTargetRef: + name: ray-worker # Should match the Deployment name in infra/k8s/ray/worker.yaml + minReplicaCount: 0 + maxReplicaCount: 10 + cooldownPeriod: 300 + pollingInterval: 30 + triggers: + - type: redis + metadata: + address: redis.default.svc.cluster.local:6379 + listName: job_queue + listLength: "5" + activationListLength: "1" diff --git a/infra/terraform/cloud/main.tf b/infra/terraform/cloud/main.tf new file mode 100644 index 0000000..22900fb --- /dev/null +++ b/infra/terraform/cloud/main.tf @@ -0,0 +1,24 @@ +provider "aws" { + region = "ap-southeast-1" +} + +resource "aws_instance" "gpu_node" { + ami = "ami-gpu-ubuntu-22" # placeholder for actual GPU AMI + instance_type = "g5.xlarge" # NVIDIA A10G GPU instance + + root_block_device { + volume_size = 200 + volume_type = "gp3" + } + + tags = { + Name = "khulnasoft-gpu-node" + Project = "AI Grid" + Environment = "production" + } +} + +# Output the IP address for Ansible usage +output "gpu_node_ip" { + value = aws_instance.gpu_node.public_ip +} diff --git a/platform/ai/critic.py b/platform/ai/critic.py new file mode 100644 index 0000000..272d858 --- /dev/null +++ b/platform/ai/critic.py @@ -0,0 +1,44 @@ +import logging +import json +from typing import Dict, Any + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("AICritic") + +class AICritic: + """ + Independent AI agent that reviews proposed workflows for efficiency and safety. + """ + def __init__(self): + self.role = "Security Auditor & Efficiency Optimizer" + + def review_plan(self, plan: Dict[str, Any]) -> Dict[str, Any]: + """ + Simulates an LLM-based review of an AIJob plan. + """ + logger.info("AICritic reviewing proposed workflow...") + + # Mocking a 'Critic' evaluation + workflow = plan.get("workflow", []) + + if len(workflow) > 10: + return { + "decision": "Warning", + "reason": "Workflow is unusually long. Optimization suggested.", + "adjustments": {"limit_steps": 10} + } + + for step in workflow: + if step.get("agent") == "exploit": + return { + "decision": "Manual Approval Required", + "reason": "Autonomous exploit agent detected. Higher risk level.", + "adjustments": {} + } + + return { + "decision": "Approved", + "reason": "Workflow aligns with safety best practices and operational efficiency.", + "adjustments": {} + } diff --git a/platform/ai/workflow_generator.py b/platform/ai/workflow_generator.py new file mode 100644 index 0000000..eaf90d5 --- /dev/null +++ b/platform/ai/workflow_generator.py @@ -0,0 +1,65 @@ +import json +import logging +import os +from typing import List, Dict, Any + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("AIWorkflowGen") + +from platform.ai.critic import AICritic +import uuid + +critic = AICritic() + +class AIWorkflowGenerator: + """ + Service to decompose high-level natural language goals into KhulnaSoft AIJobs. + Now uses a multi-agent Planner/Critic loop. + """ + def __init__(self, provider="mock"): + self.provider = provider + + def generate_workflow(self, goal: str) -> Dict[str, Any]: + logger.info(f"Generating workflow for goal: {goal}") + + # 1. Planner Phase + raw_plan = self._planner_mock(goal) + + # 2. Critic Phase + review = critic.review_plan(raw_plan) + + # 3. Decision Audit Log + decision_id = str(uuid.uuid4()) + self._log_decision(decision_id, goal, raw_plan, review) + + if review["decision"] == "Approved": + return {"job": raw_plan, "decision_id": decision_id} + else: + logger.warning(f"Workflow Critic issue: {review['reason']}") + # In a real system, we might re-plan or apply adjustments + return {"job": raw_plan, "decision_id": decision_id, "warning": review["reason"]} + + def _planner_mock(self, goal: str): + if "vulnerability" in goal.lower() or "scan" in goal.lower(): + return { + "type": "scanner", + "resources": {"gpu": 1, "vram_gb": 8}, + "workflow": [ + {"name": "Initial Recon", "agent": "recon", "params": {"target": "scope_auto"}}, + {"name": "Vuln Identification", "agent": "scanner", "params": {"intensity": "high"}}, + ] + } + return {"type": "custom", "workflow": [{"name": "Analyze", "agent": "base"}]} + + def _log_decision(self, d_id, goal, plan, review): + # Implementation for writing to SystemDecision DB table + logger.info(f"Audit Logged: Decision {d_id} | Status: {review['decision']}") + pass + + async def chat_with_grid(self, query: str): + """ + Interactive interface for operators to talk to the cluster. + """ + # Logic for responding to 'What is the current cluster load?' or 'Show me failed jobs' + pass diff --git a/platform/api/auth.py b/platform/api/auth.py new file mode 100644 index 0000000..cac8b08 --- /dev/null +++ b/platform/api/auth.py @@ -0,0 +1,20 @@ +from fastapi import Security, HTTPException, status +from fastapi.security.api_key import APIKeyHeader +import os + +API_KEY_NAME = "X-Grid-API-Key" +api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) + +# In production, these would be in a database or vault +VALID_API_KEYS = { + os.getenv("GRID_ADMIN_KEY", "khulnasoft-admin-123"): "admin", + os.getenv("GRID_AGENT_KEY", "khulnasoft-agent-456"): "agent" +} + +async def get_api_key(api_key: str = Security(api_key_header)): + if api_key in VALID_API_KEYS: + return VALID_API_KEYS[api_key] + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Could not validate API Key" + ) diff --git a/platform/api/db.py b/platform/api/db.py new file mode 100644 index 0000000..513bd9a --- /dev/null +++ b/platform/api/db.py @@ -0,0 +1,70 @@ +from sqlalchemy import create_all, Column, String, Integer, Float, DateTime, JSON, ForeignKey +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker, relationship +from sqlalchemy import create_engine +import datetime +import os + +# Use SQLite as default for portability, Switch to PostgreSQL in production via DATABASE_URL +DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./grid_compute.db") + +engine = create_engine(DATABASE_URL) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +Base = declarative_base() + +class JobRecord(Base): + __tablename__ = "jobs" + + id = Column(String, primary_key=True, index=True) + task_type = Column(String) + status = Column(String) + priority = Column(Integer) + node_id = Column(String, nullable=True) + cluster_type = Column(String) + params = Column(JSON) + created_at = Column(DateTime, default=datetime.datetime.utcnow) + completed_at = Column(DateTime, nullable=True) + cost = Column(Float, default=0.0) + user_scope = Column(String) + +class NodeRecord(Base): + __tablename__ = "nodes" + + id = Column(String, primary_key=True, index=True) + type = Column(String) # local | cloud + status = Column(String) + vram_gb = Column(Integer) + cpu_cores = Column(Integer) + first_seen = Column(DateTime, default=datetime.datetime.utcnow) + last_seen = Column(DateTime, default=datetime.datetime.utcnow) + +class CostLog(Base): + __tablename__ = "cost_logs" + + id = Column(Integer, primary_key=True, index=True) + timestamp = Column(DateTime, default=datetime.datetime.utcnow) + amount = Column(Float) + currency = Column(String, default="USD") + node_id = Column(String) + +class SystemDecision(Base): + __tablename__ = "system_decisions" + + id = Column(String, primary_key=True, index=True) + timestamp = Column(DateTime, default=datetime.datetime.utcnow) + component = Column(String) # Planner | Critic | Scheduler | Engine + action = Column(String) + rationale = Column(String) + input_data = Column(JSON) + output_data = Column(JSON) + status = Column(String) # Authorized | Blocked | Warning + +def init_db(): + Base.metadata.create_all(bind=engine) + +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() diff --git a/platform/api/main.py b/platform/api/main.py new file mode 100644 index 0000000..c0b4c74 --- /dev/null +++ b/platform/api/main.py @@ -0,0 +1,117 @@ +import uuid +import json +import logging +import time +from typing import Dict, Any +from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends, Request +from pydantic import BaseModel +import redis +from sqlalchemy.orm import Session + +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.errors import RateLimitExceeded + +from platform.api.db import init_db, get_db, JobRecord +from platform.api.monitor import router as monitor_router +from platform.api.ws import router as ws_router +from platform.api.auth import get_api_key + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +limiter = Limiter(key_func=get_remote_address) +app = FastAPI(title="KhulnaSoft AI Compute Grid API") +app.state.limiter = limiter +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) + +@app.on_event("startup") +def on_startup(): + init_db() + +# Include Routers +app.include_router(monitor_router) +app.include_router(ws_router) + +# Redis configuration +REDIS_HOST = "redis" +REDIS_PORT = 6379 +r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + +class JobPayload(BaseModel): + task_type: str + params: Dict[str, Any] + priority: int = 0 + resources: Dict[str, Any] = {} + +@app.get("/health") +def health_check(): + try: + r.ping() + return {"status": "healthy", "redis": "connected"} + except Exception as e: + return {"status": "degraded", "redis": str(e)} + +@app.post("/submit") +@limiter.limit("10/minute") +async def submit_job( + request: Request, + payload: JobPayload, + user_scope: str = Depends(get_api_key), + db: Session = Depends(get_db) +): + job_id = str(uuid.uuid4()) + job_data = { + "id": job_id, + "task_type": payload.task_type, + "params": payload.params, + "priority": payload.priority, + "resources_requested": payload.resources, + "status": "queued", + "created_at": time.time(), + "user_scope": user_scope, + "retries": 0 + } + + try: + # 1. Persistent Storage (PostgreSQL/SQLite) + db_job = JobRecord( + id=job_id, + task_type=payload.task_type, + status="queued", + priority=payload.priority, + params=payload.params, + user_scope=user_scope + ) + db.add(db_job) + db.commit() + + # 2. Real-time Queue (Redis) + # Determine priority queue + queue_name = "job_queue" + if payload.priority > 5: + queue_name = "high_priority_queue" + elif payload.priority < 0: + queue_name = "low_priority_queue" + + r.lpush(queue_name, json.dumps(job_data)) + r.set(f"job:{job_id}", json.dumps(job_data)) + + logger.info(f"Job {job_id} submitted by {user_scope} to {queue_name}") + return {"job_id": job_id, "status": "queued", "scope": user_scope} + + except Exception as e: + logger.error(f"Failed to submit job {job_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal Server Error") + +@app.get("/job/{job_id}") +def get_job_status(job_id: str): + job_data = r.get(f"job:{job_id}") + if not job_data: + raise HTTPException(status_code=404, detail="Job not found") + return json.loads(job_data) + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/platform/api/monitor.py b/platform/api/monitor.py new file mode 100644 index 0000000..d0a3cdb --- /dev/null +++ b/platform/api/monitor.py @@ -0,0 +1,45 @@ +from fastapi import APIRouter, Depends +import redis +import json + +router = APIRouter(prefix="/api/monitor", tags=["monitoring"]) + +# Note: In production, use the same redis connection pool +REDIS_HOST = "redis" +REDIS_PORT = 6379 +r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + +from platform.scheduler.state import get_active_nodes + +@router.get("/metrics") +def get_metrics(): + """ + Returns high-level cluster metrics for the dashboard. + """ + try: + queue_size = r.llen("job_queue") + active_jobs = r.get("active_jobs_count") or 0 + + nodes = get_active_nodes() + + return { + "queue_size": int(queue_size), + "active_jobs": int(active_jobs), + "nodes": nodes, + "status": "healthy" + } + except Exception as e: + return {"status": "error", "message": str(e)} + +@router.get("/cost") +def get_cost_estimate(): + """ + Simulated cost tracking metrics. + """ + # In a real system, you'd calculate this based on cloud instance runtimes + total_spend = float(r.get("total_cloud_spend") or 0.0) + return { + "monthly_estimate": total_spend * 30, + "total_spend": total_spend, + "currency": "USD" + } diff --git a/platform/api/requirements.txt b/platform/api/requirements.txt new file mode 100644 index 0000000..c3dc069 --- /dev/null +++ b/platform/api/requirements.txt @@ -0,0 +1,4 @@ +fastapi +uvicorn +redis +pydantic diff --git a/platform/api/ws.py b/platform/api/ws.py new file mode 100644 index 0000000..1a4f65f --- /dev/null +++ b/platform/api/ws.py @@ -0,0 +1,63 @@ +import asyncio +import json +import logging +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +import redis + +router = APIRouter(tags=["websockets"]) + +logger = logging.getLogger("WS") + +# Redis for metrics polling +REDIS_HOST = "redis" +REDIS_PORT = 6379 +r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + +class ConnectionManager: + def __init__(self): + self.active_connections: list[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + self.active_connections.append(websocket) + + def disconnect(self, websocket: WebSocket): + self.active_connections.remove(websocket) + + async def broadcast(self, message: str): + for connection in self.active_connections: + try: + await connection.send_text(message) + except Exception: + # Handle stale connections + pass + +manager = ConnectionManager() + +from platform.scheduler.state import get_active_nodes + +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await manager.connect(websocket) + try: + while True: + queue_size = r.llen("job_queue") + active_jobs = r.get("active_jobs_count") or 0 + nodes = get_active_nodes() + + data = { + "type": "metrics_update", + "queue_size": int(queue_size), + "active_jobs": int(active_jobs), + "nodes": nodes, + "timestamp": asyncio.get_event_loop().time() + } + + await websocket.send_json(data) + await asyncio.sleep(2) # Push update every 2 seconds + except WebSocketDisconnect: + manager.disconnect(websocket) + logger.info("WebSocket disconnected") + except Exception as e: + logger.error(f"WS Error: {str(e)}") + manager.disconnect(websocket) diff --git a/platform/audit/observer.py b/platform/audit/observer.py new file mode 100644 index 0000000..988f2e5 --- /dev/null +++ b/platform/audit/observer.py @@ -0,0 +1,45 @@ +import time +import json +import logging +import redis +from platform.scheduler.state import r as redis_conn + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("AnomalyObserver") + +MAX_RUNTIME_SECONDS = 3600 # 1 Hour hard limit +MAX_MUTATIONS_ALLOWED = 10 + +def audit_loop(): + logger.info("Anomaly Observer started. Monitoring job telemetry...") + while True: + try: + # Check all active jobs + all_job_keys = redis_conn.keys("job:*") + for key in all_job_keys: + raw_data = redis_conn.get(key) + if not raw_data: continue + + job_data = json.loads(raw_data) + if job_data.get("status") != "running": continue + + # 1. Runtime Anomaly + start_time = job_data.get("created_at", time.time()) + runtime = time.time() - start_time + if runtime > MAX_RUNTIME_SECONDS: + logger.warning(f"🚨 ANOMALY: Job {job_data['id']} runtime ({int(runtime)}s) exceeds safety limit. Triggering kill.") + # In production: trigger task termination via Ray + + # 2. Mutation Count Anomaly + mutations = job_data.get("mutations", 0) + if mutations > MAX_MUTATIONS_ALLOWED: + logger.warning(f"🚨 ANOMALY: Job {job_data['id']} mutations ({mutations}) spike detected. Investigation required.") + + except Exception as e: + logger.error(f"Error in anomaly audit loop: {str(e)}") + + time.sleep(20) + +if __name__ == "__main__": + audit_loop() diff --git a/platform/governance/policy_engine.py b/platform/governance/policy_engine.py new file mode 100644 index 0000000..31f76ee --- /dev/null +++ b/platform/governance/policy_engine.py @@ -0,0 +1,61 @@ +import logging +from typing import Dict, Any, List + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("PolicyEngine") + +# Hard-coded safety policies +ALLOWED_AGENTS = ["recon", "scanner", "base", "analyst"] +MAX_RECURSION_DEPTH = 5 +MAX_JOBS_PER_SESSION = 20 +RESTRICTED_PARAMS = ["--privileged", "rm -rf /", "chmod 777"] + +from platform.governance.quotas import QuotaManager + +quota_manager = QuotaManager() + +class PolicyEngine: + """ + Validates autonomous actions against security and safety policies. + """ + def __init__(self): + self.active_policies = ["agent_restriction", "recursion_guard", "param_sanitization"] + + def validate_workflow(self, workflow_spec: Dict[str, Any], user_id: str = "default") -> Dict[str, Any]: + """ + Validates an entire AIJob workflow including user quotas. + """ + issues = [] + + # 1. Quota Check + estimated_hours = 1.0 # Simple estimate based on step count later + if not quota_manager.check_quota(user_id, estimated_hours): + issues.append(f"Insufficient GPU quota for user '{user_id}'.") + + # 2. Agent/Param Checks + steps = workflow_spec.get("workflow", []) + for step in steps: + agent = step.get("agent") + if agent not in ALLOWED_AGENTS: + issues.append(f"Agent '{agent}' is not in the allowed list.") + + # Check params for dangerous strings + params_str = str(step.get("params", "")) + for restricted in RESTRICTED_PARAMS: + if restricted in params_str: + issues.append(f"Restricted parameter/command detected: {restricted}") + + if issues: + return {"status": "Blocked", "reason": "; ".join(issues)} + + return {"status": "Authorized", "reason": "All steps pass safety validation."} + + def validate_mutation(self, current_depth: int, total_mutations: int) -> bool: + """ + Checks if a dynamic mutation is allowed based on recursion guards. + """ + if current_depth >= MAX_RECURSION_DEPTH: + logger.warning(f"Recursion depth limit reached: {current_depth}") + return False + return True diff --git a/platform/governance/quotas.py b/platform/governance/quotas.py new file mode 100644 index 0000000..a0873a7 --- /dev/null +++ b/platform/governance/quotas.py @@ -0,0 +1,43 @@ +import redis +import json +import logging + +logger = logging.getLogger("QuotaManager") +r = redis.Redis(host="redis", port=6379, decode_responses=True) + +DEFAULT_QUOTAS = { + "gpu_hours": 24, + "max_cost": 500.0, + "rate_limit_rpm": 60 +} + +class QuotaManager: + """ + Manages per-user usage quotas and budget caps. + """ + def __init__(self): + self.key_prefix = "quota:user:" + + def check_quota(self, user_id: str, estimated_hours: float) -> bool: + """ + Verifies if the user has enough remaining quota for a new job. + """ + quota = self.get_user_quota(user_id) + current_usage = float(r.get(f"{self.key_prefix}{user_id}:usage") or 0.0) + + if (current_usage + estimated_hours) > quota["gpu_hours"]: + logger.warning(f"Quota exceeded for user {user_id}: {current_usage} + {estimated_hours} > {quota['gpu_hours']}") + return False + + return True + + def track_usage(self, user_id: str, actual_hours: float): + """ + Increments the user's recorded usage after job completion. + """ + r.incrbyfloat(f"{self.key_prefix}{user_id}:usage", actual_hours) + + def get_user_quota(self, user_id: str): + # In production, fetch from Postgres. Here using defaults. + data = r.get(f"{self.key_prefix}{user_id}:config") + return json.loads(data) if data else DEFAULT_QUOTAS diff --git a/platform/intelligence/adaptive_weights.py b/platform/intelligence/adaptive_weights.py new file mode 100644 index 0000000..9dbada2 --- /dev/null +++ b/platform/intelligence/adaptive_weights.py @@ -0,0 +1,39 @@ +import redis +import json +import logging + +logger = logging.getLogger("AdaptiveWeights") +r = redis.Redis(host="redis", port=6379, decode_responses=True) + +class AdaptiveOptimizer: + """ + Analyzes historical job performance to adjust scheduling weights. + """ + def __init__(self): + self.weights_key = "scheduler_adaptive_weights" + + def get_weights(self): + """ + Returns current weights for Local vs Cloud placement. + """ + weights = r.get(self.weights_key) + if weights: + return json.loads(weights) + + # Default weights: Prefer local (0.0 cost) + return {"local_preference": 1.5, "cloud_penalty": 0.8} + + def update_weights_from_history(self, job_results: list): + """ + Simple feedback loop: If local nodes succeed frequently, increase local_preference. + """ + current = self.get_weights() + success_rate = len([j for j in job_results if j['status'] == 'completed']) / len(job_results) + + if success_rate > 0.9: + current["local_preference"] += 0.1 + elif success_rate < 0.5: + current["local_preference"] -= 0.2 + + r.set(self.weights_key, json.dumps(current)) + logger.info(f"Adaptive weights updated: {current}") diff --git a/platform/memory/vector_store.py b/platform/memory/vector_store.py new file mode 100644 index 0000000..00fb208 --- /dev/null +++ b/platform/memory/vector_store.py @@ -0,0 +1,53 @@ +import logging +import json +import time + +# In a real environment, we'd use: import chromadb +# We'll mock the core vector store interface for this blueprint. + +logger = logging.getLogger("VectorStore") + +class KnowledgeStore: + """ + Enterprise Vector Memory for AI Agents. + Supports Namespaces (Multi-Tenancy) and Trust Scoring. + """ + def __init__(self, collection_name="grid_knowledge"): + self.collection_name = collection_name + self.mock_db = [] # In-memory mock for vector data + + def add_finding(self, agent_id: str, content: str, metadata: dict, namespace: str = "default"): + """ + Embeds and stores a finding within a user-specific namespace. + """ + entry = { + "id": f"{agent_id}_{int(time.time())}", + "content": content, + "metadata": { + **metadata, + "trust_score": 1.0, # Initial trust + "namespace": namespace, + "ttl": time.time() + (3600 * 24 * 30) # 30 day TTL + }, + "timestamp": time.time() + } + self.mock_db.append(entry) + logger.info(f"Memory stored [{namespace}]: {content[:50]}...") + + def query_context(self, query: str, namespace: str = "default", limit=3): + """ + Retrieves relevant findings ONLY from the authorized namespace. + """ + logger.info(f"Querying memory [{namespace}] for: {query}") + + # Filter mock DB by namespace + ns_results = [e for e in self.mock_db if e["metadata"]["namespace"] == namespace] + + # Return last 3 entries tagged with trust + return ns_results[-limit:] if ns_results else [] + + def get_all_embeddings(self): + """ + Used for the dashboard memory graph. + """ + return self.mock_db diff --git a/platform/operator/controller.py b/platform/operator/controller.py new file mode 100644 index 0000000..4c9d05e --- /dev/null +++ b/platform/operator/controller.py @@ -0,0 +1,84 @@ +import kopf +import kubernetes +import redis +import json +import logging +import uuid +import datetime + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("AIOperator") + +r = redis.Redis(host="redis", port=6379, decode_responses=True) + +@kopf.on.create('khulnasoft.ai', 'v1', 'aijobs') +def create_fn(spec, name, namespace, logger, **kwargs): + """ + Called when a new AIJob is created. + """ + logger.info(f"AIJob {name} detected. Syncing with compute grid...") + + job_id = str(uuid.uuid4()) + workflow = spec.get("workflow", []) + resources = spec.get("resources", {}) + job_type = spec.get("type", "custom") + + job_data = { + "id": job_id, + "k8s_name": name, + "type": job_type, + "workflow": workflow, + "resources_requested": resources, + "status": "queued", + "created_at": datetime.datetime.now().isoformat() + } + + # Push to our internal grid scheduler + r.lpush("job_queue", json.dumps(job_data)) + r.set(f"job:{job_id}", json.dumps(job_data)) + + # Store mapping between K8s name and Grid ID + r.set(f"k8s_job_map:{name}", job_id) + + return {'jobId': job_id, 'status': 'submitted'} + +@kopf.on.update('khulnasoft.ai', 'v1', 'aijobs') +def update_fn(spec, status, name, logger, **kwargs): + """ + Handles updates to the AIJob (e.g. spec changes). + """ + # implementation for handling spec updates (e.g. canceling/restarting) + pass + +@kopf.on.delete('khulnasoft.ai', 'v1', 'aijobs') +def delete_fn(name, logger, **kwargs): + """ + Cleans up grid resources when AIJob is deleted. + """ + job_id = r.get(f"k8s_job_map:{name}") + if job_id: + logger.info(f"AIJob {name} deleted. Cleaning up Grid ID {job_id}") + r.delete(f"job:{job_id}") + r.delete(f"k8s_job_map:{name}") + +def update_k8s_status(name, namespace, phase, progress, node=None): + """ + Utility to push grid updates back to K8s status. + """ + api = kubernetes.client.CustomObjectsApi() + status = { + "status": { + "phase": phase, + "progress": progress, + "node": node + } + } + api.patch_namespaced_custom_object_status( + group="khulnasoft.ai", + version="v1", + namespace=namespace, + plural="aijobs", + name=name, + body=status + ) diff --git a/platform/scheduler/autoscaler.py b/platform/scheduler/autoscaler.py new file mode 100644 index 0000000..7deb9d7 --- /dev/null +++ b/platform/scheduler/autoscaler.py @@ -0,0 +1,69 @@ +import time +import redis +import logging +import os +import requests + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("Autoscaler") + +# Config +REDIS_HOST = os.getenv("REDIS_HOST", "redis") +REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) +QUEUE_THRESHOLD = int(os.getenv("QUEUE_THRESHOLD", 5)) +CHECK_INTERVAL = int(os.getenv("AUTOSCALE_INTERVAL", 10)) + +from platform.scheduler.state import get_node_count, register_node, get_active_nodes + +r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + +def scale_up_cloud(): + """ + Triggers cloud node provisioning if we don't already have one in progress. + """ + active_cloud_nodes = get_node_count(node_type="cloud") + + if active_cloud_nodes < 5: # Max cloud nodes limit + logger.info(f"🚀 Queue pressure high. Current cloud nodes: {active_cloud_nodes}. Spawning new node...") + + # Simulate API call to provider + node_id = f"cloud-node-{uuid.uuid4().hex[:6]}" + register_node(node_id, "cloud", {"gpu": True, "vram_gb": 24}) # Simulated A10G + + # Increment total spend for monitoring + current_spend = float(r.get("total_cloud_spend") or 0.0) + r.set("total_cloud_spend", current_spend + 1.05) # Assume $1.05/hr base cost + else: + logger.warning("Cloud quota reached. Cannot scale up further.") + +def scale_down_cloud(): + """ + Cleans up idle cloud nodes from the registry. + """ + cloud_nodes = [n for n in get_active_nodes() if n["type"] == "cloud"] + if cloud_nodes: + node_to_kill = cloud_nodes[0] + logger.info(f"🛑 Cluster idle. Terminating cloud node {node_to_kill['id']}...") + r.hdel("grid_nodes", node_to_kill['id']) + r.delete(f"node_heartbeat:{node_to_kill['id']}") + +def autoscale_loop(): + logger.info(f"Autoscaler started. Threshold: {QUEUE_THRESHOLD}, Interval: {CHECK_INTERVAL}s") + while True: + try: + queue_size = r.llen("job_queue") + logger.debug(f"Current queue size: {queue_size}") + + if queue_size > QUEUE_THRESHOLD: + scale_up_cloud() + elif queue_size == 0: + scale_down_cloud() + + except Exception as e: + logger.error(f"Error in autoscaler loop: {str(e)}") + + time.sleep(CHECK_INTERVAL) + +if __name__ == "__main__": + autoscale_loop() diff --git a/platform/scheduler/cost_engine.py b/platform/scheduler/cost_engine.py new file mode 100644 index 0000000..c02b201 --- /dev/null +++ b/platform/scheduler/cost_engine.py @@ -0,0 +1,28 @@ +import time +import redis +import json + +r = redis.Redis(host="redis", port=6379, decode_responses=True) + +# Price per hour for cloud nodes (e.g., A10G on AWS) +PRICE_PER_HOUR = 1.05 + +def track_job_cost(job_id, start_time, cluster_type): + """ + Calculates cost for a completed job if it ran on cloud. + """ + if cluster_type != "cloud": + return 0.0 + + duration_seconds = time.time() - start_time + duration_hours = duration_seconds / 3600.0 + cost = duration_hours * PRICE_PER_HOUR + + # Update total spend in Redis + current_spend = float(r.get("total_cloud_spend") or 0.0) + r.set("total_cloud_spend", current_spend + cost) + + return cost + +def get_total_cloud_spend(): + return float(r.get("total_cloud_spend") or 0.0) diff --git a/platform/scheduler/heartbeat_monitor.py b/platform/scheduler/heartbeat_monitor.py new file mode 100644 index 0000000..3c18075 --- /dev/null +++ b/platform/scheduler/heartbeat_monitor.py @@ -0,0 +1,37 @@ +import time +import json +import logging +import redis +from platform.scheduler.state import get_active_nodes, r as redis_conn +from platform.scheduler.lifecycle import fail_job + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("FaultRecovery") + +def recovery_loop(): + logger.info("Fault Recovery monitor started.") + while True: + try: + # get_active_nodes automatically cleans up stale heartbeats in our state.py + # But we need to handle jobs that were running on those nodes. + nodes = get_active_nodes() + active_node_ids = {n["id"] for n in nodes} + + # Check for jobs that are 'running' but their node is gone + all_job_keys = redis_conn.keys("job:*") + for key in all_job_keys: + job_data = json.loads(redis_conn.get(key)) + if job_data.get("status") == "running": + node_id = job_data.get("node_id") + if node_id and node_id not in active_node_ids: + logger.warning(f"Job {job_data['id']} was running on lost node {node_id}. Triggering recovery...") + fail_job(job_data['id'], f"Node {node_id} heartbeat timeout") + + except Exception as e: + logger.error(f"Error in recovery loop: {str(e)}") + + time.sleep(30) + +if __name__ == "__main__": + recovery_loop() diff --git a/platform/scheduler/lifecycle.py b/platform/scheduler/lifecycle.py new file mode 100644 index 0000000..5bad9e7 --- /dev/null +++ b/platform/scheduler/lifecycle.py @@ -0,0 +1,70 @@ +import json +import time +import logging +import redis +import uuid + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("JobLifecycle") + +r = redis.Redis(host="redis", port=6379, decode_responses=True) + +MAX_RETRIES = 3 + +def transition_job(job_id, new_status, metadata=None): + """ + State machine for job lifecycle. + Statuses: queued, pending, running, completed, failed + """ + job_key = f"job:{job_id}" + job_str = r.get(job_key) + + if not job_str: + logger.error(f"Job {job_id} not found in store.") + return False + + job_data = json.loads(job_str) + old_status = job_data.get("status") + + job_data["status"] = new_status + job_data["updated_at"] = time.time() + + if metadata: + job_data.update(metadata) + + r.set(job_key, json.dumps(job_data)) + logger.info(f"Job {job_id} transitioned: {old_status} -> {new_status}") + + # Push event to Redis PubSub for real-time dashboard updates + r.publish("job_events", json.dumps({"job_id": job_id, "status": new_status})) + return True + +def fail_job(job_id, reason): + """ + Handles job failure with optional retry logic. + """ + job_key = f"job:{job_id}" + job_data = json.loads(r.get(job_key)) + + retries = job_data.get("retries", 0) + + if retries < MAX_RETRIES: + logger.warning(f"Job {job_id} failed. Retrying ({retries + 1}/{MAX_RETRIES})...") + job_data["retries"] = retries + 1 + job_data["status"] = "queued" + job_data["last_error"] = reason + r.set(job_key, json.dumps(job_data)) + # Requeue job + r.lpush("job_queue", json.dumps(job_data)) + r.publish("job_events", json.dumps({"job_id": job_id, "status": "retrying"})) + else: + logger.error(f"Job {job_id} failed after {MAX_RETRIES} retries. Final Status: FAILED") + transition_job(job_id, "failed", {"error": reason}) + +def monitor_stuck_jobs(): + """ + Scans for jobs that have been 'running' or 'pending' for too long. + """ + # implementation for periodic cleanup/recovery script + pass diff --git a/platform/scheduler/locks.py b/platform/scheduler/locks.py new file mode 100644 index 0000000..7ad7641 --- /dev/null +++ b/platform/scheduler/locks.py @@ -0,0 +1,50 @@ +import time +import redis +import logging +import uuid +from contextlib import contextmanager + +logger = logging.getLogger("DistLocks") + +r = redis.Redis(host="redis", port=6379, decode_responses=True) + +class RedisLock: + """ + Simple distributed lock using Redis SET NX. + """ + def __init__(self, lock_name, timeout=10, expire=30): + self.lock_name = f"lock:{lock_name}" + self.timeout = timeout + self.expire = expire + self.token = str(uuid.uuid4()) + + def acquire(self): + end = time.time() + self.timeout + while time.time() < end: + if r.set(self.lock_name, self.token, nx=True, ex=self.expire): + return True + time.sleep(0.1) + return False + + def release(self): + # Only release if we still own it (via token) + script = """ + if redis.call("get", KEYS[1]) == ARGV[1] then + return redis.call("del", KEYS[1]) + else + return 0 + end + """ + r.eval(script, 1, self.lock_name, self.token) + +@contextmanager +def distributed_lock(name, timeout=5, expire=15): + lock = RedisLock(name, timeout, expire) + if lock.acquire(): + try: + yield + finally: + lock.release() + else: + logger.warning(f"Could not acquire lock: {name}") + yield False diff --git a/platform/scheduler/resource_manager.py b/platform/scheduler/resource_manager.py new file mode 100644 index 0000000..0daca2a --- /dev/null +++ b/platform/scheduler/resource_manager.py @@ -0,0 +1,76 @@ +import logging + +logger = logging.getLogger("ResourceManager") + +# Resource profiles for different AI/Pentest tasks +TASK_RESOURCES = { + "llm_inference": {"gpu": True, "vram_min_gb": 16, "cpu_cores": 4}, + "llm_training": {"gpu": True, "vram_min_gb": 40, "cpu_cores": 8}, + "cve_scanner": {"gpu": False, "cpu_cores": 2, "mem_gb": 4}, + "brute_force_hash": {"gpu": True, "vram_min_gb": 8, "cpu_cores": 2}, +} + +def get_required_resources(task_type): + """ + Returns the resource requirements for a given task type. + """ + return TASK_RESOURCES.get(task_type.lower(), {"gpu": False, "cpu_cores": 1, "mem_gb": 2}) + +def can_fit_on_node(resources_required, node_resources): + """ + Enhanced matching logic that respects granular availability (Bin-Packing). + """ + if resources_required.get("gpu") and not node_resources.get("gpu"): + return False + + # Check VRAM availability + available_vram = node_resources.get("vram_gb", 0) + if resources_required.get("vram_min_gb", 0) > available_vram: + return False + + # Check CPU availability + available_cpu = node_resources.get("cpu_cores", 0) + if resources_required.get("cpu_cores", 0) > available_cpu: + return False + + return True + +from platform.intelligence.adaptive_weights import AdaptiveOptimizer + +optimizer = AdaptiveOptimizer() + +def find_best_node(resources_required, available_nodes): + """ + Selects the best node using adaptive weights from the feedback loop. + """ + eligible_nodes = [n for n in available_nodes if can_fit_on_node(resources_required, n["resources"])] + + if not eligible_nodes: + return None + + weights = optimizer.get_weights() + scored_nodes = [] + + for node in eligible_nodes: + # Base score from VRAM compactness + vram_diff = node["resources"].get("vram_gb", 0) - resources_required.get("vram_min_gb", 0) + + # Apply adaptive weights + if node["type"] == "local": + type_multiplier = weights.get("local_preference", 1.0) + score = vram_diff / type_multiplier + else: + type_penalty = weights.get("cloud_penalty", 1.0) + score = (vram_diff + 1000) * type_penalty + + scored_nodes.append((score, node)) + + scored_nodes.sort(key=lambda x: x[0]) + return scored_nodes[0][1] + +def update_node_resources(node_id, resources_consumed): + """ + Updates the registry after a job is scheduled to reflect consumed resources. + """ + # implementation will interact with NodeRegistry to decrement vram/cpu + pass diff --git a/platform/scheduler/scheduler.py b/platform/scheduler/scheduler.py new file mode 100644 index 0000000..0b74566 --- /dev/null +++ b/platform/scheduler/scheduler.py @@ -0,0 +1,96 @@ +import redis +import json +import time +import logging +import random + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("Scheduler") + +r = redis.Redis(host="redis", port=6379, decode_responses=True) + +from platform.scheduler.state import get_active_nodes, update_node_status +from platform.scheduler.lifecycle import transition_job, fail_job +from platform.scheduler.resource_manager import get_required_resources, find_best_node + +r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True) + +def dispatch(cluster_type, job_data, node_id): + """ + Dispatches the job to a specific node with lifecycle tracking. + """ + job_id = job_data["id"] + + # Transition to 'pending' as we attempt to deploy + transition_job(job_id, "pending", {"node_id": node_id, "cluster": cluster_type}) + + try: + # Update node status in registry + update_node_status(node_id, "busy") + + # In a real system, trigger Ray task here + # Example: ray_task.remote(...) + + # Transition to 'running' + transition_job(job_id, "running") + logger.info(f"Job {job_id} successfully dispatched to node {node_id}") + + except Exception as e: + logger.error(f"Dispatch failed for job {job_id}: {str(e)}") + fail_job(job_id, str(e)) + update_node_status(node_id, "ready") + +from platform.scheduler.locks import distributed_lock + +PRIORITY_QUEUES = ["high_priority_queue", "job_queue", "low_priority_queue"] + +def schedule(): + logger.info("Elite Scheduler started. Monitoring priority queues...") + while True: + # Use distributed lock to prevent multiple schedulers from thrashing the same queues + with distributed_lock("scheduler_main") as acquired: + if not acquired: + time.sleep(1) + continue + + try: + # Poll priority queues in order + job = None + target_queue = None + for q in PRIORITY_QUEUES: + job = r.rpop(q) + if job: + target_queue = q + break + + if job: + job_data = json.loads(job) + job_id = job_data["id"] + + # Idempotency check: Ensure we haven't already dispatched this job + current_status = json.loads(r.get(f"job:{job_id}") or "{}").get("status") + if current_status not in ["queued", "retrying"]: + logger.warning(f"Job {job_id} is already {current_status}. Skipping duplicate dispatch.") + continue + + resources_req = get_required_resources(job_data.get("task_type")) + available_nodes = get_active_nodes() + ready_nodes = [n for n in available_nodes if n["status"] == "ready"] + + target_node = find_best_node(resources_req, ready_nodes) + + if target_node: + dispatch(target_node["type"], job_data, target_node["id"]) + else: + logger.debug(f"Pending resources for {job_id}. Requeueing to {target_queue}.") + r.lpush(target_queue, json.dumps(job_data)) + + except Exception as e: + logger.error(f"Error in locked schedule loop: {str(e)}") + + time.sleep(2) # Small gap between lock attempts + time.sleep(5) + +if __name__ == "__main__": + schedule() diff --git a/platform/scheduler/state.py b/platform/scheduler/state.py new file mode 100644 index 0000000..6d8c0f6 --- /dev/null +++ b/platform/scheduler/state.py @@ -0,0 +1,64 @@ +import json +import time +import logging +import redis + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("NodeRegistry") + +r = redis.Redis(host="redis", port=6379, decode_responses=True) + +NODE_EXPIRY_SECONDS = 60 # Heartbeat timeout + +def register_node(node_id, node_type, resources): + """ + Registers a node in the grid. + node_type: 'local' | 'cloud' + resources: {'gpu': bool, 'vram': int, ...} + """ + node_data = { + "id": node_id, + "type": node_type, + "resources": resources, + "last_heartbeat": time.time(), + "status": "ready" + } + + # Store in a Redis hash for fast lookup + r.hset("grid_nodes", node_id, json.dumps(node_data)) + # Pulse the node's individual key with an expiry + r.set(f"node_heartbeat:{node_id}", "alive", ex=NODE_EXPIRY_SECONDS) + + logger.info(f"Node registered: {node_id} ({node_type})") + +def get_active_nodes(): + """ + Returns a list of all nodes that haven't timed out. + """ + all_nodes = r.hgetall("grid_nodes") + active_nodes = [] + + for node_id, data_str in all_nodes.items(): + # Check if heartbeat key still exists + if r.exists(f"node_heartbeat:{node_id}"): + active_nodes.append(json.loads(data_str)) + else: + # Cleanup stale node + logger.warning(f"Node {node_id} timed out. Removing from registry.") + r.hdel("grid_nodes", node_id) + + return active_nodes + +def update_node_status(node_id, status): + node_data_str = r.hget("grid_nodes", node_id) + if node_data_str: + node_data = json.loads(node_data_str) + node_data["status"] = status + r.hset("grid_nodes", node_id, json.dumps(node_data)) + +def get_node_count(node_type=None): + nodes = get_active_nodes() + if node_type: + return len([n for n in nodes if n["type"] == node_type]) + return len(nodes) diff --git a/runtime/dsl/dag_manager.py b/runtime/dsl/dag_manager.py new file mode 100644 index 0000000..730f8ca --- /dev/null +++ b/runtime/dsl/dag_manager.py @@ -0,0 +1,31 @@ +import logging + +logger = logging.getLogger("DAGManager") + +class DAGManager: + """ + Handles Directed Acyclic Graph based workflow dependencies. + """ + def __init__(self, nodes, dependencies): + self.nodes = nodes # {name: step_config} + self.dependencies = dependencies # {child: [parents]} + + def get_execution_order(self): + """ + Simple topological sort to determine execution order. + """ + order = [] + visited = set() + + def visit(node): + if node in visited: + return + for parent in self.dependencies.get(node, []): + visit(parent) + visited.add(node) + order.append(node) + + for node in self.nodes: + visit(node) + + return order diff --git a/runtime/dsl/engine.py b/runtime/dsl/engine.py new file mode 100644 index 0000000..88490c2 --- /dev/null +++ b/runtime/dsl/engine.py @@ -0,0 +1,77 @@ +import logging +import time +import json +from enum import Enum +from platform.governance.policy_engine import PolicyEngine + +policy_engine = PolicyEngine() + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("DSLEngine") + +class StepStatus(Enum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + +class WorkflowEngine: + """ + Core engine for executing multi-step AI workflows. + """ + def __init__(self, job_id, steps): + self.job_id = job_id + self.steps = steps + self.context = {} # Shared data between steps + + def execute(self): + logger.info(f"Starting autonomous workflow for Job {self.job_id} with {len(self.steps)} steps.") + + current_step_idx = 0 + while current_step_idx < len(self.steps): + step = self.steps[current_step_idx] + step_name = step.get("name", f"step-{current_step_idx}") + agent_type = step.get("agent", "base") + params = step.get("params", {}) + + logger.info(f"Executing step {current_step_idx + 1}: {step_name} [{agent_type}]") + + try: + # Actual execution would happen on Ray + result = self.run_step(step_name, agent_type, params) + + # Check for Self-Mutation / Dynamic Feedback + if isinstance(result, dict) and "follow_up" in result: + new_steps = result["follow_up"] + + # GOVERNANCE CHECK: Validate mutation with Policy Engine + if policy_engine.validate_mutation(current_step_idx, len(self.steps)): + logger.info(f"💡 Policy Authorized: Injecting {len(new_steps)} follow-up steps.") + for i, new_step in enumerate(new_steps): + self.steps.insert(current_step_idx + 1 + i, new_step) + else: + logger.warning("🚨 Policy Blocked: Mutation limit reached. Skipping follow-up steps.") + + # Merge result into context + if isinstance(result, dict): + self.context.update(result) + + logger.info(f"Step {step_name} completed successfully.") + + except Exception as e: + logger.error(f"Step {step_name} failed: {str(e)}") + return {"status": "failed", "step": step_name, "error": str(e)} + + current_step_idx += 1 + + return {"status": "completed", "context": self.context} + + def run_step(self, name, agent, params): + """ + Mock execution of a step. + In production, this calls Ray tasks. + """ + # Simulate work + time.sleep(1) + return {"last_action": f"finished {name}", "data": f"processed_{name}"} diff --git a/runtime/dsl/ray_adapter.py b/runtime/dsl/ray_adapter.py new file mode 100644 index 0000000..1e8f93b --- /dev/null +++ b/runtime/dsl/ray_adapter.py @@ -0,0 +1,36 @@ +import ray +import logging + +logger = logging.getLogger("RayAdapter") + +@ray.remote +class AgentExecutor: + """ + Stateful Ray Actor to execute agent tasks. + """ + def __init__(self, agent_type, config): + self.agent_type = agent_type + self.config = config + self.history = [] + + def execute_task(self, task_name, params): + logger.info(f"Ray Actor executing {task_name}...") + # In production, this would load real Agent classes + # result = AgentFactory.get(self.agent_type).run(params) + + result = {"status": "success", "task": task_name, "node": ray.get_runtime_context().node_id.hex()} + self.history.append(result) + return result + +def run_workflow_step_on_ray(agent_type, step_name, params): + """ + Helper to dispatch a DSL step to a Ray Actor. + """ + try: + # Create a transient actor for the step + executor = AgentExecutor.remote(agent_type, params) + future = executor.execute_task.remote(step_name, params) + return ray.get(future) + except Exception as e: + logger.error(f"Ray execution error: {str(e)}") + raise diff --git a/runtime/ray/worker_task.py b/runtime/ray/worker_task.py new file mode 100644 index 0000000..432ee4c --- /dev/null +++ b/runtime/ray/worker_task.py @@ -0,0 +1,51 @@ +import ray +import os +import logging +import time + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("RayWorker") + +# Initialize Ray (address="auto" assumes we're running inside the K8s cluster or local Ray) +# In production, this would be set via environment variable +RAY_ADDRESS = os.getenv("RAY_ADDRESS", "auto") + +try: + ray.init(address=RAY_ADDRESS) + logger.info(f"Connected to Ray cluster at {RAY_ADDRESS}") +except Exception as e: + logger.error(f"Failed to connect to Ray: {str(e)}") + # Fallback to local execution for testing + ray.init() + +@ray.remote(num_gpus=1) +def run_ai_compute_task(payload): + """ + Core AI task execution. + Requires NVIDIA GPU and PyTorch/TensorFlow. + """ + logger.info(f"Starting AI task: {payload.get('task', 'unknown')}") + + # Simulate heavy computation + time.sleep(2) + + # Simulate GPU check + import torch + gpu_available = torch.cuda.is_available() + device_name = torch.cuda.get_device_name(0) if gpu_available else "CPU (Fallback)" + + return { + "status": "completed", + "result": f"Processed {payload.get('task')} on {device_name}", + "gpu_used": gpu_available + } + +def main(): + # Example usage + future = run_ai_compute_task.remote({"task": "LLM Inference", "model": "Llama-3"}) + result = ray.get(future) + print(f"Result: {result}") + +if __name__ == "__main__": + main() diff --git a/ui/index.html b/ui/index.html new file mode 100644 index 0000000..b847451 --- /dev/null +++ b/ui/index.html @@ -0,0 +1,13 @@ + + +
+ + + +Deterministic Replay Log
+"{decision.rationale}"
+{value}
++ Governance & Multi-Agent Control Plane • v5.5 (Enterprise) +
+Vector DB Clustering (ChromaDB)
++ Detailed Resource Inspection +
+Active Policy Enforcement
++ Automatic mutation throttle is ACTIVE. High-complexity workflows will require manual operator checkpoint. +
+Multi-Tenancy Enforcement
++ Your current tier allows for 60 requests/min across all grid namespaces. + Auto-refill occurs at 00:00 UTC. +
+