diff --git a/.github/workflows/integration-test-sync-deprovision.yml b/.github/workflows/integration-test-sync-deprovision.yml new file mode 100644 index 000000000..7eb12414b --- /dev/null +++ b/.github/workflows/integration-test-sync-deprovision.yml @@ -0,0 +1,97 @@ +name: integration-test-sync-deprovision +on: [push] +concurrency: transfer-test +env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + SKYPLANE_USAGE_STATS_ENABLED: 0 +jobs: + integration: + runs-on: ubuntu-latest + strategy: + max-parallel: 1 + matrix: + pairs: + # AWS to AWS + - aws:us-east-1 aws:us-east-1 --multipart + - aws:us-east-2 aws:us-west-2 --multipart + # GCP to GCP + - gcp:us-central1-a gcp:us-central1-a --multipart + - gcp:us-west1-a gcp:us-east1-a --multipart + # Azure to Azure + - azure:westus azure:westus + - azure:eastus azure:westus + # cross cloud tests + - aws:us-west-1 gcp:us-west2-a --multipart + - gcp:us-west2-a aws:us-west-1 --multipart + - aws:us-west-1 azure:westus + - azure:westus aws:us-west-1 + - gcp:us-west2-a azure:westus + - azure:westus gcp:us-west2-a + timeout-minutes: 40 + env: + STRATEGY_UUID: itest-${{ github.run_id }}-${{ github.run_attempt }}-${{ strategy.job-index }} + steps: + - uses: actions/checkout@v1 + - name: Install poetry + run: pipx install poetry + - name: Set up Python 3.10 + uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "poetry" + - name: Set Poetry config + run: | + poetry config virtualenvs.in-project false + poetry config virtualenvs.path ~/.virtualenvs + - name: Install Dependencies + run: poetry install -E aws -E azure -E gcp + if: steps.cache.outputs.cache-hit != 'true' + - id: 'auth' + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}' + - name: Log into Azure + uses: azure/login@v1 + with: + creds: '{"clientId":"${{ secrets.AZURE_CLIENT_ID }}","clientSecret":"${{ secrets.AZURE_CLIENT_SECRET }}","subscriptionId":"${{ secrets.AZURE_SUBSCRIPTION_ID }}","tenantId":"${{ secrets.AZURE_TENANT_ID }}"}' + - name: Skyplane init + run: | + poetry run skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} + poetry run skyplane config set native_cmd_enabled false + cat ~/.skyplane/config + poetry run skyplane init -y --disable-config-cloudflare + poetry run skyplane config set usage_stats false + - name: 1000000 small files test + run: poetry run python tests/integration/sync.py ${{ matrix.pairs }} --n-files 10000000 --file-size-kb 1 --autoshutdown 1 + - name: Cleanup GCP service account + if: always() + run: gcloud iam service-accounts delete ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com + deprovision: + runs-on: ubuntu-latest + if: ${{ always() }} + needs: [integration] + env: + STRATEGY_UUID: itest-d-${{ github.run_id }}-${{ github.run_attempt }} + steps: + - uses: actions/checkout@v1 + - name: Set up Python 3.10 + uses: actions/setup-python@v4 + with: + python-version: "3.10" + - name: Install skyplane from pypi + run: pip install skyplane[aws,azure,gcp] + - id: 'auth' + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GCP_CREDENTIALS_JSON }}' + - name: Skyplane init + run: | + skyplane config set gcp_service_account_name ${{ env.STRATEGY_UUID }} + skyplane init -y --disable-config-azure + skyplane config set usage_stats false + - name: Deprovision + run: skyplane deprovision + - name: Cleanup GCP service account + if: always() + run: gcloud iam service-accounts delete --quiet ${{ env.STRATEGY_UUID }}@${{ secrets.GCP_PROJECT_ID }}.iam.gserviceaccount.com diff --git a/skyplane/api/config.py b/skyplane/api/config.py index 4e4979815..6c853d4b6 100644 --- a/skyplane/api/config.py +++ b/skyplane/api/config.py @@ -63,6 +63,8 @@ def make_auth_provider(self) -> compute.IBMCloudAuthentication: @dataclass(frozen=True) class TransferConfig: + """Configuration for a specific transfer""" + autoterminate_minutes: int = 15 requester_pays: bool = False diff --git a/skyplane/api/transfer_job.py b/skyplane/api/transfer_job.py index ecfbaa086..4858114dd 100644 --- a/skyplane/api/transfer_job.py +++ b/skyplane/api/transfer_job.py @@ -170,25 +170,6 @@ def _run_multipart_chunk_thread( mime_type = None raise NotImplementedError("Multipart not implement for non-object store interfaces") - # def to_chunk_requests(self, gen_in: Generator[Chunk, None, None]) -> Generator[ChunkRequest, None, None]: - # """Converts a generator of chunks to a generator of chunk requests. - - # :param gen_in: generator that generates chunk requests - # :type gen_in: Generator - # """ - # src_region = self.src_iface.region_tag() - # src_bucket = self.src_iface.bucket() - # for chunk in gen_in: - # yield ChunkRequest( - # chunk=chunk, - # src_region=src_region, - # #dst_region=dest_region, - # src_object_store_bucket=src_bucket, - # #dst_object_store_bucket=dest_bucket, - # src_type="object_store", - # #dst_type="object_store", - # ) - @staticmethod def map_object_key_prefix(source_prefix: str, source_key: str, dest_prefix: str, recursive: bool = False): """ @@ -244,6 +225,7 @@ def transfer_pair_generator( self, src_prefix: str, dst_prefixes: List[str], + dataplane: "Dataplane", recursive: bool, prefilter_fn: Optional[Callable[[ObjectStoreObject], bool]] = None, # TODO: change to StorageObject ) -> Generator[TransferPair, None, None]: @@ -251,8 +233,10 @@ def transfer_pair_generator( :param src_prefix: source bucket folder prefix :type src_prefix: string - :param dst_prefix: destination bucket folder prefix - :type dst_prefix: string + :param dst_prefixes: destination bucket folder prefixes + :type dst_prefix: List[string] + :param dataplane: dataplane for the transfer + :type dataplane: skyplane.api.Dataplane :param recursive: if true, will copy objects at folder prefix recursively :type recursive: bool :param prefilter_fn: filters out objects whose prefixes do not match the filter function (default: None) @@ -273,6 +257,8 @@ def transfer_pair_generator( logger.fs.debug(f"Querying objects in {self.src_iface.path()}") n_objs = 0 for obj in self.src_iface.list_objects(src_prefix): + if dataplane.bound_nodes: + do_parallel(lambda i: i.run_command("echo 1"), dataplane.bound_nodes.values(), n=8) if prefilter_fn is None or prefilter_fn(obj): # collect list of destination objects dest_objs = {} @@ -586,6 +572,7 @@ def http_pool(self): def gen_transfer_pairs( self, chunker: Optional[Chunker] = None, + dataplane: Optional["Dataplane"] = None, transfer_config: Optional[TransferConfig] = field(init=False, default_factory=lambda: TransferConfig()), ) -> Generator[TransferPair, None, None]: """Generate transfer pairs for the transfer job. @@ -595,7 +582,7 @@ def gen_transfer_pairs( """ if chunker is None: # used for external access to transfer pair list chunker = Chunker(self.src_iface, self.dst_ifaces, transfer_config) # TODO: should read in existing transfer config - yield from chunker.transfer_pair_generator(self.src_prefix, self.dst_prefixes, self.recursive, self._pre_filter_fn) + yield from chunker.transfer_pair_generator(self.src_prefix, self.dst_prefixes, dataplane, self.recursive, self._pre_filter_fn) def dispatch( self, @@ -613,7 +600,7 @@ def dispatch( :type dispatch_batch_size: int """ chunker = Chunker(self.src_iface, self.dst_ifaces, transfer_config) - transfer_pair_generator = self.gen_transfer_pairs(chunker) # returns TransferPair objects + transfer_pair_generator = self.gen_transfer_pairs(chunker, dataplane) # returns TransferPair objects gen_transfer_list = chunker.tail_generator(transfer_pair_generator, self.transfer_list) chunks = chunker.chunk(gen_transfer_list) batches = chunker.batch_generator( @@ -779,6 +766,7 @@ def __init__(self, src_path: str, dst_paths: List[str] or str, requester_pays: b def gen_transfer_pairs( self, chunker: Optional[Chunker] = None, + dataplane: Optional["Dataplane"] = None, transfer_config: Optional[TransferConfig] = field(init=False, default_factory=lambda: TransferConfig()), ) -> Generator[TransferPair, None, None]: """Generate transfer pairs for the transfer job. @@ -788,7 +776,9 @@ def gen_transfer_pairs( """ if chunker is None: # used for external access to transfer pair list chunker = Chunker(self.src_iface, self.dst_ifaces, transfer_config) - transfer_pair_gen = chunker.transfer_pair_generator(self.src_prefix, self.dst_prefixes, self.recursive, self._pre_filter_fn) + transfer_pair_gen = chunker.transfer_pair_generator( + self.src_prefix, self.dst_prefixes, dataplane, self.recursive, self._pre_filter_fn + ) # only single destination supported assert len(self.dst_ifaces) == 1, "Only single destination supported for sync job" diff --git a/tests/integration/sync.py b/tests/integration/sync.py new file mode 100644 index 000000000..6cb393cfc --- /dev/null +++ b/tests/integration/sync.py @@ -0,0 +1,109 @@ +import argparse +import os +import tempfile +import uuid +from skyplane.utils.definitions import KB +from skyplane.obj_store.object_store_interface import ObjectStoreInterface +from skyplane.cli.cli import sync +from skyplane.config_paths import cloud_config +from skyplane.utils import logger + + +def setup_buckets(src_region, dest_region, n_files=1, file_size_kb=1): + src_provider, src_zone = src_region.split(":") + dest_provider, dest_zone = dest_region.split(":") + if src_provider == "azure": + src_bucket_name = f"integration{src_zone}/{str(uuid.uuid4()).replace('-', '')}" + else: + src_bucket_name = f"integration{src_zone}-{str(uuid.uuid4())[:8]}" + if dest_provider == "azure": + dest_bucket_name = f"integration{dest_zone}/{str(uuid.uuid4()).replace('-', '')}" + else: + dest_bucket_name = f"skyplane-integration-{dest_zone}-{str(uuid.uuid4())[:8]}" + logger.debug(f"creating buckets {src_bucket_name} and {dest_bucket_name}") + src_interface = ObjectStoreInterface.create(src_region, src_bucket_name) + dest_interface = ObjectStoreInterface.create(dest_region, dest_bucket_name) + src_interface.create_bucket(src_zone) + dest_interface.create_bucket(dest_zone) + + src_prefix = f"src_{uuid.uuid4()}" + dest_prefix = f"dest_{uuid.uuid4()}" + with tempfile.NamedTemporaryFile() as tmp: + fpath = tmp.name + with open(fpath, "wb+") as f: + f.write(os.urandom(int(file_size_kb * KB))) + for i in range(n_files): + src_interface.upload_object(fpath, f"{src_prefix}/{i}", mime_type="text/plain") + + return src_bucket_name, dest_bucket_name, src_prefix, dest_prefix + + +def run(src_region, dest_region, n_files=1, file_size_kb=1, multipart=False, autoshowdown_minutes=15): + logger.info( + f"Running skyplane sync integration test with config " + + f"src_region={src_region}, " + + f"dest_region={dest_region}, " + + f"n_files={n_files}, " + + f"file_size_kb={file_size_kb}, " + + f"multipart={multipart}" + + f"autoshowdown_minutes={autoshowdown_minutes}" + ) + cloud_config.set_flag("autoshutdown_minutes", autoshowdown_minutes) + src_bucket_name, dest_bucket_name, src_prefix, dest_prefix = setup_buckets( + src_region, dest_region, n_files=n_files, file_size_kb=file_size_kb + ) + + def map_path(region, bucket, prefix): + provider, _ = region.split(":") + if provider == "aws": + return f"s3://{bucket}/{prefix}" + elif provider == "azure": + storage_account, container = bucket.split("/") + return f"https://{storage_account}.blob.core.windows.net/{container}/{prefix}" + elif provider == "gcp": + return f"gs://{bucket}/{prefix}" + else: + raise Exception(f"Unknown provider {provider}") + + return_code = sync( + map_path(src_region, src_bucket_name, src_prefix), + map_path(dest_region, dest_bucket_name, dest_prefix), + debug=False, + multipart=multipart, + confirm=True, + max_instances=1, + max_connections=1, + solver="direct", + solver_required_throughput_gbits=1, + ) + + # clean up path + src_interface = ObjectStoreInterface.create(src_region, src_bucket_name) + dest_interface = ObjectStoreInterface.create(dest_region, dest_bucket_name) + src_interface.delete_objects([f"{src_prefix}/{i}" for i in range(n_files)]) + dest_interface.delete_objects([f"{dest_prefix}/{i}" for i in range(n_files)]) + src_interface.delete_bucket() + dest_interface.delete_bucket() + + return return_code + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("src", help="source region") + parser.add_argument("dest", help="destination region") + parser.add_argument("--n-files", type=int, default=1) + parser.add_argument("--file-size-kb", type=int, default=1) + parser.add_argument("--multipart", action="store_true") + parser.add_argument("--autoshutdown", type=int, default=15) + args = parser.parse_args() + + return_code = run( + args.src, + args.dest, + n_files=args.n_files, + file_size_kb=args.file_size_kb, + multipart=args.multipart, + autoshowdown_minutes=args.autoshutdown, + ) + exit(return_code)