Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ repos:
- types-requests
- types-python-dateutil
- types-croniter
- types-aiobotocore[essential]
- boto3-stubs[essential]
exclude: ^(diracx-client/src/diracx/client/_generated|diracx-[a-z]+/tests/|diracx-testing/|build|extensions/gubbins/gubbins-client/src/gubbins/client/_generated)

- repo: https://github.com/hukkin/mdformat
Expand Down
6 changes: 1 addition & 5 deletions diracx-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ classifiers = [
"Topic :: System :: Distributed Computing",
]
dependencies = [
"aiobotocore>=2.15",
"botocore>=1.35",
"cachetools",
"email_validator",
"gitpython",
Expand All @@ -23,6 +21,7 @@ dependencies = [
"pydantic-settings",
"pyyaml",
"sh",
"signurlarity >=0.2.2",
"diraccommon >=9.0.18",
]
dynamic = ["version"]
Expand All @@ -33,9 +32,6 @@ testing = [
"moto[server]",
]
types = [
"botocore-stubs",
"types-aiobotocore[essential]",
"types-aiobotocore-s3",
"types-cachetools",
"types-PyYAML",
]
Expand Down
48 changes: 39 additions & 9 deletions diracx-core/src/diracx/core/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
import base64
from typing import TYPE_CHECKING, TypedDict, cast

from botocore.errorfactory import ClientError
from signurlarity.exceptions import NoSuchBucketError, PresignError

from .models.sandbox import ChecksumAlgorithm

if TYPE_CHECKING:
from typing import TypedDict

from types_aiobotocore_s3.client import S3Client
from signurlarity.aio.client import AsyncClient

class S3Object(TypedDict):
Key: str
Expand All @@ -31,29 +31,59 @@ class S3PresignedPostInfo(TypedDict):
fields: dict[str, str]


async def s3_bucket_exists(s3_client: S3Client, bucket_name: str) -> bool:
async def s3_bucket_exists(s3_client: AsyncClient, bucket_name: str) -> bool:
"""Check if a bucket exists in S3."""
return await _s3_exists(s3_client.head_bucket, Bucket=bucket_name)


async def s3_object_exists(s3_client: S3Client, bucket_name: str, key: str) -> bool:
async def s3_object_exists(s3_client: AsyncClient, bucket_name: str, key: str) -> bool:
"""Check if an object exists in an S3 bucket."""
return await _s3_exists(s3_client.head_object, Bucket=bucket_name, Key=key)


async def _s3_exists(method, **kwargs: str) -> bool:
try:
await method(**kwargs)
except ClientError as e:
if e.response["Error"]["Code"] != "404":
raise
except (NoSuchBucketError, PresignError):
# if e.response["Error"]["Code"] != "404":
# raise
return False
else:
return True


async def generate_presigned_upload(
s3_client: S3Client,
s3_client: AsyncClient,
bucket_name: str,
key: str,
checksum_algorithm: ChecksumAlgorithm,
checksum: str,
size: int,
validity_seconds: int,
) -> S3PresignedPostInfo:
"""Generate a presigned URL and fields for uploading a file to S3.

The signature is restricted to only accept data with the given checksum and size.
"""
fields = {
"x-amz-checksum-algorithm": checksum_algorithm,
f"x-amz-checksum-{checksum_algorithm}": b16_to_b64(checksum),
}
conditions = [["content-length-range", size, size]] + [
{k: v} for k, v in fields.items()
]
result = await s3_client.generate_presigned_post(
Bucket=bucket_name,
Key=key,
Fields=fields,
Conditions=conditions,
ExpiresIn=validity_seconds,
)
return cast(S3PresignedPostInfo, result)


async def generate_presigned_download(
s3_client: AsyncClient,
bucket_name: str,
key: str,
checksum_algorithm: ChecksumAlgorithm,
Expand Down Expand Up @@ -126,7 +156,7 @@ async def _s3_delete_chunk_with_retry(
Bucket=bucket,
Delete={"Objects": remaining, "Quiet": True},
)
except ClientError:
except NoSuchBucketError:
if attempt == max_attempts:
return {obj["Key"] for obj in remaining}
await asyncio.sleep(delay)
Expand Down
20 changes: 7 additions & 13 deletions diracx-core/src/diracx/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, Self, TypeVar, cast

from aiobotocore.session import get_session
from botocore.config import Config
from botocore.errorfactory import ClientError
from cryptography.fernet import Fernet
from joserfc.jwk import KeySet, KeySetSerialization
from pydantic import (
Expand All @@ -35,9 +32,11 @@
model_validator,
)
from pydantic_settings import BaseSettings, SettingsConfigDict
from signurlarity.aio.client import AsyncClient
from signurlarity.exceptions import NoSuchBucketError

if TYPE_CHECKING:
from types_aiobotocore_s3.client import S3Client
from signurlarity.aio.client import AsyncClient

T = TypeVar("T")

Expand Down Expand Up @@ -324,17 +323,12 @@ class SandboxStoreSettings(ServiceSettingsBase):
Controls parallelism of database DELETE operations.
"""

_client: S3Client = PrivateAttr()
_client: AsyncClient = PrivateAttr()

@contextlib.asynccontextmanager
async def lifetime_function(self) -> AsyncIterator[None]:
async with get_session().create_client(
"s3",
async with AsyncClient(
**self.s3_client_kwargs,
config=Config(
signature_version="v4",
max_pool_connections=self.s3_max_pool_connections,
),
) as self._client: # type: ignore
if not await s3_bucket_exists(self._client, self.bucket_name):
if not self.auto_create_bucket:
Expand All @@ -343,15 +337,15 @@ async def lifetime_function(self) -> AsyncIterator[None]:
)
try:
await self._client.create_bucket(Bucket=self.bucket_name)
except ClientError as e:
except NoSuchBucketError as e:
raise ValueError(
f"Failed to create bucket {self.bucket_name}"
) from e

yield

@property
def s3_client(self) -> S3Client:
def s3_client(self) -> AsyncClient:
if self._client is None:
raise RuntimeError("S3 client accessed before lifetime function")
return self._client
30 changes: 22 additions & 8 deletions diracx-core/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import httpx
import pytest
from aiobotocore.session import get_session
from signurlarity.aio.client import AsyncClient

from diracx.core.s3 import (
b16_to_b64,
Expand Down Expand Up @@ -52,7 +52,7 @@ async def moto_s3(aio_moto):
Note that this is not a complete S3 backend, in particular authentication
and validation of requests is not implemented.
"""
async with get_session().create_client("s3", **aio_moto) as client:
async with AsyncClient(**aio_moto) as client:
await client.create_bucket(Bucket=BUCKET_NAME)
await client.create_bucket(Bucket=OTHER_BUCKET_NAME)
yield client
Expand Down Expand Up @@ -92,15 +92,23 @@ async def test_presigned_upload_moto(moto_s3):
assert r.status_code == 204, r.text

# Make sure the object is actually there
obj = await moto_s3.get_object(Bucket=BUCKET_NAME, Key=key)
assert (await obj["Body"].read()) == file_content
url = await moto_s3.generate_presigned_url(
"get_object",
Params={"Bucket": BUCKET_NAME, "Key": key},
ExpiresIn=3600,
)
async with httpx.AsyncClient() as client:
r = await client.get(
url,
)

assert r.content == file_content


@pytest.fixture(scope="function")
async def minio_client(demo_urls):
"""Create a S3 client that uses minio from the demo as backend."""
async with get_session().create_client(
"s3",
async with AsyncClient(
endpoint_url=demo_urls["minio"],
aws_access_key_id="console",
aws_secret_access_key="console123",
Expand All @@ -115,8 +123,14 @@ async def test_bucket(minio_client):
await minio_client.create_bucket(Bucket=bucket_name)
yield bucket_name
objects = await minio_client.list_objects(Bucket=bucket_name)
for obj in objects.get("Contents", []):
await minio_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
if objects.get("Contents", []):
await minio_client.delete_objects(
Bucket=bucket_name,
Delete={
"Objects": [{"Key": obj["Key"]} for obj in objects.get("Contents", [])]
},
)

await minio_client.delete_bucket(Bucket=bucket_name)


Expand Down
7 changes: 5 additions & 2 deletions diracx-logic/tests/jobs/test_sandboxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
from io import BytesIO
from typing import AsyncGenerator, Generator

import botocore.exceptions
import freezegun
import httpx
import pytest
import signurlarity.exceptions
import sqlalchemy

from diracx.core.exceptions import SandboxNotFoundError
Expand Down Expand Up @@ -142,7 +142,10 @@ async def test_upload_and_clean(
await clean_sandboxes(sandbox_metadata_db, sandbox_settings)

# Check that the sandbox was actually removed from the bucket
with pytest.raises(botocore.exceptions.ClientError, match="Not Found"):
with pytest.raises(
signurlarity.exceptions.PresignError,
match="does not exist or is not accessible",
):
await sandbox_settings.s3_client.head_object(
Bucket=sandbox_settings.bucket_name, Key=key
)
Expand Down
3 changes: 0 additions & 3 deletions extensions/gubbins/gubbins-logic/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ dynamic = ["version"]
[project.optional-dependencies]
testing = []
types = [
"boto3-stubs",
"types-aiobotocore[essential]",
"types-aiobotocore-s3",
"types-cachetools",
"types-python-dateutil",
"types-PyYAML",
Expand Down
3 changes: 0 additions & 3 deletions extensions/gubbins/gubbins-routers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ dynamic = ["version"]
[project.optional-dependencies]
testing = ["diracx-testing", "moto[server]", "pytest-httpx"]
types = [
"boto3-stubs",
"types-aiobotocore[essential]",
"types-aiobotocore-s3",
"types-cachetools",
"types-python-dateutil",
"types-PyYAML",
Expand Down
Loading
Loading