Skip to content

Commit e2cff15

Browse files
committed
Add sync optimization support to pulp_file
Whe the manifest hasn't changed relative to the previous sync, we skip the sync, so long as the sync modes are compatible and such. This requires storing a bit of hidden metadata on the repository object.
1 parent 7a4fb51 commit e2cff15

8 files changed

Lines changed: 414 additions & 12 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add backend support for sync-optimization for the file plugin
2+
3+
We can't actually use it yet due to zero-downtime upgrade restrictions
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from django.db import migrations, models
2+
3+
4+
class Migration(migrations.Migration):
5+
6+
dependencies = [
7+
("file", "0019_add_filegitremote"),
8+
]
9+
10+
operations = [
11+
migrations.AddField(
12+
model_name="filerepository",
13+
name="last_sync_details",
14+
field=models.JSONField(default=dict),
15+
),
16+
]

pulp_file/app/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ class FileRepository(Repository, AutoAddObjPermsMixin):
9191

9292
manifest = models.TextField(default="PULP_MANIFEST", null=True)
9393
autopublish = models.BooleanField(default=False)
94+
last_sync_details = models.JSONField(default=dict)
9495

9596
class Meta:
9697
default_related_name = "%(app_label)s_%(model_name)s"

pulp_file/app/serializers.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
PublicationSerializer,
1717
RemoteSerializer,
1818
RepositorySerializer,
19+
RepositorySyncURLSerializer,
1920
SingleArtifactContentUploadSerializer,
2021
)
2122
from pulpcore.plugin.util import get_domain_pk
@@ -139,11 +140,28 @@ class FileRepositorySerializer(RepositorySerializer):
139140
allow_null=True,
140141
)
141142

143+
last_sync_details = serializers.JSONField(
144+
help_text=_("Details about the last sync of this repository."),
145+
read_only=True,
146+
)
147+
142148
class Meta:
143-
fields = RepositorySerializer.Meta.fields + ("autopublish", "manifest")
149+
fields = RepositorySerializer.Meta.fields + ("autopublish", "manifest", "last_sync_details")
144150
model = FileRepository
145151

146152

153+
class FileRepositorySyncURLSerializer(RepositorySyncURLSerializer):
154+
"""
155+
Serializer for File Repository Sync URL.
156+
"""
157+
158+
optimize = serializers.BooleanField(
159+
help_text=_("Whether or not to optimize sync."),
160+
required=False,
161+
default=True,
162+
)
163+
164+
147165
class FileRemoteSerializer(RemoteSerializer):
148166
"""
149167
Serializer for File Remotes.

pulp_file/app/tasks/synchronizing.py

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import re
44
import shutil
55
import tempfile
6+
from hashlib import sha256
67

78
from gettext import gettext as _
89
from urllib.parse import quote, urlparse, urlunparse
@@ -36,7 +37,51 @@
3637
metadata_files = []
3738

3839

39-
def synchronize(remote_pk, repository_pk, mirror, url=None):
40+
def _get_sha256(file_path):
41+
"""Compute the SHA256 hex digest of a file."""
42+
with open(file_path, "rb") as f:
43+
return sha256(f.read()).hexdigest()
44+
45+
46+
def _should_optimize_sync(sync_details, last_sync_details):
47+
"""
48+
Check whether the sync can be skipped by comparing with the previous sync.
49+
50+
Args:
51+
sync_details (dict): Details about the current sync configuration.
52+
last_sync_details (dict): Details about the previous sync configuration.
53+
54+
Returns:
55+
bool: True if sync can be skipped; False otherwise.
56+
57+
"""
58+
if not last_sync_details:
59+
return False
60+
61+
# If switching to immediate download, we may need to download content
62+
if (
63+
last_sync_details.get("download_policy") != "immediate"
64+
and sync_details["download_policy"] == "immediate"
65+
):
66+
return False
67+
68+
# If switching to mirror mode, we need to create a publication
69+
if not last_sync_details.get("mirror") and sync_details["mirror"]:
70+
return False
71+
72+
if last_sync_details.get("url") != sync_details["url"]:
73+
return False
74+
75+
if last_sync_details.get("most_recent_version") != sync_details["most_recent_version"]:
76+
return False
77+
78+
if last_sync_details.get("manifest_checksum") != sync_details["manifest_checksum"]:
79+
return False
80+
81+
return True
82+
83+
84+
def synchronize(remote_pk, repository_pk, mirror, optimize=False, url=None, **kwargs):
4085
"""
4186
Sync content from the remote repository.
4287
@@ -46,6 +91,7 @@ def synchronize(remote_pk, repository_pk, mirror, url=None):
4691
remote_pk (str): The remote PK.
4792
repository_pk (str): The repository PK.
4893
mirror (bool): True for mirror mode, False for additive.
94+
optimize (bool): Whether to skip sync if nothing has changed.
4995
url (str): The url to synchronize. If omitted, the url of the remote is used.
5096
5197
Raises:
@@ -67,10 +113,42 @@ def synchronize(remote_pk, repository_pk, mirror, url=None):
67113
for stage in old_pipeline_stages(new_version)
68114
if not isinstance(stage, (RemoteArtifactSaver))
69115
]
116+
rv = dv.create()
70117
else:
71-
first_stage = FileFirstStage(remote, url)
118+
sync_url = url or remote.url
119+
version = repository.latest_version()
120+
121+
# Download the manifest to compute its checksum for optimization
122+
downloader = remote.get_downloader(url=sync_url)
123+
manifest_result = downloader.fetch()
124+
125+
sync_details = {
126+
"url": remote.url,
127+
"download_policy": remote.policy,
128+
"mirror": mirror,
129+
"most_recent_version": version.number,
130+
"manifest_checksum": _get_sha256(manifest_result.path),
131+
}
132+
133+
if optimize and _should_optimize_sync(sync_details, repository.last_sync_details):
134+
with ProgressReport(
135+
message="Skipping Sync (no change from previous sync)",
136+
code="sync.was_skipped",
137+
) as pb:
138+
pb.total = 1
139+
pb.done = 1
140+
return
141+
142+
first_stage = FileFirstStage(remote, url, manifest_path=manifest_result.path)
72143
dv = DeclarativeVersion(first_stage, repository, mirror=mirror, acs=True)
73-
rv = dv.create()
144+
rv = dv.create()
145+
146+
# Update last_sync_details after sync
147+
if rv:
148+
sync_details["most_recent_version"] = rv.number
149+
repository.last_sync_details = sync_details
150+
repository.save()
151+
74152
if rv and mirror:
75153
# TODO: this is awful, we really should rewrite the DeclarativeVersion API to
76154
# accomodate this use case
@@ -98,18 +176,21 @@ class FileFirstStage(Stage):
98176
The first stage of a pulp_file sync pipeline.
99177
"""
100178

101-
def __init__(self, remote, url):
179+
def __init__(self, remote, url, manifest_path=None):
102180
"""
103181
The first stage of a pulp_file sync pipeline.
104182
105183
Args:
106184
remote (FileRemote): The remote data to be used when syncing
107185
url (str): The base url of custom remote
186+
manifest_path (str): Path to an already-downloaded manifest file. If provided,
187+
the manifest will not be downloaded again.
108188
109189
"""
110190
super().__init__()
111191
self.remote = remote
112192
self.url = url if url else remote.url
193+
self.manifest_path = manifest_path
113194

114195
async def run(self):
115196
"""
@@ -123,15 +204,19 @@ async def run(self):
123204
) as pb:
124205
parsed_url = urlparse(self.url)
125206
root_dir = os.path.dirname(parsed_url.path)
126-
downloader = self.remote.get_downloader(url=self.url)
127-
result = await downloader.run()
207+
if self.manifest_path:
208+
result_path = self.manifest_path
209+
else:
210+
downloader = self.remote.get_downloader(url=self.url)
211+
result = await downloader.run()
212+
result_path = result.path
128213
await pb.aincrement()
129-
metadata_files.append((result.path, self.url.split("/")[-1]))
214+
metadata_files.append((result_path, self.url.split("/")[-1]))
130215

131216
async with ProgressReport(
132217
message="Parsing Metadata Lines", code="sync.parsing.metadata"
133218
) as pb:
134-
manifest = Manifest(result.path)
219+
manifest = Manifest(result_path)
135220
entries = list(manifest.read())
136221

137222
pb.total = len(entries)

pulp_file/app/viewsets.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
)
1717
from pulpcore.plugin.serializers import (
1818
AsyncOperationResponseSerializer,
19-
RepositorySyncURLSerializer,
2019
TaskGroupOperationResponseSerializer,
2120
)
2221
from pulpcore.plugin.tasking import dispatch
@@ -52,6 +51,7 @@
5251
FileGitRemoteSerializer,
5352
FileRemoteSerializer,
5453
FileRepositorySerializer,
54+
FileRepositorySyncURLSerializer,
5555
FilePublicationSerializer,
5656
)
5757

@@ -252,14 +252,14 @@ class FileRepositoryViewSet(RepositoryViewSet, ModifyRepositoryActionMixin, Role
252252
summary="Sync from a remote",
253253
responses={202: AsyncOperationResponseSerializer},
254254
)
255-
@action(detail=True, methods=["post"], serializer_class=RepositorySyncURLSerializer)
255+
@action(detail=True, methods=["post"], serializer_class=FileRepositorySyncURLSerializer)
256256
def sync(self, request, pk):
257257
"""
258258
Synchronizes a repository.
259259
260260
The ``repository`` field has to be provided.
261261
"""
262-
serializer = RepositorySyncURLSerializer(
262+
serializer = FileRepositorySyncURLSerializer(
263263
data=request.data, context={"request": request, "repository_pk": pk}
264264
)
265265
serializer.is_valid(raise_exception=True)
@@ -268,6 +268,7 @@ def sync(self, request, pk):
268268
remote = serializer.validated_data.get("remote", repository.remote)
269269

270270
mirror = serializer.validated_data.get("mirror", False)
271+
optimize = serializer.validated_data.get("optimize", True) # noqa
271272
result = dispatch(
272273
tasks.synchronize,
273274
shared_resources=[remote],
@@ -276,6 +277,7 @@ def sync(self, request, pk):
276277
"remote_pk": str(remote.pk),
277278
"repository_pk": str(repository.pk),
278279
"mirror": mirror,
280+
# "optimize": optimize, # TODO: uncomment in 3.109
279281
},
280282
)
281283
return OperationPostponedResponse(result, request)

0 commit comments

Comments
 (0)