diff --git a/datasources/requests/taiga_api_call.py b/datasources/requests/taiga_api_call.py index e5a433d..937d439 100644 --- a/datasources/requests/taiga_api_call.py +++ b/datasources/requests/taiga_api_call.py @@ -2,27 +2,18 @@ import requests from datetime import datetime, timedelta, timezone from utils.taiga_token.taiga_auth import get_taiga_token -from config.credentials_loader import ( - CredentialsConfigError, - ProjectCredentialsNotFoundError, - resolve, -) - -from config.settings import TAIGA_API_URL +from config.credentials_loader import resolve +from config.settings import TAIGA_API_URL, TAIGA_TOKEN _CACHE = {} # key = (project_id, milestone_id) -> (timestamp, stats) _DETAILS_CACHE = {} # key = (project_id, milestone_id) -> (timestamp, details) _USERSTORY_CACHE = {} # key = (project_id, userstory_id) -> (timestamp, details) +_US_CUSTOM_ATTR_NAMES_CACHE = {} # key = project_id -> (timestamp, {attr_id(str): attr_name}) +_TASK_CUSTOM_ATTR_NAMES_CACHE = {} # key = project_id -> (timestamp, {attr_id(str): attr_name}) TTL = timedelta(minutes=1) # Cache time-to-live, set to 5 minutes. Means that if the same request is made within 5 minutes, it will return the cached result instead of making a new API call. -logger = logging.getLogger(__name__) - -_CACHE = {} # key = (project_id, milestone_id) -> (timestamp, stats) -MILESTONE_TIMEOUT = (3, 8) -TAIGA_LOOKUP_ERRORS = ( - requests.exceptions.RequestException, - CredentialsConfigError, - ProjectCredentialsNotFoundError, -) +log = logging.getLogger(__name__) +logger = log +TAIGA_LOOKUP_ERRORS = (requests.RequestException,) def _empty_stats(): @@ -36,18 +27,122 @@ def _empty_stats(): } - def _build_taiga_headers(prj: str): - """Return the Taiga headers needed for public and private deployments.""" - if "api.taiga.io" in TAIGA_API_URL: + """Return Taiga headers for public, private and SSO deployments.""" + if TAIGA_TOKEN: + return {"Authorization": f"Bearer {TAIGA_TOKEN}"} + + try: user = resolve(prj, "taiga_user") psw = resolve(prj, "taiga_password") - if user and psw: + except KeyError: + log.warning("No Taiga credentials configured for project %s; using anonymous requests.", prj) + return {} + + if user and psw: + try: token = get_taiga_token(user, psw) return {"Authorization": f"Bearer {token}"} + except requests.RequestException as exc: + log.warning("Failed to fetch Taiga token for project %s: %s", prj, exc) + return {} + + log.warning("Incomplete Taiga credentials for project %s; using anonymous requests.", prj) return {} +def _userstory_custom_attribute_names(project_id: str, prj: str): + """Return map of custom attribute id -> name for userstories in a project.""" + if not project_id: + return {} + + key = str(project_id) + now = datetime.now(timezone.utc) + if key in _US_CUSTOM_ATTR_NAMES_CACHE and now - _US_CUSTOM_ATTR_NAMES_CACHE[key][0] < TTL: + return _US_CUSTOM_ATTR_NAMES_CACHE[key][1] + + headers = _build_taiga_headers(prj) + url = f"{TAIGA_API_URL}/userstory-custom-attributes" + try: + r = requests.get(url, params={"project": project_id}, headers=headers, timeout=(1, 5)) + r.raise_for_status() + mapping = {str(item.get("id")): item.get("name") for item in (r.json() or []) if item.get("id") and item.get("name")} + except requests.RequestException as exc: + log.warning("Failed to fetch userstory custom attribute definitions for project %s: %s", project_id, exc) + mapping = {} + + _US_CUSTOM_ATTR_NAMES_CACHE[key] = (now, mapping) + return mapping + + +def _userstory_custom_values(project_id: str, userstory_id: str, prj: str): + """Fetch custom attribute values for a userstory and map IDs to attribute names.""" + if not project_id or not userstory_id: + return {} + + headers = _build_taiga_headers(prj) + url = f"{TAIGA_API_URL}/userstories/custom-attributes-values/{userstory_id}" + try: + r = requests.get(url, params={"project": project_id}, headers=headers, timeout=(1, 5)) + r.raise_for_status() + raw_values = (r.json() or {}).get("attributes_values") or {} + except requests.RequestException as exc: + log.warning("Failed to fetch custom values for userstory %s in project %s: %s", userstory_id, project_id, exc) + return {} + + names = _userstory_custom_attribute_names(project_id, prj) + mapped = {} + for attr_id, value in raw_values.items(): + mapped[names.get(str(attr_id), str(attr_id))] = value + return mapped + + +def _task_custom_attribute_names(project_id: str, prj: str): + """Return map of custom attribute id -> name for tasks in a project.""" + if not project_id: + return {} + + key = str(project_id) + now = datetime.now(timezone.utc) + if key in _TASK_CUSTOM_ATTR_NAMES_CACHE and now - _TASK_CUSTOM_ATTR_NAMES_CACHE[key][0] < TTL: + return _TASK_CUSTOM_ATTR_NAMES_CACHE[key][1] + + headers = _build_taiga_headers(prj) + url = f"{TAIGA_API_URL}/task-custom-attributes" + try: + r = requests.get(url, params={"project": project_id}, headers=headers, timeout=(1, 5)) + r.raise_for_status() + mapping = {str(item.get("id")): item.get("name") for item in (r.json() or []) if item.get("id") and item.get("name")} + except requests.RequestException as exc: + log.warning("Failed to fetch task custom attribute definitions for project %s: %s", project_id, exc) + mapping = {} + + _TASK_CUSTOM_ATTR_NAMES_CACHE[key] = (now, mapping) + return mapping + + +def _task_custom_values(project_id: str, task_id: str, prj: str): + """Fetch custom attribute values for a task and map IDs to attribute names.""" + if not project_id or not task_id: + return {} + + headers = _build_taiga_headers(prj) + url = f"{TAIGA_API_URL}/tasks/custom-attributes-values/{task_id}" + try: + r = requests.get(url, params={"project": project_id}, headers=headers, timeout=(1, 5)) + r.raise_for_status() + raw_values = (r.json() or {}).get("attributes_values") or {} + except requests.RequestException as exc: + log.warning("Failed to fetch custom values for task %s in project %s: %s", task_id, project_id, exc) + return {} + + names = _task_custom_attribute_names(project_id, prj) + mapped = {} + for attr_id, value in raw_values.items(): + mapped[names.get(str(attr_id), str(attr_id))] = value + return mapped + + def milestone_details(project_id: str, milestone_id: str, prj: str): """ Fetches the milestone metadata from Taiga. @@ -94,6 +189,7 @@ def userstory_details(project_id: str, userstory_id: str, prj: str): """ Fetches the userstory metadata from Taiga. Used as a fallback when task payloads do not include the nested userstory state. + Also returns custom_attributes and description for recovery backfill. """ if not project_id or not userstory_id: return {} @@ -120,12 +216,39 @@ def userstory_details(project_id: str, userstory_id: str, prj: str): return {} js = r.json() + custom_values = js.get("custom_attributes_values") or _userstory_custom_values(project_id, userstory_id, prj) details = { "userstory_is_closed": (js.get("status_extra_info") or {}).get("is_closed"), + "custom_attributes_values": custom_values or {}, + "description": js.get("description") or "", } _USERSTORY_CACHE[key] = (now, details) return details +def task_details(project_id: str, task_id: str, prj: str): + """ + Fetches the task metadata from Taiga. + Returns custom_attributes with fallback to dedicated endpoint if empty. + """ + if not project_id or not task_id: + return {} + + headers = _build_taiga_headers(prj) + url = f"{TAIGA_API_URL}/tasks/{task_id}" + try: + r = requests.get(url, params={"project": project_id}, headers=headers, timeout=(1, 5)) + r.raise_for_status() + js = r.json() + except requests.RequestException as exc: + log.warning("Failed to fetch task %s in project %s: %s", task_id, project_id, exc) + return {} + + custom_values = js.get("custom_attributes_values") or _task_custom_values(project_id, task_id, prj) + details = { + "custom_attributes_values": custom_values or {}, + } + return details + def milestone_stats(project_id: str, milestone_id: str, prj: str): """ Fetches the statistics of a milestone in a Taiga project. diff --git a/routes/taiga_routes.py b/routes/taiga_routes.py index e8e5ca5..56de6c9 100644 --- a/routes/taiga_routes.py +++ b/routes/taiga_routes.py @@ -72,6 +72,15 @@ def taiga_webhook(): collection_name, delete_key, id, result.deleted_count ) + # When a user story is deleted, also remove all its associated tasks + if event_type == "userstory": + tasks_coll = get_collection(f"taiga_{prj}.tasks") + task_result = tasks_coll.delete_many({"userstory_id": id}) + logger.info( + "Cascade-deleted %s task(s) linked to userstory_id=%s in taiga_%s.tasks", + task_result.deleted_count, id, prj, + ) + author_login = raw_payload.get("by", {}).get("username", "unknown") logger.info( "Notifying LD_EVAL about deleted event: %s for team with external_id: %s with quality_model: %s", @@ -81,6 +90,9 @@ def taiga_webhook(): ) try: notify_eval_push(event_type, prj, author_login, quality_model) + # Force a task re-eval when tasks were cascade-deleted with the user story + if event_type == "userstory": + notify_eval_push("task", prj, author_login, quality_model) except Exception as e: logger.error("Error notifying LD_EVAL: %s", e) return jsonify({"error": "Failed to notify LD_EVAL"}), 500 diff --git a/tests/test_taiga_routes.py b/tests/test_taiga_routes.py index d0b67d2..15486fd 100644 --- a/tests/test_taiga_routes.py +++ b/tests/test_taiga_routes.py @@ -105,6 +105,43 @@ def test_delete_action(self, mock_verify, mock_coll, mock_notify, client): mock_collection.delete_one.assert_called_once_with({"task_id": 99}) mock_notify.assert_called_once_with("task", "P", "u", None) + @patch("routes.taiga_routes.notify_eval_push") + @patch("routes.taiga_routes.get_collection") + @patch("routes.taiga_routes.verify_taiga_signature", return_value=True) + def test_delete_userstory_cascades_tasks( + self, mock_verify, mock_coll, mock_notify, client + ): + payload = { + "type": "userstory", + "action": "delete", + "data": {"id": 55, "project": {"name": "P"}}, + "by": {"username": "u"}, + } + mock_us_collection = MagicMock() + mock_tasks_collection = MagicMock() + + def side_effect(name): + if name == "taiga_P.userstories": + return mock_us_collection + if name == "taiga_P.tasks": + return mock_tasks_collection + return MagicMock() + + mock_coll.side_effect = side_effect + + resp = client.post( + "/webhook/taiga?prj=P", + data=json.dumps(payload), + content_type="application/json", + headers={"X-TAIGA-WEBHOOK-SIGNATURE": "x"}, + ) + assert resp.status_code == 200 + mock_us_collection.delete_one.assert_called_once_with({"userstory_id": 55}) + mock_tasks_collection.delete_many.assert_called_once_with({"userstory_id": 55}) + assert mock_notify.call_count == 2 + mock_notify.assert_any_call("userstory", "P", "u", None) + mock_notify.assert_any_call("task", "P", "u", None) + @patch("routes.taiga_routes.get_collection") @patch("routes.taiga_routes.verify_taiga_signature", return_value=True) def test_delete_no_id_returns_400(self, mock_verify, mock_coll, client): diff --git a/utils/pattern_detector.py b/utils/pattern_detector.py index d2b180d..9b01ab5 100644 --- a/utils/pattern_detector.py +++ b/utils/pattern_detector.py @@ -9,14 +9,14 @@ class PatternDetector: # Patrones regex compilados para optimización PATTERNS = [ # English - r"\bas\s+[\w\s]+\s+i\s+want\s+[\w\s,.:;!?-]+\s+so\s+that\s+[\w\s,.:;!?-]+", - r"\bas\s+[\w\s]+\s+i\s+want\s+[\w\s,.:;!?-]+\s+to\s+[\w\s,.:;!?-]+", + r"\bas\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+\s+i\s+want\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+\s+so\s+that\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+", + r"\bas\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+\s+i\s+want\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+\s+to\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+", # Spanish - COMO...QUIERO... - r"\bcomo\s+[\w\s]+\s+quiero\s+[\w\s,.:;!?-]+\s+(?:de\s+manera\s+que|de\s+forma\s+que|para|por|porqu[eé]|porque)\s+[\w\s,.:;!?-]+", + r"\bcomo\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+\s+quiero\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+\s+(?:de\s+manera\s+que|de\s+forma\s+que|para|por|porqu[eé]|porque)\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+", # Catalan - COM...VULL... - r"\bcom\s+[\w\s]+\s+vull\s+[\w\s,.:;!?-]+\s+(?:de\s+manera\s+que|de\s+forma\s+que|per|perqu[eè]|perqué)\s+[\w\s,.:;!?-]+", + r"\bcom\s+(?:a\s+)?[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+\s+vull\s+[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+\s+(?:de\s+manera\s+que|de\s+forma\s+que|per\s+a\s+poder|per\s+poder|per\s+tal\s+de|per\s+tal\s+d[’']|per|perqu[eè]|perqué)\s*[\w\s'àáäâäèéëêìíïîòóöôùúüûñçÀÁÄÂÈÉËÊÌÍÏÎÒÓÖÔÙÚÜÛÑÇ’()\/·,.:;!?-]+", ] # Compilar patrones una sola vez diff --git a/utils/recovery/github_recovery.py b/utils/recovery/github_recovery.py index 1f2d8d4..bb861de 100644 --- a/utils/recovery/github_recovery.py +++ b/utils/recovery/github_recovery.py @@ -36,15 +36,17 @@ def get_organization_repos(org: str, headers: Dict[str, str]) -> List[str]: return [repo["name"] for repo in gh_paginated(url, headers)] -def gh_paginated(url: str, headers: Dict[str, str]) -> Iterable[Dict]: +def gh_paginated(url: str, headers: Dict[str, str], params: Optional[Dict[str, str]] = None) -> Iterable[Dict]: """ Gets paginated results from a GitHub API endpoint. With this each call to the API returns a suitable JSON """ while url: - r = requests.get(url, headers=headers, timeout=30) + r = requests.get(url, headers=headers, params=params, timeout=30) r.raise_for_status() yield from r.json() url = r.links.get("next", {}).get("url") + # Link-based pagination already includes query params in next URL. + params = None def upsert(coll, docs: list[dict], key: str) -> int: @@ -81,53 +83,56 @@ def collect_github( f"Bearer {GITHUB_TOKEN}" # Authentication with GitHub API using a token ) - counters = { - "commits": 0, - "issues": 0, - "pull_requests": 0, - } # Counter to display the number of documents inserted of each event type - author_login = "backfill" # The author login is always "backfill" for backfilling - - for ev in events: # Start iterating over the events to collect - - if ev == "commits": # First commits - event_name = "push" # The event name for commits is always "push" - - log_url = f"https://api.github.com/repos/{repo_full}/commits?per_page=100" - if since: - log_url += ( - f"&since={since}" # If a SINCE date is proviaded, add it to the URL - ) - if until: - log_url += ( - f"&until={until}" # If a UNTIL date is proviaded, add it to the URL - ) - - payloads = [] # List to store the payloads of the commits - for c in gh_paginated( - log_url, headers - ): # Iterate over the paginated results of the commits and store them in the payloads list under the schema - payloads.append( - { + counters = {"commits": 0, "issues": 0, "pull_requests": 0} #Counter to display the number of documents inserted of each event type + author_login = "backfill" # The author login is always "backfill" for backfilling + + for ev in events: # Start iterating over the events to collect + + if ev == "commits": #First commits + event_name= "push" # The event name for commits is always "push" + + # Fetch all branches first, then collect commits branch-by-branch. + branches_url = f"https://api.github.com/repos/{repo_full}/branches?per_page=100" + branches = [b.get("name") for b in gh_paginated(branches_url, headers) if isinstance(b, dict) and b.get("name")] + if not branches: + branches = [None] + + payloads = [] #List to store the payloads of the commits + seen_shas = set() # Avoid processing the same commit multiple times across branches + for branch in branches: + log_url = f"https://api.github.com/repos/{repo_full}/commits?per_page=100" + query_params = {} + if branch: + query_params["sha"] = branch + if since: + query_params["since"] = since + if until: + query_params["until"] = until + + # Iterate over paginated commits for each branch. + for c in gh_paginated(log_url, headers, params=query_params): + sha = c.get("sha") + if not sha or sha in seen_shas: + continue + seen_shas.add(sha) + + payloads.append({ "X-GitHub-Event": "push", "repository": {"full_name": repo_full}, "organization": {"login": org}, "sender": c["author"] or {}, - "commits": [ - { - "id": c["sha"], - "url": c["url"], - "message": c["commit"]["message"], - "timestamp": c["commit"]["author"]["date"], - "author": { - "username": (c["author"] or {}).get("login", ""), - "name": c["commit"]["author"]["name"], - "email": c["commit"]["author"]["email"], - }, - } - ], - } - ) + "commits": [{ + "id": sha, + "url": c["url"], + "message": c["commit"]["message"], + "timestamp": c["commit"]["author"]["date"], + "author": { + "username": (c["author"] or {}).get("login", ""), + "name": c["commit"]["author"]["name"], + "email": c["commit"]["author"]["email"], + }, + }], + }) coll = get_collection(f"github_{prj}.commits") # Collection name to store for ( diff --git a/utils/recovery/taiga_recovery.py b/utils/recovery/taiga_recovery.py index 16e1244..d61e0e7 100644 --- a/utils/recovery/taiga_recovery.py +++ b/utils/recovery/taiga_recovery.py @@ -13,10 +13,21 @@ from config.settings import TAIGA_API_URL from utils.pattern_detector import PatternDetector -from datasources.requests.taiga_api_call import milestone_details, milestone_stats, userstory_details +from datasources.requests.taiga_api_call import milestone_details, milestone_stats, userstory_details, task_details setup_logging() logger = logging.getLogger(__name__) +ACCEPTANCE_CRITERIA_KEY = "Acceptance Criteria" + + +def collection_name_for_event(prj: str, event: str) -> str: + if event == "userstory": + return f"taiga_{prj}.userstories" + if event == "task": + return f"taiga_{prj}.tasks" + if event == "epic": + return f"taiga_{prj}.epics" + return f"taiga_{prj}.{event}" def first_non_empty(*values, default=""): @@ -152,19 +163,26 @@ def task_from_api(j: dict, prj: str) -> dict: milestone_stats_data = milestone_stats(project_id, milestone_id, prj) userstory_id = j.get("user_story") us = j.get("user_story_extra_info") or {} - userstory_is_closed = us.get("is_closed") - userstory_info = ( - userstory_details(project_id, userstory_id, prj) - if userstory_id and userstory_is_closed in (None, "") - else {} + userstory_is_closed = first_non_empty( + us.get("is_closed"), + (us.get("status_extra_info") or {}).get("is_closed"), + default=None, ) + userstory_info = {} + if userstory_id and userstory_is_closed is None: + userstory_info = userstory_details(project_id, userstory_id, prj) + task_id = j.get("id") + task_custom_attrs = j.get("custom_attributes_values") or {} + task_info = {} + if task_id and not task_custom_attrs: + task_info = task_details(project_id, task_id, prj) doc = { "task_id": j["id"], "action_type": "import", "assigned_by": "backfill", "assigned_to": (j.get("assigned_to_extra_info") or {}).get("username"), "created_date": j["created_date"], - "custom_attributes": j.get("custom_attributes_values") or {}, + "custom_attributes": task_custom_attrs or task_info.get("custom_attributes_values") or {}, "estimated_finish": first_non_empty(m.get("estimated_finish"), milestone_info.get("estimated_finish")), "estimated_start": first_non_empty(m.get("estimated_start"), milestone_info.get("estimated_start")), "event_type": "task", @@ -246,8 +264,20 @@ def userstory_from_api(j: dict, prj: str) -> dict: milestone_id = j.get("milestone") milestone_info = milestone_details(project_id, milestone_id, prj) milestone_stats_data = milestone_stats(project_id, milestone_id, prj) - desc = j.get("description") or "" + us_info = userstory_details(project_id, j.get("id"), prj) + + # Use custom_attributes from API response, fallback to userstory detail + custom_attrs = j.get("custom_attributes_values") or us_info.get("custom_attributes_values") or {} + + # Normalize Acceptance Criteria: if list, join as string + if isinstance(custom_attrs.get(ACCEPTANCE_CRITERIA_KEY), list): + custom_attrs = dict(custom_attrs) # Make a copy + ac_list = custom_attrs.get(ACCEPTANCE_CRITERIA_KEY, []) + custom_attrs[ACCEPTANCE_CRITERIA_KEY] = " | ".join(str(x) for x in ac_list) if ac_list else "" + + desc = j.get("description") or us_info.get("description") or "" pattern = PatternDetector.detect_pattern(desc) + raw_points = j.get("points") # puede ser list | "" | None if isinstance(raw_points, list): total = sum((p.get("value") or 0) for p in raw_points) @@ -257,9 +287,9 @@ def userstory_from_api(j: dict, prj: str) -> dict: doc = { "userstory_id": j["id"], "action_type": "import", - "assigned_by": "backfill", - "created_date": j["created_date"], - "custom_attributes": j.get("custom_attributes_values") or {}, + "assigned_by": "backfill", + "created_date": j["created_date"], + "custom_attributes": custom_attrs, "estimated_finish": first_non_empty(m.get("estimated_finish"), milestone_info.get("estimated_finish")), "estimated_start": first_non_empty(m.get("estimated_start"), milestone_info.get("estimated_start")), "event_type": "userstory", @@ -271,23 +301,72 @@ def userstory_from_api(j: dict, prj: str) -> dict: "milestone_name": first_non_empty(m.get("name"), milestone_info.get("milestone_name")), "modified_date": j["modified_date"], "pattern": pattern, - "priority": (j.get("custom_attributes_values") or {}).get("Priority"), - "prj": prj, - "status": (j.get("status_extra_info") or {}).get("name"), - "subject": j["subject"], - "team_name": j["project_extra_info"]["name"], - "total_points": total, + "priority": custom_attrs.get("Priority"), + "prj": prj, + "status": (j.get("status_extra_info") or {}).get("name"), + "subject": j["subject"], + "team_name": j["project_extra_info"]["name"], + "total_points": total, } doc.update(milestone_stats_data) return doc ENTITY_ENDPOINT = { - "task": ("tasks", task_from_api, "task_id"), - "issue": ("issues", issue_from_api, "issue_id"), - "epic": ("epics", epic_from_api, "epic_id"), - "userstory": ("userstories", userstory_from_api, "userstory_id"), -} + "task": ("tasks", task_from_api, "task_id"), + "issue": ("issues", issue_from_api, "issue_id"), + "epic": ("epics", epic_from_api, "epic_id"), + "userstory": ("userstories", userstory_from_api, "userstory_id"), + } + + +def sync_deleted_entities( + event: str, + prj: str, + project_id: int, + start: Optional[datetime] = None, + end: Optional[datetime] = None, + raw_api: Optional[List[Dict]] = None, +) -> int: + ''' + Removes from MongoDB documents that no longer exist in the Taiga API. + This prevents "orphaned" tasks/issues/epics/userstories from inflating metrics after they've been deleted. + + Returns the count of deleted documents from MongoDB. + ''' + if event not in ENTITY_ENDPOINT: + return 0 + + _, _, key = ENTITY_ENDPOINT[event] + + # Reuse pre-fetched entities when available to avoid duplicate API calls. + if raw_api is None: + raw_api = fetch_entities(event, project_id, start, end) + api_ids = set(r["id"] for r in raw_api) + + # Get the MongoDB collection + collection_name = f"taiga_{prj}.userstories" if event == "userstory" else f"taiga_{prj}.tasks" if event == "task" else f"taiga_{prj}.epics" if event == "epic" else f"taiga_{prj}.{event}" + coll = get_collection(collection_name) + + # Find all document IDs in MongoDB + mongo_docs = coll.find({}, {key: 1}) + mongo_ids = set(doc[key] for doc in mongo_docs if key in doc) + + # Identify IDs that are in MongoDB but not in the current API response + deleted_ids = mongo_ids - api_ids + + if not deleted_ids: + return 0 + + # Delete them from MongoDB + delete_filter = {key: {"$in": list(deleted_ids)}} + result = coll.delete_many(delete_filter) + + logger.info(f"Removed {result.deleted_count} {event}(s) from MongoDB (no longer in Taiga API)") + return result.deleted_count + + + def main(argv: list[str] | None = None): @@ -345,8 +424,10 @@ def main(argv: list[str] | None = None): ns.slug ) # Get the project ID using the project name and the token info - total = 0 - for event in events: # Iterate over the events to backfill + + total = 0 + total_deleted = 0 + for event in events: # Iterate over the events to backfill endpoint, converter, key = ENTITY_ENDPOINT[event] raw = fetch_entities(event, pid, start, end) # Get the raw data from the Taiga API for the event docs = [converter(r, ns.prj) for r in raw] # Convert the raw data to the MongoDB schema using the converter function @@ -355,23 +436,28 @@ def main(argv: list[str] | None = None): coll = get_collection(collection_name) # Get the MongoDB collection for the event n = upsert(coll, docs, key) # Upsert the documents total += n - logger.info(" • %s → %d documents", event, n) - - # COMMUNICATION WITH LD_EVAL USING API - logger.info( - f"Notifying LD_EVAL about event: {event} for team with external_id: {ns.prj} with quality_model: {ns.quality_model}" - ) + print(f" • {event:<12} → {n:>4} documents") # Print total number of documments + + # Sync deletions: remove entities that no longer exist in the Taiga API + deleted_count = sync_deleted_entities(event, ns.prj, pid, start, end, raw_api=raw) + total_deleted += deleted_count + + #COMMUNICATION WITH LD_EVAL USING API + logger.info(f"Notifying LD_EVAL about event: {event} for team with external_id: {ns.prj} with quality_model: {ns.quality_model}") try: notify_eval_push(event, ns.prj, "backfill", ns.quality_model) except Exception as e: logger.error(f"Error notifying LD_EVAL: {e}") + + + + span = "all time" if not (start or end) else \ + f"from {ns.from_date or '…'} to {ns.to_date or '…'}" + print(f"{total} documents inserted ({span})") + if total_deleted > 0: + print(f"{total_deleted} documents deleted (orphaned, no longer in Taiga API)") + - span = ( - "all time" - if not (start or end) - else f"from {ns.from_date or '…'} to {ns.to_date or '…'}" - ) - logger.info("%d documents inserted (%s)", total, span) if __name__ == "__main__":