From 5c568e7e9e17bdfc268796d62f0fea07373c4aa4 Mon Sep 17 00:00:00 2001 From: inimaz <93inigo93@gmail.com> Date: Sun, 3 May 2026 14:35:14 +0200 Subject: [PATCH 1/2] feat: first version of telemetry --- .../carbonserver/api/domain/telemetry.py | 10 + .../infra/database/telemetry_sql_models.py | 104 ++++++++++ .../repositories/repository_telemetry.py | 28 +++ .../carbonserver/api/routers/telemetry.py | 31 +++ .../carbonserver/api/schemas_telemetry.py | 178 ++++++++++++++++++ .../api/services/telemetry_service.py | 16 ++ carbonserver/carbonserver/container.py | 12 ++ carbonserver/main.py | 6 +- .../tests/api/routers/test_telemetry.py | 80 ++++++++ .../api/service/test_telemetry_service.py | 29 +++ codecarbon/core/telemetry_client.py | 61 ++++++ codecarbon/core/telemetry_schemas.py | 156 +++++++++++++++ tests/test_telemetry_client.py | 104 ++++++++++ 13 files changed, 814 insertions(+), 1 deletion(-) create mode 100644 carbonserver/carbonserver/api/domain/telemetry.py create mode 100644 carbonserver/carbonserver/api/infra/database/telemetry_sql_models.py create mode 100644 carbonserver/carbonserver/api/infra/repositories/repository_telemetry.py create mode 100644 carbonserver/carbonserver/api/routers/telemetry.py create mode 100644 carbonserver/carbonserver/api/schemas_telemetry.py create mode 100644 carbonserver/carbonserver/api/services/telemetry_service.py create mode 100644 carbonserver/tests/api/routers/test_telemetry.py create mode 100644 carbonserver/tests/api/service/test_telemetry_service.py create mode 100644 codecarbon/core/telemetry_client.py create mode 100644 codecarbon/core/telemetry_schemas.py create mode 100644 tests/test_telemetry_client.py diff --git a/carbonserver/carbonserver/api/domain/telemetry.py b/carbonserver/carbonserver/api/domain/telemetry.py new file mode 100644 index 000000000..c69201e93 --- /dev/null +++ b/carbonserver/carbonserver/api/domain/telemetry.py @@ -0,0 +1,10 @@ +import abc +from uuid import UUID + +from carbonserver.api import schemas_telemetry + + +class Telemetry(abc.ABC): + @abc.abstractmethod + def add_telemetry(self, telemetry: schemas_telemetry.TelemetryCreate) -> UUID: + raise NotImplementedError diff --git a/carbonserver/carbonserver/api/infra/database/telemetry_sql_models.py b/carbonserver/carbonserver/api/infra/database/telemetry_sql_models.py new file mode 100644 index 000000000..243ae79ee --- /dev/null +++ b/carbonserver/carbonserver/api/infra/database/telemetry_sql_models.py @@ -0,0 +1,104 @@ +"""SQLAlchemy models for telemetry data in the CarbonServer API.""" + +import uuid + +from sqlalchemy import JSON, Boolean, Column, DateTime, Float, Integer, String +from sqlalchemy.dialects.postgresql import UUID + +from carbonserver.database.database import Base + + +class Telemetry(Base): + __tablename__ = "telemetry" + + id = Column(UUID(as_uuid=True), primary_key=True, index=True, default=uuid.uuid4) + timestamp = Column(DateTime, nullable=False) + telemetry_level = Column(String, nullable=False) + + os = Column(String, nullable=True) + country_name = Column(String, nullable=True) + country_iso_code = Column(String, nullable=True) + region = Column(String, nullable=True) + cloud_provider = Column(String, nullable=True) + cloud_region = Column(String, nullable=True) + longitude = Column(Float, nullable=True) + latitude = Column(Float, nullable=True) + + cpu_count = Column(Integer, nullable=True) + cpu_physical_count = Column(Integer, nullable=True) + cpu_model = Column(String, nullable=True) + cpu_architecture = Column(String, nullable=True) + gpu_count = Column(Integer, nullable=True) + gpu_model = Column(String, nullable=True) + gpu_driver_version = Column(String, nullable=True) + gpu_memory_total_gb = Column(Float, nullable=True) + ram_total_size_gb = Column(Float, nullable=True) + cuda_version = Column(String, nullable=True) + cudnn_version = Column(String, nullable=True) + + python_version = Column(String, nullable=True) + python_implementation = Column(String, nullable=True) + python_executable_hash = Column(String, nullable=True) + python_env_type = Column(String, nullable=True) + codecarbon_version = Column(String, nullable=True) + codecarbon_install_method = Column(String, nullable=True) + + total_emissions_kg = Column(Float, nullable=True) + emissions_rate_kg_per_sec = Column(Float, nullable=True) + energy_consumed_kwh = Column(Float, nullable=True) + cpu_energy_kwh = Column(Float, nullable=True) + gpu_energy_kwh = Column(Float, nullable=True) + ram_energy_kwh = Column(Float, nullable=True) + duration_seconds = Column(Float, nullable=True) + cpu_utilization_avg = Column(Float, nullable=True) + gpu_utilization_avg = Column(Float, nullable=True) + ram_utilization_avg = Column(Float, nullable=True) + + tracking_mode = Column(String, nullable=True) + api_mode = Column(String, nullable=True) + output_methods = Column(JSON, nullable=True) + hardware_tracked = Column(JSON, nullable=True) + task_tracking_used = Column(Boolean, nullable=True) + decorator_vs_context = Column(String, nullable=True) + measure_power_interval_secs = Column(Float, nullable=True) + + hardware_detection_success = Column(Boolean, nullable=True) + rapl_available = Column(Boolean, nullable=True) + gpu_detection_method = Column(String, nullable=True) + first_measurement_time_ms = Column(Float, nullable=True) + tracking_overhead_percent = Column(Float, nullable=True) + errors_encountered = Column(JSON, nullable=True) + warning_count = Column(Integer, nullable=True) + + ide_used = Column(String, nullable=True) + notebook_environment = Column(String, nullable=True) + ci_environment = Column(String, nullable=True) + python_package_manager = Column(String, nullable=True) + framework_detected = Column(String, nullable=True) + + has_torch = Column(Boolean, nullable=True) + torch_version = Column(String, nullable=True) + has_transformers = Column(Boolean, nullable=True) + transformers_version = Column(String, nullable=True) + has_diffusers = Column(Boolean, nullable=True) + diffusers_version = Column(String, nullable=True) + has_tensorflow = Column(Boolean, nullable=True) + tensorflow_version = Column(String, nullable=True) + has_keras = Column(Boolean, nullable=True) + keras_version = Column(String, nullable=True) + has_pytorch_lightning = Column(Boolean, nullable=True) + pytorch_lightning_version = Column(String, nullable=True) + has_fastai = Column(Boolean, nullable=True) + fastai_version = Column(String, nullable=True) + ml_framework_primary = Column(String, nullable=True) + + container_runtime = Column(String, nullable=True) + in_container = Column(Boolean, nullable=True) + host_machine_hash = Column(String, nullable=True) + + def __repr__(self): + return ( + f'' + ) diff --git a/carbonserver/carbonserver/api/infra/repositories/repository_telemetry.py b/carbonserver/carbonserver/api/infra/repositories/repository_telemetry.py new file mode 100644 index 000000000..b530cbdc8 --- /dev/null +++ b/carbonserver/carbonserver/api/infra/repositories/repository_telemetry.py @@ -0,0 +1,28 @@ +"""Repository implementation for telemetry data using SQLAlchemy.""" + +import uuid +from contextlib import AbstractContextManager +from uuid import UUID + +from dependency_injector.providers import Callable + +from carbonserver.api.domain.telemetry import Telemetry +from carbonserver.api.infra.database.telemetry_sql_models import ( + Telemetry as SqlModelTelemetry, +) +from carbonserver.api.schemas_telemetry import TelemetryCreate + + +class SqlAlchemyRepository(Telemetry): + def __init__(self, session_factory) -> Callable[..., AbstractContextManager]: + self.session_factory = session_factory + + def add_telemetry(self, telemetry: TelemetryCreate) -> UUID: + with self.session_factory() as session: + db_telemetry = SqlModelTelemetry( + id=uuid.uuid4(), + **telemetry.model_dump(), + ) + session.add(db_telemetry) + session.commit() + return db_telemetry.id diff --git a/carbonserver/carbonserver/api/routers/telemetry.py b/carbonserver/carbonserver/api/routers/telemetry.py new file mode 100644 index 000000000..a2e4da796 --- /dev/null +++ b/carbonserver/carbonserver/api/routers/telemetry.py @@ -0,0 +1,31 @@ +"""API router for handling telemetry data in the CarbonServer API.""" + +from uuid import UUID + +from dependency_injector.wiring import Provide, inject +from fastapi import APIRouter, Depends +from starlette import status + +from carbonserver.api.schemas_telemetry import TelemetryCreate +from carbonserver.api.services.telemetry_service import TelemetryService +from carbonserver.container import ServerContainer + +TELEMETRY_ROUTER_TAGS = ["Telemetry"] + +router = APIRouter() + + +@router.post( + "/telemetry", + tags=TELEMETRY_ROUTER_TAGS, + status_code=status.HTTP_201_CREATED, + response_model=UUID, +) +@inject +def add_telemetry( + telemetry: TelemetryCreate, + telemetry_service: TelemetryService = Depends( + Provide[ServerContainer.telemetry_service] + ), +) -> UUID: + return telemetry_service.add_telemetry(telemetry) diff --git a/carbonserver/carbonserver/api/schemas_telemetry.py b/carbonserver/carbonserver/api/schemas_telemetry.py new file mode 100644 index 000000000..5517c2ff4 --- /dev/null +++ b/carbonserver/carbonserver/api/schemas_telemetry.py @@ -0,0 +1,178 @@ +"""Schemas for telemetry data submitted to the CarbonServer API.""" + +from datetime import datetime +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, ConfigDict, Field, model_validator + + +class TelemetryLevel(str, Enum): + disabled = "disabled" + minimal = "minimal" + extensive = "extensive" + + +class TelemetryBase(BaseModel): + model_config = ConfigDict( + extra="forbid", + use_enum_values=True, + json_schema_extra={ + "example": { + "timestamp": "2026-05-03T12:00:00+00:00", + "telemetry_level": "minimal", + "os": "Linux-5.10.0-x86_64", + "country_name": "France", + "country_iso_code": "FRA", + "cpu_count": 12, + "cpu_model": "Intel(R) Core(TM) i7-8850H CPU @ 2.60GHz", + "python_version": "3.11.5", + "codecarbon_version": "3.0.0", + } + }, + ) + + timestamp: datetime + telemetry_level: TelemetryLevel + + os: Optional[str] = None + country_name: Optional[str] = None + country_iso_code: Optional[str] = Field(default=None, min_length=2, max_length=3) + region: Optional[str] = None + cloud_provider: Optional[str] = None + cloud_region: Optional[str] = None + longitude: Optional[float] = Field(default=None, ge=-180, le=180) + latitude: Optional[float] = Field(default=None, ge=-90, le=90) + + cpu_count: Optional[int] = Field(default=None, ge=0) + cpu_physical_count: Optional[int] = Field(default=None, ge=0) + cpu_model: Optional[str] = None + cpu_architecture: Optional[str] = None + gpu_count: Optional[int] = Field(default=None, ge=0) + gpu_model: Optional[str] = None + gpu_driver_version: Optional[str] = None + gpu_memory_total_gb: Optional[float] = Field(default=None, ge=0) + ram_total_size_gb: Optional[float] = Field(default=None, ge=0) + cuda_version: Optional[str] = None + cudnn_version: Optional[str] = None + + python_version: Optional[str] = None + python_implementation: Optional[str] = None + python_executable_hash: Optional[str] = Field( + default=None, min_length=64, max_length=64 + ) + python_env_type: Optional[str] = None + codecarbon_version: Optional[str] = None + codecarbon_install_method: Optional[str] = None + + total_emissions_kg: Optional[float] = Field(default=None, ge=0) + emissions_rate_kg_per_sec: Optional[float] = Field(default=None, ge=0) + energy_consumed_kwh: Optional[float] = Field(default=None, ge=0) + cpu_energy_kwh: Optional[float] = Field(default=None, ge=0) + gpu_energy_kwh: Optional[float] = Field(default=None, ge=0) + ram_energy_kwh: Optional[float] = Field(default=None, ge=0) + duration_seconds: Optional[float] = Field(default=None, ge=0) + cpu_utilization_avg: Optional[float] = Field(default=None, ge=0, le=100) + gpu_utilization_avg: Optional[float] = Field(default=None, ge=0, le=100) + ram_utilization_avg: Optional[float] = Field(default=None, ge=0, le=100) + + tracking_mode: Optional[str] = None + api_mode: Optional[str] = None + output_methods: Optional[List[str]] = None + hardware_tracked: Optional[List[str]] = None + task_tracking_used: Optional[bool] = None + decorator_vs_context: Optional[str] = None + measure_power_interval_secs: Optional[float] = Field(default=None, ge=0) + + hardware_detection_success: Optional[bool] = None + rapl_available: Optional[bool] = None + gpu_detection_method: Optional[str] = None + first_measurement_time_ms: Optional[float] = Field(default=None, ge=0) + tracking_overhead_percent: Optional[float] = Field(default=None, ge=0) + errors_encountered: Optional[List[str]] = None + warning_count: Optional[int] = Field(default=None, ge=0) + + ide_used: Optional[str] = None + notebook_environment: Optional[str] = None + ci_environment: Optional[str] = None + python_package_manager: Optional[str] = None + framework_detected: Optional[str] = None + + has_torch: Optional[bool] = None + torch_version: Optional[str] = None + has_transformers: Optional[bool] = None + transformers_version: Optional[str] = None + has_diffusers: Optional[bool] = None + diffusers_version: Optional[str] = None + has_tensorflow: Optional[bool] = None + tensorflow_version: Optional[str] = None + has_keras: Optional[bool] = None + keras_version: Optional[str] = None + has_pytorch_lightning: Optional[bool] = None + pytorch_lightning_version: Optional[str] = None + has_fastai: Optional[bool] = None + fastai_version: Optional[str] = None + ml_framework_primary: Optional[str] = None + + container_runtime: Optional[str] = None + in_container: Optional[bool] = None + host_machine_hash: Optional[str] = None + + @model_validator(mode="after") + def validate_telemetry_level(self): + if self.telemetry_level == TelemetryLevel.disabled: + raise ValueError("Disabled telemetry must not be submitted") + + if self.telemetry_level == TelemetryLevel.minimal: + extensive_fields = set(type(self).model_fields) - MINIMAL_TELEMETRY_FIELDS + submitted_extensive_fields = [ + field + for field in extensive_fields + if getattr(self, field) not in (None, [], {}) + ] + if submitted_extensive_fields: + fields = ", ".join(sorted(submitted_extensive_fields)) + raise ValueError( + f"Minimal telemetry cannot include extensive fields: {fields}" + ) + + return self + + +MINIMAL_TELEMETRY_FIELDS = { + "timestamp", + "telemetry_level", + "os", + "country_name", + "country_iso_code", + "region", + "cloud_provider", + "cloud_region", + "longitude", + "latitude", + "cpu_count", + "cpu_physical_count", + "cpu_model", + "cpu_architecture", + "gpu_count", + "gpu_model", + "gpu_driver_version", + "gpu_memory_total_gb", + "ram_total_size_gb", + "cuda_version", + "cudnn_version", + "python_version", + "python_implementation", + "python_executable_hash", + "python_env_type", + "codecarbon_version", + "codecarbon_install_method", +} + + +class TelemetryCreate(TelemetryBase): + pass + + +class Telemetry(TelemetryBase): + id: str diff --git a/carbonserver/carbonserver/api/services/telemetry_service.py b/carbonserver/carbonserver/api/services/telemetry_service.py new file mode 100644 index 000000000..8e0161e83 --- /dev/null +++ b/carbonserver/carbonserver/api/services/telemetry_service.py @@ -0,0 +1,16 @@ +"""Service layer for handling telemetry data in the CarbonServer API.""" + +from uuid import UUID + +from carbonserver.api.infra.repositories.repository_telemetry import ( + SqlAlchemyRepository as TelemetrySqlRepository, +) +from carbonserver.api.schemas_telemetry import TelemetryCreate + + +class TelemetryService: + def __init__(self, telemetry_repository: TelemetrySqlRepository): + self._repository = telemetry_repository + + def add_telemetry(self, telemetry: TelemetryCreate) -> UUID: + return self._repository.add_telemetry(telemetry) diff --git a/carbonserver/carbonserver/container.py b/carbonserver/carbonserver/container.py index b730fb706..c09711495 100644 --- a/carbonserver/carbonserver/container.py +++ b/carbonserver/carbonserver/container.py @@ -8,6 +8,7 @@ repository_projects, repository_projects_tokens, repository_runs, + repository_telemetry, repository_users, ) from carbonserver.api.services.auth_context import AuthContext @@ -21,6 +22,7 @@ from carbonserver.api.services.project_token_service import ProjectTokenService from carbonserver.api.services.run_service import RunService from carbonserver.api.services.signup_service import SignUpService +from carbonserver.api.services.telemetry_service import TelemetryService from carbonserver.api.services.user_service import UserService from carbonserver.api.usecases.experiment.project_sum_by_experiment import ( ProjectSumsByExperimentUsecase, @@ -63,6 +65,11 @@ class ServerContainer(containers.DeclarativeContainer): session_factory=db.provided.session, ) + telemetry_repository = providers.Factory( + repository_telemetry.SqlAlchemyRepository, + session_factory=db.provided.session, + ) + experiment_repository = providers.Factory( repository_experiments.SqlAlchemyRepository, session_factory=db.provided.session, @@ -99,6 +106,11 @@ class ServerContainer(containers.DeclarativeContainer): emission_repository=emission_repository, ) + telemetry_service = providers.Factory( + TelemetryService, + telemetry_repository=telemetry_repository, + ) + experiment_service = providers.Factory( ExperimentService, auth_context=auth_context, diff --git a/carbonserver/main.py b/carbonserver/main.py index 8eb366059..b02119ecd 100644 --- a/carbonserver/main.py +++ b/carbonserver/main.py @@ -9,7 +9,7 @@ from starlette.responses import JSONResponse from carbonserver.api.errors import DBException, UserException, get_http_exception -from carbonserver.api.infra.database import sql_models +from carbonserver.api.infra.database import sql_models, telemetry_sql_models from carbonserver.api.routers import ( authenticate, emissions, @@ -18,6 +18,7 @@ project_api_tokens, projects, runs, + telemetry, users, ) from carbonserver.api.services import auth_service @@ -70,6 +71,7 @@ def init_container(): project_api_tokens, organizations, users, + telemetry, authenticate, auth_service, ] @@ -81,6 +83,7 @@ def init_db(container): db = container.db() db.create_database() sql_models.Base.metadata.create_all(bind=engine) + telemetry_sql_models.Base.metadata.create_all(bind=engine) def init_server(container): @@ -102,6 +105,7 @@ def init_server(container): server.include_router(experiments.router) server.include_router(runs.router) server.include_router(emissions.router) + server.include_router(telemetry.router) add_pagination(server) # Add CORS from env variable diff --git a/carbonserver/tests/api/routers/test_telemetry.py b/carbonserver/tests/api/routers/test_telemetry.py new file mode 100644 index 000000000..a405b59e8 --- /dev/null +++ b/carbonserver/tests/api/routers/test_telemetry.py @@ -0,0 +1,80 @@ +from unittest import mock +from uuid import UUID + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient +from starlette import status + +from carbonserver.api.infra.repositories.repository_telemetry import ( + SqlAlchemyRepository as TelemetryRepository, +) +from carbonserver.api.routers import telemetry +from carbonserver.container import ServerContainer + +TELEMETRY_ID = "f52fe339-164d-4c2b-a8c0-f562dfce066d" + +MINIMAL_TELEMETRY_TO_CREATE = { + "timestamp": "2026-05-03T12:00:00+00:00", + "telemetry_level": "minimal", + "os": "Linux-5.10.0-x86_64", + "country_name": "France", + "country_iso_code": "FRA", + "cpu_count": 12, + "python_version": "3.11.5", + "codecarbon_version": "3.2.6", +} + + +@pytest.fixture +def custom_test_server(): + container = ServerContainer() + container.wire(modules=[telemetry]) + app = FastAPI() + app.container = container + app.include_router(telemetry.router) + yield app + + +@pytest.fixture +def client(custom_test_server): + yield TestClient(custom_test_server) + + +def test_add_telemetry(client, custom_test_server): + repository_mock = mock.Mock(spec=TelemetryRepository) + repository_mock.add_telemetry.return_value = UUID(TELEMETRY_ID) + + with custom_test_server.container.telemetry_repository.override(repository_mock): + response = client.post("/telemetry", json=MINIMAL_TELEMETRY_TO_CREATE) + + assert response.status_code == status.HTTP_201_CREATED + assert response.json() == TELEMETRY_ID + + +def test_minimal_telemetry_rejects_extensive_fields(client, custom_test_server): + repository_mock = mock.Mock(spec=TelemetryRepository) + telemetry_with_extensive_field = { + **MINIMAL_TELEMETRY_TO_CREATE, + "total_emissions_kg": 0.42, + } + + with custom_test_server.container.telemetry_repository.override(repository_mock): + response = client.post("/telemetry", json=telemetry_with_extensive_field) + + assert response.status_code == 422 + repository_mock.add_telemetry.assert_not_called() + + +def test_disabled_telemetry_is_rejected(client, custom_test_server): + repository_mock = mock.Mock(spec=TelemetryRepository) + disabled_telemetry = { + **MINIMAL_TELEMETRY_TO_CREATE, + "telemetry_level": "disabled", + } + + with custom_test_server.container.telemetry_repository.override(repository_mock): + response = client.post("/telemetry", json=disabled_telemetry) + + assert response.status_code == 422 + repository_mock.add_telemetry.assert_not_called() diff --git a/carbonserver/tests/api/service/test_telemetry_service.py b/carbonserver/tests/api/service/test_telemetry_service.py new file mode 100644 index 000000000..08ef947e0 --- /dev/null +++ b/carbonserver/tests/api/service/test_telemetry_service.py @@ -0,0 +1,29 @@ +from unittest import mock + +from carbonserver.api.infra.repositories.repository_telemetry import ( + SqlAlchemyRepository, +) +from carbonserver.api.schemas_telemetry import TelemetryCreate +from carbonserver.api.services.telemetry_service import TelemetryService + +TELEMETRY_ID = "f52fe339-164d-4c2b-a8c0-f562dfce066d" + + +def test_telemetry_service_creates_telemetry(): + repository_mock: SqlAlchemyRepository = mock.Mock(spec=SqlAlchemyRepository) + telemetry_service = TelemetryService(repository_mock) + repository_mock.add_telemetry.return_value = TELEMETRY_ID + + telemetry_to_create = TelemetryCreate( + timestamp="2026-05-03T12:00:00+00:00", + telemetry_level="minimal", + os="Linux-5.10.0-x86_64", + cpu_count=12, + python_version="3.11.5", + codecarbon_version="3.2.6", + ) + + actual_saved_telemetry_id = telemetry_service.add_telemetry(telemetry_to_create) + + assert actual_saved_telemetry_id == TELEMETRY_ID + repository_mock.add_telemetry.assert_called_once_with(telemetry_to_create) diff --git a/codecarbon/core/telemetry_client.py b/codecarbon/core/telemetry_client.py new file mode 100644 index 000000000..8dfdad05a --- /dev/null +++ b/codecarbon/core/telemetry_client.py @@ -0,0 +1,61 @@ +import json +from typing import Optional, Union + +import requests + +from codecarbon.core.telemetry_schemas import TelemetryCreate +from codecarbon.external.logger import logger + + +class TelemetryClient: + """ + Client dedicated to sending CodeCarbon telemetry payloads. + """ + + def __init__( + self, + endpoint_url="https://api.codecarbon.io", + telemetry: Optional[Union[TelemetryCreate, dict]] = None, + ): + self.endpoint_url = endpoint_url.rstrip("/") + self.telemetry_url = self.endpoint_url + "/telemetry" + self.headers = {"Content-Type": "application/json"} + self.telemetry = self._validate_telemetry(telemetry) if telemetry else None + + def add_telemetry(self, telemetry: Optional[Union[TelemetryCreate, dict]] = None): + telemetry_payload = ( + self._validate_telemetry(telemetry) if telemetry else self.telemetry + ) + if telemetry_payload is None: + logger.error("TelemetryClient.add_telemetry() needs a telemetry payload") + return None + payload = telemetry_payload.model_dump(mode="json", exclude_none=True) + + try: + response = requests.post( + url=self.telemetry_url, + json=payload, + timeout=2, + headers=self.headers, + ) + if response.status_code != 201: + self._log_error(payload, response) + return None + return response.json() + except Exception as e: + logger.error(e, exc_info=True) + return None + + @staticmethod + def _validate_telemetry(telemetry: Union[TelemetryCreate, dict]) -> TelemetryCreate: + if isinstance(telemetry, TelemetryCreate): + return telemetry + return TelemetryCreate(**telemetry) + + def _log_error(self, payload, response): + logger.error( + f"TelemetryClient Error when calling the API on {self.telemetry_url} with : {json.dumps(payload)}" + ) + logger.error( + f"TelemetryClient API return http code {response.status_code} and answer : {response.text}" + ) diff --git a/codecarbon/core/telemetry_schemas.py b/codecarbon/core/telemetry_schemas.py new file mode 100644 index 000000000..ea6249b65 --- /dev/null +++ b/codecarbon/core/telemetry_schemas.py @@ -0,0 +1,156 @@ +from datetime import datetime +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, ConfigDict, Field, model_validator + + +class TelemetryLevel(str, Enum): + disabled = "disabled" + minimal = "minimal" + extensive = "extensive" + + +class TelemetryBase(BaseModel): + model_config = ConfigDict(extra="forbid", use_enum_values=True) + + timestamp: datetime + telemetry_level: TelemetryLevel + + os: Optional[str] = None + country_name: Optional[str] = None + country_iso_code: Optional[str] = Field(default=None, min_length=2, max_length=3) + region: Optional[str] = None + cloud_provider: Optional[str] = None + cloud_region: Optional[str] = None + longitude: Optional[float] = Field(default=None, ge=-180, le=180) + latitude: Optional[float] = Field(default=None, ge=-90, le=90) + + cpu_count: Optional[int] = Field(default=None, ge=0) + cpu_physical_count: Optional[int] = Field(default=None, ge=0) + cpu_model: Optional[str] = None + cpu_architecture: Optional[str] = None + gpu_count: Optional[int] = Field(default=None, ge=0) + gpu_model: Optional[str] = None + gpu_driver_version: Optional[str] = None + gpu_memory_total_gb: Optional[float] = Field(default=None, ge=0) + ram_total_size_gb: Optional[float] = Field(default=None, ge=0) + cuda_version: Optional[str] = None + cudnn_version: Optional[str] = None + + python_version: Optional[str] = None + python_implementation: Optional[str] = None + python_executable_hash: Optional[str] = Field( + default=None, min_length=64, max_length=64 + ) + python_env_type: Optional[str] = None + codecarbon_version: Optional[str] = None + codecarbon_install_method: Optional[str] = None + + total_emissions_kg: Optional[float] = Field(default=None, ge=0) + emissions_rate_kg_per_sec: Optional[float] = Field(default=None, ge=0) + energy_consumed_kwh: Optional[float] = Field(default=None, ge=0) + cpu_energy_kwh: Optional[float] = Field(default=None, ge=0) + gpu_energy_kwh: Optional[float] = Field(default=None, ge=0) + ram_energy_kwh: Optional[float] = Field(default=None, ge=0) + duration_seconds: Optional[float] = Field(default=None, ge=0) + cpu_utilization_avg: Optional[float] = Field(default=None, ge=0, le=100) + gpu_utilization_avg: Optional[float] = Field(default=None, ge=0, le=100) + ram_utilization_avg: Optional[float] = Field(default=None, ge=0, le=100) + + tracking_mode: Optional[str] = None + api_mode: Optional[str] = None + output_methods: Optional[List[str]] = None + hardware_tracked: Optional[List[str]] = None + task_tracking_used: Optional[bool] = None + decorator_vs_context: Optional[str] = None + measure_power_interval_secs: Optional[float] = Field(default=None, ge=0) + + hardware_detection_success: Optional[bool] = None + rapl_available: Optional[bool] = None + gpu_detection_method: Optional[str] = None + first_measurement_time_ms: Optional[float] = Field(default=None, ge=0) + tracking_overhead_percent: Optional[float] = Field(default=None, ge=0) + errors_encountered: Optional[List[str]] = None + warning_count: Optional[int] = Field(default=None, ge=0) + + ide_used: Optional[str] = None + notebook_environment: Optional[str] = None + ci_environment: Optional[str] = None + python_package_manager: Optional[str] = None + framework_detected: Optional[str] = None + + has_torch: Optional[bool] = None + torch_version: Optional[str] = None + has_transformers: Optional[bool] = None + transformers_version: Optional[str] = None + has_diffusers: Optional[bool] = None + diffusers_version: Optional[str] = None + has_tensorflow: Optional[bool] = None + tensorflow_version: Optional[str] = None + has_keras: Optional[bool] = None + keras_version: Optional[str] = None + has_pytorch_lightning: Optional[bool] = None + pytorch_lightning_version: Optional[str] = None + has_fastai: Optional[bool] = None + fastai_version: Optional[str] = None + ml_framework_primary: Optional[str] = None + + container_runtime: Optional[str] = None + in_container: Optional[bool] = None + host_machine_hash: Optional[str] = None + + @model_validator(mode="after") + def validate_telemetry_level(self): + if self.telemetry_level == TelemetryLevel.disabled: + raise ValueError("Disabled telemetry must not be submitted") + + if self.telemetry_level == TelemetryLevel.minimal: + extensive_fields = set(type(self).model_fields) - MINIMAL_TELEMETRY_FIELDS + submitted_extensive_fields = [ + field + for field in extensive_fields + if getattr(self, field) not in (None, [], {}) + ] + if submitted_extensive_fields: + fields = ", ".join(sorted(submitted_extensive_fields)) + raise ValueError( + f"Minimal telemetry cannot include extensive fields: {fields}" + ) + + return self + + +MINIMAL_TELEMETRY_FIELDS = { + "timestamp", + "telemetry_level", + "os", + "country_name", + "country_iso_code", + "region", + "cloud_provider", + "cloud_region", + "longitude", + "latitude", + "cpu_count", + "cpu_physical_count", + "cpu_model", + "cpu_architecture", + "gpu_count", + "gpu_model", + "gpu_driver_version", + "gpu_memory_total_gb", + "ram_total_size_gb", + "cuda_version", + "cudnn_version", + "python_version", + "python_implementation", + "python_executable_hash", + "python_env_type", + "codecarbon_version", + "codecarbon_install_method", +} + + +class TelemetryCreate(TelemetryBase): + pass diff --git a/tests/test_telemetry_client.py b/tests/test_telemetry_client.py new file mode 100644 index 000000000..14ade8b7f --- /dev/null +++ b/tests/test_telemetry_client.py @@ -0,0 +1,104 @@ +import unittest + +import requests_mock +from pydantic import ValidationError + +from codecarbon.core.telemetry_client import TelemetryClient +from codecarbon.core.telemetry_schemas import TelemetryCreate + + +class TestTelemetryClient(unittest.TestCase): + def test_init_sets_up_client_without_calling_api(self): + with requests_mock.Mocker() as m: + client = TelemetryClient( + endpoint_url="http://test.com/", + telemetry={ + "timestamp": "2026-05-03T12:00:00+00:00", + "telemetry_level": "minimal", + }, + ) + + self.assertEqual(client.endpoint_url, "http://test.com") + self.assertEqual(client.telemetry_url, "http://test.com/telemetry") + self.assertIsInstance(client.telemetry, TelemetryCreate) + self.assertEqual(m.call_count, 0) + + def test_add_telemetry_posts_configured_payload(self): + telemetry = { + "timestamp": "2026-05-03T12:00:00+00:00", + "telemetry_level": "minimal", + "os": "Linux-5.10.0-x86_64", + } + + with requests_mock.Mocker() as m: + m.post( + "http://test.com/telemetry", + json="f52fe339-164d-4c2b-a8c0-f562dfce066d", + status_code=201, + ) + client = TelemetryClient( + endpoint_url="http://test.com", telemetry=telemetry + ) + + actual_telemetry_id = client.add_telemetry() + + self.assertEqual( + actual_telemetry_id, "f52fe339-164d-4c2b-a8c0-f562dfce066d" + ) + self.assertEqual(m.call_count, 1) + self.assertEqual( + m.last_request.json(), + { + **telemetry, + "timestamp": "2026-05-03T12:00:00Z", + }, + ) + + def test_add_telemetry_posts_call_payload(self): + telemetry = TelemetryCreate( + timestamp="2026-05-03T12:00:00+00:00", + telemetry_level="minimal", + os="Linux-5.10.0-x86_64", + ) + + with requests_mock.Mocker() as m: + m.post( + "http://test.com/telemetry", + json="f52fe339-164d-4c2b-a8c0-f562dfce066d", + status_code=201, + ) + client = TelemetryClient(endpoint_url="http://test.com") + + actual_telemetry_id = client.add_telemetry(telemetry) + + self.assertEqual( + actual_telemetry_id, "f52fe339-164d-4c2b-a8c0-f562dfce066d" + ) + self.assertEqual(m.call_count, 1) + self.assertEqual( + m.last_request.json(), + { + "timestamp": "2026-05-03T12:00:00Z", + "telemetry_level": "minimal", + "os": "Linux-5.10.0-x86_64", + }, + ) + + def test_init_rejects_invalid_telemetry_without_calling_api(self): + with requests_mock.Mocker() as m: + with self.assertRaises(ValidationError): + TelemetryClient( + endpoint_url="http://test.com", + telemetry={ + "timestamp": "2026-05-03T12:00:00+00:00", + "telemetry_level": "minimal", + "total_emissions_kg": 0.42, + }, + ) + + self.assertEqual(m.call_count, 0) + + def test_add_telemetry_returns_none_without_payload(self): + client = TelemetryClient(endpoint_url="http://test.com") + + self.assertIsNone(client.add_telemetry()) From 39eef0ba478f5c70508b727e469fc91ea9048c6d Mon Sep 17 00:00:00 2001 From: inimaz <93inigo93@gmail.com> Date: Sun, 3 May 2026 14:36:08 +0200 Subject: [PATCH 2/2] tests: add a test so that client and server do not differ --- .../tests/api/test_telemetry_schema_drift.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 carbonserver/tests/api/test_telemetry_schema_drift.py diff --git a/carbonserver/tests/api/test_telemetry_schema_drift.py b/carbonserver/tests/api/test_telemetry_schema_drift.py new file mode 100644 index 000000000..abc20d764 --- /dev/null +++ b/carbonserver/tests/api/test_telemetry_schema_drift.py @@ -0,0 +1,45 @@ +"""Test to ensure that the telemetry schema used by the CarbonServer API does not drift from the core telemetry schema defined in CodeCarbon.""" + +import importlib.util +from copy import deepcopy +from pathlib import Path + +from carbonserver.api.schemas_telemetry import TelemetryCreate as ServerTelemetryCreate + +REPO_ROOT = Path(__file__).resolve().parents[3] +CORE_TELEMETRY_SCHEMA_PATH = REPO_ROOT / "codecarbon" / "core" / "telemetry_schemas.py" + + +def _load_core_telemetry_create(): + spec = importlib.util.spec_from_file_location( + "core_telemetry_schemas", + CORE_TELEMETRY_SCHEMA_PATH, + ) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module.TelemetryCreate + + +def _contract_schema(schema): + contract = deepcopy(schema) + contract.pop("description", None) + contract.pop("example", None) + contract.pop("examples", None) + contract.pop("title", None) + contract.pop("$defs", None) + + for property_schema in contract.get("properties", {}).values(): + property_schema.pop("description", None) + property_schema.pop("example", None) + property_schema.pop("examples", None) + property_schema.pop("title", None) + + return contract + + +def test_core_and_server_telemetry_schemas_do_not_drift(): + CoreTelemetryCreate = _load_core_telemetry_create() + + assert _contract_schema( + CoreTelemetryCreate.model_json_schema() + ) == _contract_schema(ServerTelemetryCreate.model_json_schema())