From e420f025e3302f8c233645a334ca8ded91d01cc5 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Fri, 3 Apr 2026 13:34:15 -0400 Subject: [PATCH] Add sync optimization support to pulp_file NOTE: we cannot actually enable this for a while due to zero downtime upgrade restrictions and possible breaking changes. Whe the manifest hasn't changed relative to the previous sync, we skip the sync, so long as the sync modes are compatible and such. This requires storing a bit of hidden metadata on the repository object. --- CHANGES/pulp_file/+sync-optimization.misc | 3 + .../0020_filerepository_last_sync_details.py | 16 +++ pulp_file/app/models.py | 1 + pulp_file/app/settings.py | 1 + pulp_file/app/tasks/synchronizing.py | 120 ++++++++++++++++-- .../tests/unit/test_sync_optimization.py | 87 +++++++++++++ 6 files changed, 220 insertions(+), 8 deletions(-) create mode 100644 CHANGES/pulp_file/+sync-optimization.misc create mode 100644 pulp_file/app/migrations/0020_filerepository_last_sync_details.py create mode 100644 pulp_file/app/settings.py create mode 100644 pulp_file/tests/unit/test_sync_optimization.py diff --git a/CHANGES/pulp_file/+sync-optimization.misc b/CHANGES/pulp_file/+sync-optimization.misc new file mode 100644 index 00000000000..6c788f0abe6 --- /dev/null +++ b/CHANGES/pulp_file/+sync-optimization.misc @@ -0,0 +1,3 @@ +Add backend support for sync-optimization for the file plugin + +We can't actually use it yet due to zero-downtime upgrade restrictions diff --git a/pulp_file/app/migrations/0020_filerepository_last_sync_details.py b/pulp_file/app/migrations/0020_filerepository_last_sync_details.py new file mode 100644 index 00000000000..2e38bcc97f5 --- /dev/null +++ b/pulp_file/app/migrations/0020_filerepository_last_sync_details.py @@ -0,0 +1,16 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("file", "0019_add_filegitremote"), + ] + + operations = [ + migrations.AddField( + model_name="filerepository", + name="last_sync_details", + field=models.JSONField(default=dict), + ), + ] diff --git a/pulp_file/app/models.py b/pulp_file/app/models.py index 074cd5d16bf..6c04ced79c1 100644 --- a/pulp_file/app/models.py +++ b/pulp_file/app/models.py @@ -91,6 +91,7 @@ class FileRepository(Repository, AutoAddObjPermsMixin): manifest = models.TextField(default="PULP_MANIFEST", null=True) autopublish = models.BooleanField(default=False) + last_sync_details = models.JSONField(default=dict) class Meta: default_related_name = "%(app_label)s_%(model_name)s" diff --git a/pulp_file/app/settings.py b/pulp_file/app/settings.py new file mode 100644 index 00000000000..93ef8b1b944 --- /dev/null +++ b/pulp_file/app/settings.py @@ -0,0 +1 @@ +FILE_SYNC_OPTIMIZATION = True diff --git a/pulp_file/app/tasks/synchronizing.py b/pulp_file/app/tasks/synchronizing.py index 1a8210ba09d..091252b01b2 100644 --- a/pulp_file/app/tasks/synchronizing.py +++ b/pulp_file/app/tasks/synchronizing.py @@ -3,6 +3,7 @@ import re import shutil import tempfile +from hashlib import sha256 from gettext import gettext as _ from urllib.parse import quote, urlparse, urlunparse @@ -10,6 +11,7 @@ import git as gitpython from gitdb.exc import BadName, BadObject from django.core.files import File +from django.conf import settings from pulpcore.plugin.exceptions import SyncError from pulpcore.plugin.models import Artifact, ProgressReport, Remote, PublishedMetadata @@ -36,7 +38,64 @@ metadata_files = [] -def synchronize(remote_pk, repository_pk, mirror, url=None): +def _get_sha256(file_path): + """Compute the SHA256 hex digest of a file.""" + with open(file_path, "rb") as f: + return sha256(f.read()).hexdigest() + + +def _should_optimize_sync(sync_details, last_sync_details, version=None): + """ + Check whether the sync can be skipped by comparing with the previous sync. + + Args: + sync_details (dict): Details about the current sync configuration. + last_sync_details (dict): Details about the previous sync configuration. + version: The current latest RepositoryVersion, used for artifact checks. + + Returns: + bool: True if sync can be skipped; False otherwise. + + """ + if not last_sync_details: + return False + + # If switching to immediate download, we may need to download content + if ( + last_sync_details.get("download_policy") != "immediate" + and sync_details["download_policy"] == "immediate" + ): + return False + + # If switching to mirror mode, we need to create a publication + if not last_sync_details.get("mirror") and sync_details["mirror"]: + return False + + if last_sync_details.get("remote_pk") != sync_details["remote_pk"]: + return False + + if last_sync_details.get("url") != sync_details["url"]: + return False + + if last_sync_details.get("most_recent_version") != sync_details["most_recent_version"]: + return False + + if last_sync_details.get("manifest_checksum") != sync_details["manifest_checksum"]: + return False + + # If immediate policy, check if any content is missing artifacts (e.g. after reclaim) + if sync_details["download_policy"] == "immediate" and version: + from pulpcore.plugin.models import ContentArtifact + + if ContentArtifact.objects.filter( + content__in=version.content, artifact__isnull=True + ).exists(): + return False + + return True + + +def synchronize(remote_pk, repository_pk, mirror, optimize=False, url=None, **kwargs): """ Sync content from the remote repository. @@ -46,6 +105,7 @@ def synchronize(remote_pk, repository_pk, mirror, url=None): remote_pk (str): The remote PK. repository_pk (str): The repository PK. mirror (bool): True for mirror mode, False for additive. + optimize (bool): Whether to skip sync if nothing has changed. url (str): The url to synchronize. If omitted, the url of the remote is used. Raises: @@ -55,6 +115,8 @@ def synchronize(remote_pk, repository_pk, mirror, url=None): remote = Remote.objects.get(pk=remote_pk).cast() repository = FileRepository.objects.get(pk=repository_pk) + optimize = optimize or settings.FILE_SYNC_OPTIMIZATION + if not remote.url: raise ValueError(_("A remote must have a url specified to synchronize.")) @@ -67,10 +129,45 @@ def synchronize(remote_pk, repository_pk, mirror, url=None): for stage in old_pipeline_stages(new_version) if not isinstance(stage, (RemoteArtifactSaver)) ] + rv = dv.create() else: - first_stage = FileFirstStage(remote, url) + sync_url = url or remote.url + version = repository.latest_version() + + # Download the manifest to compute its checksum for optimization + downloader = remote.get_downloader(url=sync_url) + manifest_result = downloader.fetch() + + sync_details = { + "remote_pk": str(remote.pk), + "url": remote.url, + "download_policy": remote.policy, + "mirror": mirror, + "most_recent_version": version.number, + "manifest_checksum": _get_sha256(manifest_result.path), + } + + if optimize and _should_optimize_sync( + sync_details, repository.last_sync_details, version=version + ): + with ProgressReport( + message="Skipping Sync (no change from previous sync)", + code="sync.was_skipped", + ) as pb: + pb.total = 1 + pb.done = 1 + return + + first_stage = FileFirstStage(remote, url, manifest_path=manifest_result.path) dv = DeclarativeVersion(first_stage, repository, mirror=mirror, acs=True) - rv = dv.create() + rv = dv.create() + + # Update last_sync_details after sync + if rv: + sync_details["most_recent_version"] = rv.number + repository.last_sync_details = sync_details + repository.save() + if rv and mirror: # TODO: this is awful, we really should rewrite the DeclarativeVersion API to # accomodate this use case @@ -98,18 +195,21 @@ class FileFirstStage(Stage): The first stage of a pulp_file sync pipeline. """ - def __init__(self, remote, url): + def __init__(self, remote, url, manifest_path=None): """ The first stage of a pulp_file sync pipeline. Args: remote (FileRemote): The remote data to be used when syncing url (str): The base url of custom remote + manifest_path (str): Path to an already-downloaded manifest file. If provided, + the manifest will not be downloaded again. """ super().__init__() self.remote = remote self.url = url if url else remote.url + self.manifest_path = manifest_path async def run(self): """ @@ -123,15 +223,19 @@ async def run(self): ) as pb: parsed_url = urlparse(self.url) root_dir = os.path.dirname(parsed_url.path) - downloader = self.remote.get_downloader(url=self.url) - result = await downloader.run() + if self.manifest_path: + result_path = self.manifest_path + else: + downloader = self.remote.get_downloader(url=self.url) + result = await downloader.run() + result_path = result.path await pb.aincrement() - metadata_files.append((result.path, self.url.split("/")[-1])) + metadata_files.append((result_path, self.url.split("/")[-1])) async with ProgressReport( message="Parsing Metadata Lines", code="sync.parsing.metadata" ) as pb: - manifest = Manifest(result.path) + manifest = Manifest(result_path) entries = list(manifest.read()) pb.total = len(entries) diff --git a/pulp_file/tests/unit/test_sync_optimization.py b/pulp_file/tests/unit/test_sync_optimization.py new file mode 100644 index 00000000000..a5197ed8ef7 --- /dev/null +++ b/pulp_file/tests/unit/test_sync_optimization.py @@ -0,0 +1,87 @@ +from pulp_file.app.tasks.synchronizing import _should_optimize_sync + + +class TestShouldOptimizeSync: + """Tests for the _should_optimize_sync function.""" + + BASE_DETAILS = { + "remote_pk": "00000000-0000-0000-0000-000000000001", + "url": "http://example.com/PULP_MANIFEST", + "download_policy": "on_demand", + "mirror": False, + "most_recent_version": 1, + "manifest_checksum": "abc123", + } + + def _details(self, **overrides): + d = self.BASE_DETAILS.copy() + d.update(overrides) + return d + + def test_no_previous_sync(self): + """First sync should never be skipped.""" + assert _should_optimize_sync(self._details(), {}) is False + + def test_identical_sync(self): + """Sync with identical details should be skipped.""" + assert _should_optimize_sync(self._details(), self._details()) is True + + def test_manifest_checksum_changed(self): + """Sync should not be skipped if the manifest checksum changed.""" + last = self._details() + current = self._details(manifest_checksum="def456") + assert _should_optimize_sync(current, last) is False + + def test_url_changed(self): + """Sync should not be skipped if the remote URL changed.""" + last = self._details() + current = self._details(url="http://other.com/PULP_MANIFEST") + assert _should_optimize_sync(current, last) is False + + def test_repository_modified(self): + """Sync should not be skipped if the repository was modified since last sync.""" + last = self._details(most_recent_version=1) + current = self._details(most_recent_version=2) + assert _should_optimize_sync(current, last) is False + + def test_download_policy_to_immediate(self): + """Sync should not be skipped when switching from deferred to immediate.""" + last = self._details(download_policy="on_demand") + current = self._details(download_policy="immediate") + assert _should_optimize_sync(current, last) is False + + def test_download_policy_immediate_to_on_demand(self): + """Switching from immediate to on_demand does not require re-sync.""" + last = self._details(download_policy="immediate") + current = self._details(download_policy="on_demand") + assert _should_optimize_sync(current, last) is True + + def test_download_policy_stays_immediate(self): + """Staying on immediate should allow optimization.""" + last = self._details(download_policy="immediate") + current = self._details(download_policy="immediate") + assert _should_optimize_sync(current, last) is True + + def test_mirror_enabled(self): + """Sync should not be skipped when switching to mirror mode.""" + last = self._details(mirror=False) + current = self._details(mirror=True) + assert _should_optimize_sync(current, last) is False + + def test_mirror_disabled(self): + """Switching from mirror to additive does not require re-sync.""" + last = self._details(mirror=True) + current = self._details(mirror=False) + assert _should_optimize_sync(current, last) is True + + def test_mirror_stays_true(self): + """Staying in mirror mode should allow optimization.""" + last = self._details(mirror=True) + current = self._details(mirror=True) + assert _should_optimize_sync(current, last) is True + + def test_remote_pk_changed(self): + """Sync should not be skipped if a different remote is used.""" + last = self._details() + current = self._details(remote_pk="00000000-0000-0000-0000-000000000002") + assert _should_optimize_sync(current, last) is False