diff --git a/CHANGES/pulp_file/+sync-optimization.misc b/CHANGES/pulp_file/+sync-optimization.misc new file mode 100644 index 00000000000..6c788f0abe6 --- /dev/null +++ b/CHANGES/pulp_file/+sync-optimization.misc @@ -0,0 +1,3 @@ +Add backend support for sync-optimization for the file plugin + +We can't actually use it yet due to zero-downtime upgrade restrictions diff --git a/pulp_file/app/migrations/0020_filerepository_last_sync_details.py b/pulp_file/app/migrations/0020_filerepository_last_sync_details.py new file mode 100644 index 00000000000..2e38bcc97f5 --- /dev/null +++ b/pulp_file/app/migrations/0020_filerepository_last_sync_details.py @@ -0,0 +1,16 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("file", "0019_add_filegitremote"), + ] + + operations = [ + migrations.AddField( + model_name="filerepository", + name="last_sync_details", + field=models.JSONField(default=dict), + ), + ] diff --git a/pulp_file/app/models.py b/pulp_file/app/models.py index 074cd5d16bf..6c04ced79c1 100644 --- a/pulp_file/app/models.py +++ b/pulp_file/app/models.py @@ -91,6 +91,7 @@ class FileRepository(Repository, AutoAddObjPermsMixin): manifest = models.TextField(default="PULP_MANIFEST", null=True) autopublish = models.BooleanField(default=False) + last_sync_details = models.JSONField(default=dict) class Meta: default_related_name = "%(app_label)s_%(model_name)s" diff --git a/pulp_file/app/settings.py b/pulp_file/app/settings.py new file mode 100644 index 00000000000..93ef8b1b944 --- /dev/null +++ b/pulp_file/app/settings.py @@ -0,0 +1 @@ +FILE_SYNC_OPTIMIZATION = True diff --git a/pulp_file/app/tasks/synchronizing.py b/pulp_file/app/tasks/synchronizing.py index 1a8210ba09d..091252b01b2 100644 --- a/pulp_file/app/tasks/synchronizing.py +++ b/pulp_file/app/tasks/synchronizing.py @@ -3,6 +3,7 @@ import re import shutil import tempfile +from hashlib import sha256 from gettext import gettext as _ from urllib.parse import quote, urlparse, urlunparse @@ -10,6 +11,7 @@ import git as gitpython from gitdb.exc import BadName, BadObject from django.core.files import File +from django.conf import settings from pulpcore.plugin.exceptions import SyncError from pulpcore.plugin.models import Artifact, ProgressReport, Remote, PublishedMetadata @@ -36,7 +38,64 @@ metadata_files = [] -def synchronize(remote_pk, repository_pk, mirror, url=None): +def _get_sha256(file_path): + """Compute the SHA256 hex digest of a file.""" + with open(file_path, "rb") as f: + return sha256(f.read()).hexdigest() + + +def _should_optimize_sync(sync_details, last_sync_details, version=None): + """ + Check whether the sync can be skipped by comparing with the previous sync. + + Args: + sync_details (dict): Details about the current sync configuration. + last_sync_details (dict): Details about the previous sync configuration. + version: The current latest RepositoryVersion, used for artifact checks. + + Returns: + bool: True if sync can be skipped; False otherwise. + + """ + if not last_sync_details: + return False + + # If switching to immediate download, we may need to download content + if ( + last_sync_details.get("download_policy") != "immediate" + and sync_details["download_policy"] == "immediate" + ): + return False + + # If switching to mirror mode, we need to create a publication + if not last_sync_details.get("mirror") and sync_details["mirror"]: + return False + + if last_sync_details.get("remote_pk") != sync_details["remote_pk"]: + return False + + if last_sync_details.get("url") != sync_details["url"]: + return False + + if last_sync_details.get("most_recent_version") != sync_details["most_recent_version"]: + return False + + if last_sync_details.get("manifest_checksum") != sync_details["manifest_checksum"]: + return False + + # If immediate policy, check if any content is missing artifacts (e.g. after reclaim) + if sync_details["download_policy"] == "immediate" and version: + from pulpcore.plugin.models import ContentArtifact + + if ContentArtifact.objects.filter( + content__in=version.content, artifact__isnull=True + ).exists(): + return False + + return True + + +def synchronize(remote_pk, repository_pk, mirror, optimize=False, url=None, **kwargs): """ Sync content from the remote repository. @@ -46,6 +105,7 @@ def synchronize(remote_pk, repository_pk, mirror, url=None): remote_pk (str): The remote PK. repository_pk (str): The repository PK. mirror (bool): True for mirror mode, False for additive. + optimize (bool): Whether to skip sync if nothing has changed. url (str): The url to synchronize. If omitted, the url of the remote is used. Raises: @@ -55,6 +115,8 @@ def synchronize(remote_pk, repository_pk, mirror, url=None): remote = Remote.objects.get(pk=remote_pk).cast() repository = FileRepository.objects.get(pk=repository_pk) + optimize = optimize or settings.FILE_SYNC_OPTIMIZATION + if not remote.url: raise ValueError(_("A remote must have a url specified to synchronize.")) @@ -67,10 +129,45 @@ def synchronize(remote_pk, repository_pk, mirror, url=None): for stage in old_pipeline_stages(new_version) if not isinstance(stage, (RemoteArtifactSaver)) ] + rv = dv.create() else: - first_stage = FileFirstStage(remote, url) + sync_url = url or remote.url + version = repository.latest_version() + + # Download the manifest to compute its checksum for optimization + downloader = remote.get_downloader(url=sync_url) + manifest_result = downloader.fetch() + + sync_details = { + "remote_pk": str(remote.pk), + "url": remote.url, + "download_policy": remote.policy, + "mirror": mirror, + "most_recent_version": version.number, + "manifest_checksum": _get_sha256(manifest_result.path), + } + + if optimize and _should_optimize_sync( + sync_details, repository.last_sync_details, version=version + ): + with ProgressReport( + message="Skipping Sync (no change from previous sync)", + code="sync.was_skipped", + ) as pb: + pb.total = 1 + pb.done = 1 + return + + first_stage = FileFirstStage(remote, url, manifest_path=manifest_result.path) dv = DeclarativeVersion(first_stage, repository, mirror=mirror, acs=True) - rv = dv.create() + rv = dv.create() + + # Update last_sync_details after sync + if rv: + sync_details["most_recent_version"] = rv.number + repository.last_sync_details = sync_details + repository.save() + if rv and mirror: # TODO: this is awful, we really should rewrite the DeclarativeVersion API to # accomodate this use case @@ -98,18 +195,21 @@ class FileFirstStage(Stage): The first stage of a pulp_file sync pipeline. """ - def __init__(self, remote, url): + def __init__(self, remote, url, manifest_path=None): """ The first stage of a pulp_file sync pipeline. Args: remote (FileRemote): The remote data to be used when syncing url (str): The base url of custom remote + manifest_path (str): Path to an already-downloaded manifest file. If provided, + the manifest will not be downloaded again. """ super().__init__() self.remote = remote self.url = url if url else remote.url + self.manifest_path = manifest_path async def run(self): """ @@ -123,15 +223,19 @@ async def run(self): ) as pb: parsed_url = urlparse(self.url) root_dir = os.path.dirname(parsed_url.path) - downloader = self.remote.get_downloader(url=self.url) - result = await downloader.run() + if self.manifest_path: + result_path = self.manifest_path + else: + downloader = self.remote.get_downloader(url=self.url) + result = await downloader.run() + result_path = result.path await pb.aincrement() - metadata_files.append((result.path, self.url.split("/")[-1])) + metadata_files.append((result_path, self.url.split("/")[-1])) async with ProgressReport( message="Parsing Metadata Lines", code="sync.parsing.metadata" ) as pb: - manifest = Manifest(result.path) + manifest = Manifest(result_path) entries = list(manifest.read()) pb.total = len(entries) diff --git a/pulp_file/tests/unit/test_sync_optimization.py b/pulp_file/tests/unit/test_sync_optimization.py new file mode 100644 index 00000000000..a5197ed8ef7 --- /dev/null +++ b/pulp_file/tests/unit/test_sync_optimization.py @@ -0,0 +1,87 @@ +from pulp_file.app.tasks.synchronizing import _should_optimize_sync + + +class TestShouldOptimizeSync: + """Tests for the _should_optimize_sync function.""" + + BASE_DETAILS = { + "remote_pk": "00000000-0000-0000-0000-000000000001", + "url": "http://example.com/PULP_MANIFEST", + "download_policy": "on_demand", + "mirror": False, + "most_recent_version": 1, + "manifest_checksum": "abc123", + } + + def _details(self, **overrides): + d = self.BASE_DETAILS.copy() + d.update(overrides) + return d + + def test_no_previous_sync(self): + """First sync should never be skipped.""" + assert _should_optimize_sync(self._details(), {}) is False + + def test_identical_sync(self): + """Sync with identical details should be skipped.""" + assert _should_optimize_sync(self._details(), self._details()) is True + + def test_manifest_checksum_changed(self): + """Sync should not be skipped if the manifest checksum changed.""" + last = self._details() + current = self._details(manifest_checksum="def456") + assert _should_optimize_sync(current, last) is False + + def test_url_changed(self): + """Sync should not be skipped if the remote URL changed.""" + last = self._details() + current = self._details(url="http://other.com/PULP_MANIFEST") + assert _should_optimize_sync(current, last) is False + + def test_repository_modified(self): + """Sync should not be skipped if the repository was modified since last sync.""" + last = self._details(most_recent_version=1) + current = self._details(most_recent_version=2) + assert _should_optimize_sync(current, last) is False + + def test_download_policy_to_immediate(self): + """Sync should not be skipped when switching from deferred to immediate.""" + last = self._details(download_policy="on_demand") + current = self._details(download_policy="immediate") + assert _should_optimize_sync(current, last) is False + + def test_download_policy_immediate_to_on_demand(self): + """Switching from immediate to on_demand does not require re-sync.""" + last = self._details(download_policy="immediate") + current = self._details(download_policy="on_demand") + assert _should_optimize_sync(current, last) is True + + def test_download_policy_stays_immediate(self): + """Staying on immediate should allow optimization.""" + last = self._details(download_policy="immediate") + current = self._details(download_policy="immediate") + assert _should_optimize_sync(current, last) is True + + def test_mirror_enabled(self): + """Sync should not be skipped when switching to mirror mode.""" + last = self._details(mirror=False) + current = self._details(mirror=True) + assert _should_optimize_sync(current, last) is False + + def test_mirror_disabled(self): + """Switching from mirror to additive does not require re-sync.""" + last = self._details(mirror=True) + current = self._details(mirror=False) + assert _should_optimize_sync(current, last) is True + + def test_mirror_stays_true(self): + """Staying in mirror mode should allow optimization.""" + last = self._details(mirror=True) + current = self._details(mirror=True) + assert _should_optimize_sync(current, last) is True + + def test_remote_pk_changed(self): + """Sync should not be skipped if a different remote is used.""" + last = self._details() + current = self._details(remote_pk="00000000-0000-0000-0000-000000000002") + assert _should_optimize_sync(current, last) is False