-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathclean_dockerhub.py
More file actions
313 lines (260 loc) · 9.73 KB
/
clean_dockerhub.py
File metadata and controls
313 lines (260 loc) · 9.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import asyncio
import re
import time
from dataclasses import dataclass
from datetime import datetime, timedelta
from http import HTTPStatus
from inspect import cleandoc
import aiohttp
from rich.console import Console
from tqdm import tqdm
DOCKERHUB_API = "https://hub.docker.com/v2"
LIMIT_SIMULTANEOUS_CONNECTIONS_TO_SAME_ENDPOINT = 10
"""
The value of this constant will be used for parameter `limit_per_host` during instantiation of class
https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.TCPConnector
"""
PAGE_SIZE = 100
"""
Page size when fetching existing tags from Dockhub Rest API.
According to https://docs.docker.com/reference/api/hub/latest/#tag/repositories/paths/~1v2~1namespaces~1%7Bnamespace%7D~1repositories~1%7Brepository%7D~1tags/get
the maximum value is 100.
"""
@dataclass(frozen=True)
class Context:
docker_repository: str
docker_username: str
docker_password: str
min_age_in_days: int
max_number_tags: int
@property
def repository_url(self):
return f"{DOCKERHUB_API}/repositories/{self.docker_repository}"
class TooManyRequestsError(Exception):
"""Raised when Docker Hub returned HTTP 429 Too Many Requests."""
@dataclass
class Tag:
"""
Represents a specific tag of a docker image on Docker Hub.
"""
name: str
date: datetime
deleted: bool
async def get_jwt_token(
session: aiohttp.ClientSession, username: str, password: str
) -> str:
"""
Log in to Docker Hub and retrieve a JWT token.
"""
login_url = f"{DOCKERHUB_API}/users/login/"
payload = {"username": username, "password": password}
async with session.post(login_url, json=payload) as resp:
resp.raise_for_status()
data = await resp.json()
return data["token"]
def parse_iso_datetime(dt_str: str) -> datetime:
"""
Parse an ISO8601 datetime string (with optional suffix 'Z') into a
timezone-aware datetime.
"""
def remove_timezone(dt_str: str) -> str:
return dt_str[:-1] if dt_str.endswith("Z") else dt_str
def remove_microseconds(dt_str: str) -> str:
"""
datetime.fromisoformat() always expects a fixed number of digits of microseconds,
whereas the ISO string from Dockerhub returns an arbitrary number of digits.
"""
return re.sub(r"\.\d+$", "", dt_str)
normalized = remove_microseconds(remove_timezone(dt_str))
return datetime.fromisoformat(normalized)
async def fetch_old_tags(
session: aiohttp.ClientSession,
context: Context,
threshold: datetime,
token: str,
) -> list[Tag]:
"""
Fetch all tags for a repository on Docker Hub, handling pagination.
Returns a list of `Tag` objects.
"""
tags: list[Tag] = []
page = 1
console = Console()
status_msg = f"Fetching pages for {context.docker_repository}...{{n_pages}} pages."
with console.status(status_msg.format(n_pages=page)) as status:
while True:
url = f"{context.repository_url}/tags?page={page}&page_size={PAGE_SIZE}"
headers = {"Authorization": f"JWT {token}"}
async with session.get(url, headers=headers) as resp:
if resp.status == HTTPStatus.TOO_MANY_REQUESTS.value:
console.log("[bold yellow]Ran into rate limit.")
# If observing a rate limit, then simply process only the tags fetched until now.
break
resp.raise_for_status()
data = await resp.json()
results = data.get("results", [])
if not results:
break
tags.extend(
[
Tag(name=t["name"], date=t["last_updated"], deleted=False)
for t in results
if parse_iso_datetime(t["last_updated"]) < threshold
]
)
if not data.get("next"):
break
page += 1
status.update(status_msg.format(n_pages=page))
if 0 < context.max_number_tags <= len(tags):
break
console.log(f"[bold green]Total tags fetched: {len(tags)}")
return tags
async def delete_tag(
context: Context,
session: aiohttp.ClientSession,
tag: Tag,
token: str,
semaphore: asyncio.Semaphore,
) -> int:
"""
Delete a single tag from the repository.
Raises a TooManyRequestsError if Docker Hub returns HTTP 429.
Otherwise, the status code is returned.
"""
url = f"{context.repository_url}/tags/{tag.name}/"
headers = {"Authorization": f"JWT {token}"}
async with semaphore:
async with session.delete(url, headers=headers) as resp:
if resp.status == HTTPStatus.NO_CONTENT.value:
tag.deleted = True
elif resp.status == HTTPStatus.TOO_MANY_REQUESTS.value:
# Rate limit hit
raise TooManyRequestsError(f"{context.docker_repository}:{tag}")
return resp.status
async def get_old_tags(
context: Context,
token: str,
) -> list[Tag]:
"""
Calculates the time threshold, opens a aiohttp.ClientSession session and then calls `fetch_old_tags`.
"""
threshold = datetime.now() - timedelta(days=context.min_age_in_days)
conn = aiohttp.TCPConnector(
limit_per_host=LIMIT_SIMULTANEOUS_CONNECTIONS_TO_SAME_ENDPOINT
)
async with aiohttp.ClientSession(connector=conn) as session:
old_tags = await fetch_old_tags(session, context, threshold, token)
return old_tags
async def delete_tags(
context: Context,
token: str,
tags_to_delete: list[Tag],
) -> bool:
"""
Delete the specified tags from a repository on Docker Hub. Use multiple HTTP
requests in parallel, see constant PARALLEL_TASKS.
The method creates an aiohttp.ClientSession session and an async delete task
for each tag to delete.
The methods's boolean return value indicates whether a retry is required.
If none of the delete tasks raises a TooManyRequestsError exception then
the method returns needs_retry=False.
In case of any of the delete tasks raising this exception then the method
returns needs_retry=True.
"""
# More parallel tasks will run deletion faster, but have higher risk of getting a 429,
# see https://docs.docker.com/docker-hub/usage/#abuse-rate-limit
# Tests have shown that max deletion rate without running into rate limits is ~650 Tags/min.
PARALLEL_TASKS = 2
sem = asyncio.Semaphore(PARALLEL_TASKS)
needs_retry = False
conn = aiohttp.TCPConnector(
limit_per_host=LIMIT_SIMULTANEOUS_CONNECTIONS_TO_SAME_ENDPOINT
)
async with aiohttp.ClientSession(connector=conn) as session:
try:
tasks = [
asyncio.create_task(delete_tag(context, session, tag, token, sem))
for tag in tags_to_delete
]
status_counter_map: dict[int, int] = {}
needs_retry = False
for delete_coro in tqdm(
asyncio.as_completed(tasks),
total=len(tasks),
desc="Deleting old tags",
):
try:
status = await delete_coro
status_counter_map[status] = status_counter_map.get(status, 0) + 1
except TooManyRequestsError:
# upon 429, cancel all pending tasks
needs_retry = True
for t in tasks:
if not t.done():
t.cancel()
break
finally:
# ensure all tasks are concluded to suppress warnings
await asyncio.gather(*tasks, return_exceptions=True)
print(
cleandoc(f"""
Deletion run completed.
Status counters:
{status_counter_map}
Retry: {needs_retry}"""),
)
return needs_retry
async def _fetch_and_delete_old_tags(context: Context) -> None:
"""
Cleans all tags in a Docker repository which are older than `min_age_in_days` days.
1. Gets the auth token from Docker Hub.
2. Fetches tags from Docker Hub (max. 10.000)
3. Deletes the tags in a loop until all tags are deleted, or only errors != 429 were raised.
"""
# 1) Authenticate
async with aiohttp.ClientSession() as session:
token = await get_jwt_token(
session, context.docker_username, context.docker_password
)
print("Authenticated successfully.")
# 1) Fetch old tags
old_tags = await get_old_tags(context, token)
if not old_tags:
print(
f"Did not find any tag in repo {context.docker_repository} older than {context.min_age_in_days} days."
)
return
# 2) Delete old tags. Repeat until we don't run into a rate limit
while True:
tags_to_delete = [tag for tag in old_tags if not tag.deleted]
retry = await delete_tags(context, token, tags_to_delete)
if retry:
console = Console()
log_msg = "Waiting 1 minute for rate limit cooldown...({seconds}s)"
with console.status(
log_msg.format(seconds=0), spinner="bouncingBall"
) as status:
for i in range(60):
time.sleep(1)
status.update(log_msg.format(seconds=i), spinner="bouncingBall")
else:
break
def fetch_and_delete_old_tags(
docker_repository: str,
docker_username: str,
docker_password: str,
min_age_in_days: int,
max_number_tags: int,
):
asyncio.run(
_fetch_and_delete_old_tags(
Context(
docker_repository,
docker_username,
docker_password,
min_age_in_days,
max_number_tags,
)
)
)