From 4d6f2a7fb374f5b5df9663ed20febbb5f95f1018 Mon Sep 17 00:00:00 2001 From: Teodor Voicu Date: Fri, 24 Apr 2026 18:36:15 +0300 Subject: [PATCH] Add grouped geographic coverage helper --- eea/geolocation/grouping.py | 157 +++++++++++++++++++++++++ eea/geolocation/tests/test_grouping.py | 142 ++++++++++++++++++++++ 2 files changed, 299 insertions(+) create mode 100644 eea/geolocation/grouping.py create mode 100644 eea/geolocation/tests/test_grouping.py diff --git a/eea/geolocation/grouping.py b/eea/geolocation/grouping.py new file mode 100644 index 0000000..0df945d --- /dev/null +++ b/eea/geolocation/grouping.py @@ -0,0 +1,157 @@ +"""Helpers for derived geographic coverage grouping.""" + +from collective.taxonomy.interfaces import ITaxonomy +from plone.i18n.normalizer.interfaces import IIDNormalizer +from zope.component import getUtility +from zope.component import queryUtility + +GEOTAGS_TAXONOMY = "eea.geolocation.geotags.taxonomy" +COUNTRIES_MAPPING_TAXONOMY = "eea.geolocation.countries_mapping.taxonomy" + + +def taxonomy_utility_name(name): + """Return the generated collective.taxonomy utility name.""" + normalizer = queryUtility(IIDNormalizer) or getUtility(IIDNormalizer) + normalized_name = normalizer.normalize(name).replace("-", "") + return "collective.taxonomy." + normalized_name + + +def get_taxonomy_vocabulary(name, context=None, language="en"): + """Return taxonomy vocabulary for a taxonomy name.""" + try: + utility_name = taxonomy_utility_name(name) + except Exception: + return None + + taxonomy = queryUtility(ITaxonomy, name=utility_name) + if taxonomy is None: + return None + + try: + return taxonomy(context) + except Exception: + return taxonomy.makeVocabulary(language) + + +def get_geotags(context=None, vocabulary=None): + """Return geotags in the same structure exposed by ``@geodata``.""" + vocabulary = vocabulary or get_taxonomy_vocabulary( + GEOTAGS_TAXONOMY, context=context + ) + if vocabulary is None: + return {} + + geotags = {} + identifier = "placeholderidentifier" + data = {} + country = "" + + for value, _key in vocabulary.iterEntries(): + value = value.encode("latin-1", "ignore").decode("latin-1") + + if identifier not in value: + identifier = value + data = {"title": identifier} + identifier_key = "_".join(value.split(" ")).lower() + geotags[identifier_key] = data + continue + + if "geo" not in value: + country = value.split(identifier)[-1] + continue + + geotag = value.split(country)[-1] + data[geotag] = country + + return geotags + + +def get_country_mappings(context=None): + """Return country label mappings from taxonomy.""" + vocabulary = get_taxonomy_vocabulary(COUNTRIES_MAPPING_TAXONOMY, context=context) + if vocabulary is None: + return {} + + country_mappings = {} + identifier = "placeholderidentifier" + + for value, _key in vocabulary.iterEntries(): + value = value.encode("latin-1", "ignore").decode("latin-1") + + if identifier not in value: + identifier = value + continue + + country = value.split(identifier)[-1] or identifier + country_mappings[country] = identifier + + return country_mappings + + +def grouped_geolocation( + geo_coverage, + context=None, + geotags=None, + country_mappings=None, +): + """Return largest matching geotag group plus ungrouped countries. + + The saved value remains the flat ``geo_coverage["geolocation"]`` list. This + helper derives an additive public-display structure from taxonomy-backed + geotags. + """ + if not isinstance(geo_coverage, dict): + return {"groups": [], "ungrouped": []} + + selected = geo_coverage.get("geolocation") or [] + selected = [item for item in selected if item.get("value")] + selected_values = {item["value"] for item in selected} + selected_labels = {item["value"]: item.get("label") for item in selected} + + if not selected_values: + return {"groups": [], "ungrouped": []} + + geotags = geotags if geotags is not None else get_geotags(context=context) + country_mappings = ( + country_mappings + if country_mappings is not None + else get_country_mappings(context=context) + ) + + matched_groups = [] + for group_value, group_data in geotags.items(): + countries = [] + + for country_value, country_name in group_data.items(): + if country_value == "title": + continue + + countries.append( + { + "value": country_value, + "label": selected_labels.get(country_value) + or country_mappings.get(country_name) + or country_name, + } + ) + + if countries and all( + country["value"] in selected_values for country in countries + ): + matched_groups.append( + { + "value": group_value, + "label": group_data.get("title") or group_value, + "countries": countries, + } + ) + + matched_groups.sort(key=lambda group: (-len(group["countries"]), group["label"])) + groups = matched_groups[:1] + + covered_values = { + country["value"] for group in groups for country in group["countries"] + } + ungrouped = [item for item in selected if item["value"] not in covered_values] + + return {"groups": groups, "ungrouped": ungrouped} diff --git a/eea/geolocation/tests/test_grouping.py b/eea/geolocation/tests/test_grouping.py new file mode 100644 index 0000000..cd10307 --- /dev/null +++ b/eea/geolocation/tests/test_grouping.py @@ -0,0 +1,142 @@ +"""Tests for grouped geographic coverage helpers.""" + +from eea.geolocation.grouping import grouped_geolocation +from eea.geolocation.grouping import get_geotags + +import unittest + + +class GroupedGeolocationTest(unittest.TestCase): + """Test grouped geolocation derivation.""" + + def test_get_geotags_uses_existing_taxonomy_entry_order(self): + class Vocabulary: + def iterEntries(self): + return iter( + [ + ("eea32", "eea32"), + ("eea32Cyprus", "eea32Cyprus"), + ("eea32Cyprusgeo-146669", "geo-146669"), + ("eea32Portugal", "eea32Portugal"), + ("eea32Portugalgeo-2264397", "geo-2264397"), + ] + ) + + self.assertEqual( + get_geotags(vocabulary=Vocabulary()), + { + "eea32": { + "title": "eea32", + "geo-146669": "Cyprus", + "geo-2264397": "Portugal", + } + }, + ) + + def test_returns_largest_group_and_leftovers(self): + geo_coverage = { + "geolocation": [ + {"value": "geo-a", "label": "Austria"}, + {"value": "geo-b", "label": "Belgium"}, + {"value": "geo-c", "label": "Croatia"}, + {"value": "geo-extra", "label": "Kyrgyzstan"}, + ] + } + geotags = { + "large": { + "title": "Large", + "geo-a": "Austria", + "geo-b": "Belgium", + "geo-c": "Croatia", + }, + "small": { + "title": "Small", + "geo-a": "Austria", + "geo-b": "Belgium", + }, + } + + self.assertEqual( + grouped_geolocation( + geo_coverage, + geotags=geotags, + country_mappings={}, + ), + { + "groups": [ + { + "value": "large", + "label": "Large", + "countries": [ + {"value": "geo-a", "label": "Austria"}, + {"value": "geo-b", "label": "Belgium"}, + {"value": "geo-c", "label": "Croatia"}, + ], + } + ], + "ungrouped": [{"value": "geo-extra", "label": "Kyrgyzstan"}], + }, + ) + + def test_returns_no_group_for_partial_match(self): + geo_coverage = { + "geolocation": [ + {"value": "geo-a", "label": "Austria"}, + {"value": "geo-c", "label": "Croatia"}, + ] + } + geotags = { + "large": { + "title": "Large", + "geo-a": "Austria", + "geo-b": "Belgium", + "geo-c": "Croatia", + }, + } + + self.assertEqual( + grouped_geolocation( + geo_coverage, + geotags=geotags, + country_mappings={}, + ), + { + "groups": [], + "ungrouped": [ + {"value": "geo-a", "label": "Austria"}, + {"value": "geo-c", "label": "Croatia"}, + ], + }, + ) + + def test_uses_country_mappings_for_group_country_labels(self): + geo_coverage = {"geolocation": [{"value": "geo-tr", "label": "Turkiye"}]} + geotags = { + "mapped": { + "title": "Mapped", + "geo-tr": "Turkey", + }, + } + + self.assertEqual( + grouped_geolocation( + geo_coverage, + geotags=geotags, + country_mappings={"Turkey": "Turkiye"}, + ), + { + "groups": [ + { + "value": "mapped", + "label": "Mapped", + "countries": [{"value": "geo-tr", "label": "Turkiye"}], + } + ], + "ungrouped": [], + }, + ) + + +def test_suite(): + """Test suite.""" + return unittest.defaultTestLoader.loadTestsFromName(__name__)