diff --git a/README.md b/README.md index 88d366a..7a25f71 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,8 @@ These tests are run regularly against our public infrastructure as well as our i | | [test_ping](./test_load_balancer.py#L855) | default | | **Nested Virtualization** | [test_virtualization_support](./test_nested_virtualization.py#L14) | default | | | [test_run_nested_vm](./test_nested_virtualization.py#L40) | default | +| **Objects** | [test_bucket_urls](./test_objects.py#L21) | default | +| | [test_notifications](./test_objects.py#L65) | default | | **Private Network** | [test_private_ip_address_on_all_images](./test_private_network.py#L14) | all | | | [test_private_network_connectivity_on_all_images](./test_private_network.py#L35) | all | | | [test_multiple_private_network_interfaces](./test_private_network.py#L88) | default | diff --git a/api.py b/api.py index b1811ec..3659435 100644 --- a/api.py +++ b/api.py @@ -1,3 +1,5 @@ +import boto3 +import re import requests import time @@ -11,6 +13,12 @@ from filelock import FileLock from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry +from resources import ObjectsUser +from urllib.parse import urlparse + + +# Contains delete handlers, see delete_handler below +DELETE_HANDLERS = {} class CloudscaleHTTPAdapter(HTTPAdapter): @@ -80,6 +88,7 @@ def __init__(self, scope, zone=None, read_only=False): self.hooks = {'response': self.on_response} self.scope = scope self.read_only = read_only + self.zone = zone # 8 Retries @ 2.5 backoff_factor = 10.6 minutes retry_strategy = RetryStrategy( @@ -93,6 +102,10 @@ def __init__(self, scope, zone=None, read_only=False): self.mount("https://", adapter) + # This is None, when running "invoke cleanup" + if self.zone: + self.objects_endpoint = self.objects_endpoint_for(self.zone) + def post(self, url, data=None, json=None, add_tags=True, **kwargs): assert not data, "Please only use json, not data" @@ -101,34 +114,13 @@ def post(self, url, data=None, json=None, add_tags=True, **kwargs): 'runner': RUNNER_ID, 'process': PROCESS_ID, 'scope': self.scope, + 'zone': self.zone, } return super().post(url, data=data, json=json, **kwargs) def delete(self, url): - super().delete(url) - - # Wait for snapshots to be deleted - if 'volume-snapshots' in url: - timeout = time.monotonic() + 60 - - while time.monotonic() < timeout: - time.sleep(1) - - try: - snapshot = self.get(url) - except requests.exceptions.HTTPError as e: - if e.response.status_code == 404: - # The snapshot is gone, stop waiting - break - else: - raise e - - else: - raise Timeout( - f'Snapshot failed to delete within 60 seconds. Status ' - f'is still "{snapshot.json()["status"]}".' - ) + delete_handler_for_url(url)(api=self, url=url) def on_response(self, response, *args, **kwargs): trigger('request.after', request=response.request, response=response) @@ -172,6 +164,7 @@ def resources(path): yield from resources('/networks') yield from resources('/server-groups') yield from resources('/custom-images') + yield from resources('/objects-users') def cleanup(self, limit_to_scope=True, limit_to_process=True): """ Deletes resources created by this API object. """ @@ -194,3 +187,116 @@ def cleanup(self, limit_to_scope=True, limit_to_process=True): if exceptions: raise ExceptionGroup("Failures during cleanup.", exceptions) + + def objects_endpoint_for(self, zone): + netloc = urlparse(self.api_url).netloc + + if netloc.startswith("api"): + prefix = "" + tld = "ch" + else: + prefix = f"{netloc.split('-')[0]}-" + tld = "zone" + + return f"{prefix}objects.{zone.rstrip('012345679')}.cloudscale.{tld}" + + +def delete_handler(path): + """ Registers the decorated function as delete handler for the given + path. The path is treated as regular expression pattern, used to search + the path (not the whole URL). + + The decorated function is supposed to delete the given URL. + + """ + + def delete_handler_decorator(fn): + DELETE_HANDLERS[path] = fn + return fn + return delete_handler_decorator + + +def delete_handler_for_url(url): + """ Evaluates the registered delete handlers and picks the first matching + one, or a default. + + The order of the evaluation is not strictly defined, handlers are + expected to not overlap. + + """ + path = urlparse(url).path + + for pattern, fn in DELETE_HANDLERS.items(): + if re.fullmatch(pattern, path): + return fn + + # Use the low-level method, as we are downstream from api.delete and cannot + # call api.delete, lest we want an infinite loop. + return lambda api, url: api.request("DELETE", url) + + +@delete_handler(path='/v1/volume-snapshots/.+') +def delete_volume_snapshots(api, url): + """ When deleting volume-snapshots, we need to wait for the snapshots to + be deleted, or we won't be able to delete the servers later. + + """ + + # Delete the snapshot first + api.request("DELETE", url) + + # Wait for snapshots to be deleted + timeout = time.monotonic() + 60 + + while time.monotonic() < timeout: + time.sleep(1) + + try: + snapshot = api.get(url) + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + # The snapshot is gone, stop waiting + break + else: + raise e + + else: + raise Timeout( + f'Snapshot failed to delete within 60 seconds. Status ' + f'is still "{snapshot.json()["status"]}".' + ) + + +@delete_handler(path='/v1/objects-users/.+') +def delete_objects_users(api, url): + """ Before deleting an objects user, we have to delete owned buckets. """ + + user = ObjectsUser.from_href(None, api, url, name="") + user.wait_for_access() + + session = boto3.Session( + aws_access_key_id=user.keys[0]['access_key'], + aws_secret_access_key=user.keys[0]['secret_key'], + ) + + objects_endpoint = api.objects_endpoint_for(zone=user.tags['zone']) + s3 = session.resource('s3', endpoint_url=f"https://{objects_endpoint}") + + for bucket in s3.buckets.all(): + bucket.objects.all().delete() + bucket.delete() + + sns = boto3.client( + 'sns', + endpoint_url=f"https://{objects_endpoint}", + aws_access_key_id=user.keys[0]['access_key'], + aws_secret_access_key=user.keys[0]['secret_key'], + region_name='default', + ) + + for topic in sns.list_topics().get('Topics', ()): + arn = topic["TopicArn"] + assert re.match(r'arn:aws:sns:(rma|lpg)::at-.+', arn) + sns.delete_topic(TopicArn=arn) + + api.request("DELETE", url) diff --git a/conftest.py b/conftest.py index e739e45..244231b 100644 --- a/conftest.py +++ b/conftest.py @@ -1,3 +1,4 @@ +import boto3 import os import pytest import random @@ -21,6 +22,7 @@ from resources import FloatingIP from resources import LoadBalancer from resources import Network +from resources import ObjectsUser from resources import Server from resources import ServerGroup from resources import Volume @@ -829,3 +831,69 @@ def factory(num_backends, return load_balancer, listener, pool, backends, private_network return factory + + +@pytest.fixture(scope='session') +def create_objects_user(request, session_api, objects_endpoint): + """ Factory to create an objects user. """ + + factory = ObjectsUser.factory( + request=request, + api=session_api, + ) + + def wrapper(*args, **kwargs): + user = factory(*args, **kwargs) + + # We need to wait for a moment for the key to become available. + user.wait_for_access() + + return user + + return wrapper + + +@pytest.fixture(scope='session') +def objects_user(create_objects_user): + """ An object user that can be used with objects_endpoint. """ + + return create_objects_user(name=f'at-{secrets.token_hex(8)}') + + +@pytest.fixture(scope='session') +def objects_endpoint(session_api): + """ An objects endpoint of the given zone. """ + + return session_api.objects_endpoint + + +@pytest.fixture(scope='session') +def access_key(objects_user): + """ An S3 access key for the objects endpoint. """ + + return objects_user.keys[0]["access_key"] + + +@pytest.fixture(scope='session') +def secret_key(objects_user): + """ An S3 secret key for the objects endpoint. """ + + return objects_user.keys[0]["secret_key"] + + +@pytest.fixture(scope='function') +def bucket(objects_user, objects_endpoint): + """ A bucket wrapped in a boto3.S3.Bucket class. """ + + session = boto3.Session( + aws_access_key_id=objects_user.keys[0]['access_key'], + aws_secret_access_key=objects_user.keys[0]['secret_key'], + ) + + s3 = session.resource('s3', endpoint_url=f"https://{objects_endpoint}") + + bucket = s3.create_bucket(Bucket=f"at-{secrets.token_hex(8)}") + yield bucket + + bucket.objects.all().delete() + bucket.delete() diff --git a/requirements.txt b/requirements.txt index d6b9c99..74d9a83 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +boto3 dnspython filelock flake8 diff --git a/resources.py b/resources.py index 647474f..b7e7e04 100644 --- a/resources.py +++ b/resources.py @@ -1,3 +1,5 @@ +import boto3 +import botocore import re import secrets import textwrap @@ -1454,3 +1456,40 @@ def verify_backend(self, prober, backend, count=1, port=None): for i in range(count): assert (prober.http_get(self.build_url(url='/hostname', port=port)) == backend.name) + + +class ObjectsUser(CloudscaleResource): + + def __init__(self, request, api, name): + super().__init__(request, api) + + self.spec = { + 'display_name': f'{RESOURCE_NAME_PREFIX}-{name}', + } + + @with_trigger('objects-user.create') + def create(self): + self.info = self.api.post('/objects-users', json=self.spec).json() + + def wait_for_access(self, timeout=30): + objects_endpoint = self.api.objects_endpoint_for(self.tags['zone']) + + s3 = boto3.client( + 's3', + endpoint_url=f"https://{objects_endpoint}", + aws_access_key_id=self.keys[0]["access_key"], + aws_secret_access_key=self.keys[0]["secret_key"], + ) + + timeout = time.monotonic() + 30 + exception = None + + while time.monotonic() < timeout: + try: + s3.list_buckets() + break + except botocore.exceptions.ClientError as e: + exception = e + time.sleep(1) + else: + raise TimeoutError from exception diff --git a/test_objects.py b/test_objects.py new file mode 100644 index 0000000..4e6be07 --- /dev/null +++ b/test_objects.py @@ -0,0 +1,158 @@ +""" + +Objects Storage +=============== + +Using an S3 compatible API, customers may store data in our Object Storage +service and serve it without the need to manage their own storage system. + +""" + +import boto3 +import json +import requests +import secrets +import time + +from util import flatten +from util import oneliner + + +def test_bucket_urls(objects_endpoint, access_key, secret_key): + """ We can create a bucket using the official AWS SDK for Python, upload + an object, and make it available publicly. It is then available as follows: + + - .objects.rma|lpg.cloudscale.ch/ + - objects.rma|lpg.cloudscale.ch// + + """ + + # Establish a connection using the official AWS SDK for Python + s3 = boto3.client( + 's3', + endpoint_url=f"https://{objects_endpoint}", + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + ) + + # Generate a bucket name that has not been used yet (bucket names are + # global across all our object storage endpoints). + bucket = f"at-{secrets.token_hex(8)}" + + # Create the bucket + s3.create_bucket(Bucket=bucket) + + # Upload an object + s3.put_object( + Bucket=bucket, + Key='key.txt', + Body=b'test', + ACL='public-read', + ) + + # Read the expected URLs using an anonymous HTTP client + urls = ( + f"https://{objects_endpoint}/{bucket}/key.txt", + f"https://{bucket}.{objects_endpoint}/key.txt", + ) + + for url in urls: + response = requests.get(url) + assert response.status_code == 200 + assert response.text == "test" + + +def test_notifications( + bucket, + access_key, + secret_key, + objects_endpoint, + server, + region, +): + """ Using S3 SNS (Simple Notification Service) we can be informed via + webhooks, when something changes on a bucket. + + """ + + # Run a service that can act as a webhook endpoint + server.assert_run(oneliner(""" + sudo apt update; + sudo apt install -y podman jq; + sudo systemd-run --unit webhook.service + podman run + --name webhook + --publish 80:8080 + --env LOG_WITHOUT_NEWLINE=true + --env DISABLE_REQUEST_LOGS=true + docker.io/mendhak/http-https-echo:38 + """)) + + # Get an SNS client (Simple Notification Service) + sns = boto3.client( + 'sns', + endpoint_url=f"https://{objects_endpoint}", + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name='default', + ) + + # Create a test topic + name = f"at-{secrets.token_hex(8)}" + + topic = sns.create_topic(Name=name, Attributes={ + "push-endpoint": f"http://{server.ip('public', 4)}", + }) + + # Get notified whenever an object is created + bucket.Notification().put(NotificationConfiguration={ + "TopicConfigurations": [ + { + "Id": name, + "TopicArn": topic['TopicArn'], + "Events": ["s3:ObjectCreated:*"] + } + ] + }) + + # We have to wait a moment for the configuration to propagate + timeout = time.monotonic() + 30 + + while time.monotonic() < timeout: + bucket.put_object(Key='pre-check', Body=b'') + + count = int(server.output_of(oneliner(""" + sudo journalctl CONTAINER_NAME=webhook | grep pre-check | wc -l + """))) + + if count: + break + + time.sleep(1) + + # Generate multiple notifications + for i in range(3): + bucket.put_object(Key=f'count-{i}', Body=b'') + + # Ensure they were received (excluding the pre-check objects from above) + messages = [json.loads(m)['json'] for m in server.output_of(oneliner(r""" + sudo journalctl CONTAINER_NAME=webhook -o json + | jq -r .MESSAGE + | grep -v pre-check + | grep -E '^\{' + """)).splitlines()] + + # A single message may contain multiple records + records = flatten(m['Records'] for m in messages) + assert len(records) == 3 + + # The records are sent in order + assert records[0]['s3']['object']['key'] == 'count-0' + assert records[1]['s3']['object']['key'] == 'count-1' + assert records[2]['s3']['object']['key'] == 'count-2' + + # They all share some properties + for record in records: + assert record['eventName'] == 'ObjectCreated:Put' + assert record['awsRegion'] == region + assert record['s3']['bucket']['name'] == bucket.name diff --git a/util.py b/util.py index ca3a791..2d8225b 100644 --- a/util.py +++ b/util.py @@ -26,6 +26,7 @@ from hashlib import blake2b from ipaddress import ip_address from ipaddress import ip_network +from itertools import chain from paramiko import SSHClient, AutoAddPolicy from paramiko.ssh_exception import ChannelException from paramiko.ssh_exception import NoValidConnectionsError @@ -856,3 +857,9 @@ def skip_test_when(match, reason=None): pytest.skip(reason) raise + + +def flatten(list_of_lists): + """ Flatten one level of nesting. """ + + return list(chain.from_iterable(list_of_lists))