Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/pulp_file/+sync-optimization.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add backend support for sync-optimization for the file plugin

We can't actually use it yet due to zero-downtime upgrade restrictions
16 changes: 16 additions & 0 deletions pulp_file/app/migrations/0020_filerepository_last_sync_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("file", "0019_add_filegitremote"),
]

operations = [
migrations.AddField(
model_name="filerepository",
name="last_sync_details",
field=models.JSONField(default=dict),
),
]
1 change: 1 addition & 0 deletions pulp_file/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class FileRepository(Repository, AutoAddObjPermsMixin):

manifest = models.TextField(default="PULP_MANIFEST", null=True)
autopublish = models.BooleanField(default=False)
last_sync_details = models.JSONField(default=dict)

class Meta:
default_related_name = "%(app_label)s_%(model_name)s"
Expand Down
1 change: 1 addition & 0 deletions pulp_file/app/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
FILE_SYNC_OPTIMIZATION = True
120 changes: 112 additions & 8 deletions pulp_file/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import re
import shutil
import tempfile
from hashlib import sha256

from gettext import gettext as _
from urllib.parse import quote, urlparse, urlunparse

import git as gitpython
from gitdb.exc import BadName, BadObject
from django.core.files import File
from django.conf import settings

from pulpcore.plugin.exceptions import SyncError
from pulpcore.plugin.models import Artifact, ProgressReport, Remote, PublishedMetadata
Expand All @@ -36,7 +38,64 @@
metadata_files = []


def synchronize(remote_pk, repository_pk, mirror, url=None):
def _get_sha256(file_path):
"""Compute the SHA256 hex digest of a file."""
with open(file_path, "rb") as f:
return sha256(f.read()).hexdigest()


def _should_optimize_sync(sync_details, last_sync_details, version=None):
"""
Check whether the sync can be skipped by comparing with the previous sync.

Args:
sync_details (dict): Details about the current sync configuration.
last_sync_details (dict): Details about the previous sync configuration.
version: The current latest RepositoryVersion, used for artifact checks.

Returns:
bool: True if sync can be skipped; False otherwise.

"""
if not last_sync_details:
return False

# If switching to immediate download, we may need to download content
if (
last_sync_details.get("download_policy") != "immediate"
and sync_details["download_policy"] == "immediate"
):
return False

# If switching to mirror mode, we need to create a publication
if not last_sync_details.get("mirror") and sync_details["mirror"]:
return False

if last_sync_details.get("remote_pk") != sync_details["remote_pk"]:
return False

if last_sync_details.get("url") != sync_details["url"]:
return False

if last_sync_details.get("most_recent_version") != sync_details["most_recent_version"]:
return False

if last_sync_details.get("manifest_checksum") != sync_details["manifest_checksum"]:
return False

# If immediate policy, check if any content is missing artifacts (e.g. after reclaim)
if sync_details["download_policy"] == "immediate" and version:
from pulpcore.plugin.models import ContentArtifact

if ContentArtifact.objects.filter(
content__in=version.content, artifact__isnull=True
).exists():
return False

return True


def synchronize(remote_pk, repository_pk, mirror, optimize=False, url=None, **kwargs):
"""
Sync content from the remote repository.

Expand All @@ -46,6 +105,7 @@ def synchronize(remote_pk, repository_pk, mirror, url=None):
remote_pk (str): The remote PK.
repository_pk (str): The repository PK.
mirror (bool): True for mirror mode, False for additive.
optimize (bool): Whether to skip sync if nothing has changed.
url (str): The url to synchronize. If omitted, the url of the remote is used.

Raises:
Expand All @@ -55,6 +115,8 @@ def synchronize(remote_pk, repository_pk, mirror, url=None):
remote = Remote.objects.get(pk=remote_pk).cast()
repository = FileRepository.objects.get(pk=repository_pk)

optimize = optimize or settings.FILE_SYNC_OPTIMIZATION

if not remote.url:
raise ValueError(_("A remote must have a url specified to synchronize."))

Expand All @@ -67,10 +129,45 @@ def synchronize(remote_pk, repository_pk, mirror, url=None):
for stage in old_pipeline_stages(new_version)
if not isinstance(stage, (RemoteArtifactSaver))
]
rv = dv.create()
else:
first_stage = FileFirstStage(remote, url)
sync_url = url or remote.url
version = repository.latest_version()

# Download the manifest to compute its checksum for optimization
downloader = remote.get_downloader(url=sync_url)
manifest_result = downloader.fetch()

sync_details = {
"remote_pk": str(remote.pk),
"url": remote.url,
"download_policy": remote.policy,
"mirror": mirror,
"most_recent_version": version.number,
"manifest_checksum": _get_sha256(manifest_result.path),
}

if optimize and _should_optimize_sync(
sync_details, repository.last_sync_details, version=version
):
with ProgressReport(
message="Skipping Sync (no change from previous sync)",
code="sync.was_skipped",
) as pb:
pb.total = 1
pb.done = 1
return

first_stage = FileFirstStage(remote, url, manifest_path=manifest_result.path)
dv = DeclarativeVersion(first_stage, repository, mirror=mirror, acs=True)
rv = dv.create()
rv = dv.create()

# Update last_sync_details after sync
if rv:
sync_details["most_recent_version"] = rv.number
repository.last_sync_details = sync_details
repository.save()

if rv and mirror:
# TODO: this is awful, we really should rewrite the DeclarativeVersion API to
# accomodate this use case
Expand Down Expand Up @@ -98,18 +195,21 @@ class FileFirstStage(Stage):
The first stage of a pulp_file sync pipeline.
"""

def __init__(self, remote, url):
def __init__(self, remote, url, manifest_path=None):
"""
The first stage of a pulp_file sync pipeline.

Args:
remote (FileRemote): The remote data to be used when syncing
url (str): The base url of custom remote
manifest_path (str): Path to an already-downloaded manifest file. If provided,
the manifest will not be downloaded again.

"""
super().__init__()
self.remote = remote
self.url = url if url else remote.url
self.manifest_path = manifest_path

async def run(self):
"""
Expand All @@ -123,15 +223,19 @@ async def run(self):
) as pb:
parsed_url = urlparse(self.url)
root_dir = os.path.dirname(parsed_url.path)
downloader = self.remote.get_downloader(url=self.url)
result = await downloader.run()
if self.manifest_path:
result_path = self.manifest_path
else:
downloader = self.remote.get_downloader(url=self.url)
result = await downloader.run()
result_path = result.path
await pb.aincrement()
metadata_files.append((result.path, self.url.split("/")[-1]))
metadata_files.append((result_path, self.url.split("/")[-1]))

async with ProgressReport(
message="Parsing Metadata Lines", code="sync.parsing.metadata"
) as pb:
manifest = Manifest(result.path)
manifest = Manifest(result_path)
entries = list(manifest.read())

pb.total = len(entries)
Expand Down
87 changes: 87 additions & 0 deletions pulp_file/tests/unit/test_sync_optimization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from pulp_file.app.tasks.synchronizing import _should_optimize_sync


class TestShouldOptimizeSync:
"""Tests for the _should_optimize_sync function."""

BASE_DETAILS = {
"remote_pk": "00000000-0000-0000-0000-000000000001",
"url": "http://example.com/PULP_MANIFEST",
"download_policy": "on_demand",
"mirror": False,
"most_recent_version": 1,
"manifest_checksum": "abc123",
}

def _details(self, **overrides):
d = self.BASE_DETAILS.copy()
d.update(overrides)
return d

def test_no_previous_sync(self):
"""First sync should never be skipped."""
assert _should_optimize_sync(self._details(), {}) is False

def test_identical_sync(self):
"""Sync with identical details should be skipped."""
assert _should_optimize_sync(self._details(), self._details()) is True

def test_manifest_checksum_changed(self):
"""Sync should not be skipped if the manifest checksum changed."""
last = self._details()
current = self._details(manifest_checksum="def456")
assert _should_optimize_sync(current, last) is False

def test_url_changed(self):
"""Sync should not be skipped if the remote URL changed."""
last = self._details()
current = self._details(url="http://other.com/PULP_MANIFEST")
assert _should_optimize_sync(current, last) is False

def test_repository_modified(self):
"""Sync should not be skipped if the repository was modified since last sync."""
last = self._details(most_recent_version=1)
current = self._details(most_recent_version=2)
assert _should_optimize_sync(current, last) is False

def test_download_policy_to_immediate(self):
"""Sync should not be skipped when switching from deferred to immediate."""
last = self._details(download_policy="on_demand")
current = self._details(download_policy="immediate")
assert _should_optimize_sync(current, last) is False

def test_download_policy_immediate_to_on_demand(self):
"""Switching from immediate to on_demand does not require re-sync."""
last = self._details(download_policy="immediate")
current = self._details(download_policy="on_demand")
assert _should_optimize_sync(current, last) is True

def test_download_policy_stays_immediate(self):
"""Staying on immediate should allow optimization."""
last = self._details(download_policy="immediate")
current = self._details(download_policy="immediate")
assert _should_optimize_sync(current, last) is True

def test_mirror_enabled(self):
"""Sync should not be skipped when switching to mirror mode."""
last = self._details(mirror=False)
current = self._details(mirror=True)
assert _should_optimize_sync(current, last) is False

def test_mirror_disabled(self):
"""Switching from mirror to additive does not require re-sync."""
last = self._details(mirror=True)
current = self._details(mirror=False)
assert _should_optimize_sync(current, last) is True

def test_mirror_stays_true(self):
"""Staying in mirror mode should allow optimization."""
last = self._details(mirror=True)
current = self._details(mirror=True)
assert _should_optimize_sync(current, last) is True

def test_remote_pk_changed(self):
"""Sync should not be skipped if a different remote is used."""
last = self._details()
current = self._details(remote_pk="00000000-0000-0000-0000-000000000002")
assert _should_optimize_sync(current, last) is False
Loading