From 01dbb5011f95483e5b3e4bd0040983e59a96f86b Mon Sep 17 00:00:00 2001 From: Gerrod Ubben Date: Fri, 15 May 2026 15:48:34 -0400 Subject: [PATCH] Add ability to sync by digests fixes: #1909 Assisted by: claude-sonnet-4.6 https://redhat.atlassian.net/browse/PULP-1740 --- CHANGES/+include_exclude_tags.removal | 1 + CHANGES/1909.feature | 1 + .../0048_containerremote_includes_excludes.py | 52 +++++++ pulp_container/app/models.py | 11 +- pulp_container/app/serializers.py | 53 ++++++-- pulp_container/app/tasks/sync_stages.py | 104 +++++++------- .../tests/functional/api/test_sync.py | 127 ++++++++++++++++++ 7 files changed, 290 insertions(+), 59 deletions(-) create mode 100644 CHANGES/+include_exclude_tags.removal create mode 100644 CHANGES/1909.feature create mode 100644 pulp_container/app/migrations/0048_containerremote_includes_excludes.py diff --git a/CHANGES/+include_exclude_tags.removal b/CHANGES/+include_exclude_tags.removal new file mode 100644 index 000000000..18c387e08 --- /dev/null +++ b/CHANGES/+include_exclude_tags.removal @@ -0,0 +1 @@ +Renamed `include_tags` and `exclude_tags` to `includes` and `excludes` on the remote. diff --git a/CHANGES/1909.feature b/CHANGES/1909.feature new file mode 100644 index 000000000..c45c998bd --- /dev/null +++ b/CHANGES/1909.feature @@ -0,0 +1 @@ +Added support for syncing manifests by digest through new `includes` field on the remote. diff --git a/pulp_container/app/migrations/0048_containerremote_includes_excludes.py b/pulp_container/app/migrations/0048_containerremote_includes_excludes.py new file mode 100644 index 000000000..356a512a8 --- /dev/null +++ b/pulp_container/app/migrations/0048_containerremote_includes_excludes.py @@ -0,0 +1,52 @@ +import django.contrib.postgres.fields +from django.db import migrations, models + + +def migrate_to_includes_excludes(apps, schema_editor): + """Copy include_tags -> includes, exclude_tags -> excludes.""" + ContainerRemote = apps.get_model("container", "ContainerRemote") + for remote in ContainerRemote.objects.all(): + remote.includes = remote.include_tags or None + remote.excludes = remote.exclude_tags or None + remote.save(update_fields=["includes", "excludes"]) + +def down_migrate_to_include_exclude_tags(apps, schema_editor): + """Copy includes + excludes -> include_tags + exclude_tags.""" + ContainerRemote = apps.get_model("container", "ContainerRemote") + for remote in ContainerRemote.objects.all(): + remote.include_tags = remote.includes or None + remote.exclude_tags = remote.excludes or None + remote.save(update_fields=["include_tags", "exclude_tags"]) + + +class Migration(migrations.Migration): + + dependencies = [ + ("container", "0047_containernamespace_pulp_labels"), + ] + + operations = [ + migrations.AddField( + model_name="containerremote", + name="includes", + field=django.contrib.postgres.fields.ArrayField( + base_field=models.TextField(null=True), + null=True, + size=None, + ), + ), + migrations.AddField( + model_name="containerremote", + name="excludes", + field=django.contrib.postgres.fields.ArrayField( + base_field=models.TextField(null=True), + null=True, + size=None, + ), + ), + # 2. Copy existing data. + migrations.RunPython( + migrate_to_includes_excludes, + down_migrate_to_include_exclude_tags, + ), + ] diff --git a/pulp_container/app/models.py b/pulp_container/app/models.py index 3a6da527d..e3ec5ad59 100644 --- a/pulp_container/app/models.py +++ b/pulp_container/app/models.py @@ -483,17 +483,22 @@ class ContainerRemote(Remote, AutoAddObjPermsMixin): upstream_name (models.TextField): The name of the image at the remote. include_foreign_layers (models.BooleanField): Foreign layers in the remote are included. They are not included by default. - include_tags (fields.ArrayField): List of tags to include during sync. - exclude_tags (fields.ArrayField): List of tags to exclude during sync. + includes (fields.ArrayField): List of tags (with optional wildcards) and/or + digests (sha256:) to sync. + excludes (fields.ArrayField): List of tag patterns to exclude from sync. sigstore (models.TextField): The URL to a sigstore where signatures of container images should be synced from. """ upstream_name = models.TextField(db_index=True) include_foreign_layers = models.BooleanField(default=False) + includes = fields.ArrayField(models.TextField(null=True), null=True) + excludes = fields.ArrayField(models.TextField(null=True), null=True) + sigstore = models.TextField(null=True) + + # Deprecated: kept for ZDT upgrades include_tags = fields.ArrayField(models.TextField(null=True), null=True) exclude_tags = fields.ArrayField(models.TextField(null=True), null=True) - sigstore = models.TextField(null=True) TYPE = "container" diff --git a/pulp_container/app/serializers.py b/pulp_container/app/serializers.py index 0281e5693..f0bbef00d 100644 --- a/pulp_container/app/serializers.py +++ b/pulp_container/app/serializers.py @@ -296,27 +296,58 @@ class ContainerRemoteSerializer(RemoteSerializer): upstream_name = serializers.CharField( required=True, allow_blank=False, help_text=_("Name of the upstream repository") ) + includes = serializers.ListField( + child=serializers.CharField(max_length=255), + allow_null=True, + required=False, + help_text=_( + "A list of tags (wildcards *, ? are recognized) and/or digests " + "(format: 'sha256:') to include during sync. " + "'includes' is evaluated before 'excludes'." + ), + ) + excludes = serializers.ListField( + child=serializers.CharField(max_length=255), + allow_null=True, + required=False, + help_text=_( + "A list of tag patterns to exclude during sync. " + "Wildcards *, ? are recognized. " + "'excludes' is evaluated after 'includes'." + ), + ) + + # --------------------------------------------------------------------------- + # Deprecated write-only fields kept for backwards compatibility. + # They are merged into `includes` / `excludes` during validation. + # --------------------------------------------------------------------------- include_tags = serializers.ListField( child=serializers.CharField(max_length=255), allow_null=True, required=False, - help_text=_(""" - A list of tags to include during sync. - Wildcards *, ? are recognized. - 'include_tags' is evaluated before 'exclude_tags'. - """), + write_only=True, + help_text=_("Deprecated. Use 'includes' instead."), ) exclude_tags = serializers.ListField( child=serializers.CharField(max_length=255), allow_null=True, required=False, - help_text=_(""" - A list of tags to exclude during sync. - Wildcards *, ? are recognized. - 'exclude_tags' is evaluated after 'include_tags'. - """), + write_only=True, + help_text=_("Deprecated. Use 'excludes' instead."), ) + def validate(self, data): + include_tags = data.pop("include_tags", None) + exclude_tags = data.pop("exclude_tags", None) + + if include_tags: + data["includes"] = list(data.get("includes") or []) + list(include_tags) + + if exclude_tags: + data["excludes"] = list(data.get("excludes") or []) + list(exclude_tags) + + return data + policy = serializers.ChoiceField( help_text=""" immediate - All manifests and blobs are downloaded and saved during a sync. @@ -337,6 +368,8 @@ class ContainerRemoteSerializer(RemoteSerializer): class Meta: fields = RemoteSerializer.Meta.fields + ( "upstream_name", + "includes", + "excludes", "include_tags", "exclude_tags", "sigstore", diff --git a/pulp_container/app/tasks/sync_stages.py b/pulp_container/app/tasks/sync_stages.py index 244b8ba52..026ab078e 100644 --- a/pulp_container/app/tasks/sync_stages.py +++ b/pulp_container/app/tasks/sync_stages.py @@ -3,6 +3,7 @@ import hashlib import json import logging +from collections import defaultdict from urllib.parse import urljoin, urlparse, urlunparse import aiohttp @@ -62,7 +63,7 @@ def __init__(self, remote, signed_only): self.manifest_list_dcs = [] self.manifest_dcs = [] self.signature_dcs = [] - self._synced_digests = set() + self._synced_digests = defaultdict(list) self._full_tag_list = [] self._cosign_tags = [] @@ -78,10 +79,12 @@ async def _download_manifest_data(self, manifest_url): return content_data, raw_text_data, response - async def _check_for_existing_manifest(self, download_tag): - response = await download_tag + async def _check_for_existing_manifest(self, head_manifest_task): + response = await head_manifest_task digest = response.headers.get("docker-content-digest") + url = response.url + original_reference = url.split("/")[-1] if manifest := await Manifest.objects.filter( digest=digest, pulp_domain=get_domain() @@ -89,9 +92,13 @@ async def _check_for_existing_manifest(self, download_tag): raw_text_data = manifest.data content_data = json.loads(raw_text_data) else: - content_data, raw_text_data, response = await self._download_manifest_data(response.url) + if not original_reference.startswith("sha256:") and digest: + # Fetch the tag with its digest + url = url.rsplit(original_reference, 1)[0] + digest - return content_data, raw_text_data, response + content_data, raw_text_data, response = await self._download_manifest_data(url) + + return content_data, raw_text_data, response, original_reference async def run(self): """ @@ -105,31 +112,33 @@ async def run(self): repo_name = self.remote.namespaced_upstream_name tag_list_url = "/v2/{name}/tags/list".format(name=repo_name) self._full_tag_list = await self.get_paginated_tag_list(tag_list_url, repo_name) - self._cosign_tags = filter_resources( - self._full_tag_list, ["sha256-*"], self.remote.exclude_tags - ) - if self.remote.include_tags or self.remote.exclude_tags: + includes = self.remote.includes or [] + excludes = self.remote.excludes or [] + + digest_includes = [i for i in includes if i.startswith("sha256:")] + self._cosign_tags = filter_resources(self._full_tag_list, ["sha256-*"], excludes) + + if includes or excludes: # Split sync into two parts, first all non-cosign tags, then cosign tags - exclude_tags_and_cosign = (self.remote.exclude_tags or []) + ["sha256-*"] - tag_list = filter_resources( - self._full_tag_list, self.remote.include_tags, exclude_tags_and_cosign - ) + exclude_and_cosign = excludes + ["sha256-*"] + filtered_tags = filter_resources(self._full_tag_list, includes, exclude_and_cosign) + manifest_list = filtered_tags + digest_includes else: - tag_list = self._full_tag_list + manifest_list = self._full_tag_list await pb.aincrement() - await self._process_tags(tag_list, signature_source) + await self._process_manifests(manifest_list, signature_source, "Processing Manifests") - if self.remote.include_tags or self.remote.exclude_tags: - # Process cosign companion tags after all non-cosign tags are synced + if includes or excludes: + # Process cosign companion tags after all primary content is synced companion_tags = self._find_cosign_companion_tags() if companion_tags: log.info( "Syncing %d cosign companion tag(s) for filtered images", len(companion_tags), ) - await self._process_tags( - companion_tags, signature_source, msg="Processing Cosign Companion Tags" + await self._process_manifests( + companion_tags, signature_source, "Processing Cosign Companion Tags" ) def _find_cosign_companion_tags(self): @@ -143,41 +152,39 @@ def _find_cosign_companion_tags(self): companion_tags.append(tag) return companion_tags - async def _process_tags(self, tag_list, signature_source, msg="Processing Tags"): - """Download and process a batch of tags, creating declarative content objects.""" + async def _process_manifests(self, manifests, signature_source, msg): + """Download and process a batch of manifests, creating declarative content objects.""" BATCH_SIZE = 500 to_download = [] - for tag_name in tag_list: - relative_url = "/v2/{name}/manifests/{tag}".format( - name=self.remote.namespaced_upstream_name, tag=tag_name - ) - tag_url = urljoin(self.remote.url, relative_url) - downloader = self.remote.get_downloader(url=tag_url) + for reference in manifests: + relative_url = f"/v2/{self.remote.namespaced_upstream_name}/manifests/{reference}" + manifest_url = urljoin(self.remote.url, relative_url) + downloader = self.remote.get_downloader(url=manifest_url) to_download.append( downloader.run(extra_data={"headers": V2_ACCEPT_HEADERS, "http_method": "head"}) ) async with ProgressReport( message=msg, - code="sync.processing.tag", - total=len(tag_list), - ) as pb_parsed_tags: + code="sync.processing.manifest", + total=len(manifests), + ) as pb_parsed_manifests: to_download_artifact = [ - self._check_for_existing_manifest(download_tag) - for download_tag in asyncio.as_completed(to_download) + self._check_for_existing_manifest(download_manifest) + for download_manifest in asyncio.as_completed(to_download) ] for artifact in asyncio.as_completed(to_download_artifact): - content_data, raw_text_data, response = await artifact + content_data, raw_text_data, response, manifest_ref = await artifact digest = calculate_digest(raw_text_data) - tag_name = response.url.split("/")[-1] + is_tag = not manifest_ref.startswith("sha256:") media_type = determine_media_type(content_data, response) if self.signed_only and not signature_source: if not ( - self._is_cosign_companion_tag(tag_name, media_type, content_data) + self._is_cosign_companion_tag(manifest_ref, media_type, content_data) or await self._has_cosign_signature(digest) ): log.info( @@ -185,12 +192,13 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags") "due to a requirement to sync signed content " "only.".format(digest=digest) ) - await pb_parsed_tags.aincrement() + await pb_parsed_manifests.aincrement() continue validate_manifest(content_data, media_type, digest) - tag_dc = DeclarativeContent(Tag(name=tag_name)) + if is_tag: + tag_dc = DeclarativeContent(Tag(name=manifest_ref)) if media_type in (MEDIA_TYPE.MANIFEST_LIST, MEDIA_TYPE.INDEX_OCI): list_dc = self.create_manifest_list( @@ -214,7 +222,7 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags") "The whole manifest list is skipped.".format( img_digest=man_dc.content.digest, ml_digest=list_dc.content.digest, - tag=tag_name, + tag=manifest_ref, ) ) break @@ -224,20 +232,23 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags") else: # Manifest indices can be signed too. It is not mandatory. # If signature is available mirror it. - self._synced_digests.add(digest) + self._synced_digests[digest].append(manifest_ref) if signature_source is not None: list_sig_dcs = await self.create_signatures(list_dc, signature_source) if list_sig_dcs: self.signature_dcs.extend(list_sig_dcs) - tag_dc.extra_data["tagged_manifest_dc"] = list_dc for listed_manifest in list_dc.extra_data["listed_manifests"]: - self._synced_digests.add(listed_manifest["manifest_dc"].content.digest) + self._synced_digests[ + listed_manifest["manifest_dc"].content.digest + ].append(manifest_ref) await self.handle_blobs( listed_manifest["manifest_dc"], listed_manifest["content_data"] ) self.manifest_dcs.append(listed_manifest["manifest_dc"]) self.manifest_list_dcs.append(list_dc) - self.tag_dcs.append(tag_dc) + if is_tag: + tag_dc.extra_data["tagged_manifest_dc"] = list_dc + self.tag_dcs.append(tag_dc) else: # Simple tagged manifest @@ -249,14 +260,15 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags") if self.signed_only and not man_sig_dcs: continue self.signature_dcs.extend(man_sig_dcs) - self._synced_digests.add(digest) - tag_dc.extra_data["tagged_manifest_dc"] = man_dc + self._synced_digests[digest].append(manifest_ref) await self.handle_blobs(man_dc, content_data) - self.tag_dcs.append(tag_dc) + if is_tag: + tag_dc.extra_data["tagged_manifest_dc"] = man_dc + self.tag_dcs.append(tag_dc) self.manifest_dcs.append(man_dc) # Count the skipped tasks as parsed too. - await pb_parsed_tags.aincrement() + await pb_parsed_manifests.aincrement() # Flush the queues to prevent overly excessive memory usage. # This will cap the number of in flight high level objects to about BATCH_SIZE. diff --git a/pulp_container/tests/functional/api/test_sync.py b/pulp_container/tests/functional/api/test_sync.py index b350aa70d..14a41e36f 100644 --- a/pulp_container/tests/functional/api/test_sync.py +++ b/pulp_container/tests/functional/api/test_sync.py @@ -13,6 +13,7 @@ PULP_COSIGN_TAGS_MANIFEST_B_DIGEST, PULP_COSIGN_TAGS_MANIFEST_C_DIGEST, PULP_FIXTURE_1, + PULP_FIXTURE_1_MANIFEST_A_DIGEST, PULP_HELLO_WORLD_LINUX_AMD64_DIGEST, PULP_LABELED_FIXTURE, REGISTRY_V2_FEED_URL, @@ -275,3 +276,129 @@ def test_sync_cosign_companion_tags_with_filtering( cr(PULP_COSIGN_TAGS_MANIFEST_C_DIGEST), } assert manifests.count == 10 + + +@pytest.mark.parallel +def test_sync_by_digest_includes( + container_repository_factory, + container_remote_factory, + container_repository_api, + container_sync, + container_tag_api, + container_manifest_api, +): + """Test that a digest entry in 'includes' syncs a manifest directly without a tag.""" + remote = container_remote_factory( + upstream_name=PULP_FIXTURE_1, + includes=[PULP_FIXTURE_1_MANIFEST_A_DIGEST], + ) + repository = container_repository_factory() + container_sync(repository, remote) + repo_version = container_repository_api.read(repository.pulp_href).latest_version_href + + tags = container_tag_api.list(repository_version=repo_version) + manifests = container_manifest_api.list(repository_version=repo_version) + + # No tags — digest entries in includes sync manifests without tag association + assert tags.count == 0 + assert manifests.count == 1 + assert manifests.results[0].digest == PULP_FIXTURE_1_MANIFEST_A_DIGEST + + +@pytest.mark.parallel +def test_sync_digest_includes_pulls_cosign_companions( + container_repository_factory, + container_remote_factory, + container_repository_api, + container_sync, + container_tag_api, + container_manifest_api, +): + """Test that digest entries in 'includes' also pull in cosign companion tags. + + pulp/cosign-tags and pulp/test-fixture-1 share the same manifest_a/b/c/d digests. + manifest_a has two cosign companions (.sig v2, .att v2). + manifest_b has two cosign companions (.sig v2, sha256- v3 index with 2 sub-manifests). + """ + cr = _cosign_registry_tag_name + + # --- manifest_a: two v2 cosign companions --- + remote_a = container_remote_factory( + upstream_name=PULP_COSIGN_COMPANION_TAGS, + includes=[PULP_COSIGN_TAGS_MANIFEST_A_DIGEST], + ) + repo_a = container_repository_factory() + container_sync(repo_a, remote_a) + ver_a = container_repository_api.read(repo_a.pulp_href).latest_version_href + + tags_a = container_tag_api.list(repository_version=ver_a) + manifests_a = container_manifest_api.list(repository_version=ver_a) + + # The primary manifest has no tag; companions get tags as usual + assert tags_a.count == 2 + assert {t.name for t in tags_a.results} == { + f"{cr(PULP_COSIGN_TAGS_MANIFEST_A_DIGEST)}.sig", + f"{cr(PULP_COSIGN_TAGS_MANIFEST_A_DIGEST)}.att", + } + # manifest_a itself + .sig manifest + .att manifest + assert manifests_a.count == 3 + + # --- manifest_b: one v2 + one v3 (manifest list) cosign companion --- + remote_b = container_remote_factory( + upstream_name=PULP_COSIGN_COMPANION_TAGS, + includes=[PULP_COSIGN_TAGS_MANIFEST_B_DIGEST], + ) + repo_b = container_repository_factory() + container_sync(repo_b, remote_b) + ver_b = container_repository_api.read(repo_b.pulp_href).latest_version_href + + tags_b = container_tag_api.list(repository_version=ver_b) + manifests_b = container_manifest_api.list(repository_version=ver_b) + + assert tags_b.count == 2 + assert {t.name for t in tags_b.results} == { + f"{cr(PULP_COSIGN_TAGS_MANIFEST_B_DIGEST)}.sig", + cr(PULP_COSIGN_TAGS_MANIFEST_B_DIGEST), + } + # manifest_b + .sig manifest + v3 manifest list + 2 v3 sub-manifests + assert manifests_b.count == 5 + + +@pytest.mark.parallel +def test_sync_mixed_tags_and_digests_in_includes( + container_repository_factory, + container_remote_factory, + container_repository_api, + container_sync, + container_tag_api, + container_manifest_api, +): + """Test that 'includes' can mix tag patterns and digests in a single sync.""" + cr = _cosign_registry_tag_name + + # Include manifest_b by tag name and manifest_a by digest. + # From pulp/cosign-tags: manifest_b has a .sig companion and a V3 companion. + # manifest_a has a .sig and .att companion. + remote = container_remote_factory( + upstream_name=PULP_COSIGN_COMPANION_TAGS, + includes=["manifest_b", PULP_COSIGN_TAGS_MANIFEST_A_DIGEST], + ) + repository = container_repository_factory() + container_sync(repository, remote) + ver = container_repository_api.read(repository.pulp_href).latest_version_href + + tags = container_tag_api.list(repository_version=ver) + manifests = container_manifest_api.list(repository_version=ver) + + # manifest_b gets a tag (synced by tag name); manifest_a does not (synced by digest). + # Cosign companions for both are pulled in as tagged content. + assert {t.name for t in tags.results} == { + "manifest_b", + f"{cr(PULP_COSIGN_TAGS_MANIFEST_B_DIGEST)}.sig", + cr(PULP_COSIGN_TAGS_MANIFEST_B_DIGEST), + f"{cr(PULP_COSIGN_TAGS_MANIFEST_A_DIGEST)}.sig", + f"{cr(PULP_COSIGN_TAGS_MANIFEST_A_DIGEST)}.att", + } + # manifest_a + manifest_b + b's .sig + b's V3 list + 2 V3 sub-manifests + # + a's .sig + a's .att = 8 + assert manifests.count == 8