diff --git a/codecarbon/__init__.py b/codecarbon/__init__.py index f602f2635..15fc25cd0 100644 --- a/codecarbon/__init__.py +++ b/codecarbon/__init__.py @@ -8,6 +8,12 @@ OfflineEmissionsTracker, track_emissions, ) +from .output import OutputMethod -__all__ = ["EmissionsTracker", "OfflineEmissionsTracker", "track_emissions"] +__all__ = [ + "EmissionsTracker", + "OfflineEmissionsTracker", + "OutputMethod", + "track_emissions", +] __app_name__ = "codecarbon" diff --git a/codecarbon/emissions_tracker.py b/codecarbon/emissions_tracker.py index 862eba2b4..c2200fe66 100644 --- a/codecarbon/emissions_tracker.py +++ b/codecarbon/emissions_tracker.py @@ -9,6 +9,7 @@ import re import time import uuid +import warnings from abc import ABC, abstractmethod from datetime import datetime from functools import wraps @@ -32,12 +33,14 @@ from codecarbon.lock import Lock from codecarbon.output import ( BaseOutput, + BoAmpsOutput, CodeCarbonAPIOutput, EmissionsData, FileOutput, HTTPOutput, LogfireOutput, LoggerOutput, + OutputMethod, PrometheusOutput, ) @@ -162,6 +165,201 @@ def _set_from_conf( # return final value (why not?) return value + def _configure_multiple_runs(self, allow_multiple_runs) -> bool: + self._set_from_conf(allow_multiple_runs, "allow_multiple_runs", True, bool) + if self._allow_multiple_runs: + logger.warning( + "Multiple instances of codecarbon are allowed to run at the same time." + ) + return False + + try: + self._lock = Lock() + self._lock.acquire() + except FileExistsError: + logger.error( + f"Error: Another instance of codecarbon is probably running as we find `{self._lock.lockfile_path}`. Turn off the other instance to be able to run this one or use `allow_multiple_runs` or delete the file. Exiting." + ) + self._another_instance_already_running = True + return True + + return False + + def _configure_electricitymaps_token( + self, electricitymaps_api_token, co2_signal_api_token + ) -> None: + if co2_signal_api_token is not _sentinel: + logger.warning( + "Parameter 'co2_signal_api_token' is deprecated and will be removed in a future version. " + "Please use 'electricitymaps_api_token' instead." + ) + if electricitymaps_api_token is _sentinel: + electricitymaps_api_token = co2_signal_api_token + + self._set_from_conf(electricitymaps_api_token, "electricitymaps_api_token") + + if self._electricitymaps_api_token is not None: + return + + self._set_from_conf(_sentinel, "co2_signal_api_token", prevent_setter=True) + old_token = self._external_conf.get("co2_signal_api_token") + if old_token: + logger.warning( + "Configuration parameter 'co2_signal_api_token' is deprecated. " + "Please update your config to use 'electricitymaps_api_token' instead." + ) + self._electricitymaps_api_token = old_token + + def _resolve_output_methods( + self, + output_methods, + save_to_file, + save_to_api, + save_to_logger, + save_to_prometheus, + save_to_logfire, + ) -> None: + save_to_flags = { + "save_to_file": save_to_file, + "save_to_api": save_to_api, + "save_to_logger": save_to_logger, + "save_to_prometheus": save_to_prometheus, + "save_to_logfire": save_to_logfire, + } + if any(value is not _sentinel for value in save_to_flags.values()): + warnings.warn( + "The save_to_* parameters are deprecated and will be removed in a " + "future version. Use output_methods=[OutputMethod.CSV, ...] instead.", + DeprecationWarning, + stacklevel=2, + ) + + self._set_from_conf(output_methods, "output_methods") + if isinstance(self._output_methods, str): + self._output_methods = [ + OutputMethod(method.strip()) + for method in self._output_methods.split(",") + if method.strip() + ] + + if self._output_methods is not None and self._output_methods is not _sentinel: + self._save_to_file = OutputMethod.CSV in self._output_methods + self._save_to_api = OutputMethod.API in self._output_methods + self._save_to_logger = OutputMethod.LOGGER in self._output_methods + self._save_to_prometheus = OutputMethod.PROMETHEUS in self._output_methods + self._save_to_logfire = OutputMethod.LOGFIRE in self._output_methods + self._conf["save_to_file"] = self._save_to_file + self._conf["save_to_api"] = self._save_to_api + self._conf["save_to_logger"] = self._save_to_logger + self._conf["save_to_prometheus"] = self._save_to_prometheus + self._conf["save_to_logfire"] = self._save_to_logfire + return + + self._set_from_conf(save_to_api, "save_to_api", False, bool) + self._set_from_conf(save_to_file, "save_to_file", True, bool) + self._set_from_conf(save_to_logger, "save_to_logger", False, bool) + self._set_from_conf(save_to_prometheus, "save_to_prometheus", False, bool) + self._set_from_conf(save_to_logfire, "save_to_logfire", False, bool) + + self._output_methods = [] + if self._save_to_file: + self._output_methods.append(OutputMethod.CSV) + if self._save_to_api: + self._output_methods.append(OutputMethod.API) + if self._save_to_logger: + self._output_methods.append(OutputMethod.LOGGER) + if self._save_to_prometheus: + self._output_methods.append(OutputMethod.PROMETHEUS) + if self._save_to_logfire: + self._output_methods.append(OutputMethod.LOGFIRE) + + def _initialize_runtime_state(self) -> None: + self._start_time: Optional[float] = None + self._last_measured_time: float = time.perf_counter() + self._total_energy: Energy = Energy.from_energy(kWh=0) + self._total_emissions: float = 0.0 + self._last_energy_covered: Energy = Energy.from_energy(kWh=0) + self._total_water: Water = Water.from_litres(litres=0) + self._cpu_utilization_history: List[float] = [] + self._gpu_utilization_history: List[float] = [] + self._ram_utilization_history: List[float] = [] + self._ram_used_history: List[float] = [] + self._total_cpu_energy: Energy = Energy.from_energy(kWh=0) + self._total_gpu_energy: Energy = Energy.from_energy(kWh=0) + self._total_ram_energy: Energy = Energy.from_energy(kWh=0) + self._cpu_power: Power = Power.from_watts(watts=0) + self._gpu_power: Power = Power.from_watts(watts=0) + self._ram_power: Power = Power.from_watts(watts=0) + self._cpu_power_sum: float = 0.0 + self._gpu_power_sum: float = 0.0 + self._ram_power_sum: float = 0.0 + self._power_measurement_count: int = 0 + self._measure_occurrence: int = 0 + self._cloud = None + self._previous_emissions = None + self._geo = None + self._task_start_measurement_values = {} + self._task_stop_measurement_values = {} + self._tasks: Dict[str, Task] = {} + self._active_task: Optional[str] = None + self._active_task_emissions_at_start: Optional[EmissionsData] = None + self._hardware = [] + + def _populate_system_metadata(self) -> None: + self._conf["os"] = platform.platform() + self._conf["python_version"] = platform.python_version() + self._conf["cpu_count"] = count_cpus() + self._conf["cpu_physical_count"] = count_physical_cpus() + + def _initialize_hardware_tracking(self) -> None: + resource_tracker = ResourceTracker(self) + resource_tracker.set_CPU_GPU_ram_tracking() + self._conf["hardware"] = [item.description() for item in self._hardware] + + def _log_tracker_metadata(self) -> None: + logger.info(">>> Tracker's metadata:") + logger.info(f" Platform system: {self._conf.get('os')}") + logger.info(f" Python version: {self._conf.get('python_version')}") + logger.info(f" CodeCarbon version: {self._conf.get('codecarbon_version')}") + + hardware_info = self.get_detected_hardware() + logger.info(f" Available RAM : {hardware_info['ram_total_size']:.3f} GB") + logger.info( + f" CPU count: {hardware_info['cpu_count']} thread(s) in {hardware_info['cpu_physical_count']} physical CPU(s)" + ) + logger.info(f" CPU model: {hardware_info['cpu_model']}") + logger.info(f" GPU count: {hardware_info['gpu_count']}") + if self._gpu_ids: + logger.info( + f" GPU model: {hardware_info['gpu_model']} BUT only tracking these GPU ids : {hardware_info['gpu_ids']}" + ) + return + + logger.info(f" GPU model: {hardware_info['gpu_model']}") + + def _initialize_scheduler_state(self) -> None: + self._scheduler = PeriodicScheduler( + function=self._measure_power_and_energy, + interval=self._measure_power_secs, + ) + self._scheduler_monitor_power = PeriodicScheduler( + function=self._monitor_power, + interval=1, + ) + + def _initialize_emissions_context(self) -> None: + self._data_source = DataSource() + cloud: CloudMetadata = self._get_cloud_metadata() + self._geo = self._get_geo_metadata() + + if cloud.is_on_private_infra: + self._conf["longitude"] = self._geo.longitude + self._conf["latitude"] = self._geo.latitude + + self._conf["region"] = cloud.region + self._conf["provider"] = cloud.provider + self._emissions = Emissions(self._data_source, self._electricitymaps_api_token) + def __init__( self, project_name: Optional[str] = _sentinel, @@ -171,6 +369,7 @@ def __init__( api_key: Optional[str] = _sentinel, output_dir: Optional[str] = _sentinel, output_file: Optional[str] = _sentinel, + output_methods: Optional[List[OutputMethod]] = _sentinel, save_to_file: Optional[bool] = _sentinel, save_to_api: Optional[bool] = _sentinel, save_to_logger: Optional[bool] = _sentinel, @@ -215,17 +414,33 @@ def __init__( :param output_dir: Directory path to which the experiment details are logged, defaults to current directory. :param output_file: Name of the output CSV file, defaults to `emissions.csv`. - :param save_to_file: Indicates if the emission artifacts should be logged to a + :param output_methods: List of :class:`OutputMethod` enum values specifying where + to send emissions data. Example:: + + EmissionsTracker(output_methods=[OutputMethod.CSV, OutputMethod.API]) + + Available values: ``CSV``, ``API``, ``LOGGER``, + ``PROMETHEUS``, ``LOGFIRE``, ``BOAMPS``, ``HTTP``. + When provided, the individual ``save_to_*`` flags are + ignored. Defaults to ``[OutputMethod.CSV]``. + Can also be set in config as a comma-separated string: + ``output_methods=csv,api``. + :param save_to_file: [DEPRECATED] Use ``output_methods`` instead. + Indicates if the emission artifacts should be logged to a file, defaults to True. - :param save_to_api: Indicates if the emission artifacts should be sent to the + :param save_to_api: [DEPRECATED] Use ``output_methods`` instead. + Indicates if the emission artifacts should be sent to the CodeCarbon API, defaults to False. - :param save_to_logger: Indicates if the emission artifacts should be written + :param save_to_logger: [DEPRECATED] Use ``output_methods`` instead. + Indicates if the emission artifacts should be written to a dedicated logger, defaults to False. :param logging_logger: LoggerOutput object encapsulating a logging.logger or a Google Cloud logger. - :param save_to_prometheus: Indicates if the emission artifacts should be + :param save_to_prometheus: [DEPRECATED] Use ``output_methods`` instead. + Indicates if the emission artifacts should be pushed to prometheus, defaults to False. - :param save_to_logfire: Indicates if the emission artifacts should be written + :param save_to_logfire: [DEPRECATED] Use ``output_methods`` instead. + Indicates if the emission artifacts should be written to a logfire observability platform, defaults to False. :param prometheus_url: url of the prometheus server, defaults to `localhost:9091`. :param output_handlers: List of custom output handlers to use. Defaults to []. @@ -276,52 +491,15 @@ def __init__( # logger.info("base tracker init") self._external_conf = get_hierarchical_config() - self._set_from_conf(allow_multiple_runs, "allow_multiple_runs", True, bool) - if self._allow_multiple_runs: - logger.warning( - "Multiple instances of codecarbon are allowed to run at the same time." - ) - else: - # Acquire lock file to prevent multiple instances of codecarbon running - # at the same time - try: - self._lock = Lock() - self._lock.acquire() - except FileExistsError: - logger.error( - f"Error: Another instance of codecarbon is probably running as we find `{self._lock.lockfile_path}`. Turn off the other instance to be able to run this one or use `allow_multiple_runs` or delete the file. Exiting." - ) - # Do not continue if another instance of codecarbon is running - self._another_instance_already_running = True - return + if self._configure_multiple_runs(allow_multiple_runs): + return self._set_from_conf(api_call_interval, "api_call_interval", 8, int) self._set_from_conf(api_endpoint, "api_endpoint", "https://api.codecarbon.io") self._set_from_conf(api_key, "api_key", "api_key") - - # Handle backward compatibility for co2_signal_api_token - if co2_signal_api_token is not _sentinel: - logger.warning( - "Parameter 'co2_signal_api_token' is deprecated and will be removed in a future version. " - "Please use 'electricitymaps_api_token' instead." - ) - if electricitymaps_api_token is _sentinel: - electricitymaps_api_token = co2_signal_api_token - - self._set_from_conf(electricitymaps_api_token, "electricitymaps_api_token") - # Also check for old config name for backward compatibility - if ( - not hasattr(self, "_electricitymaps_api_token") - or self._electricitymaps_api_token is None - ): - self._set_from_conf(_sentinel, "co2_signal_api_token", prevent_setter=True) - old_token = self._external_conf.get("co2_signal_api_token") - if old_token: - logger.warning( - "Configuration parameter 'co2_signal_api_token' is deprecated. " - "Please update your config to use 'electricitymaps_api_token' instead." - ) - self._electricitymaps_api_token = old_token + self._configure_electricitymaps_token( + electricitymaps_api_token, co2_signal_api_token + ) self._set_from_conf(emissions_endpoint, "emissions_endpoint") self._set_from_conf(experiment_name, "experiment_name", "base") @@ -331,12 +509,16 @@ def __init__( self._set_from_conf(output_dir, "output_dir", ".") self._set_from_conf(output_file, "output_file", "emissions.csv") self._set_from_conf(project_name, "project_name", "codecarbon") - self._set_from_conf(save_to_api, "save_to_api", False, bool) - self._set_from_conf(save_to_file, "save_to_file", True, bool) - self._set_from_conf(save_to_logger, "save_to_logger", False, bool) + self._resolve_output_methods( + output_methods, + save_to_file, + save_to_api, + save_to_logger, + save_to_prometheus, + save_to_logfire, + ) + self._set_from_conf(logging_logger, "logging_logger") - self._set_from_conf(save_to_prometheus, "save_to_prometheus", False, bool) - self._set_from_conf(save_to_logfire, "save_to_logfire", False, bool) self._set_from_conf(prometheus_url, "prometheus_url", "localhost:9091") self._set_from_conf(output_handlers, "output_handlers", []) self._set_from_conf(tracking_mode, "tracking_mode", "machine") @@ -356,105 +538,21 @@ def __init__( assert self._tracking_mode in ["machine", "process"] set_logger_level(self._log_level) set_logger_format(self._logger_preamble) - - self._start_time: Optional[float] = None - self._last_measured_time: float = time.perf_counter() - self._total_energy: Energy = Energy.from_energy(kWh=0) - self._total_emissions: float = 0.0 - self._last_energy_covered: Energy = Energy.from_energy(kWh=0) - self._total_water: Water = Water.from_litres(litres=0) - # CPU and RAM utilization tracking - self._cpu_utilization_history: List[float] = [] - self._gpu_utilization_history: List[float] = [] - self._ram_utilization_history: List[float] = [] - self._ram_used_history: List[float] = [] - self._total_cpu_energy: Energy = Energy.from_energy(kWh=0) - self._total_gpu_energy: Energy = Energy.from_energy(kWh=0) - self._total_ram_energy: Energy = Energy.from_energy(kWh=0) - self._cpu_power: Power = Power.from_watts(watts=0) - self._gpu_power: Power = Power.from_watts(watts=0) - self._ram_power: Power = Power.from_watts(watts=0) - # Running average tracking for power - self._cpu_power_sum: float = 0.0 - self._gpu_power_sum: float = 0.0 - self._ram_power_sum: float = 0.0 - self._power_measurement_count: int = 0 - self._measure_occurrence: int = 0 - self._cloud = None - self._previous_emissions = None - self._conf["os"] = platform.platform() - self._conf["python_version"] = platform.python_version() - self._conf["cpu_count"] = count_cpus() - self._conf["cpu_physical_count"] = count_physical_cpus() - self._geo = None - self._task_start_measurement_values = {} - self._task_stop_measurement_values = {} - self._tasks: Dict[str, Task] = {} - self._active_task: Optional[str] = None - self._active_task_emissions_at_start: Optional[EmissionsData] = None - # Tracking mode detection - self._hardware = [] - resource_tracker = ResourceTracker(self) - resource_tracker.set_CPU_GPU_ram_tracking() - - self._conf["hardware"] = list(map(lambda x: x.description(), self._hardware)) - - logger.info(">>> Tracker's metadata:") - logger.info(f" Platform system: {self._conf.get('os')}") - logger.info(f" Python version: {self._conf.get('python_version')}") - logger.info(f" CodeCarbon version: {self._conf.get('codecarbon_version')}") - - hardware_info = self.get_detected_hardware() - logger.info(f" Available RAM : {hardware_info['ram_total_size']:.3f} GB") - logger.info( - f" CPU count: {hardware_info['cpu_count']} thread(s) in {hardware_info['cpu_physical_count']} physical CPU(s)" - ) - logger.info(f" CPU model: {hardware_info['cpu_model']}") - logger.info(f" GPU count: {hardware_info['gpu_count']}") - if self._gpu_ids: - logger.info( - f" GPU model: {hardware_info['gpu_model']} BUT only tracking these GPU ids : {hardware_info['gpu_ids']}" - ) - else: - logger.info(f" GPU model: {hardware_info['gpu_model']}") - - # Run `self._measure_power_and_energy` every `measure_power_secs` seconds in a - # background thread - self._scheduler = PeriodicScheduler( - function=self._measure_power_and_energy, - interval=self._measure_power_secs, - ) - self._scheduler_monitor_power = PeriodicScheduler( - function=self._monitor_power, - interval=1, - ) - - self._data_source = DataSource() - - cloud: CloudMetadata = self._get_cloud_metadata() - - # Always populate geo metadata as it's needed for fallback when cloud region is not found - self._geo = self._get_geo_metadata() - - if cloud.is_on_private_infra: - self._conf["longitude"] = self._geo.longitude - self._conf["latitude"] = self._geo.latitude - self._conf["region"] = cloud.region - self._conf["provider"] = cloud.provider - else: - self._conf["region"] = cloud.region - self._conf["provider"] = cloud.provider - - self._emissions: Emissions = Emissions( - self._data_source, self._electricitymaps_api_token - ) + self._initialize_runtime_state() + self._populate_system_metadata() + self._initialize_hardware_tracking() + self._log_tracker_metadata() + self._initialize_scheduler_state() + self._initialize_emissions_context() self._init_output_methods(api_key=self._api_key) def _init_output_methods(self, *, api_key: str = None): """ - Prepare the different output methods + Prepare the different output methods based on ``self._output_methods``. """ - if self._save_to_file: + methods = set(self._output_methods) if self._output_methods else set() + + if OutputMethod.CSV in methods: self._output_handlers.append( FileOutput( self._output_file, @@ -463,13 +561,13 @@ def _init_output_methods(self, *, api_key: str = None): ) ) - if self._save_to_logger: + if OutputMethod.LOGGER in methods: self._output_handlers.append(self._logging_logger) if self._emissions_endpoint: self._output_handlers.append(HTTPOutput(self._emissions_endpoint)) - if self._save_to_api: + if OutputMethod.API in methods: cc_api__out = CodeCarbonAPIOutput( endpoint_url=self._api_endpoint, experiment_id=self._experiment_id, @@ -481,7 +579,7 @@ def _init_output_methods(self, *, api_key: str = None): else: self.run_id = uuid.uuid4() - if self._save_to_prometheus: + if OutputMethod.PROMETHEUS in methods: self._output_handlers.append( PrometheusOutput( self._prometheus_url, @@ -493,9 +591,12 @@ def _init_output_methods(self, *, api_key: str = None): ) ) - if self._save_to_logfire: + if OutputMethod.LOGFIRE in methods: self._output_handlers.append(LogfireOutput()) + if OutputMethod.BOAMPS in methods: + self._output_handlers.append(BoAmpsOutput(output_dir=self._output_dir)) + def get_detected_hardware(self) -> Dict[str, Any]: """ Get the detected hardware. @@ -1280,6 +1381,7 @@ def track_emissions( api_key: Optional[str] = _sentinel, output_dir: Optional[str] = _sentinel, output_file: Optional[str] = _sentinel, + output_methods: Optional[List[OutputMethod]] = _sentinel, save_to_file: Optional[bool] = _sentinel, save_to_api: Optional[bool] = _sentinel, save_to_logger: Optional[bool] = _sentinel, @@ -1328,18 +1430,14 @@ def track_emissions( :param output_dir: Directory path to which the experiment details are logged, defaults to current directory. :param output_file: Name of output CSV file, defaults to `emissions.csv` - :param save_to_file: Indicates if the emission artifacts should be logged to a file, - defaults to True. - :param save_to_api: Indicates if the emission artifacts should be send to the - CodeCarbon API, defaults to False. - :param save_to_logger: Indicates if the emission artifacts should be written - to a dedicated logger, defaults to False. + :param output_methods: List of OutputMethod enum values. See BaseEmissionsTracker. + :param save_to_file: [DEPRECATED] Use output_methods instead. + :param save_to_api: [DEPRECATED] Use output_methods instead. + :param save_to_logger: [DEPRECATED] Use output_methods instead. :param logging_logger: LoggerOutput object encapsulating a logging.logger or a Google Cloud logger. - :param save_to_prometheus: Indicates if the emission artifacts should be - pushed to prometheus, defaults to False. - :param save_to_logfire: Indicates if the emission artifacts should be - pushed to logfire, defaults to False. + :param save_to_prometheus: [DEPRECATED] Use output_methods instead. + :param save_to_logfire: [DEPRECATED] Use output_methods instead. :param prometheus_url: url of the prometheus server, defaults to `localhost:9091`. :param output_handlers: List of output handlers to use. :param gpu_ids: User-specified known gpu ids to track. @@ -1425,6 +1523,7 @@ def wrapped_fn(*args, **kwargs): measure_power_secs=measure_power_secs, output_dir=output_dir, output_file=output_file, + output_methods=output_methods, save_to_file=save_to_file, save_to_logger=save_to_logger, logging_logger=logging_logger, @@ -1460,6 +1559,7 @@ def wrapped_fn(*args, **kwargs): api_key=api_key, output_dir=output_dir, output_file=output_file, + output_methods=output_methods, save_to_file=save_to_file, save_to_api=save_to_api, save_to_logger=save_to_logger, diff --git a/codecarbon/output.py b/codecarbon/output.py index b1b1d6d8d..e0f8c3e0b 100644 --- a/codecarbon/output.py +++ b/codecarbon/output.py @@ -2,7 +2,7 @@ Provides functionality for persistence of data """ -from codecarbon.output_methods.base_output import BaseOutput # noqa: F401 +from codecarbon.output_methods.base_output import BaseOutput, OutputMethod # noqa: F401 # Output to BoAmps format from codecarbon.output_methods.boamps import BoAmpsOutput # noqa: F401 diff --git a/codecarbon/output_methods/base_output.py b/codecarbon/output_methods/base_output.py index ff6ea1778..45fb1a953 100644 --- a/codecarbon/output_methods/base_output.py +++ b/codecarbon/output_methods/base_output.py @@ -1,8 +1,32 @@ +from enum import Enum from typing import List from codecarbon.output_methods.emissions_data import EmissionsData, TaskEmissionsData +class OutputMethod(str, Enum): + """ + Enum listing the available output methods. + + Usage:: + + tracker = EmissionsTracker( + output_methods=[OutputMethod.CSV, OutputMethod.API] + ) + + Available values: ``CSV``, ``API``, ``LOGGER``, ``PROMETHEUS``, + ``LOGFIRE``, ``BOAMPS``, ``HTTP``. + """ + + CSV = "csv" + API = "api" + LOGGER = "logger" + PROMETHEUS = "prometheus" + LOGFIRE = "logfire" + BOAMPS = "boamps" + HTTP = "http" + + class BaseOutput: """ An abstract class defining possible contracts for an output strategy, a strategy implementation can save emissions diff --git a/docs/reference/output.md b/docs/reference/output.md index 1fb8b97a8..435da9985 100644 --- a/docs/reference/output.md +++ b/docs/reference/output.md @@ -102,6 +102,31 @@ tracker.stop() The first time it will ask you to log in to Logfire. Once you log in and set the default Logfire project, the metrics will appear following the format `codecarbon_*`. +## BoAmps + +[BoAmps](https://github.com/Boavizta/BoAmps) is a standardized JSON format for reporting AI and ML energy consumption. + +### How to use it + +Run your EmissionsTracker as usual, with `save_to_boamps=True`: + +```python-skip +from codecarbon import OfflineEmissionsTracker + +tracker = OfflineEmissionsTracker( + project_name="my_project", + country_iso_code="USA", + save_to_boamps=True, +) +tracker.start() +# Your code here +tracker.stop() +``` + +CodeCarbon writes a final report named `boamps_report_.json` in `output_dir`. + +If you need to enrich the report with task metadata, datasets, or publisher information, use `BoAmpsOutput` directly through `output_handlers` or start from [examples/boamps_output.py](../../examples/boamps_output.py). + ## HTTP Output The HTTP Output allows calling a webhook with emission data when the tracker is stopped. Use the `emissions_endpoint` parameter to specify your endpoint. diff --git a/examples/boamps_output.py b/examples/boamps_output.py new file mode 100644 index 000000000..4571a4a98 --- /dev/null +++ b/examples/boamps_output.py @@ -0,0 +1,28 @@ +import multiprocessing + +from codecarbon import EmissionsTracker, OutputMethod + + +def cpu_load_task(number): + a = 0 + for i in range(5): + for i in range(int(1e6)): + a = a + i**number + + +tracker = EmissionsTracker( + measure_power_secs=10, + force_mode_cpu_load=False, + log_level="debug", + output_methods=[OutputMethod.BOAMPS], +) +try: + tracker.start() + with multiprocessing.Pool() as pool: + # call the function for each item in parallel + pool.map(cpu_load_task, [i for i in range(100)]) +finally: + emissions = tracker.stop() + +print(f"Emissions: {emissions} kg") +print(f"BoAmps report written to ./boamps_report_{tracker.run_id}.json") diff --git a/tests/test_emissions_tracker.py b/tests/test_emissions_tracker.py index ab4a0a275..c43aa8524 100644 --- a/tests/test_emissions_tracker.py +++ b/tests/test_emissions_tracker.py @@ -18,6 +18,7 @@ track_emissions, ) from codecarbon.external.geography import CloudMetadata +from codecarbon.output import BoAmpsOutput, OutputMethod from tests.fake_modules import pynvml as fake_pynvml from tests.testdata import ( GEO_METADATA_CANADA, @@ -212,6 +213,28 @@ def raise_exception(*args, **kwargs): tracker._measure_power = raise_exception tracker.stop() + def test_output_methods_boamps_adds_boamps_output_handler( + self, + mock_cli_setup, + mock_log_values, + mocked_get_gpu_details, + mocked_env_cloud_details, + mocked_is_gpu_details_available, + mocked_is_nvidia_system, + ): + tracker = EmissionsTracker( + output_dir=self.temp_path, + output_handlers=[], + output_methods=[OutputMethod.BOAMPS], + ) + + self.assertTrue( + any( + isinstance(handler, BoAmpsOutput) + for handler in tracker._output_handlers + ) + ) + @responses.activate def test_decorator_ONLINE_NO_ARGS( self, @@ -268,6 +291,35 @@ def dummy_train_model(): # THEN self.verify_output_file(self.emissions_file_path, 2) + def test_decorator_online_passes_output_methods( + self, + mock_cli_setup, + mock_log_values, + mocked_get_gpu_details, + mocked_env_cloud_details, + mocked_is_gpu_details_available, + mocked_is_nvidia_system, + ): + mocked_tracker = mock.Mock() + + with mock.patch( + "codecarbon.emissions_tracker.EmissionsTracker", + return_value=mocked_tracker, + ) as mocked_tracker_cls: + + @track_emissions(output_methods=[OutputMethod.BOAMPS]) + def dummy_train_model(): + return 42 + + self.assertEqual(dummy_train_model(), 42) + + self.assertEqual( + mocked_tracker_cls.call_args.kwargs["output_methods"], + [OutputMethod.BOAMPS], + ) + mocked_tracker.start.assert_called_once() + mocked_tracker.stop.assert_called_once() + def test_decorator_OFFLINE_NO_COUNTRY( self, mock_cli_setup,