Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 69 additions & 29 deletions capycli/bom/findsources.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,50 @@ def add(self, project: str, version: str, tag: str) -> None:
def filter(self, project: str, version: str, data: Any) -> List[str]:
"""Remove all cached entries from @data."""
if isinstance(data, str):
data = [data]
data = (data,)
elif not isinstance(data, Iterable):
raise ValueError('Expecting an iterable of tags!')
key = self._validate_key((project, version))
return [item for item in data
if item not in self.data.get(key, [])
and len(item) > 0]
stored = self.data.get((project, version), set())
return set(data) - stored - {''}

def filter_and_cache(self, project: str, version: str, data: Any) -> List[str]:
"""Convenience method to to filtering and adding in one run."""
candidates = set(self.filter(project, version, data))
candidates = self.filter(project, version, data)
for tag in candidates:
self.add(project, version, tag)
return list(candidates)
candidates = list(reversed(sorted(candidates, key=len)))
return candidates

_re_prefix_and_sep = re.compile(r"^([^0-9]+)([0-9]+(.)?(.*))?$")
_re_version = re.compile(r"([0-9]+)([^0-9]([0-9]+)([^0-9]([0-9]+))?)?")

def gen_new_prefixes(self, project: str, version: str, data: Any) -> List[str]:
"""Generate new tag prefixes for a project and version."""
candidates = [] # output list
vmaj, _, vmin, _, vmic = self._re_version.search(version).groups()
# generate sensible prefixes here and add them to candidates
for d in data:
# extract the longest prefix w/o a numerical component
# remember what we believe is the version separator
matches = self._re_prefix_and_sep.search(d)
if matches is None:
LOG.debug(d)
continue
prefix, _, sep, _ = matches.groups()
# now lets render variants, w/ and w/o the prefix
for prefix in set((prefix, '')):
candidates.append(prefix)
# add variants with our desired version added to the prefix
# we use OUR numeric parts, but THEIR version separator
for part in (vmaj, vmin, vmic):
if part is not None:
candidates.append(candidates[-1] + str(part))
candidates.append(candidates[-1] + sep)
candidates.pop() # pop full version with trailing sep
# and a more or less random one off guess
candidates.append(prefix + version[0])
candidates.append(prefix + version[0] + sep)
return self.filter_and_cache(project, version, candidates)

def __init__(self) -> None:
self.verbose: bool = False
Expand Down Expand Up @@ -260,33 +290,43 @@ def get_matching_source_url(self, version: Any, github_ref: str,
# 'name' is a viable index, for instance an error message
tags = []

new_prefixes = self.tag_cache.filter_and_cache(
repo['full_name'], version, # cache key
[self.version_regex.split(tag['name'], 1)[0]
for tag in tags
if self.version_regex.search(tag['name']) is not None])
def get_label(tag):
return tag.get('ref', tag.get('name', ''))

def get_url(tag):
return tag.get('url', tag.get('zipball_url', ''))

new_prefixes = self.tag_cache.gen_new_prefixes(
repo['full_name'], version, # <-- cache key
[get_label(tag) for tag in tags # <-- filtered list of tags
if self.version_regex.search(get_label(tag)) is not None])

for prefix in new_prefixes:
url = git_refs_url_tpl.format(sha=f'/tags/{prefix}')
w_prefix = GitHubSupport.github_request(url, self.github_name,
self.github_token)
if isinstance(w_prefix, dict): # exact match
w_prefix = [w_prefix]

# ORDER BY tag-name-length DESC
# Note: it may happen that the GithHubSupport.github_request
# returns items without 'ref'
by_size = sorted([(len(tag.get('ref', '')), tag) for tag in w_prefix],
key=lambda x: x[0])
w_prefix = [itm[1] for itm in reversed(by_size)]

transformed_for_get_matching_tags = [
{'name': tag.get('ref', '').replace('refs/tags/', '', 1),
'zipball_url': tag['url'].replace(
result = GitHubSupport.github_request(
url, self.github_name, self.github_token)
if isinstance(result, list):
# ORDER BY tag-name-length DESC
result = list(
reversed(
sorted(
[tag for tag in result
if len(get_label(tag)) > 0],
key=lambda x: len(x['ref'])
)
)
)
elif isinstance(result, dict) and len(get_label(result)) > 0:
result = [result] # exact match
else: # GitHub used one of its endless possibilities to fail
continue
_transformed = [{
'name': get_label(tag).replace('refs/tags/', '', 1),
'zipball_url': get_url(tag).replace(
'/git/refs/tags/', '/zipball/refs/tags/', 1),
} for tag in w_prefix]
} for tag in result]
source_url = self.get_matching_tag(
transformed_for_get_matching_tags, version, tags_url)
_transformed, version, tags_url)
if len(source_url) > 0: # we found what we believe is
return source_url # the correct source_url
try:
Expand Down
Loading