diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index e1f18de74..794677227 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -75,6 +75,29 @@ jobs: run: | python -m pytest tests --ignore=tests/integrations/persisters + test-tracking-server-s3: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ['3.9', '3.10', '3.11', '3.12'] + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install -e ".[tests,tracking-client,tracking-server-s3]" + + - name: Run S3 tracking server tests + run: | + python -m pytest tests/tracking/test_bip0042_s3_buffering.py -v + validate-examples: runs-on: ubuntu-latest steps: diff --git a/.gitignore b/.gitignore index a92fd35b4..1982de051 100644 --- a/.gitignore +++ b/.gitignore @@ -193,3 +193,11 @@ burr/tracking/server/build examples/*/statemachine examples/*/*/statemachine .vscode + +# Terraform (see also examples/deployment/aws/terraform/.gitignore) +**/.terraform.lock.hcl +examples/deployment/aws/terraform/.terraform/ +examples/deployment/aws/terraform/*.tfstate +examples/deployment/aws/terraform/*.tfstate.* +examples/deployment/aws/terraform/.terraform.tfstate.lock.info +examples/deployment/aws/terraform/*.tfplan diff --git a/burr/tracking/server/backend.py b/burr/tracking/server/backend.py index e33cab9b8..904fe0019 100644 --- a/burr/tracking/server/backend.py +++ b/burr/tracking/server/backend.py @@ -162,6 +162,31 @@ def snapshot_interval_milliseconds(self) -> Optional[int]: pass +class EventDrivenBackendMixin(abc.ABC): + """Mixin for backends that support event-driven updates. + + Enables backends to receive real-time notifications instead of polling + for new files. + """ + + @abc.abstractmethod + async def start_event_consumer(self): + """Start the event consumer for event-driven tracking. + + This method should run indefinitely, processing event notifications + from the configured message queue. + """ + pass + + @abc.abstractmethod + def is_event_driven(self) -> bool: + """Check if this backend is configured for event-driven updates. + + :return: True if event-driven mode is enabled and configured, False otherwise + """ + pass + + class BackendBase(abc.ABC): async def lifespan(self, app: FastAPI): """Quick tool to allow plugin to the app's lifecycle. diff --git a/burr/tracking/server/run.py b/burr/tracking/server/run.py index 0e5ce62b0..f0318ef20 100644 --- a/burr/tracking/server/run.py +++ b/burr/tracking/server/run.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import asyncio import importlib import logging import os @@ -29,6 +30,7 @@ from burr.tracking.server.backend import ( AnnotationsBackendMixin, BackendBase, + EventDrivenBackendMixin, IndexingBackendMixin, SnapshottingBackendMixin, ) @@ -135,9 +137,20 @@ async def lifespan(app: FastAPI): await backend.lifespan(app).__anext__() await sync_index() # this will trigger the repeat every N seconds await save_snapshot() # this will trigger the repeat every N seconds + # Start event consumer for event-driven tracking when configured + event_consumer_task = None + if isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven(): + event_consumer_task = asyncio.create_task(backend.start_event_consumer()) global initialized initialized = True yield + # Graceful shutdown: cancel event consumer task + if event_consumer_task is not None: + event_consumer_task.cancel() + try: + await event_consumer_task + except asyncio.CancelledError: + pass await backend.lifespan(app).__anext__() @@ -172,17 +185,16 @@ def get_app_spec(): logger = logging.getLogger(__name__) if app_spec.indexing: - update_interval = backend.update_interval_milliseconds() / 1000 if app_spec.indexing else None - sync_index = repeat_every( - seconds=backend.update_interval_milliseconds() / 1000, - wait_first=True, - logger=logger, - )(sync_index) + # Only use polling when not in event-driven mode + _event_driven = isinstance(backend, EventDrivenBackendMixin) and backend.is_event_driven() + if not _event_driven: + sync_index = repeat_every( + seconds=backend.update_interval_milliseconds() / 1000, + wait_first=True, + logger=logger, + )(sync_index) if app_spec.snapshotting: - snapshot_interval = ( - backend.snapshot_interval_milliseconds() / 1000 if app_spec.snapshotting else None - ) save_snapshot = repeat_every( seconds=backend.snapshot_interval_milliseconds() / 1000, wait_first=True, diff --git a/burr/tracking/server/s3/README.md b/burr/tracking/server/s3/README.md index 0dbd7608f..62a6035b2 100644 --- a/burr/tracking/server/s3/README.md +++ b/burr/tracking/server/s3/README.md @@ -59,8 +59,9 @@ This will immediately start indexing your s3 bucket (or pick up from the last sn To track your data, you use the S3TrackingClient. You pass the tracker to the `ApplicationBuilder`: - ```python +from burr.tracking.s3client import S3TrackingClient + app = ( ApplicationBuilder() .with_graph(graph) diff --git a/burr/tracking/server/s3/backend.py b/burr/tracking/server/s3/backend.py index 7f48f88ad..706411fc4 100644 --- a/burr/tracking/server/s3/backend.py +++ b/burr/tracking/server/s3/backend.py @@ -15,14 +15,17 @@ # specific language governing permissions and limitations # under the License. +import asyncio import dataclasses import datetime +import enum import functools import itertools import json import logging import operator import os.path +import tempfile import uuid from collections import Counter from typing import List, Literal, Optional, Sequence, Tuple, Type, TypeVar, Union @@ -31,6 +34,7 @@ import pydantic from aiobotocore import session from fastapi import FastAPI +from pydantic import field_validator from pydantic_settings import BaseSettings from tortoise import functions, transactions from tortoise.contrib.fastapi import RegisterTortoise @@ -42,6 +46,7 @@ from burr.tracking.server.backend import ( BackendBase, BurrSettings, + EventDrivenBackendMixin, IndexingBackendMixin, SnapshottingBackendMixin, ) @@ -67,10 +72,33 @@ async def _query_s3_file( bucket: str, key: str, client: session.AioBaseClient, -) -> Union[ContentsModel, List[ContentsModel]]: + buffer_size_mb: int = 10, +) -> bytes: + """Query S3 file with buffering to handle large files. + + BIP-0042: Uses SpooledTemporaryFile to buffer content, spilling to disk + if the file exceeds buffer_size_mb. This ensures the returned bytes object + is seekable for pickle/json deserialization, fixing the UnsupportedOperation + error on large state files. + + :param bucket: S3 bucket name + :param key: S3 object key + :param client: aiobotocore S3 client + :param buffer_size_mb: Max MB to hold in RAM before spilling to disk (default 10MB) + :return: File contents as bytes + """ response = await client.get_object(Bucket=bucket, Key=key) - body = await response["Body"].read() - return body + buffer_size = buffer_size_mb * 1024 * 1024 + + with tempfile.SpooledTemporaryFile(max_size=buffer_size, mode="w+b") as tmp: + async with response["Body"] as stream: + while True: + chunk = await stream.read(8192) + if not chunk: + break + tmp.write(chunk) + tmp.seek(0) + return tmp.read() @dataclasses.dataclass @@ -133,6 +161,13 @@ def from_path(cls, path: str, created_date: datetime.datetime) -> "DataFile": ) +class TrackingMode(str, enum.Enum): + """Tracking mode for S3 backend: polling or event-driven.""" + + POLLING = "POLLING" + EVENT_DRIVEN = "EVENT_DRIVEN" + + class S3Settings(BurrSettings): s3_bucket: str update_interval_milliseconds: int = 120_000 @@ -140,6 +175,20 @@ class S3Settings(BurrSettings): snapshot_interval_milliseconds: int = 3_600_000 load_snapshot_on_start: bool = True prior_snapshots_to_keep: int = 5 + # BIP-0042: Event-driven tracking settings + tracking_mode: TrackingMode = TrackingMode.POLLING + sqs_queue_url: Optional[str] = None + sqs_region: Optional[str] = None + sqs_wait_time_seconds: int = 20 # SQS long polling timeout + s3_buffer_size_mb: int = 10 # RAM buffer before spilling to disk + + @field_validator("tracking_mode", mode="before") + @classmethod + def coerce_tracking_mode(cls, v: object) -> object: + """Coerce legacy 'SQS' string to EVENT_DRIVEN for backward compatibility.""" + if v == "SQS": + return TrackingMode.EVENT_DRIVEN + return v def timestamp_to_reverse_alphabetical(timestamp: datetime) -> str: @@ -156,7 +205,40 @@ def timestamp_to_reverse_alphabetical(timestamp: datetime) -> str: return inverted_str + "-" + timestamp.isoformat() -class SQLiteS3Backend(BackendBase, IndexingBackendMixin, SnapshottingBackendMixin): +def _parse_sqs_message_events( + body: dict, +) -> Optional[List[Tuple[str, datetime.datetime]]]: + """Parse EventBridge-wrapped or native S3 notification bodies from SQS. + + Returns None if the format is not recognized. Multiple S3 records in one + message yield one tuple per record. + """ + if "detail" in body: + return [ + ( + body["detail"]["object"]["key"], + datetime.datetime.fromisoformat(body["time"].replace("Z", "+00:00")), + ) + ] + if "Records" in body: + out: List[Tuple[str, datetime.datetime]] = [] + for record in body["Records"]: + out.append( + ( + record["s3"]["object"]["key"], + datetime.datetime.fromisoformat(record["eventTime"].replace("Z", "+00:00")), + ) + ) + return out + return None + + +class SQLiteS3Backend( + BackendBase, + IndexingBackendMixin, + SnapshottingBackendMixin, + EventDrivenBackendMixin, +): def __init__( self, s3_bucket: str, @@ -165,6 +247,12 @@ def __init__( snapshot_interval_milliseconds: int, load_snapshot_on_start: bool, prior_snapshots_to_keep: int, + # BIP-0042: New parameters for event-driven tracking + tracking_mode: Union[TrackingMode, str] = TrackingMode.POLLING, + sqs_queue_url: Optional[str] = None, + sqs_region: Optional[str] = None, + sqs_wait_time_seconds: int = 20, + s3_buffer_size_mb: int = 10, ): self._backend_id = system.now().isoformat() + str(uuid.uuid4()) self._bucket = s3_bucket @@ -177,6 +265,17 @@ def __init__( self._load_snapshot_on_start = load_snapshot_on_start self._snapshot_key_history = [] self._prior_snapshots_to_keep = prior_snapshots_to_keep + # BIP-0042: Store event-driven tracking settings (normalize str to enum) + if isinstance(tracking_mode, TrackingMode): + self._tracking_mode = tracking_mode + elif tracking_mode == "SQS": + self._tracking_mode = TrackingMode.EVENT_DRIVEN + else: + self._tracking_mode = TrackingMode(tracking_mode) + self._sqs_queue_url = sqs_queue_url + self._sqs_region = sqs_region + self._sqs_wait_time_seconds = sqs_wait_time_seconds + self._s3_buffer_size_mb = s3_buffer_size_mb async def load_snapshot(self): if not self._load_snapshot_on_start: @@ -631,13 +730,22 @@ async def get_application_logs( "-created_at" ) async with self._session.create_client("s3") as client: - # Get all the files + # Get all the files (BIP-0042: use buffered reading for large files) files = await utils.gather_with_concurrency( 1, - _query_s3_file(self._bucket, application.graph_file_pointer, client), - # _query_s3_files(self.bucket, application.metadata_file_pointer, client), + _query_s3_file( + self._bucket, + application.graph_file_pointer, + client, + self._s3_buffer_size_mb, + ), *itertools.chain( - _query_s3_file(self._bucket, log_file.s3_path, client) + _query_s3_file( + self._bucket, + log_file.s3_path, + client, + self._s3_buffer_size_mb, + ) for log_file in application_logs ), ) @@ -656,6 +764,92 @@ async def get_application_logs( application=graph_data, ) + # BIP-0042: Event-driven tracking methods + async def _handle_s3_event(self, s3_key: str, event_time: datetime.datetime) -> None: + """Handle a single S3 event notification - index the file immediately. + + :param s3_key: The S3 object key from the event + :param event_time: When the event occurred + """ + try: + data_file = DataFile.from_path(s3_key, created_date=event_time) + # Path structure: data/{project}/yyyy/mm/dd/hh/minutes/pk/app_id/filename + project_name = s3_key.split("/")[1] + + project = await Project.filter(name=project_name).first() + if project is None: + logger.info(f"Creating project {project_name} from S3 event") + project = await Project.create( + name=project_name, + uri=None, + created_at=event_time, + indexed_at=event_time, + updated_at=event_time, + ) + + all_applications = await self._ensure_applications_exist([data_file], project) + await self._update_all_applications(all_applications, [data_file]) + await self.update_log_files([data_file], all_applications) + + logger.info(f"Indexed S3 event: {s3_key}") + except Exception as e: + logger.error(f"Failed to handle S3 event {s3_key}: {e}") + raise # Re-raise so message stays in queue for retry / DLQ + + async def start_event_consumer(self) -> None: + """Start the event consumer for event-driven tracking. + + Runs indefinitely, processing S3 event notifications from the configured + message queue. Handles both EventBridge and direct S3 notification formats. + """ + if self._tracking_mode != TrackingMode.EVENT_DRIVEN or not self._sqs_queue_url: + logger.info("Event consumer not configured, skipping") + return + + logger.info(f"Starting event consumer for queue: {self._sqs_queue_url}") + + async with self._session.create_client("sqs", region_name=self._sqs_region) as sqs_client: + try: + while True: + try: + response = await sqs_client.receive_message( + QueueUrl=self._sqs_queue_url, + MaxNumberOfMessages=10, + WaitTimeSeconds=self._sqs_wait_time_seconds, + VisibilityTimeout=300, + ) + + messages = response.get("Messages", []) + for message in messages: + try: + body = json.loads(message["Body"]) + events = _parse_sqs_message_events(body) + if events is None: + logger.warning("Unknown message format: %s", body) + continue + + for s3_key, event_time in events: + if s3_key and s3_key.endswith(".jsonl"): + await self._handle_s3_event(s3_key, event_time) + + await sqs_client.delete_message( + QueueUrl=self._sqs_queue_url, + ReceiptHandle=message["ReceiptHandle"], + ) + except Exception as e: + logger.error(f"Failed to process SQS message: {e}") + + except Exception as e: + logger.error(f"Event consumer error: {e}") + await asyncio.sleep(5) + except asyncio.CancelledError: + logger.info("Event consumer shutting down") + raise + + def is_event_driven(self) -> bool: + """Check if this backend is configured for event-driven updates.""" + return self._tracking_mode == TrackingMode.EVENT_DRIVEN and self._sqs_queue_url is not None + async def indexing_jobs( self, offset: int = 0, limit: int = 100, filter_empty: bool = True ) -> Sequence[schema.IndexingJob]: @@ -691,7 +885,6 @@ async def indexing_jobs( if __name__ == "__main__": os.environ["BURR_LOAD_SNAPSHOT_ON_START"] = "True" - import asyncio be = SQLiteS3Backend.from_settings(S3Settings()) # coro = be.snapshot() # save to s3 diff --git a/burr/version.py b/burr/version.py index 8555b4cce..bc7c5aa75 100644 --- a/burr/version.py +++ b/burr/version.py @@ -20,5 +20,9 @@ try: __version__ = importlib.metadata.version("apache-burr") except importlib.metadata.PackageNotFoundError: - # Fallback for older installations or development - __version__ = importlib.metadata.version("burr") + try: + # Fallback for older installations + __version__ = importlib.metadata.version("burr") + except importlib.metadata.PackageNotFoundError: + # Development / source tree: no package metadata + __version__ = "0.0.0.dev" diff --git a/examples/deployment/aws/terraform/.gitignore b/examples/deployment/aws/terraform/.gitignore new file mode 100644 index 000000000..00a20986e --- /dev/null +++ b/examples/deployment/aws/terraform/.gitignore @@ -0,0 +1,11 @@ +# Terraform +.terraform/ +.terraform.lock.hcl +*.tfstate +*.tfstate.* +.terraform.tfstate.lock.info +*.tfplan +crash.log +override.tf +override.tf.json +*.tfvars.backup diff --git a/examples/deployment/aws/terraform/dev.tfvars b/examples/deployment/aws/terraform/dev.tfvars new file mode 100644 index 000000000..86378ba96 --- /dev/null +++ b/examples/deployment/aws/terraform/dev.tfvars @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Development environment configuration +# Bucket name is auto-generated: burr-tracking-{env}-{region}-{account_id}-{random} +# account_id: leave empty to auto-fetch from AWS credentials, or set explicitly + +aws_region = "us-east-1" +environment = "dev" + +# account_id = "" # Optional. Empty = auto-fetch. Or set: account_id = "123456789012" + +sqs_queue_name = "burr-s3-events-dev" + +# S3 only (polling mode) - simpler for dev; set to true for event-driven +enable_sqs = false + +log_retention_days = 30 +snapshot_retention_days = 14 + +sqs_message_retention_seconds = 86400 +sqs_visibility_timeout_seconds = 120 +sqs_receive_wait_time_seconds = 20 +sqs_max_receive_count = 3 diff --git a/examples/deployment/aws/terraform/main.tf b/examples/deployment/aws/terraform/main.tf new file mode 100644 index 000000000..7c6b5bbc2 --- /dev/null +++ b/examples/deployment/aws/terraform/main.tf @@ -0,0 +1,183 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +terraform { + required_version = ">= 1.0" + + required_providers { + aws = { + source = "hashicorp/aws" + version = ">= 5.0" + } + random = { + source = "hashicorp/random" + version = ">= 3.0" + } + } +} + +provider "aws" { + region = var.aws_region +} + +data "aws_caller_identity" "current" {} +data "aws_region" "current" {} + +resource "random_id" "bucket_suffix" { + byte_length = 4 +} + +locals { + region_short = replace(data.aws_region.current.name, "-", "") + account_id = var.account_id != "" ? var.account_id : data.aws_caller_identity.current.account_id + auto_bucket = "burr-tracking-${var.environment}-${local.region_short}-${local.account_id}-${random_id.bucket_suffix.hex}" + bucket_name = var.s3_bucket_name != "" ? var.s3_bucket_name : local.auto_bucket +} + +module "s3" { + source = "./modules/s3" + + bucket_name = local.bucket_name + tags = local.common_tags + + lifecycle_rules = [ + { + id = "expire-old-logs" + prefix = "data/" + enabled = true + expiration_days = var.log_retention_days + noncurrent_days = 7 + }, + { + id = "expire-old-snapshots" + prefix = "snapshots/" + enabled = true + expiration_days = var.snapshot_retention_days + noncurrent_days = null + } + ] +} + +module "sqs" { + source = "./modules/sqs" + count = var.enable_sqs ? 1 : 0 + + queue_name = var.sqs_queue_name + message_retention_seconds = var.sqs_message_retention_seconds + visibility_timeout_seconds = var.sqs_visibility_timeout_seconds + receive_wait_time_seconds = var.sqs_receive_wait_time_seconds + max_receive_count = var.sqs_max_receive_count + tags = local.common_tags +} + +resource "aws_sqs_queue_policy" "s3_notifications" { + count = var.enable_sqs ? 1 : 0 + + queue_url = module.sqs[0].queue_id + + policy = jsonencode({ + Version = "2012-10-17" + Statement = [ + { + Sid = "AllowS3Notifications" + Effect = "Allow" + Principal = { + Service = "s3.amazonaws.com" + } + Action = "sqs:SendMessage" + Resource = module.sqs[0].queue_arn + Condition = { + ArnLike = { + "aws:SourceArn" = module.s3.bucket_arn + } + } + } + ] + }) +} + +resource "aws_s3_bucket_notification" "burr_logs" { + count = var.enable_sqs ? 1 : 0 + + bucket = module.s3.bucket_id + + queue { + queue_arn = module.sqs[0].queue_arn + events = ["s3:ObjectCreated:*"] + filter_prefix = "data/" + filter_suffix = ".jsonl" + } + + depends_on = [aws_sqs_queue_policy.s3_notifications] +} + +resource "aws_sns_topic" "dlq_alarm" { + count = var.enable_sqs ? 1 : 0 + + name = "${var.environment}-burr-dlq-alarm" + display_name = "Burr DLQ Alarm - ${var.environment}" + tags = local.common_tags +} + +resource "aws_sns_topic_subscription" "dlq_alarm_email" { + count = var.enable_sqs && length(var.dlq_alarm_notification_emails) > 0 ? length(var.dlq_alarm_notification_emails) : 0 + + topic_arn = aws_sns_topic.dlq_alarm[0].arn + protocol = "email" + endpoint = var.dlq_alarm_notification_emails[count.index] +} + +resource "aws_cloudwatch_metric_alarm" "dlq_messages" { + count = var.enable_sqs ? 1 : 0 + + alarm_name = "${var.environment}-burr-dlq-messages" + alarm_description = "Alarm when messages appear in Burr SQS dead letter queue" + comparison_operator = "GreaterThanThreshold" + evaluation_periods = 1 + metric_name = "ApproximateNumberOfMessagesVisible" + namespace = "AWS/SQS" + period = 60 + statistic = "Sum" + threshold = 0 + + alarm_actions = [aws_sns_topic.dlq_alarm[0].arn] + ok_actions = [aws_sns_topic.dlq_alarm[0].arn] + + dimensions = { + QueueName = module.sqs[0].dlq_name + } + + tags = local.common_tags +} + +module "iam" { + source = "./modules/iam" + + role_name = "${var.environment}-burr-server-role" + s3_bucket_arn = module.s3.bucket_arn + sqs_queue_arn = var.enable_sqs ? module.sqs[0].queue_arn : "" + enable_sqs = var.enable_sqs + tags = local.common_tags +} + +locals { + common_tags = { + Environment = var.environment + Project = "burr-tracking" + ManagedBy = "terraform" + } +} diff --git a/examples/deployment/aws/terraform/modules/iam/main.tf b/examples/deployment/aws/terraform/modules/iam/main.tf new file mode 100644 index 000000000..b63284f19 --- /dev/null +++ b/examples/deployment/aws/terraform/modules/iam/main.tf @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +data "aws_iam_policy_document" "assume_role" { + statement { + effect = "Allow" + actions = ["sts:AssumeRole"] + principals { + type = "Service" + identifiers = var.trusted_services + } + } +} + +resource "aws_iam_role" "burr_server" { + name = var.role_name + assume_role_policy = data.aws_iam_policy_document.assume_role.json + + tags = merge(var.tags, { + Name = var.role_name + }) +} + +data "aws_iam_policy_document" "s3_least_privilege" { + statement { + sid = "S3ListBucket" + effect = "Allow" + actions = [ + "s3:ListBucket", + "s3:GetBucketLocation" + ] + resources = [var.s3_bucket_arn] + } + + statement { + sid = "S3ObjectOperations" + effect = "Allow" + actions = [ + "s3:GetObject", + "s3:PutObject", + "s3:DeleteObject", + "s3:HeadObject" + ] + resources = ["${var.s3_bucket_arn}/*"] + } +} + +resource "aws_iam_role_policy" "s3" { + name = "${var.role_name}-s3" + role = aws_iam_role.burr_server.id + policy = data.aws_iam_policy_document.s3_least_privilege.json +} + +data "aws_iam_policy_document" "sqs_least_privilege" { + count = var.enable_sqs ? 1 : 0 + + statement { + sid = "SQSConsume" + effect = "Allow" + actions = [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ] + resources = [var.sqs_queue_arn] + } +} + +resource "aws_iam_role_policy" "sqs" { + count = var.enable_sqs ? 1 : 0 + name = "${var.role_name}-sqs" + role = aws_iam_role.burr_server.id + policy = data.aws_iam_policy_document.sqs_least_privilege[0].json +} + diff --git a/examples/deployment/aws/terraform/modules/iam/outputs.tf b/examples/deployment/aws/terraform/modules/iam/outputs.tf new file mode 100644 index 000000000..ccf3003e6 --- /dev/null +++ b/examples/deployment/aws/terraform/modules/iam/outputs.tf @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +output "role_arn" { + description = "ARN of the IAM role" + value = aws_iam_role.burr_server.arn +} + +output "role_name" { + description = "Name of the IAM role" + value = aws_iam_role.burr_server.name +} diff --git a/examples/deployment/aws/terraform/modules/iam/variables.tf b/examples/deployment/aws/terraform/modules/iam/variables.tf new file mode 100644 index 000000000..9a2e83cc9 --- /dev/null +++ b/examples/deployment/aws/terraform/modules/iam/variables.tf @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +variable "role_name" { + description = "Name of the IAM role for Burr server" + type = string +} + +variable "trusted_services" { + description = "List of AWS services that can assume this role" + type = list(string) + default = ["ecs-tasks.amazonaws.com", "ec2.amazonaws.com", "lambda.amazonaws.com"] +} + +variable "s3_bucket_arn" { + description = "ARN of the S3 bucket for least privilege access" + type = string +} + +variable "enable_sqs" { + description = "Enable SQS IAM permissions" + type = bool + default = true +} + +variable "sqs_queue_arn" { + description = "ARN of the SQS queue for least privilege access" + type = string + default = "" +} + +variable "tags" { + description = "Tags to apply to resources" + type = map(string) + default = {} +} diff --git a/examples/deployment/aws/terraform/modules/s3/main.tf b/examples/deployment/aws/terraform/modules/s3/main.tf new file mode 100644 index 000000000..67163ee09 --- /dev/null +++ b/examples/deployment/aws/terraform/modules/s3/main.tf @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +resource "aws_s3_bucket" "this" { + bucket = var.bucket_name + + tags = merge(var.tags, { + Name = var.bucket_name + }) +} + +resource "aws_s3_bucket_versioning" "this" { + bucket = aws_s3_bucket.this.id + + versioning_configuration { + status = "Enabled" + } +} + +resource "aws_s3_bucket_server_side_encryption_configuration" "this" { + bucket = aws_s3_bucket.this.id + + rule { + apply_server_side_encryption_by_default { + sse_algorithm = "AES256" + } + } +} + +resource "aws_s3_bucket_lifecycle_configuration" "this" { + bucket = aws_s3_bucket.this.id + + dynamic "rule" { + for_each = var.lifecycle_rules + content { + id = rule.value.id + status = rule.value.enabled ? "Enabled" : "Disabled" + + filter { + prefix = rule.value.prefix + } + + expiration { + days = rule.value.expiration_days + } + + dynamic "noncurrent_version_expiration" { + for_each = try(rule.value.noncurrent_days, null) != null ? [1] : [] + content { + noncurrent_days = rule.value.noncurrent_days + } + } + } + } +} + +resource "aws_s3_bucket_public_access_block" "this" { + bucket = aws_s3_bucket.this.id + + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true +} diff --git a/examples/deployment/aws/terraform/modules/s3/outputs.tf b/examples/deployment/aws/terraform/modules/s3/outputs.tf new file mode 100644 index 000000000..5ffc964b6 --- /dev/null +++ b/examples/deployment/aws/terraform/modules/s3/outputs.tf @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +output "bucket_id" { + description = "ID of the S3 bucket" + value = aws_s3_bucket.this.id +} + +output "bucket_arn" { + description = "ARN of the S3 bucket" + value = aws_s3_bucket.this.arn +} diff --git a/examples/deployment/aws/terraform/modules/s3/variables.tf b/examples/deployment/aws/terraform/modules/s3/variables.tf new file mode 100644 index 000000000..580cc967d --- /dev/null +++ b/examples/deployment/aws/terraform/modules/s3/variables.tf @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +variable "bucket_name" { + description = "Name of the S3 bucket" + type = string +} + +variable "lifecycle_rules" { + description = "List of lifecycle rules for the bucket" + type = list(object({ + id = string + prefix = string + enabled = bool + expiration_days = number + noncurrent_days = optional(number) + })) +} + +variable "tags" { + description = "Tags to apply to resources" + type = map(string) + default = {} +} diff --git a/examples/deployment/aws/terraform/modules/sqs/main.tf b/examples/deployment/aws/terraform/modules/sqs/main.tf new file mode 100644 index 000000000..4eb3bea9d --- /dev/null +++ b/examples/deployment/aws/terraform/modules/sqs/main.tf @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +resource "aws_sqs_queue" "main" { + name = var.queue_name + message_retention_seconds = var.message_retention_seconds + visibility_timeout_seconds = var.visibility_timeout_seconds + receive_wait_time_seconds = var.receive_wait_time_seconds + + tags = merge(var.tags, { + Name = var.queue_name + }) +} + +resource "aws_sqs_queue" "dlq" { + name = "${var.queue_name}-dlq" + message_retention_seconds = var.dlq_message_retention_seconds + + tags = merge(var.tags, { + Name = "${var.queue_name}-dlq" + }) +} + +resource "aws_sqs_queue_redrive_policy" "main" { + queue_url = aws_sqs_queue.main.id + redrive_policy = jsonencode({ + deadLetterTargetArn = aws_sqs_queue.dlq.arn + maxReceiveCount = var.max_receive_count + }) +} diff --git a/examples/deployment/aws/terraform/modules/sqs/outputs.tf b/examples/deployment/aws/terraform/modules/sqs/outputs.tf new file mode 100644 index 000000000..5b7ccd098 --- /dev/null +++ b/examples/deployment/aws/terraform/modules/sqs/outputs.tf @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +output "queue_id" { + description = "URL of the SQS queue" + value = aws_sqs_queue.main.id +} + +output "queue_url" { + description = "URL of the SQS queue" + value = aws_sqs_queue.main.url +} + +output "queue_arn" { + description = "ARN of the SQS queue" + value = aws_sqs_queue.main.arn +} + +output "dlq_url" { + description = "URL of the dead letter queue" + value = aws_sqs_queue.dlq.url +} + +output "dlq_arn" { + description = "ARN of the dead letter queue" + value = aws_sqs_queue.dlq.arn +} + +output "dlq_name" { + description = "Name of the dead letter queue (for CloudWatch dimensions)" + value = aws_sqs_queue.dlq.name +} diff --git a/examples/deployment/aws/terraform/modules/sqs/variables.tf b/examples/deployment/aws/terraform/modules/sqs/variables.tf new file mode 100644 index 000000000..47e67f3ba --- /dev/null +++ b/examples/deployment/aws/terraform/modules/sqs/variables.tf @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +variable "queue_name" { + description = "Name of the SQS queue" + type = string +} + +variable "message_retention_seconds" { + description = "Message retention period in seconds" + type = number + default = 1209600 +} + +variable "visibility_timeout_seconds" { + description = "Visibility timeout for messages in seconds" + type = number + default = 300 +} + +variable "receive_wait_time_seconds" { + description = "Long polling wait time in seconds" + type = number + default = 20 +} + +variable "dlq_message_retention_seconds" { + description = "DLQ message retention period in seconds" + type = number + default = 1209600 +} + +variable "max_receive_count" { + description = "Max receive count before message moves to DLQ" + type = number + default = 3 +} + +variable "tags" { + description = "Tags to apply to resources" + type = map(string) + default = {} +} diff --git a/examples/deployment/aws/terraform/outputs.tf b/examples/deployment/aws/terraform/outputs.tf new file mode 100644 index 000000000..627a98bc0 --- /dev/null +++ b/examples/deployment/aws/terraform/outputs.tf @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +output "s3_bucket_name" { + description = "Name of the S3 bucket for Burr logs" + value = module.s3.bucket_id +} + +output "s3_bucket_arn" { + description = "ARN of the S3 bucket" + value = module.s3.bucket_arn +} + +output "sqs_queue_url" { + description = "URL of the SQS queue for S3 events" + value = var.enable_sqs ? module.sqs[0].queue_url : null +} + +output "sqs_queue_arn" { + description = "ARN of the SQS queue" + value = var.enable_sqs ? module.sqs[0].queue_arn : null +} + +output "sqs_dlq_url" { + description = "URL of the dead letter queue" + value = var.enable_sqs ? module.sqs[0].dlq_url : null +} + +output "dlq_alarm_arn" { + description = "ARN of the CloudWatch alarm for DLQ messages" + value = var.enable_sqs ? aws_cloudwatch_metric_alarm.dlq_messages[0].arn : null +} + +output "dlq_alarm_sns_topic_arn" { + description = "ARN of the SNS topic for DLQ alarm notifications" + value = var.enable_sqs ? aws_sns_topic.dlq_alarm[0].arn : null +} + +output "iam_role_arn" { + description = "ARN of the IAM role for Burr server" + value = module.iam.role_arn +} + +output "iam_role_name" { + description = "Name of the IAM role for Burr server" + value = module.iam.role_name +} + +output "burr_environment_variables" { + description = "Environment variables to configure Burr server" + value = var.enable_sqs ? { + BURR_S3_BUCKET = module.s3.bucket_id + BURR_TRACKING_MODE = "EVENT_DRIVEN" + BURR_SQS_QUEUE_URL = module.sqs[0].queue_url + BURR_SQS_REGION = data.aws_region.current.name + BURR_SQS_WAIT_TIME_SECONDS = "20" + BURR_S3_BUFFER_SIZE_MB = "10" + } : { + BURR_S3_BUCKET = module.s3.bucket_id + BURR_TRACKING_MODE = "POLLING" + BURR_SQS_QUEUE_URL = "" + BURR_SQS_REGION = data.aws_region.current.name + BURR_SQS_WAIT_TIME_SECONDS = "20" + BURR_S3_BUFFER_SIZE_MB = "10" + } +} diff --git a/examples/deployment/aws/terraform/prod.tfvars b/examples/deployment/aws/terraform/prod.tfvars new file mode 100644 index 000000000..43b9e8a95 --- /dev/null +++ b/examples/deployment/aws/terraform/prod.tfvars @@ -0,0 +1,40 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Production environment configuration +# Bucket name is auto-generated: burr-tracking-{env}-{region}-{account_id}-{random} +# account_id: leave empty to auto-fetch from AWS credentials, or set explicitly + +aws_region = "us-east-1" +environment = "prod" + +# account_id = "" # Optional. Empty = auto-fetch. Or set: account_id = "123456789012" + +sqs_queue_name = "burr-s3-events-prod" + +enable_sqs = true + +log_retention_days = 90 +snapshot_retention_days = 30 + +sqs_message_retention_seconds = 1209600 +sqs_visibility_timeout_seconds = 300 +sqs_receive_wait_time_seconds = 20 +sqs_max_receive_count = 3 + +# Optional: receive email when messages land in DLQ +# dlq_alarm_notification_emails = ["ops@example.com"] diff --git a/examples/deployment/aws/terraform/tutorial.md b/examples/deployment/aws/terraform/tutorial.md new file mode 100644 index 000000000..93883f90d --- /dev/null +++ b/examples/deployment/aws/terraform/tutorial.md @@ -0,0 +1,212 @@ +# Apache Burr AWS Tracking Infrastructure Tutorial + +This tutorial explains how to deploy Apache Burr tracking infrastructure on AWS using Terraform. All Terraform code lives in `examples/deployment/aws/terraform/`. It covers deployment with S3 only (polling mode), with S3 and SQS (event-driven mode), and local development without AWS. + +## Quick Start + +```bash +cd examples/deployment/aws/terraform +terraform init +terraform apply -var-file=dev.tfvars # S3 only, polling mode +# or +terraform apply -var-file=prod.tfvars # S3 + SQS, event-driven + DLQ alarm +``` + +Bucket names are auto-generated. After apply, run `terraform output burr_environment_variables` and set those on your Burr server. + +## Overview + +The Terraform configuration provisions: + +- **S3 bucket**: Stores Burr application logs and database snapshots. Name is auto-generated (`burr-tracking-{env}-{region}-{account_id}-{random}`) when not specified. +- **SQS queue** (optional): Receives S3 event notifications for real-time tracking; controlled by `enable_sqs` +- **CloudWatch alarm + SNS**: Alerts when messages land in the dead letter queue; optional email subscriptions +- **IAM role**: Least-privilege permissions for the Burr server + +## Directory Structure + +All code is in `examples/deployment/aws/terraform/`: + +``` +examples/deployment/aws/terraform/ +├── main.tf # Root module: S3, SQS, CloudWatch alarm, SNS, IAM +├── variables.tf # Input variables +├── outputs.tf # Output values +├── dev.tfvars # Development: S3 only (enable_sqs = false) +├── prod.tfvars # Production: S3 + SQS + DLQ alarm (enable_sqs = true) +├── tutorial.md # This file +└── modules/ + ├── s3/ # S3 bucket with versioning, encryption, lifecycle + ├── sqs/ # SQS queue with DLQ and redrive policy + └── iam/ # IAM role with least-privilege policies +``` + +## Prerequisites + +- Terraform >= 1.0 +- AWS CLI configured with credentials + +No manual bucket naming required; names are auto-generated. `account_id` is fetched from AWS credentials when not set. For a custom bucket name, set `s3_bucket_name` in your tfvars. + +## Using tfvars Files + +| File | Mode | enable_sqs | Resources created | +|-------------|-------------------|------------|--------------------------------------------------------| +| dev.tfvars | S3 only (polling) | false | S3 bucket, IAM role | +| prod.tfvars | S3 + SQS (event) | true | S3 bucket, SQS queue, DLQ, CloudWatch alarm, SNS, IAM | + +### Development (dev.tfvars) - S3 Only + +Uses S3 polling mode (no SQS). Bucket name is auto-generated (`burr-tracking-{env}-{region}-{account_id}-{random}`). Override with `s3_bucket_name = "my-bucket"` in tfvars if needed. + +Deploy: + +```bash +cd examples/deployment/aws/terraform +terraform init +terraform plan -var-file=dev.tfvars +terraform apply -var-file=dev.tfvars +``` + +### Production (prod.tfvars) - S3 + SQS + +Uses event-driven mode with SQS. Bucket name is auto-generated (`burr-tracking-{env}-{region}-{account_id}-{random}`). A CloudWatch alarm fires when messages land in the DLQ. + +Deploy: + +```bash +terraform plan -var-file=prod.tfvars +terraform apply -var-file=prod.tfvars +``` + +### Override Mode in Any tfvars + +To deploy with SQS using dev.tfvars, override: `terraform apply -var-file=dev.tfvars -var="enable_sqs=true"`. To deploy S3-only with prod.tfvars: `terraform apply -var-file=prod.tfvars -var="enable_sqs=false"`. + +## Deployment Modes + +### With S3 and SQS (Event-Driven Mode) + +Default configuration. Provides near-instant telemetry updates (~200ms latency). + +1. Set `enable_sqs = true` in your tfvars (e.g. prod.tfvars). +2. Deploy with `terraform apply -var-file=prod.tfvars`. +3. Configure the Burr server with the output environment variables: + +```bash +terraform output burr_environment_variables +``` + +4. Set these on your Burr server (ECS task, EC2, etc.): + +- BURR_S3_BUCKET +- BURR_TRACKING_MODE=EVENT_DRIVEN +- BURR_SQS_QUEUE_URL +- BURR_SQS_REGION +- BURR_SQS_WAIT_TIME_SECONDS +- BURR_S3_BUFFER_SIZE_MB + +### With S3 Only (Polling Mode) + +Use when you prefer simpler infrastructure or cannot use SQS. Burr polls S3 periodically (default 120 seconds). + +1. Set `enable_sqs = false` in your tfvars. +2. Deploy: + +```bash +terraform apply -var-file=dev.tfvars +``` + +3. Configure the Burr server: + +- BURR_S3_BUCKET +- BURR_TRACKING_MODE=POLLING +- BURR_SQS_QUEUE_URL="" (leave empty) +- BURR_SQS_REGION +- BURR_S3_BUFFER_SIZE_MB + +The Terraform will create only the S3 bucket and IAM role. No SQS queue or S3 event notifications. + +### Without S3 and SQS (Local Mode) + +For local development, no Terraform deployment is needed. Burr uses the local filesystem for tracking. + +1. Run the Burr server locally: + +```bash +burr --no-open +``` + +2. Use `LocalTrackingClient` in your application instead of `S3TrackingClient`. + +3. Data is stored in `~/.burr` by default. + +## Key Variables + +| Variable | Description | Default | +|----------|-------------|---------| +| aws_region | AWS region | us-east-1 | +| environment | Environment name (dev, prod) | dev | +| account_id | AWS account ID. Empty = auto-fetch from credentials | "" | +| s3_bucket_name | S3 bucket name. Empty = auto-generated (env, region, account_id, random) | "" | +| enable_sqs | Create SQS for event-driven tracking | true | +| sqs_queue_name | Name of the SQS queue | burr-s3-events | +| log_retention_days | Days to retain logs in S3 | 90 | +| snapshot_retention_days | Days to retain DB snapshots | 30 | +| dlq_alarm_notification_emails | Emails to notify when DLQ has messages (confirm via AWS email) | [] | + +## CloudWatch DLQ Alarm and SNS Notifications + +When SQS is enabled, a CloudWatch alarm fires when messages appear in the dead letter queue. An SNS topic is created for notifications. To receive email alerts, add your addresses to `dlq_alarm_notification_emails` in your tfvars: + +``` +dlq_alarm_notification_emails = ["ops@example.com", "oncall@example.com"] +``` + +Each email will receive a confirmation request from AWS; you must confirm the subscription before alerts are delivered. To use Slack or other endpoints, subscribe them to the SNS topic ARN (see `terraform output dlq_alarm_sns_topic_arn`) after apply. + +## Outputs + +After apply, useful outputs: + +```bash +terraform output s3_bucket_name +terraform output sqs_queue_url +terraform output sqs_dlq_url +terraform output dlq_alarm_arn +terraform output dlq_alarm_sns_topic_arn +terraform output burr_environment_variables +``` + +## IAM Least Privilege + +The IAM role grants only: + +- **S3**: ListBucket, GetBucketLocation, GetObject, PutObject, DeleteObject, HeadObject on the specific bucket +- **SQS** (when enabled): ReceiveMessage, DeleteMessage, GetQueueAttributes on the specific queue + +## Cleanup + +To destroy all resources: + +```bash +terraform destroy -var-file=dev.tfvars +``` + +For S3 buckets with versioning, you may need to empty the bucket first: + +```bash +aws s3api list-object-versions --bucket BUCKET_NAME --output json | jq -r '.Versions[],.DeleteMarkers[]|.Key+" "+.VersionId' | while read key vid; do aws s3api delete-object --bucket BUCKET_NAME --key "$key" --version-id "$vid"; done +``` + +## Troubleshooting + +**S3 bucket name already exists**: S3 bucket names are globally unique. With auto-generation, each apply gets a new random suffix. For a fixed name, set `s3_bucket_name` explicitly. + +**SQS policy errors**: Ensure the S3 bucket notification depends on the queue policy. The Terraform handles this with `depends_on`. + +**Burr server not receiving events**: Verify BURR_SQS_QUEUE_URL is set and the IAM role has sqs:ReceiveMessage. Check CloudWatch for the SQS consumer. + +**DLQ alarm firing**: Messages in the DLQ mean the Burr server failed to process S3 events (e.g. crashed, timeout). Check the DLQ in the AWS Console, inspect failed messages, and fix the root cause. Confirm SNS email subscriptions via the link AWS sends. + +**No email from DLQ alarm**: Check your spam folder for the SNS confirmation email. Subscriptions are pending until confirmed. diff --git a/examples/deployment/aws/terraform/variables.tf b/examples/deployment/aws/terraform/variables.tf new file mode 100644 index 000000000..0af4960ab --- /dev/null +++ b/examples/deployment/aws/terraform/variables.tf @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +variable "aws_region" { + description = "AWS region for resources" + type = string + default = "us-east-1" +} + +variable "environment" { + description = "Environment name (dev, staging, prod)" + type = string + default = "dev" +} + +variable "account_id" { + description = "AWS account ID for bucket name. Leave empty to auto-fetch from AWS credentials." + type = string + default = "" +} + +variable "s3_bucket_name" { + description = "Name of the S3 bucket for Burr logs. If empty, auto-generated from environment, region, and random suffix." + type = string + default = "" +} + +variable "enable_sqs" { + description = "Enable SQS for event-driven tracking. When false, Burr uses S3 polling mode." + type = bool + default = true +} + +variable "sqs_queue_name" { + description = "Name of the SQS queue for S3 events" + type = string + default = "burr-s3-events" +} + +variable "log_retention_days" { + description = "Days to retain log files in S3" + type = number + default = 90 +} + +variable "snapshot_retention_days" { + description = "Days to retain database snapshots in S3" + type = number + default = 30 +} + +variable "sqs_message_retention_seconds" { + description = "SQS message retention period in seconds" + type = number + default = 1209600 +} + +variable "sqs_visibility_timeout_seconds" { + description = "SQS visibility timeout in seconds" + type = number + default = 300 +} + +variable "sqs_receive_wait_time_seconds" { + description = "SQS long polling wait time in seconds" + type = number + default = 20 +} + +variable "sqs_max_receive_count" { + description = "Max receive count before message moves to DLQ" + type = number + default = 3 +} + +variable "dlq_alarm_notification_emails" { + description = "Email addresses to notify when messages land in the DLQ. Empty = no email subscriptions." + type = list(string) + default = [] +} diff --git a/tests/tracking/test_bip0042_s3_buffering.py b/tests/tracking/test_bip0042_s3_buffering.py new file mode 100644 index 000000000..4cbd151e2 --- /dev/null +++ b/tests/tracking/test_bip0042_s3_buffering.py @@ -0,0 +1,205 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""BIP-0042: Tests for S3 buffering, settings, and SQS message parsing.""" + +import inspect +from unittest.mock import AsyncMock, MagicMock + +import pytest + +pytest.importorskip("aiobotocore") + +from burr.tracking.server.backend import EventDrivenBackendMixin +from burr.tracking.server.s3.backend import ( + S3Settings, + SQLiteS3Backend, + TrackingMode, + _parse_sqs_message_events, + _query_s3_file, +) + + +def _minimal_backend(**kwargs): + defaults = dict( + s3_bucket="test-bucket", + update_interval_milliseconds=60_000, + aws_max_concurrency=10, + snapshot_interval_milliseconds=3_600_000, + load_snapshot_on_start=False, + prior_snapshots_to_keep=1, + tracking_mode=TrackingMode.POLLING, + sqs_queue_url=None, + sqs_region=None, + ) + defaults.update(kwargs) + return SQLiteS3Backend(**defaults) + + +class TestS3Settings: + """S3Settings BIP-0042 fields and coercion.""" + + def test_s3_settings_has_tracking_mode(self): + assert "tracking_mode" in S3Settings.model_fields + assert S3Settings.model_fields["tracking_mode"].default == TrackingMode.POLLING + + def test_s3_settings_has_sqs_queue_url(self): + assert "sqs_queue_url" in S3Settings.model_fields + assert S3Settings.model_fields["sqs_queue_url"].default is None + + def test_s3_settings_has_sqs_region(self): + assert "sqs_region" in S3Settings.model_fields + assert S3Settings.model_fields["sqs_region"].default is None + + def test_s3_settings_has_sqs_wait_time_seconds(self): + assert "sqs_wait_time_seconds" in S3Settings.model_fields + assert S3Settings.model_fields["sqs_wait_time_seconds"].default == 20 + + def test_s3_settings_has_s3_buffer_size_mb(self): + assert "s3_buffer_size_mb" in S3Settings.model_fields + assert S3Settings.model_fields["s3_buffer_size_mb"].default == 10 + + def test_s3_settings_coerces_sqs_string_to_event_driven(self): + settings = S3Settings(s3_bucket="test", tracking_mode="SQS") + assert settings.tracking_mode == TrackingMode.EVENT_DRIVEN + + +class TestSQLiteS3BackendInit: + """SQLiteS3Backend constructor and mixins.""" + + def test_backend_accepts_new_parameters(self): + sig = inspect.signature(SQLiteS3Backend.__init__) + params = list(sig.parameters.keys()) + assert "tracking_mode" in params + assert "sqs_queue_url" in params + assert "sqs_region" in params + assert "sqs_wait_time_seconds" in params + assert "s3_buffer_size_mb" in params + + def test_backend_has_event_driven_methods(self): + assert hasattr(SQLiteS3Backend, "_handle_s3_event") + assert hasattr(SQLiteS3Backend, "start_event_consumer") + assert hasattr(SQLiteS3Backend, "is_event_driven") + assert callable(getattr(SQLiteS3Backend, "_handle_s3_event")) + assert callable(getattr(SQLiteS3Backend, "start_event_consumer")) + assert callable(getattr(SQLiteS3Backend, "is_event_driven")) + + +class TestIsEventDriven: + def test_true_when_event_driven_and_queue_url_set(self): + b = _minimal_backend( + tracking_mode=TrackingMode.EVENT_DRIVEN, + sqs_queue_url="https://sqs.us-east-1.amazonaws.com/123/test", + ) + assert b.is_event_driven() is True + + def test_false_when_polling(self): + b = _minimal_backend(tracking_mode=TrackingMode.POLLING, sqs_queue_url=None) + assert b.is_event_driven() is False + + def test_false_when_event_driven_but_no_queue_url(self): + b = _minimal_backend(tracking_mode=TrackingMode.EVENT_DRIVEN, sqs_queue_url=None) + assert b.is_event_driven() is False + + +class TestParseSqsMessageEvents: + def test_eventbridge_wrapped_s3(self): + body = { + "detail": {"object": {"key": "data/proj/2024/01/01/00/00/pk/app/log.jsonl"}}, + "time": "2024-06-01T12:34:56Z", + } + events = _parse_sqs_message_events(body) + assert events is not None + assert len(events) == 1 + key, t = events[0] + assert key.endswith(".jsonl") + assert t.tzinfo is not None + + def test_native_s3_notification_multiple_records(self): + body = { + "Records": [ + { + "s3": {"object": {"key": "data/a.jsonl"}}, + "eventTime": "2024-01-01T00:00:00.000Z", + }, + { + "s3": {"object": {"key": "data/b.jsonl"}}, + "eventTime": "2024-01-02T00:00:00.000Z", + }, + ] + } + events = _parse_sqs_message_events(body) + assert events is not None + assert len(events) == 2 + assert events[0][0].endswith("a.jsonl") + assert events[1][0].endswith("b.jsonl") + + def test_unknown_format_returns_none(self): + assert _parse_sqs_message_events({"foo": "bar"}) is None + + +class TestEventDrivenBackendMixin: + def test_mixin_exists(self): + assert EventDrivenBackendMixin is not None + + def test_mixin_has_abstract_methods(self): + import abc + + assert issubclass(EventDrivenBackendMixin, abc.ABC) + assert hasattr(EventDrivenBackendMixin, "start_event_consumer") + assert hasattr(EventDrivenBackendMixin, "is_event_driven") + + def test_sqlite_s3_backend_inherits_mixin(self): + assert issubclass(SQLiteS3Backend, EventDrivenBackendMixin) + + +class TestQueryS3FileBuffering: + def test_query_s3_file_has_buffer_param(self): + sig = inspect.signature(_query_s3_file) + assert "buffer_size_mb" in sig.parameters + assert sig.parameters["buffer_size_mb"].default == 10 + + @pytest.mark.asyncio + async def test_query_s3_file_reads_via_buffer(self): + chunk = b"x" * 4096 + stream = AsyncMock() + stream.read = AsyncMock(side_effect=[chunk, chunk, b""]) + + body_cm = MagicMock() + body_cm.__aenter__ = AsyncMock(return_value=stream) + body_cm.__aexit__ = AsyncMock(return_value=None) + + client = AsyncMock() + client.get_object = AsyncMock(return_value={"Body": body_cm}) + + data = await _query_s3_file("bucket", "key", client, buffer_size_mb=1) + assert data == chunk + chunk + client.get_object.assert_called_once_with(Bucket="bucket", Key="key") + + +class TestHandleS3Event: + def test_handle_s3_event_method_exists(self): + assert hasattr(SQLiteS3Backend, "_handle_s3_event") + method = getattr(SQLiteS3Backend, "_handle_s3_event") + assert inspect.iscoroutinefunction(method) + + def test_handle_s3_event_signature(self): + sig = inspect.signature(SQLiteS3Backend._handle_s3_event) + params = list(sig.parameters.keys()) + assert "self" in params + assert "s3_key" in params + assert "event_time" in params