diff --git a/.env_test b/.env_test
index a770063d228..478f635c7dd 100644
--- a/.env_test
+++ b/.env_test
@@ -45,6 +45,8 @@ HARVESTING_MONITOR_DELAY=60
SITEURL=http://localhost:8000/
ALLOWED_HOSTS="['django', 'localhost', '127.0.0.1']"
+# allow internal URLs pass validation for tests
+SAFE_URL_CHECK_ENABLED=False
# Data Uploader
TIME_ENABLED=True
diff --git a/geonode/documents/api/serializers.py b/geonode/documents/api/serializers.py
index b0c63875060..b37481f3238 100644
--- a/geonode/documents/api/serializers.py
+++ b/geonode/documents/api/serializers.py
@@ -18,7 +18,6 @@
#########################################################################
import logging
-from dynamic_rest.fields.fields import DynamicComputedField
from rest_framework import serializers
from geonode.base.api.serializers import ResourceBaseSerializer
from geonode.documents.models import Document
@@ -26,20 +25,8 @@
logger = logging.getLogger(__name__)
-class GeonodeFilePathField(DynamicComputedField):
- def get_attribute(self, instance):
- return instance.files
-
-
-class DocumentFieldField(DynamicComputedField):
- def get_attribute(self, instance):
- return instance.files
-
-
class DocumentSerializer(ResourceBaseSerializer):
title = serializers.CharField(required=False)
- file_path = GeonodeFilePathField(required=False, write_only=True)
- doc_file = DocumentFieldField(required=False, write_only=True)
class Meta:
model = Document
@@ -55,9 +42,11 @@ class Meta:
"subtype",
"extension",
"mime_type",
- "file_path",
- "doc_file",
"doc_url",
)
)
)
+ # name and extension determine where the document file lives on disk
+ # and how it is served. POST/PUT are blocked at the http_method_names
+ # layer; making them read-only locks them down on PATCH too.
+ read_only_fields = ("name", "extension")
diff --git a/geonode/documents/api/tests.py b/geonode/documents/api/tests.py
index d8d68967eb3..afbe7acc261 100644
--- a/geonode/documents/api/tests.py
+++ b/geonode/documents/api/tests.py
@@ -25,10 +25,8 @@
from rest_framework.test import APITestCase
from guardian.shortcuts import assign_perm, get_anonymous_user
-from geonode import settings
from geonode.base.populate_test_data import create_models
-from geonode.base.enumerations import SOURCE_TYPE_REMOTE
from geonode.documents.models import Document
logger = logging.getLogger(__name__)
@@ -46,9 +44,6 @@ def setUpTestData(cls):
def setUp(self):
self.admin = get_user_model().objects.get(username="admin")
self.url = reverse("documents-list")
- self.invalid_file_path = f"{settings.PROJECT_ROOT}/tests/data/thesaurus.rdf"
- self.valid_file_path = f"{settings.PROJECT_ROOT}/base/fixtures/test_xml.xml"
- self.no_title_file_path = f"{settings.PROJECT_ROOT}/base/fixtures/test_sld.sld"
def test_documents(self):
"""
@@ -91,47 +86,22 @@ def test_extra_metadata_included_with_param(self):
response = self.client.get(url, format="json")
self.assertNotIn("metadata", response.data["document"])
- def test_creation_return_error_if_file_is_not_passed(self):
+ def test_post_not_allowed(self):
"""
- If file_path is not available, should raise error
+ Document creation via the DRF endpoint is disabled; /documents/upload
+ is the only supported entry point.
"""
self.client.force_login(self.admin)
payload = {"document": {"title": "New document", "metadata_only": True}}
- expected = {
- "success": False,
- "errors": ["A file, file path or URL must be speficied"],
- "code": "document_exception",
- }
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(400, actual.status_code)
- self.assertDictEqual(expected, actual.json())
+ response = self.client.post(self.url, data=payload, format="json")
+ self.assertEqual(405, response.status_code)
- def test_creation_return_error_if_file_is_none(self):
- """
- If file_path is not available, should raise error
- """
- self.client.force_login(self.admin)
- payload = {"document": {"title": "New document", "metadata_only": True, "file_path": None, "doc_file": None}}
- expected = {
- "success": False,
- "errors": ["A file, file path or URL must be speficied"],
- "code": "document_exception",
- }
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(400, actual.status_code)
- self.assertDictEqual(expected, actual.json())
-
- def test_creation_should_rase_exec_for_unsupported_files(self):
+ def test_put_not_allowed(self):
+ document = Document.objects.first()
+ url = urljoin(f"{self.url}/", f"{document.id}")
self.client.force_login(self.admin)
- payload = {"document": {"title": "New document", "metadata_only": True, "file_path": self.invalid_file_path}}
- expected = {
- "success": False,
- "errors": ["The file provided is not in the supported extensions list"],
- "code": "document_exception",
- }
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(400, actual.status_code)
- self.assertDictEqual(expected, actual.json())
+ response = self.client.put(url, data={"title": "x"}, format="json")
+ self.assertEqual(405, response.status_code)
def test_document_listing_advertised(self):
document = Document.objects.first()
@@ -166,32 +136,6 @@ def test_document_listing_advertised(self):
Document.objects.update(advertised=True)
- def test_creation_should_create_the_doc(self):
- """
- If file_path is not available, should raise error
- """
- self.client.force_login(self.admin)
- payload = {
- "document": {"title": "New document for testing", "metadata_only": True, "file_path": self.valid_file_path}
- }
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(201, actual.status_code)
- extension = actual.json().get("document", {}).get("extension", "")
- self.assertEqual("xml", extension)
- self.assertTrue(Document.objects.filter(title="New document for testing").exists())
-
- def test_uploading_doc_without_title(self):
- """
- A document should be uploaded without specifying a title
- """
- self.client.force_login(self.admin)
- payload = {"document": {"metadata_only": True, "file_path": self.no_title_file_path}}
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(201, actual.status_code)
- extension = actual.json().get("document", {}).get("extension", "")
- self.assertEqual("sld", extension)
- self.assertTrue(Document.objects.filter(title="test_sld.sld").exists())
-
def test_patch_point_of_contact(self):
document = Document.objects.first()
url = urljoin(f"{reverse('documents-list')}/", f"{document.id}")
@@ -494,34 +438,6 @@ def test_patch_principal_investigator(self):
)
)
- def test_creation_should_create_the_doc_and_update_the_bbox(self):
- """
- If file_path is not available, should raise error
- """
- self.client.force_login(self.admin)
- payload = {
- "document": {
- "title": "New document for testing",
- "metadata_only": True,
- "file_path": self.valid_file_path,
- "extent": {"coords": [1123692.0, 5338214.0, 1339852.0, 5482615.0], "srid": "EPSG:3857"},
- },
- }
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(201, actual.status_code)
- extension = actual.json().get("document", {}).get("extension", "")
- self.assertEqual("xml", extension)
- doc = Document.objects.filter(title="New document for testing").all()
- self.assertTrue(doc.exists())
- x = doc.first()
- x.refresh_from_db()
- self.assertEqual("EPSG:3857", x.srid)
- self.assertEqual(actual.json()["document"].get("extent")["srid"], "EPSG:4326")
- self.assertEqual(
- actual.json()["document"].get("extent")["coords"],
- [10.094296982428332, 43.1721654049465, 12.03609530058109, 44.11086592050112],
- )
-
def test_file_path_and_doc_path_are_not_returned(self):
"""
If file_path and doc_path should not be visible
@@ -533,62 +449,28 @@ def test_file_path_and_doc_path_are_not_returned(self):
self.assertFalse("file_path" in _doc_payload)
self.assertFalse("doc_path" in _doc_payload)
- def test_creation_from_url_should_create_the_doc(self):
+ def test_patch_cannot_mutate_file_location_fields(self):
"""
- If file_path is not available, should raise error
+ PATCH must not let a caller rewrite the document's on-disk location.
+ name, extension and files are locked down by DocumentSerializer.update().
"""
- self.client.force_login(self.admin)
- doc_url = "https://example.com/image"
- payload = {
- "document": {
- "title": "New document from URL for testing",
- "metadata_only": False,
- "doc_url": doc_url,
- "extension": "jpeg",
- }
- }
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(201, actual.status_code)
- created_doc_url = actual.json().get("document", {}).get("doc_url", "")
- self.assertEqual(created_doc_url, doc_url)
+ document = Document.objects.first()
+ original_name = document.name
+ original_extension = document.extension
+ original_files = list(document.files or [])
- def test_remote_document_is_marked_remote(self):
- """Tests creating an external document set its sourcetype to REMOTE."""
+ url = urljoin(f"{self.url}/", f"{document.id}")
self.client.force_login(self.admin)
- doc_url = "https://example.com/image"
- payload = {
- "document": {
- "title": "A remote document is remote",
- "doc_url": doc_url,
- "extension": "jpeg",
- }
- }
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(201, actual.status_code)
- created_sourcetype = actual.json().get("document", {}).get("sourcetype", "")
- self.assertEqual(created_sourcetype, SOURCE_TYPE_REMOTE)
- def test_either_path_or_url_doc(self):
- """
- If file_path is not available, should raise error
- """
- self.client.force_login(self.admin)
- doc_url = "https://example.com/image"
- payload = {
- "document": {
- "title": "New document from URL for testing",
- "metadata_only": False,
- "doc_url": doc_url,
- "file_path": self.valid_file_path,
- "extension": "jpeg",
- }
+ patch_data = {
+ "name": "pwned",
+ "extension": "exe",
+ "files": ["/etc/passwd"],
}
- actual = self.client.post(self.url, data=payload, format="json")
- expected = {
- "success": False,
- "errors": ["Either a file or a URL must be specified, not both"],
- "code": "document_exception",
- }
- actual = self.client.post(self.url, data=payload, format="json")
- self.assertEqual(400, actual.status_code)
- self.assertDictEqual(expected, actual.json())
+ response = self.client.patch(url, data=patch_data, format="json")
+ self.assertEqual(200, response.status_code)
+
+ document.refresh_from_db()
+ self.assertEqual(document.name, original_name)
+ self.assertEqual(document.extension, original_extension)
+ self.assertEqual(list(document.files or []), original_files)
diff --git a/geonode/documents/api/views.py b/geonode/documents/api/views.py
index 7e11cd5c5b5..bed2099d548 100644
--- a/geonode/documents/api/views.py
+++ b/geonode/documents/api/views.py
@@ -24,7 +24,6 @@
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticatedOrReadOnly
-from geonode import settings
from geonode.assets.utils import create_asset_and_link
from geonode.base.api.filters import DynamicSearchFilter, ExtentFilter
@@ -32,15 +31,8 @@
from geonode.base.api.pagination import GeoNodeApiPagination
from geonode.base.api.permissions import UserHasPerms
from geonode.base.api.views import base_linked_resources, ApiPresetsInitializer
-from geonode.base import enumerations
-from geonode.documents.api.exceptions import DocumentException
from geonode.documents.models import Document
-from geonode.base.api.serializers import ResourceBaseSerializer
-from geonode.resource.utils import resourcebase_post_save
-from geonode.storage.manager import StorageManager
-from geonode.resource.manager import resource_manager
-
from .serializers import DocumentSerializer
from .permissions import DocumentPermissionsFilter
@@ -52,14 +44,19 @@
class DocumentViewSet(ApiPresetsInitializer, DynamicModelViewSet, AdvertisedListMixin):
"""
- API endpoint that allows documents to be viewed or edited.
+ API endpoint that allows documents to be viewed or partially updated.
+
+ Document creation is intentionally not exposed here -- use the
+ /documents/upload/ endpoint (DocumentUploadView) so that uploads go
+ through the FileValidationUploadHandler magic-byte check. Full PUT
+ replacement is disabled; only PATCH metadata edits are allowed.
"""
- http_method_names = ["get", "patch", "put", "post"]
- permission_classes = [
- IsAuthenticatedOrReadOnly,
- UserHasPerms(perms_dict={"default": {"POST": ["base.add_resourcebase"]}}),
- ]
+ http_method_names = ["get", "patch"]
+ # UserHasPerms enforces resource-level EDIT_PERMISSIONS on PATCH (its
+ # default perms_map maps PATCH -> EDIT_PERMISSIONS). Without it any
+ # authenticated user could PATCH any document.
+ permission_classes = [IsAuthenticatedOrReadOnly, UserHasPerms]
filter_backends = [
DynamicFilterBackend,
DynamicSortingFilter,
@@ -71,84 +68,6 @@ class DocumentViewSet(ApiPresetsInitializer, DynamicModelViewSet, AdvertisedList
serializer_class = DocumentSerializer
pagination_class = GeoNodeApiPagination
- def perform_create(self, serializer):
- """
- Function to create document via API v2.
- file_path: path to the file
- doc_file: the open file
-
- The API expect this kind of JSON:
- {
- "document": {
- "title": "New document",
- "metadata_only": true,
- "file_path": "/home/mattia/example.json"
- }
- }
- File path rappresent the filepath where the file to upload is saved.
-
- or can be also a form-data:
- curl --location --request POST 'http://localhost:8000/api/v2/documents' \
- --form 'title="Super Title2"' \
- --form 'doc_file=@"/C:/Users/user/Pictures/BcMc-a6T9IM.jpg"' \
- --form 'metadata_only="False"'
- """
- manager = None
- serializer.is_valid(raise_exception=True)
- file = serializer.validated_data.pop("file_path", None) or serializer.validated_data.pop("doc_file", None)
- doc_url = serializer.validated_data.pop("doc_url", None)
- extension = serializer.validated_data.pop("extension", None)
-
- if not file and not doc_url:
- raise DocumentException(detail="A file, file path or URL must be speficied")
-
- if file and doc_url:
- raise DocumentException(detail="Either a file or a URL must be specified, not both")
-
- if not extension:
- filename = file if isinstance(file, str) else file.name
- extension = Path(filename).suffix.replace(".", "")
-
- if extension not in settings.ALLOWED_DOCUMENT_TYPES:
- raise DocumentException("The file provided is not in the supported extensions list")
-
- try:
- payload = {
- "owner": self.request.user,
- "extension": extension,
- "resource_type": "document",
- }
- if doc_url:
- payload["doc_url"] = doc_url
- payload["sourcetype"] = enumerations.SOURCE_TYPE_REMOTE
-
- resource = serializer.save(**payload)
-
- if file:
- manager = StorageManager(remote_files={"base_file": file})
- manager.clone_remote_files()
- create_asset_and_link(
- resource, self.request.user, [manager.get_retrieved_paths().get("base_file")], clone_files=True
- )
- manager.delete_retrieved_paths(force=True)
-
- resource.set_missing_info()
- resourcebase_post_save(resource.get_real_instance())
- resource.set_default_permissions(owner=self.request.user, created=True)
- resource.handle_moderated_uploads()
- resource_manager.set_thumbnail(resource.uuid, instance=resource, overwrite=False)
- return resource
- except Exception as e:
- logger.error(f"Error creating document {serializer.validated_data}", exc_info=e)
- if manager:
- manager.delete_retrieved_paths()
- raise e
-
- @extend_schema(
- methods=["get"],
- responses={200: ResourceBaseSerializer(many=True)},
- description="API endpoint allowing to retrieve linked resources",
- )
@action(detail=True, methods=["get"])
def linked_resources(self, request, pk=None, *args, **kwargs):
return base_linked_resources(self.get_object().get_real_instance(), request.user, request.GET)
diff --git a/geonode/documents/enumerations.py b/geonode/documents/enumerations.py
index 8b270b75558..ad1a54dccd9 100644
--- a/geonode/documents/enumerations.py
+++ b/geonode/documents/enumerations.py
@@ -22,13 +22,13 @@
DOCUMENT_TYPE_MAP = {
"txt": "text",
+ "csv": "text",
"log": "text",
"doc": "text",
"docx": "text",
"ods": "text",
"odt": "text",
"sld": "text",
- "qml": "text",
"xls": "text",
"xlsx": "text",
"xml": "text",
@@ -49,43 +49,24 @@
"ppt": "presentation",
"pptx": "presentation",
"pdf": "presentation",
- "tar": "archive",
- "tgz": "archive",
- "rar": "archive",
"gz": "archive",
- "7z": "archive",
"zip": "archive",
- "aif": "audio",
- "aifc": "audio",
- "aiff": "audio",
- "au": "audio",
"mp3": "audio",
- "mpga": "audio",
"wav": "audio",
- "afl": "video",
- "avi": "video",
- "avs": "video",
- "fli": "video",
- "mp2": "video",
"mp4": "video",
- "mpg": "video",
"ogg": "video",
- "webm": "video",
- "3gp": "video",
- "flv": "video",
- "vdo": "video",
}
DOCUMENT_MIMETYPE_MAP = {
"txt": "text/plain",
+ "csv": "text/plain",
"log": "text/plain",
"doc": "application/msword",
"docx": "application/msword",
"ods": "text/plain",
"odt": "text/plain",
"sld": "text/plain",
- "qml": "text/plain",
"xls": "application/excel",
"xlsx": "application/vnd.ms-excel",
"xml": "application/xml",
@@ -106,29 +87,44 @@
"ppt": "application/powerpoint",
"pptx": "application/x-mspowerpoint",
"pdf": "application/pdf",
- "tar": "application/x-tar",
- "tgz": "application/x-compressed",
- "rar": "application/x-rar-compressed",
"gz": "application/x-gzip",
- "7z": "application/zip",
"zip": "application/zip",
- "aif": "audio/aiff",
- "aifc": "audio/aiff",
- "aiff": "audio/aiff",
- "au": "audio/basic",
"mp3": "audio/mp3",
- "mpga": "audio/mpeg",
"wav": "audio/wav",
- "afl": "video/animaflex",
- "avi": "video/avi",
- "avs": "video/avs-video",
- "fli": "video/fli",
- "mp2": "video/mpeg",
"mp4": "video/mp4",
- "mpg": "video/mpeg",
"ogg": "video/ogg",
- "webm": "video/webm",
- "3gp": "video/3gp",
- "flv": "video/flv",
- "vdo": "video/vdo",
}
+
+
+DOCUMENT_MAGIC_MIMETYPE_MAP = {extension: {mime_type} for extension, mime_type in DOCUMENT_MIMETYPE_MAP.items()}
+DOCUMENT_MAGIC_MIMETYPE_MAP.update(
+ {
+ "csv": {"text/plain", "text/csv"},
+ "log": {"text/plain", "text/csv"},
+ "xml": {"application/xml", "text/xml"},
+ "sld": {"text/plain", "application/xml", "text/xml"},
+ "dxf": {"image/vnd.dwg", "image/vnd.dxf"},
+ "doc": {"application/msword", "application/x-ole-storage"},
+ "docx": {
+ "application/msword",
+ "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ },
+ "ods": {"text/plain", "application/vnd.oasis.opendocument.spreadsheet"},
+ "odt": {"text/plain", "application/vnd.oasis.opendocument.text"},
+ "xls": {"application/excel", "application/vnd.ms-excel", "application/x-ole-storage"},
+ "xlsx": {
+ "application/vnd.ms-excel",
+ "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ },
+ "odp": {"application/odp", "application/vnd.oasis.opendocument.presentation"},
+ "ppt": {"application/powerpoint", "application/vnd.ms-powerpoint", "application/x-ole-storage"},
+ "pptx": {
+ "application/x-mspowerpoint",
+ "application/vnd.openxmlformats-officedocument.presentationml.presentation",
+ },
+ "gz": {"application/gzip", "application/x-gzip"},
+ "mp3": {"audio/mp3", "audio/mpeg"},
+ "wav": {"audio/wav", "audio/x-wav"},
+ "ogg": {"application/ogg", "audio/ogg", "video/ogg"},
+ }
+)
diff --git a/geonode/documents/tests.py b/geonode/documents/tests.py
index b20332a9d20..90a19058715 100644
--- a/geonode/documents/tests.py
+++ b/geonode/documents/tests.py
@@ -373,6 +373,25 @@ def test_document_isuploaded(self):
)
self.assertEqual(response.status_code, 200)
+ def test_document_upload_rejects_file_with_mismatched_content(self):
+ self.client.login(username="admin", password="admin")
+ pe_like_content = (
+ b"MZ" + b"\x00" * 58 + b"\x80\x00\x00\x00" + b"\x00" * 64 + b"PE\x00\x00" + b"\x4c\x01\x01\x00"
+ )
+ f = SimpleUploadedFile("fake.pdf", pe_like_content, "application/pdf")
+
+ response = self.client.post(
+ f"{reverse('document_upload')}?no__redirect=true",
+ data={
+ "doc_file": f,
+ "title": "fake_pdf",
+ "permissions": '{"users":{"AnonymousUser": ["view_resourcebase"]}}',
+ },
+ )
+
+ self.assertEqual(response.status_code, 400)
+ self.assertFalse(Document.objects.filter(title="fake_pdf").exists())
+
# Permissions Tests
def test_set_document_permissions(self):
diff --git a/geonode/documents/validation.py b/geonode/documents/validation.py
new file mode 100644
index 00000000000..1dde8bfe7f2
--- /dev/null
+++ b/geonode/documents/validation.py
@@ -0,0 +1,41 @@
+#########################################################################
+#
+# Copyright (C) 2026 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+
+from django.conf import settings
+
+from geonode.documents.enumerations import DOCUMENT_MAGIC_MIMETYPE_MAP
+from geonode.upload.validation.base import ValidationConfigProvider
+
+
+class DocumentFileValidationConfigProvider(ValidationConfigProvider):
+ """
+ File validation config for the legacy /documents/upload/ HTML form view
+ (URL name "document_upload"). Sources its data from the static
+ DOCUMENT_MAGIC_MIMETYPE_MAP and the operator-tunable
+ ALLOWED_DOCUMENT_TYPES setting.
+ """
+
+ def url_names(self):
+ return ("document_upload",)
+
+ def build_config(self):
+ return {
+ "allowed_extensions": frozenset(settings.ALLOWED_DOCUMENT_TYPES),
+ "magic_mimetype_map": {ext: frozenset(mimes) for ext, mimes in DOCUMENT_MAGIC_MIMETYPE_MAP.items()},
+ }
diff --git a/geonode/documents/views.py b/geonode/documents/views.py
index 30cda95bc5e..f74ffc8bb96 100644
--- a/geonode/documents/views.py
+++ b/geonode/documents/views.py
@@ -94,6 +94,19 @@ class DocumentUploadView(CreateView):
def post(self, request, *args, **kwargs):
self.object = None
+ # Accessing request.FILES forces the multipart parser (and therefore
+ # FileValidationUploadHandler) to run. If validation rejected the
+ # upload, return a clean 400 with the reason instead of letting the
+ # form surface a generic "required field" error.
+ request.FILES # noqa: B018
+ validation_error = getattr(request, "upload_validation_error", None)
+ if validation_error:
+ if request.GET.get("no__redirect", False):
+ out = {"success": False, "message": validation_error}
+ return HttpResponse(json.dumps(out), content_type="application/json", status=400)
+ form = self.get_form()
+ form.add_error(None, validation_error)
+ return self.form_invalid(form)
try:
return super().post(request, *args, **kwargs)
except Exception as e:
@@ -114,7 +127,7 @@ def form_invalid(self, form):
if self.request.GET.get("no__redirect", False):
plaintext_errors = []
for field in form.errors.values():
- plaintext_errors.append(field.data[0].message)
+ plaintext_errors.append(str(field.data[0].message))
out = {"success": False}
out["message"] = ".".join(plaintext_errors)
status_code = 400
diff --git a/geonode/geoserver/views.py b/geonode/geoserver/views.py
index 0a601a98c35..ab000fcfe90 100644
--- a/geonode/geoserver/views.py
+++ b/geonode/geoserver/views.py
@@ -290,6 +290,8 @@ def geoserver_proxy(request, proxy_path, downstream_path, workspace=None, layern
logger.warn(f"Could not find any Dataset {os.path.basename(request.path)} on DB")
kwargs = {"affected_datasets": affected_datasets}
+ kwargs["safe_url"] = True # required to let the proxy accept the internal Geoserver URL as safe
+
raw_url = unquote(raw_url)
timeout = getattr(ogc_server_settings, "TIMEOUT") or 60
response = proxy(
diff --git a/geonode/proxy/views.py b/geonode/proxy/views.py
index 5dd57afe64d..1b319a89718 100644
--- a/geonode/proxy/views.py
+++ b/geonode/proxy/views.py
@@ -94,7 +94,7 @@ def proxy(
raw_url = url or request.GET["url"]
raw_url = urljoin(settings.SITEURL, raw_url) if raw_url.startswith("/") else raw_url
- if not is_safe_url(raw_url):
+ if not kwargs.get("safe_url", False) and not is_safe_url(raw_url):
return HttpResponse(
"Invalid URL provided.",
status=403,
diff --git a/geonode/settings.py b/geonode/settings.py
index b9371d1fe11..f922ef70c65 100644
--- a/geonode/settings.py
+++ b/geonode/settings.py
@@ -31,6 +31,7 @@
# General Django development settings
#
from django.conf.global_settings import DATETIME_INPUT_FORMATS
+from geonode.documents.enumerations import DOCUMENT_TYPE_MAP
from geonode import get_version
from kombu import Queue, Exchange
from kombu.serialization import register
@@ -625,70 +626,13 @@
except ValueError:
# fallback to regular list of values separated with misc chars
ALLOWED_DOCUMENT_TYPES = (
- [
- "txt",
- "csv",
- "log",
- "doc",
- "docx",
- "ods",
- "odt",
- "sld",
- "qml",
- "xls",
- "xlsx",
- "xml",
- "bm",
- "bmp",
- "dwg",
- "dxf",
- "fif",
- "gif",
- "jpg",
- "jpe",
- "jpeg",
- "png",
- "tif",
- "tiff",
- "pbm",
- "odp",
- "ppt",
- "pptx",
- "pdf",
- "tar",
- "tgz",
- "rar",
- "gz",
- "7z",
- "zip",
- "aif",
- "aifc",
- "aiff",
- "au",
- "mp3",
- "mpga",
- "wav",
- "afl",
- "avi",
- "avs",
- "fli",
- "mp2",
- "mp4",
- "mpg",
- "ogg",
- "webm",
- "3gp",
- "flv",
- "vdo",
- "glb",
- "pcd",
- "gltf",
- "ifc",
- ]
+ DOCUMENT_TYPE_MAP.keys()
if os.getenv("ALLOWED_DOCUMENT_TYPES") is None
else re.split(r" *[,|:;] *", os.getenv("ALLOWED_DOCUMENT_TYPES"))
)
+ALLOWED_DOCUMENT_TYPES = [t for t in ALLOWED_DOCUMENT_TYPES if t in DOCUMENT_TYPE_MAP.keys()]
+
MAX_DOCUMENT_SIZE = int(os.getenv("MAX_DOCUMENT_SIZE ", "2")) # MB
# DOCUMENT_TYPE_MAP and DOCUMENT_MIMETYPE_MAP update enumerations in
@@ -1065,6 +1009,7 @@
# The proxy to use when making cross origin requests.
PROXY_URL = os.environ.get("PROXY_URL", "/proxy/?url=")
+SAFE_URL_CHECK_ENABLED = ast.literal_eval(os.getenv("SAFE_URL_CHECK_ENABLED", "True"))
# Avoid permissions prefiltering
SKIP_PERMS_FILTER = ast.literal_eval(os.getenv("SKIP_PERMS_FILTER", "False"))
@@ -2100,6 +2045,7 @@ def get_geonode_catalogue_service():
]
FILE_UPLOAD_HANDLERS = [
+ "geonode.upload.uploadhandler.FileValidationUploadHandler",
"geonode.upload.uploadhandler.SizeRestrictedFileUploadHandler",
"django.core.files.uploadhandler.TemporaryFileUploadHandler",
"django.core.files.uploadhandler.MemoryFileUploadHandler",
@@ -2132,6 +2078,15 @@ def get_geonode_catalogue_service():
"base-resources-assets",
)
+# FileValidationUploadHandler config providers, run once at app-ready time
+# (see geonode.upload.handlers.apps.run_setup_hooks). Each provider declares
+# the URL names it covers and returns the merged validation config dict.
+# To add per-endpoint validation, register a ValidationConfigProvider here.
+FILE_VALIDATION_CONFIGURATION_PROVIDERS = [
+ "geonode.documents.validation.DocumentFileValidationConfigProvider",
+ "geonode.upload.validation.datasets.DatasetFileValidationConfigProvider",
+]
+
INSTALLED_APPS += (
"dynamic_models",
# "importer",
diff --git a/geonode/tests/test_utils.py b/geonode/tests/test_utils.py
index 1b3a42cb30b..487b21a3390 100644
--- a/geonode/tests/test_utils.py
+++ b/geonode/tests/test_utils.py
@@ -25,6 +25,7 @@
from django.contrib.gis.geos import GEOSGeometry, Polygon
from django.contrib.auth import get_user_model
from django.core.management import call_command
+from django.test import override_settings, TestCase as djangoTestCase
from geonode.maps.models import Dataset
from geonode.layers.models import Attribute
@@ -270,7 +271,8 @@ def test_region_wkt_multipolygon_if_across_idl(self):
self.assertEqual(poly.geom_type, "MultiPolygon", f"Expexted 'MultiPolygon' type but received {poly.geom_type}")
-class TestIsSafeURL(TestCase):
+@override_settings(SAFE_URL_CHECK_ENABLED=True)
+class TestIsSafeURL(djangoTestCase):
def test_allows_http_and_https_urls(self):
urls = [
diff --git a/geonode/upload/api/serializer.py b/geonode/upload/api/serializer.py
index cbabadc9182..897fd981ceb 100644
--- a/geonode/upload/api/serializer.py
+++ b/geonode/upload/api/serializer.py
@@ -16,15 +16,57 @@
# along with this program. If not, see .
#
#########################################################################
+import logging
from rest_framework import serializers
from dynamic_rest.serializers import DynamicModelSerializer
from geonode.base.api.serializers import BaseDynamicModelSerializer
from geonode.base.models import ResourceBase
from geonode.upload.models import UploadParallelismLimit, UploadSizeLimit
from geonode.resource.enumerator import ExecutionRequestAction as exa
+from geonode.upload.zip_validation import ZipValidationError, is_zip_extension, validate_safe_zip
-class ImporterSerializer(DynamicModelSerializer):
+logger = logging.getLogger(__name__)
+
+
+class BaseImporterSerializer(DynamicModelSerializer):
+ """
+ Base for every serializer wired into the importer endpoint.
+
+ Holds the zip-safety check that must run for any ``base_file`` capable of
+ carrying a zip-based archive (.zip / .kmz / .xlsx). DRF only invokes
+ ``validate_base_file`` when the subclass declares a ``base_file`` field,
+ so subclasses without one (e.g. RemoteResourceSerializer,
+ EmptyDatasetSerializer) inherit harmlessly.
+ """
+
+ def validate_base_file(self, f):
+ """
+ Inspect the central directory of zip-based uploads (.zip for 3D Tiles
+ and zipped shapefiles, .kmz, .xlsx) to reject path-traversal entries,
+ symlinks, oversized archives and zip-bomb compression ratios before
+ any handler extracts the file.
+ """
+ if not is_zip_extension(getattr(f, "name", None)):
+ return f
+ source = f.temporary_file_path() if hasattr(f, "temporary_file_path") else f
+ try:
+ validate_safe_zip(source)
+ except ZipValidationError:
+ logger.warning("ZIP validation failed for uploaded file.", exc_info=True)
+ raise serializers.ValidationError("Invalid or unsafe ZIP archive.")
+ finally:
+ # Rewind any in-memory file-like we read so downstream code sees
+ # the full stream.
+ if hasattr(f, "seek"):
+ try:
+ f.seek(0)
+ except (OSError, ValueError):
+ pass
+ return f
+
+
+class ImporterSerializer(BaseImporterSerializer):
class Meta:
ref_name = "ImporterSerializer"
model = ResourceBase
diff --git a/geonode/upload/api/tests.py b/geonode/upload/api/tests.py
index d64b60fec41..0636263c0da 100644
--- a/geonode/upload/api/tests.py
+++ b/geonode/upload/api/tests.py
@@ -16,9 +16,13 @@
# along with this program. If not, see .
#
#########################################################################
+import io
+import zipfile
+
from django.contrib.auth import get_user_model
from django.core.files.uploadedfile import SimpleUploadedFile
from geonode.layers.models import Dataset
+from django.test import override_settings
from django.urls import reverse
from unittest.mock import MagicMock, patch
@@ -31,6 +35,39 @@
from geonode.upload.tests.utils import ImporterBaseTestSupport
+def _build_gpkg_header():
+ """
+ Minimal 100-byte SQLite 3 / GeoPackage header, enough for libmagic to
+ identify the buffer as 'SQLite 3.x database (OGC GeoPackage file)'.
+ """
+ hdr = b"SQLite format 3\x00"
+ hdr += (0x1000).to_bytes(2, "big") # page size
+ hdr += b"\x01\x01" # write / read format versions
+ hdr += b"\x00" # reserved space
+ hdr += b"\x40\x20\x20" # payload fraction fields
+ hdr += (0).to_bytes(4, "big") # file change counter
+ hdr += (1).to_bytes(4, "big") # database size in pages
+ hdr += (0).to_bytes(4, "big") # first freelist trunk
+ hdr += (0).to_bytes(4, "big") # total freelist pages
+ hdr += (0).to_bytes(4, "big") # schema cookie
+ hdr += (4).to_bytes(4, "big") # schema format
+ hdr += (0).to_bytes(4, "big") # default page cache size
+ hdr += (0).to_bytes(4, "big") # largest root b-tree page
+ hdr += (1).to_bytes(4, "big") # text encoding UTF-8
+ hdr += (0).to_bytes(4, "big") # user version
+ hdr += (0).to_bytes(4, "big") # incremental vacuum
+ hdr += b"GPKG" # application id (GeoPackage)
+ hdr += b"\x00" * 20 # reserved
+ hdr += (1).to_bytes(4, "big") # version-valid-for
+ hdr += (3039000).to_bytes(4, "big") # sqlite version number
+ assert len(hdr) == 100
+ return hdr
+
+
+GPKG_VALID_HEADER = _build_gpkg_header()
+PE_LIKE_CONTENT = b"MZ" + b"\x00" * 58 + b"\x80\x00\x00\x00" + b"\x00" * 64 + b"PE\x00\x00" + b"\x4c\x01\x01\x00"
+
+
class TestImporterViewSet(ImporterBaseTestSupport):
@classmethod
def setUpClass(cls):
@@ -61,14 +98,103 @@ def test_raise_exception_if_file_is_not_a_handled(self):
self.client.force_login(get_user_model().objects.get(username="admin"))
payload = {"base_file": SimpleUploadedFile(name="file.invalid", content=b"abc"), "action": "upload"}
response = self.client.post(self.url, data=payload)
- self.assertEqual(500, response.status_code)
+ # FileValidationUploadHandler rejects the extension before the view runs.
+ self.assertEqual(400, response.status_code)
+
+ def test_importer_upload_rejects_disallowed_extension(self):
+ self.client.force_login(get_user_model().objects.get(username="admin"))
+ payload = {
+ "base_file": SimpleUploadedFile(name="malicious.exe", content=PE_LIKE_CONTENT),
+ "action": "upload",
+ }
+
+ response = self.client.post(self.url, data=payload)
+
+ self.assertEqual(400, response.status_code)
+ self.assertFalse(Dataset.objects.filter(name="malicious").exists())
+
+ def test_importer_upload_rejects_shapefile_with_mismatched_content(self):
+ self.client.force_login(get_user_model().objects.get(username="admin"))
+ payload = {
+ "base_file": SimpleUploadedFile(name="fake.shp", content=PE_LIKE_CONTENT),
+ "action": "upload",
+ }
+
+ response = self.client.post(self.url, data=payload)
+
+ self.assertEqual(400, response.status_code)
+ self.assertFalse(Dataset.objects.filter(name="fake").exists())
+
+ def test_importer_upload_rejects_gpkg_with_mismatched_content(self):
+ self.client.force_login(get_user_model().objects.get(username="admin"))
+ payload = {
+ "base_file": SimpleUploadedFile(
+ name="fake.gpkg",
+ content=b'{"type": "FeatureCollection", "content": "some-content"}',
+ ),
+ "action": "upload",
+ }
+
+ response = self.client.post(self.url, data=payload)
+
+ self.assertEqual(400, response.status_code)
+ self.assertFalse(Dataset.objects.filter(name="fake").exists())
+
+ def test_importer_upload_rejects_geojson_with_binary_content(self):
+ self.client.force_login(get_user_model().objects.get(username="admin"))
+ payload = {
+ "base_file": SimpleUploadedFile(name="fake.geojson", content=PE_LIKE_CONTENT),
+ "action": "upload",
+ }
+
+ response = self.client.post(self.url, data=payload)
+
+ self.assertEqual(400, response.status_code)
+ self.assertFalse(Dataset.objects.filter(name="fake").exists())
+
+ def test_importer_upload_rejects_zip_with_path_traversal(self):
+ self.client.force_login(get_user_model().objects.get(username="admin"))
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
+ zf.writestr("../../etc/passwd", b"root:x:0:0")
+ payload = {
+ "base_file": SimpleUploadedFile(name="traversal.zip", content=buf.getvalue()),
+ "action": "upload",
+ }
+
+ response = self.client.post(self.url, data=payload)
+
+ self.assertEqual(400, response.status_code)
+ self.assertIn(b"Invalid or unsafe ZIP archive", response.content)
+ self.assertFalse(Dataset.objects.filter(name="traversal").exists())
+
+ def test_importer_upload_rejects_zip_bomb(self):
+ self.client.force_login(get_user_model().objects.get(username="admin"))
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
+ info = zipfile.ZipInfo("bomb.bin")
+ info.compress_type = zipfile.ZIP_DEFLATED
+ zf.writestr(info, b"\x00" * (5 * 1024 * 1024))
+ payload = {
+ "base_file": SimpleUploadedFile(name="bomb.zip", content=buf.getvalue()),
+ "action": "upload",
+ }
+
+ response = self.client.post(self.url, data=payload)
+
+ self.assertEqual(400, response.status_code)
+ self.assertIn(b"Invalid or unsafe ZIP archive", response.content)
+ self.assertFalse(Dataset.objects.filter(name="bomb").exists())
def test_gpkg_raise_error_with_invalid_payload(self):
self.client.force_login(get_user_model().objects.get(username="admin"))
+ # Use a valid GeoPackage header so that FileValidationUploadHandler's
+ # libmagic description check passes and the serializer reaches the
+ # boolean validation that is being tested.
payload = {
"base_file": SimpleUploadedFile(
name="test.gpkg",
- content=b'{"type": "FeatureCollection", "content": "some-content"}',
+ content=GPKG_VALID_HEADER,
),
"store_spatial_files": "invalid",
"action": "upload",
@@ -92,7 +218,7 @@ def test_gpkg_task_is_called(self, patch_upload):
payload = {
"base_file": SimpleUploadedFile(
name="test.gpkg",
- content=b'{"type": "FeatureCollection", "content": "some-content"}',
+ content=GPKG_VALID_HEADER,
),
"store_spatial_files": True,
"action": "upload",
@@ -138,6 +264,7 @@ def test_zip_file_is_unzip_and_the_handler_is_found(self, patch_upload):
self.assertEqual(201, response.status_code)
+ @override_settings(SAFE_URL_CHECK_ENABLED=True)
def test_remote_upload_rejects_unsafe_url(self):
self.client.force_login(get_user_model().objects.get(username="admin"))
invalid_urls = [
@@ -159,6 +286,7 @@ def test_remote_upload_rejects_unsafe_url(self):
self.assertEqual(400, response.status_code)
@patch("geonode.utils.socket.getaddrinfo")
+ @override_settings(SAFE_URL_CHECK_ENABLED=True)
def test_remote_upload_rejects_dns_resolving_to_private_ip(self, mock_dns):
self.client.force_login(get_user_model().objects.get(username="admin"))
mock_dns.return_value = [(None, None, None, None, ("127.0.0.1", 0))]
diff --git a/geonode/upload/api/views.py b/geonode/upload/api/views.py
index 1ba3e8efd03..8bdeb9b55d3 100644
--- a/geonode/upload/api/views.py
+++ b/geonode/upload/api/views.py
@@ -137,6 +137,9 @@ def create(self, request, *args, **kwargs):
It clone on the local repo the file that the user want to upload
"""
_file = request.FILES.get("base_file") or request.data.get("base_file")
+ validation_error = getattr(request, "upload_validation_error", None)
+ if validation_error:
+ raise ValidationError(detail=validation_error, code="invalid_file_type")
execution_id = None
storage_manager = None
serializer = self.get_serializer_class()
diff --git a/geonode/upload/handlers/apps.py b/geonode/upload/handlers/apps.py
index 72e26b7c585..a912ed59d58 100644
--- a/geonode/upload/handlers/apps.py
+++ b/geonode/upload/handlers/apps.py
@@ -41,3 +41,10 @@ def run_setup_hooks(*args, **kwargs):
for item in _handlers:
item.register()
logger.info(f"The following handlers have been registered: {', '.join(available_handlers)}")
+
+ # Build the FileValidationUploadHandler registry. Providers run AFTER
+ # all importer handlers have registered, so DatasetFileValidationConfigProvider
+ # walks a fully-populated BaseHandler.REGISTRY.
+ from geonode.upload.validation import FileValidationConfigRegistry
+
+ FileValidationConfigRegistry.rebuild()
diff --git a/geonode/upload/handlers/base.py b/geonode/upload/handlers/base.py
index fb915df483b..933964ed0c0 100644
--- a/geonode/upload/handlers/base.py
+++ b/geonode/upload/handlers/base.py
@@ -52,6 +52,16 @@ class BaseHandler(ABC):
REGISTRY = []
+ # Merged FileValidationUploadHandler config across all registered handlers.
+ # Populated incrementally by ``register()`` from each handler's
+ # ``upload_validation_config`` property. Read by
+ # geonode.upload.validation.datasets.DatasetFileValidationConfigProvider.
+ UPLOAD_VALIDATION_CONFIG = {
+ "allowed_extensions": frozenset(),
+ "magic_mimetype_map": {},
+ "magic_description_map": {},
+ }
+
TASKS = {
exa.UPLOAD.value: (),
exa.COPY.value: (),
@@ -69,11 +79,50 @@ def __repr__(self):
@classmethod
def register(cls):
BaseHandler.REGISTRY.append(cls)
+ cls.merge_upload_validation_config()
@classmethod
def get_registry(cls):
return BaseHandler.REGISTRY
+ @classmethod
+ def merge_upload_validation_config(cls):
+ """
+ Fold this handler's ``upload_validation_config`` into
+ ``BaseHandler.UPLOAD_VALIDATION_CONFIG``. Handlers that do not
+ declare the property contribute nothing. Idempotent: re-registering
+ a handler unions the same set values without duplicating anything.
+
+ Resulting dict shape::
+
+ {
+ "allowed_extensions": frozenset[str],
+ "magic_mimetype_map": dict[str, frozenset[str]],
+ "magic_description_map": dict[str, frozenset[str]],
+ }
+ """
+ # cls() is needed because handlers declare upload_validation_config
+ # as @property (mirroring supported_file_extension_config); reading
+ # the property requires an instance.
+ new_cfg = getattr(cls(), "upload_validation_config", {})
+ if not new_cfg:
+ return
+ current = BaseHandler.UPLOAD_VALIDATION_CONFIG
+ allowed = set(current["allowed_extensions"])
+ mimes = {ext: set(s) for ext, s in current["magic_mimetype_map"].items()}
+ descs = {ext: set(s) for ext, s in current["magic_description_map"].items()}
+ for ext, rules in new_cfg.items():
+ allowed.add(ext)
+ for m in rules.get("mimes", ()):
+ mimes.setdefault(ext, set()).add(m)
+ for d in rules.get("description_contains", ()):
+ descs.setdefault(ext, set()).add(d)
+ BaseHandler.UPLOAD_VALIDATION_CONFIG = {
+ "allowed_extensions": frozenset(allowed),
+ "magic_mimetype_map": {ext: frozenset(s) for ext, s in mimes.items() if s},
+ "magic_description_map": {ext: frozenset(s) for ext, s in descs.items() if s},
+ }
+
@classmethod
def get_task_list(cls, action) -> tuple:
if action not in cls.TASKS:
diff --git a/geonode/upload/handlers/common/serializer.py b/geonode/upload/handlers/common/serializer.py
index 2f337c696ef..69ad03b1945 100644
--- a/geonode/upload/handlers/common/serializer.py
+++ b/geonode/upload/handlers/common/serializer.py
@@ -17,13 +17,13 @@
#
#########################################################################
from rest_framework import serializers
-from dynamic_rest.serializers import DynamicModelSerializer
from geonode.base.models import ResourceBase
from geonode.resource.enumerator import ExecutionRequestAction as exa
+from geonode.upload.api.serializer import BaseImporterSerializer
from geonode.utils import is_safe_url
-class RemoteResourceSerializer(DynamicModelSerializer):
+class RemoteResourceSerializer(BaseImporterSerializer):
class Meta:
ref_name = "RemoteResourceSerializer"
model = ResourceBase
diff --git a/geonode/upload/handlers/csv/handler.py b/geonode/upload/handlers/csv/handler.py
index dc0c4442864..2d82e41adf0 100644
--- a/geonode/upload/handlers/csv/handler.py
+++ b/geonode/upload/handlers/csv/handler.py
@@ -62,6 +62,12 @@ def supported_file_extension_config(self):
"type": "vector",
}
+ @property
+ def upload_validation_config(self):
+ return {
+ "csv": {"mimes": {"text/plain", "text/csv"}},
+ }
+
@staticmethod
def can_handle(_data) -> bool:
"""
diff --git a/geonode/upload/handlers/empty_dataset/serializer.py b/geonode/upload/handlers/empty_dataset/serializer.py
index 038b18c93b8..d7da42b41a3 100644
--- a/geonode/upload/handlers/empty_dataset/serializer.py
+++ b/geonode/upload/handlers/empty_dataset/serializer.py
@@ -17,13 +17,13 @@
#
#########################################################################
from rest_framework import serializers
-from dynamic_rest.serializers import DynamicModelSerializer
from geonode.resource.enumerator import ExecutionRequestAction as exa
from geonode.base.models import ResourceBase
+from geonode.upload.api.serializer import BaseImporterSerializer
-class EmptyDatasetSerializer(DynamicModelSerializer):
+class EmptyDatasetSerializer(BaseImporterSerializer):
class Meta:
ref_name = "EmptyDatasetSerializer"
model = ResourceBase
diff --git a/geonode/upload/handlers/geojson/handler.py b/geonode/upload/handlers/geojson/handler.py
index 146212d5cf3..de7d87ea7a8 100644
--- a/geonode/upload/handlers/geojson/handler.py
+++ b/geonode/upload/handlers/geojson/handler.py
@@ -55,6 +55,13 @@ def supported_file_extension_config(self):
"type": "vector",
}
+ @property
+ def upload_validation_config(self):
+ return {
+ "geojson": {"mimes": {"application/json", "text/plain", "text/json"}},
+ "json": {"mimes": {"application/json", "text/plain", "text/json"}},
+ }
+
@staticmethod
def can_handle(_data) -> bool:
"""
diff --git a/geonode/upload/handlers/geotiff/handler.py b/geonode/upload/handlers/geotiff/handler.py
index cd6e2ea57de..92bb73a50c2 100644
--- a/geonode/upload/handlers/geotiff/handler.py
+++ b/geonode/upload/handlers/geotiff/handler.py
@@ -99,6 +99,15 @@ def supported_file_extension_config(self):
"type": "raster",
}
+ @property
+ def upload_validation_config(self):
+ return {
+ "tiff": {"mimes": {"image/tiff"}},
+ "tif": {"mimes": {"image/tiff"}},
+ "geotiff": {"mimes": {"image/tiff"}},
+ "geotif": {"mimes": {"image/tiff"}},
+ }
+
@staticmethod
def can_handle(_data) -> bool:
"""
diff --git a/geonode/upload/handlers/gpkg/handler.py b/geonode/upload/handlers/gpkg/handler.py
index 554ec4d1687..40245755100 100644
--- a/geonode/upload/handlers/gpkg/handler.py
+++ b/geonode/upload/handlers/gpkg/handler.py
@@ -51,6 +51,15 @@ def supported_file_extension_config(self):
"type": "vector",
}
+ @property
+ def upload_validation_config(self):
+ # libmagic reports application/vnd.sqlite3 on modern builds but may
+ # fall back to application/octet-stream on older ones; validate via
+ # the description string for cross-version robustness.
+ return {
+ "gpkg": {"description_contains": {"SQLite", "GeoPackage"}},
+ }
+
@property
def can_handle_xml_file(self) -> bool:
"""
diff --git a/geonode/upload/handlers/kml/handler.py b/geonode/upload/handlers/kml/handler.py
index ecdc12b41ba..8f323238b54 100644
--- a/geonode/upload/handlers/kml/handler.py
+++ b/geonode/upload/handlers/kml/handler.py
@@ -72,6 +72,22 @@ def supported_file_extension_config(self):
"type": "vector",
}
+ @property
+ def upload_validation_config(self):
+ return {
+ "kml": {
+ "mimes": {
+ "application/xml",
+ "text/xml",
+ "application/vnd.google-earth.kml+xml",
+ "text/plain",
+ },
+ },
+ "kmz": {
+ "mimes": {"application/zip", "application/vnd.google-earth.kmz"},
+ },
+ }
+
@property
def can_handle_xml_file(self) -> bool:
"""
diff --git a/geonode/upload/handlers/shapefile/handler.py b/geonode/upload/handlers/shapefile/handler.py
index c8b983f296c..4a3a83d3001 100644
--- a/geonode/upload/handlers/shapefile/handler.py
+++ b/geonode/upload/handlers/shapefile/handler.py
@@ -55,6 +55,19 @@ def supported_file_extension_config(self):
"type": "vector",
}
+ @property
+ def upload_validation_config(self):
+ # libmagic reports application/octet-stream for shp/shx/dbf, so we
+ # match against the descriptive string libmagic emits instead.
+ return {
+ "shp": {"description_contains": {"ESRI Shapefile"}},
+ "shx": {"description_contains": {"ESRI Shapefile"}},
+ "dbf": {"description_contains": {"dBase", "FoxPro", "Xbase"}},
+ "prj": {"mimes": {"text/plain"}},
+ "cpg": {"mimes": {"text/plain"}},
+ "cst": {"mimes": {"text/plain"}},
+ }
+
@staticmethod
def can_do(action) -> bool:
"""
diff --git a/geonode/upload/handlers/shapefile/serializer.py b/geonode/upload/handlers/shapefile/serializer.py
index a71507690ff..dce6eaf4f3a 100644
--- a/geonode/upload/handlers/shapefile/serializer.py
+++ b/geonode/upload/handlers/shapefile/serializer.py
@@ -17,12 +17,12 @@
#
#########################################################################
from rest_framework import serializers
-from dynamic_rest.serializers import DynamicModelSerializer
from geonode.base.models import ResourceBase
from geonode.resource.enumerator import ExecutionRequestAction as exa
+from geonode.upload.api.serializer import BaseImporterSerializer
-class ShapeFileSerializer(DynamicModelSerializer):
+class ShapeFileSerializer(BaseImporterSerializer):
class Meta:
ref_name = "ShapeFileSerializer"
model = ResourceBase
diff --git a/geonode/upload/handlers/sld/handler.py b/geonode/upload/handlers/sld/handler.py
index 453fdc7778d..3782dafb00a 100644
--- a/geonode/upload/handlers/sld/handler.py
+++ b/geonode/upload/handlers/sld/handler.py
@@ -65,6 +65,12 @@ def supported_file_extension_config(self):
"type": "metadata",
}
+ @property
+ def upload_validation_config(self):
+ return {
+ "sld": {"mimes": {"text/plain", "application/xml", "text/xml"}},
+ }
+
@staticmethod
def can_handle(_data) -> bool:
"""
diff --git a/geonode/upload/handlers/tiles3d/handler.py b/geonode/upload/handlers/tiles3d/handler.py
index 3affb738f01..17c118ea57a 100755
--- a/geonode/upload/handlers/tiles3d/handler.py
+++ b/geonode/upload/handlers/tiles3d/handler.py
@@ -78,6 +78,12 @@ def supported_file_extension_config(self):
"type": "vector",
}
+ @property
+ def upload_validation_config(self):
+ return {
+ "zip": {"mimes": {"application/zip"}},
+ }
+
@staticmethod
def can_handle(_data) -> bool:
"""
diff --git a/geonode/upload/handlers/xml/handler.py b/geonode/upload/handlers/xml/handler.py
index 51940d7d2a4..b77d28b2d31 100644
--- a/geonode/upload/handlers/xml/handler.py
+++ b/geonode/upload/handlers/xml/handler.py
@@ -65,6 +65,12 @@ def supported_file_extension_config(self):
"type": "metadata",
}
+ @property
+ def upload_validation_config(self):
+ return {
+ "xml": {"mimes": {"application/xml", "text/xml"}},
+ }
+
@staticmethod
def can_handle(_data) -> bool:
"""
diff --git a/geonode/upload/handlers/xml/serializer.py b/geonode/upload/handlers/xml/serializer.py
index efcc49b5ffa..f9ef5870c2f 100644
--- a/geonode/upload/handlers/xml/serializer.py
+++ b/geonode/upload/handlers/xml/serializer.py
@@ -17,13 +17,13 @@
#
#########################################################################
from rest_framework import serializers
-from dynamic_rest.serializers import DynamicModelSerializer
from geonode.resource.enumerator import ExecutionRequestAction as exa
from geonode.base.models import ResourceBase
+from geonode.upload.api.serializer import BaseImporterSerializer
-class MetadataFileSerializer(DynamicModelSerializer):
+class MetadataFileSerializer(BaseImporterSerializer):
class Meta:
ref_name = "MetadataFileSerializer"
model = ResourceBase
diff --git a/geonode/upload/tests/unit/test_validation_registry.py b/geonode/upload/tests/unit/test_validation_registry.py
new file mode 100644
index 00000000000..99f3e407901
--- /dev/null
+++ b/geonode/upload/tests/unit/test_validation_registry.py
@@ -0,0 +1,181 @@
+#########################################################################
+#
+# Copyright (C) 2026 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+
+"""unit tests for geonode.upload.validation"""
+
+from types import MappingProxyType
+
+from django.test import SimpleTestCase, override_settings
+
+from geonode.upload.handlers.base import BaseHandler
+from geonode.upload.validation import FileValidationConfigRegistry
+from geonode.upload.validation.base import ValidationConfigProvider
+
+
+EMPTY_CONFIG = {
+ "allowed_extensions": frozenset(),
+ "magic_mimetype_map": {},
+ "magic_description_map": {},
+}
+
+
+class _FakeHandler(BaseHandler):
+ upload_validation_config = {
+ "shp": {"description_contains": {"ESRI Shapefile"}},
+ "xml": {"mimes": {"application/xml"}},
+ }
+
+
+class _OverlappingHandler(BaseHandler):
+ upload_validation_config = {
+ "xml": {"mimes": {"text/xml"}, "description_contains": {"XML"}},
+ }
+
+
+class _EmptyHandler(BaseHandler):
+ """Handler that does not declare upload_validation_config."""
+
+
+class BaseHandlerValidationConfigTests(SimpleTestCase):
+ """Cover the merge / read classmethods on BaseHandler."""
+
+ def setUp(self):
+ self._saved = BaseHandler.UPLOAD_VALIDATION_CONFIG
+ BaseHandler.UPLOAD_VALIDATION_CONFIG = EMPTY_CONFIG
+
+ def tearDown(self):
+ BaseHandler.UPLOAD_VALIDATION_CONFIG = self._saved
+
+ def test_merge_unions_mimes_and_descriptions_across_handlers(self):
+ _FakeHandler.merge_upload_validation_config()
+ _OverlappingHandler.merge_upload_validation_config()
+ merged = BaseHandler.UPLOAD_VALIDATION_CONFIG
+
+ self.assertEqual(merged["allowed_extensions"], frozenset({"shp", "xml"}))
+ self.assertEqual(merged["magic_mimetype_map"]["xml"], frozenset({"application/xml", "text/xml"}))
+ self.assertEqual(merged["magic_description_map"]["shp"], frozenset({"ESRI Shapefile"}))
+ self.assertEqual(merged["magic_description_map"]["xml"], frozenset({"XML"}))
+
+ def test_handler_without_property_contributes_nothing(self):
+ _EmptyHandler.merge_upload_validation_config()
+ self.assertEqual(BaseHandler.UPLOAD_VALIDATION_CONFIG, EMPTY_CONFIG)
+
+ def test_extension_with_only_mimes_omits_description_entry(self):
+ _FakeHandler.merge_upload_validation_config()
+ merged = BaseHandler.UPLOAD_VALIDATION_CONFIG
+ self.assertNotIn("xml", merged["magic_description_map"])
+ self.assertIn("xml", merged["magic_mimetype_map"])
+
+
+class _DummyProvider(ValidationConfigProvider):
+ def url_names(self):
+ return ("dummy_url",)
+
+ def build_config(self):
+ return {"allowed_extensions": frozenset({"foo"}), "magic_mimetype_map": {"foo": frozenset({"text/plain"})}}
+
+
+class _SecondProvider(ValidationConfigProvider):
+ def url_names(self):
+ return ("second_url",)
+
+ def build_config(self):
+ return {"allowed_extensions": frozenset({"bar"}), "magic_mimetype_map": {"bar": frozenset({"text/plain"})}}
+
+
+class _OverridingProvider(ValidationConfigProvider):
+ """Claims the same URL name as _DummyProvider so we can verify warning behaviour."""
+
+ def url_names(self):
+ return ("dummy_url",)
+
+ def build_config(self):
+ return {"allowed_extensions": frozenset({"baz"}), "magic_mimetype_map": {"baz": frozenset({"text/plain"})}}
+
+
+class _EmptyProvider(ValidationConfigProvider):
+ def url_names(self):
+ return ("noop",)
+
+ def build_config(self):
+ return {}
+
+
+class RegistryTests(SimpleTestCase):
+ def setUp(self):
+ FileValidationConfigRegistry.clear()
+
+ def tearDown(self):
+ FileValidationConfigRegistry.clear()
+
+ def test_install_freezes_payload(self):
+ FileValidationConfigRegistry.install({"x": {"allowed_extensions": frozenset()}})
+ cfg = FileValidationConfigRegistry.get("x")
+ self.assertIsInstance(cfg, MappingProxyType)
+ with self.assertRaises(TypeError):
+ cfg["allowed_extensions"] = frozenset({"y"})
+
+ def test_get_returns_none_for_unknown(self):
+ self.assertIsNone(FileValidationConfigRegistry.get("missing"))
+ self.assertIsNone(FileValidationConfigRegistry.get(None))
+
+ def test_rebuild_runs_listed_providers_only(self):
+ FileValidationConfigRegistry.rebuild(
+ provider_paths=[
+ "geonode.upload.tests.unit.test_validation_registry._DummyProvider",
+ "geonode.upload.tests.unit.test_validation_registry._SecondProvider",
+ ]
+ )
+ self.assertIn("foo", FileValidationConfigRegistry.get("dummy_url")["allowed_extensions"])
+ self.assertIn("bar", FileValidationConfigRegistry.get("second_url")["allowed_extensions"])
+
+ def test_rebuild_skips_empty_provider(self):
+ FileValidationConfigRegistry.rebuild(
+ provider_paths=[
+ "geonode.upload.tests.unit.test_validation_registry._DummyProvider",
+ "geonode.upload.tests.unit.test_validation_registry._EmptyProvider",
+ ]
+ )
+ # _EmptyProvider's URL must not be installed.
+ self.assertIsNone(FileValidationConfigRegistry.get("noop"))
+ self.assertIsNotNone(FileValidationConfigRegistry.get("dummy_url"))
+
+ def test_later_provider_overrides_earlier_for_same_url(self):
+ FileValidationConfigRegistry.rebuild(
+ provider_paths=[
+ "geonode.upload.tests.unit.test_validation_registry._DummyProvider",
+ "geonode.upload.tests.unit.test_validation_registry._OverridingProvider",
+ ]
+ )
+ self.assertIn("baz", FileValidationConfigRegistry.get("dummy_url")["allowed_extensions"])
+ self.assertNotIn("foo", FileValidationConfigRegistry.get("dummy_url")["allowed_extensions"])
+
+ @override_settings(
+ FILE_VALIDATION_CONFIGURATION_PROVIDERS=[
+ "geonode.upload.tests.unit.test_validation_registry._DummyProvider",
+ ],
+ )
+ def test_rebuild_default_reads_from_settings(self):
+ FileValidationConfigRegistry.rebuild()
+ self.assertIsNotNone(FileValidationConfigRegistry.get("dummy_url"))
+
+ def test_clear_empties_registry(self):
+ FileValidationConfigRegistry.install({"x": {"allowed_extensions": frozenset()}})
+ FileValidationConfigRegistry.clear()
+ self.assertIsNone(FileValidationConfigRegistry.get("x"))
diff --git a/geonode/upload/tests/unit/test_zip_validation.py b/geonode/upload/tests/unit/test_zip_validation.py
new file mode 100644
index 00000000000..178fff3cefd
--- /dev/null
+++ b/geonode/upload/tests/unit/test_zip_validation.py
@@ -0,0 +1,152 @@
+#########################################################################
+#
+# Copyright (C) 2026 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+
+"""unit tests for geonode.upload.zip_validation"""
+
+import io
+import stat
+import zipfile
+
+from django.test import SimpleTestCase
+
+from geonode.upload.zip_validation import (
+ MAX_ENTRIES,
+ ZipValidationError,
+ is_zip_extension,
+ validate_safe_zip,
+)
+
+
+def _make_zip(entries):
+ """entries: iterable of (arcname, bytes, [ZipInfo overrides dict])."""
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
+ for entry in entries:
+ if len(entry) == 3:
+ arcname, data, overrides = entry
+ else:
+ arcname, data = entry
+ overrides = None
+ info = zipfile.ZipInfo(arcname)
+ info.compress_type = zipfile.ZIP_DEFLATED
+ if overrides:
+ for key, value in overrides.items():
+ setattr(info, key, value)
+ zf.writestr(info, data)
+ buf.seek(0)
+ return buf
+
+
+class ValidateSafeZipTests(SimpleTestCase):
+ def test_accepts_clean_archive(self):
+ buf = _make_zip([("tileset.json", b'{"asset": {"version": "1.0"}}'), ("content/0.b3dm", b"x" * 128)])
+ validate_safe_zip(buf) # should not raise
+
+ def test_rejects_absolute_path_entry(self):
+ buf = _make_zip([("/etc/passwd", b"root:x:0:0")])
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(buf)
+ self.assertIn("Absolute path", str(ctx.exception))
+
+ def test_rejects_windows_absolute_path(self):
+ buf = _make_zip([("C:/evil.txt", b"payload")])
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(buf)
+ self.assertIn("Absolute path", str(ctx.exception))
+
+ def test_rejects_path_traversal(self):
+ buf = _make_zip([("../../etc/passwd", b"payload")])
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(buf)
+ self.assertIn("traversal", str(ctx.exception))
+
+ def test_rejects_traversal_in_middle_segment(self):
+ buf = _make_zip([("a/../../b.txt", b"payload")])
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(buf)
+ self.assertIn("traversal", str(ctx.exception))
+
+ def test_rejects_nul_byte_in_name(self):
+ # Python's zipfile silently strips NUL bytes when writing, so we patch
+ # the ZipInfo's filename after the archive is built -- this mirrors a
+ # hand-crafted malicious archive.
+ buf = _make_zip([("foo.txt", b"payload")])
+ zf = zipfile.ZipFile(buf)
+ infos = zf.infolist()
+ infos[0].filename = "foo\x00.txt"
+ with self.assertRaises(ZipValidationError) as ctx:
+ # Re-invoke the internal name check directly since ZipFile writes
+ # have already happened.
+ from geonode.upload.zip_validation import _check_entry_name
+
+ _check_entry_name(infos[0].filename)
+ self.assertIn("NUL byte", str(ctx.exception))
+
+ def test_rejects_symlink_entry(self):
+ # external_attr top 16 bits carry the Unix mode; S_IFLNK == 0o120000.
+ symlink_mode = stat.S_IFLNK | 0o777
+ buf = _make_zip([("link", b"target", {"external_attr": symlink_mode << 16})])
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(buf)
+ self.assertIn("symlink", str(ctx.exception))
+
+ def test_rejects_too_many_entries(self):
+ entries = [(f"f{i}.txt", b"x") for i in range(MAX_ENTRIES + 1)]
+ buf = _make_zip(entries)
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(buf)
+ self.assertIn("entries", str(ctx.exception))
+
+ def test_rejects_zip_bomb_ratio(self):
+ # A single highly-compressible entry: 5 MiB of zeros typically crunches
+ # below 10 KiB, well past the 100x ratio cap.
+ payload = b"\x00" * (5 * 1024 * 1024)
+ buf = _make_zip([("bomb.bin", payload)])
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(buf)
+ self.assertIn("compression ratio", str(ctx.exception))
+
+ def test_rejects_cumulative_bomb_of_tiny_high_ratio_entries(self):
+ # Each entry is small enough (compress_size < MIN_COMPRESSED_FOR_RATIO_CHECK)
+ # that the per-entry ratio check is skipped, but the aggregate is bomb-shaped.
+ # 100 entries of 100 KiB zeros each: ~10 MiB uncompressed, ~tens of KiB total
+ # compressed -- well past the cumulative ratio cap.
+ entries = [(f"chunk{i}.bin", b"\x00" * (100 * 1024)) for i in range(100)]
+ buf = _make_zip(entries)
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(buf)
+ self.assertIn("cumulative compression ratio", str(ctx.exception))
+
+ def test_rejects_not_a_zip(self):
+ with self.assertRaises(ZipValidationError) as ctx:
+ validate_safe_zip(io.BytesIO(b"not a zip file"))
+ self.assertIn("valid zip", str(ctx.exception))
+
+
+class IsZipExtensionTests(SimpleTestCase):
+ def test_accepts_known_zip_extensions(self):
+ self.assertTrue(is_zip_extension("foo.zip"))
+ self.assertTrue(is_zip_extension("foo.KMZ"))
+ self.assertTrue(is_zip_extension("report.xlsx"))
+
+ def test_rejects_other_extensions(self):
+ self.assertFalse(is_zip_extension("foo.shp"))
+ self.assertFalse(is_zip_extension("foo.gpkg"))
+ self.assertFalse(is_zip_extension(""))
+ self.assertFalse(is_zip_extension(None))
diff --git a/geonode/upload/uploadhandler.py b/geonode/upload/uploadhandler.py
index cf8eab71812..70ac296797d 100644
--- a/geonode/upload/uploadhandler.py
+++ b/geonode/upload/uploadhandler.py
@@ -19,17 +19,132 @@
import base64
import binascii
import html
+from pathlib import Path
+import magic
from django.conf import settings
from django.core.exceptions import RequestDataTooBig, TooManyFieldsSent
from django.core.files.uploadedfile import SimpleUploadedFile
-from django.core.files.uploadhandler import FileUploadHandler
+from django.core.files.uploadhandler import FileUploadHandler, StopUpload
from django.http import QueryDict
from django.http.multipartparser import FIELD, FILE, ChunkIter, LazyStream, Parser, exhaust
from django.utils.datastructures import MultiValueDict
from django.utils.encoding import force_str
from django.urls import reverse
+
from geonode.upload.models import UploadSizeLimit
+from geonode.upload.validation import FileValidationConfigRegistry
+
+
+class FileValidationUploadHandler(FileUploadHandler):
+ """
+ Validate uploads early in the upload stream.
+
+ Per-URL config is supplied by ValidationConfigProvider classes listed in
+ settings.FILE_VALIDATION_CONFIGURATION_PROVIDERS. Their merged output is
+ installed into FileValidationConfigRegistry at app-ready time; this
+ handler just looks it up by URL name. Each config entry exposes at
+ least one of:
+
+ * magic_mimetype_map: extension -> set of MIME types accepted by libmagic
+ (magic.from_buffer(sample, mime=True)).
+ * magic_description_map: extension -> set of substrings that must appear in
+ the libmagic description (magic.from_buffer(sample)). Used for formats
+ whose MIME is too generic to be trustworthy (e.g. Shapefile / GeoPackage
+ detected as application/octet-stream).
+
+ When both maps list the same extension, both checks must pass.
+ """
+
+ sniff_bytes = 8192
+
+ def handle_raw_input(self, input_data, META, content_length, boundary, encoding=None):
+ self.validation_config = None
+ self.activated = False
+
+ if self.request.resolver_match:
+ self.validation_config = FileValidationConfigRegistry.get(self.request.resolver_match.url_name)
+
+ self.activated = self.validation_config is not None
+
+ def new_file(self, *args, **kwargs):
+ super().new_file(*args, **kwargs)
+ self.extension = None
+ self.detected_mime = None
+ self.detected_description = None
+ self._buffer = bytearray()
+ self._checked = False
+
+ if not self.activated:
+ return
+
+ self.allowed_extensions = self.validation_config["allowed_extensions"]
+ self.magic_mimetype_map = self.validation_config.get("magic_mimetype_map", {})
+ self.magic_description_map = self.validation_config.get("magic_description_map", {})
+
+ self.extension = Path(self.file_name).suffix.replace(".", "").lower()
+ if self.extension not in self.allowed_extensions:
+ self._reject(f"File extension '.{self.extension}' is not allowed for this endpoint.")
+
+ if self.extension not in self.magic_mimetype_map and self.extension not in self.magic_description_map:
+ self._reject(f"File extension '.{self.extension}' has no content-type validation configured.")
+
+ def receive_data_chunk(self, raw_data, start):
+ if not self.activated or self._checked:
+ return raw_data
+
+ needed = self.sniff_bytes - len(self._buffer)
+ if needed > 0:
+ self._buffer.extend(raw_data[:needed])
+
+ content_complete = self.content_length is not None and start + len(raw_data) >= self.content_length
+ if len(self._buffer) >= self.sniff_bytes or content_complete:
+ self._validate_magic_mime()
+
+ return raw_data
+
+ def file_complete(self, file_size):
+ # Validate small files that finish before sniff_bytes is reached.
+ if self.activated and not self._checked:
+ self._validate_magic_mime()
+
+ def _validate_magic_mime(self):
+ sample = bytes(self._buffer)
+
+ if self.extension in self.magic_mimetype_map:
+ self.detected_mime = self._detect_mime(sample)
+ if self.detected_mime not in self.magic_mimetype_map[self.extension]:
+ self._reject(
+ f"Detected content type '{self.detected_mime}' does not match extension '.{self.extension}'."
+ )
+
+ if self.extension in self.magic_description_map:
+ self.detected_description = self._detect_description(sample)
+ expected_substrings = self.magic_description_map[self.extension]
+ if not any(token in self.detected_description for token in expected_substrings):
+ self._reject(f"File content does not match expected signature for '.{self.extension}'.")
+
+ self._checked = True
+
+ def _reject(self, reason):
+ """
+ Record the validation failure on the request, then raise StopUpload
+ with connection_reset=False so Django drains the remaining request
+ body. That keeps the TCP connection alive and lets the downstream
+ view return a proper HTTP 400 response instead of an aborted
+ connection.
+ """
+ self.request.upload_validation_error = reason
+ # tell Django's MultiPartParser to stop parsing the multipart body
+ # and drain (exhaust) the remaining data, but not to reset the connection
+ # since we want to return a 400 response with the error message instead of dropping the connection.
+ raise StopUpload(connection_reset=False)
+
+ def _detect_mime(self, sample):
+ return magic.from_buffer(sample, mime=True)
+
+ def _detect_description(self, sample):
+ return magic.from_buffer(sample)
class SizeRestrictedFileUploadHandler(FileUploadHandler):
diff --git a/geonode/upload/validation/__init__.py b/geonode/upload/validation/__init__.py
new file mode 100644
index 00000000000..a25412ca2ed
--- /dev/null
+++ b/geonode/upload/validation/__init__.py
@@ -0,0 +1,27 @@
+#########################################################################
+#
+# Copyright (C) 2026 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+
+from geonode.upload.validation.base import ValidationConfigProvider
+from geonode.upload.validation.registry import FileValidationConfigRegistry
+
+
+__all__ = [
+ "FileValidationConfigRegistry",
+ "ValidationConfigProvider",
+]
diff --git a/geonode/upload/validation/base.py b/geonode/upload/validation/base.py
new file mode 100644
index 00000000000..7417594b7e5
--- /dev/null
+++ b/geonode/upload/validation/base.py
@@ -0,0 +1,49 @@
+#########################################################################
+#
+# Copyright (C) 2026 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+
+from abc import ABC, abstractmethod
+from typing import Iterable, Mapping
+
+
+class ValidationConfigProvider(ABC):
+ """
+ Source of FileValidationUploadHandler config for one or more URL names.
+
+ Implementations declare which URL names they cover (``url_names``) and
+ return the merged validation config in ``build_config``. The dict shape
+ matches what FileValidationUploadHandler expects:
+
+ {
+ "allowed_extensions": frozenset[str],
+ "magic_mimetype_map": dict[str, frozenset[str]],
+ "magic_description_map": dict[str, frozenset[str]], # optional
+ }
+
+ Either ``magic_mimetype_map`` or ``magic_description_map`` (or both) must
+ cover every extension listed in ``allowed_extensions``; otherwise the
+ upload handler will reject any upload of that type as "unconfigured".
+ """
+
+ @abstractmethod
+ def url_names(self) -> Iterable[str]:
+ """URL names (i.e. ``request.resolver_match.url_name`` values) this provider serves."""
+
+ @abstractmethod
+ def build_config(self) -> Mapping:
+ """Return the validation config dict for the URL names above."""
diff --git a/geonode/upload/validation/datasets.py b/geonode/upload/validation/datasets.py
new file mode 100644
index 00000000000..ecc6d769b41
--- /dev/null
+++ b/geonode/upload/validation/datasets.py
@@ -0,0 +1,41 @@
+#########################################################################
+#
+# Copyright (C) 2026 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+
+from geonode.upload.handlers.base import BaseHandler
+from geonode.upload.validation.base import ValidationConfigProvider
+
+
+class DatasetFileValidationConfigProvider(ValidationConfigProvider):
+ """
+ File validation config for the dataset importer endpoint (URL name
+ "importer_upload" / permission base.add_resourcebase).
+
+ The merged config is built incrementally as handlers register: each
+ handler's ``upload_validation_config`` is folded into
+ ``BaseHandler.UPLOAD_VALIDATION_CONFIG`` from ``BaseHandler.register()``.
+ A handler that does not declare the property contributes nothing, which
+ means disabling a handler (e.g. via a feature flag) automatically keeps
+ its file types out of the allowed set.
+ """
+
+ def url_names(self):
+ return ("importer_upload",)
+
+ def build_config(self):
+ return BaseHandler.UPLOAD_VALIDATION_CONFIG
diff --git a/geonode/upload/validation/registry.py b/geonode/upload/validation/registry.py
new file mode 100644
index 00000000000..937466e780d
--- /dev/null
+++ b/geonode/upload/validation/registry.py
@@ -0,0 +1,110 @@
+#########################################################################
+#
+# Copyright (C) 2026 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+
+"""
+Process-wide registry of upload validation configurations.
+
+Configs are produced by ValidationConfigProvider classes listed in
+settings.FILE_VALIDATION_CONFIGURATION_PROVIDERS. They are merged once at
+app-ready time (see geonode.upload.handlers.apps.run_setup_hooks) and frozen
+into an immutable mapping that FileValidationUploadHandler reads on every
+request.
+
+Tests that swap providers should call ``FileValidationConfigRegistry.rebuild()``
+explicitly; we deliberately do not subscribe to setting_changed signals to
+keep the runtime model simple.
+
+The class mirrors the BaseHandler pattern in geonode.upload.handlers.base:
+class-level state, classmethods for mutation/lookup, no instances.
+"""
+
+import logging
+from types import MappingProxyType
+from typing import Iterable, Mapping, Optional
+
+from django.utils.module_loading import import_string
+
+
+logger = logging.getLogger(__name__)
+
+
+class FileValidationConfigRegistry:
+ """
+ Singleton-style registry of FileValidationUploadHandler configs.
+
+ Mirrors ``geonode.upload.handlers.base.BaseHandler``: class-level state,
+ classmethods for installation and lookup, no instantiation.
+ """
+
+ REGISTRY: Mapping[str, Mapping] = MappingProxyType({})
+
+ @classmethod
+ def install(cls, configs: Mapping[str, Mapping]) -> None:
+ """
+ Replace the registry with a frozen copy of ``configs``. Called once
+ from HandlersConfig.ready() after all providers have run.
+ """
+ cls.REGISTRY = MappingProxyType({url_name: MappingProxyType(dict(cfg)) for url_name, cfg in configs.items()})
+ logger.info(
+ "FileValidation registry installed for URL names: %s",
+ ", ".join(sorted(cls.REGISTRY)) or "",
+ )
+
+ @classmethod
+ def get(cls, url_name: Optional[str]) -> Optional[Mapping]:
+ """Return the validation config for ``url_name`` or None."""
+ if not url_name:
+ return None
+ return cls.REGISTRY.get(url_name)
+
+ @classmethod
+ def rebuild(cls, provider_paths: Optional[Iterable[str]] = None) -> None:
+ """
+ Re-run providers and refresh the registry.
+
+ Pass ``provider_paths`` to use a specific list (handy for tests).
+ When omitted the function reads
+ ``settings.FILE_VALIDATION_CONFIGURATION_PROVIDERS``.
+ """
+ from django.conf import settings
+
+ if provider_paths is None:
+ provider_paths = getattr(settings, "FILE_VALIDATION_CONFIGURATION_PROVIDERS", ())
+
+ merged: dict = {}
+ for path in provider_paths:
+ provider_cls = import_string(path)
+ provider = provider_cls()
+ cfg = provider.build_config()
+ if not cfg:
+ continue
+ for url_name in provider.url_names():
+ if url_name in merged:
+ logger.warning(
+ "FileValidation provider %s overrides existing config for %r",
+ path,
+ url_name,
+ )
+ merged[url_name] = cfg
+ cls.install(merged)
+
+ @classmethod
+ def clear(cls) -> None:
+ """Reset the registry to empty. Test helper."""
+ cls.install({})
diff --git a/geonode/upload/zip_validation.py b/geonode/upload/zip_validation.py
new file mode 100644
index 00000000000..9ae61f4c9a9
--- /dev/null
+++ b/geonode/upload/zip_validation.py
@@ -0,0 +1,138 @@
+#########################################################################
+#
+# Copyright (C) 2026 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+
+"""
+Safety checks for uploaded zip archives (zip, kmz, xlsx, ...).
+
+The upload handler's libmagic check rejects PE-prefixed self-extracting
+archives, but legitimate zip archives can still carry path-traversal entries,
+symlinks, or zip-bomb compression ratios. This module inspects the central
+directory without extracting anything, and raises ZipValidationError on any
+hostile structure.
+"""
+
+import os
+import posixpath
+import stat
+import zipfile
+
+
+MAX_ENTRIES = 10_000
+MAX_TOTAL_UNCOMPRESSED = 2 * 1024 * 1024 * 1024 # 2 GiB
+MAX_COMPRESSION_RATIO = 100
+MIN_COMPRESSED_FOR_RATIO_CHECK = 1024 # bytes; ratios are meaningless below this
+# Cumulative ratio across the whole archive. Per-entry ratio is skipped for
+# entries below MIN_COMPRESSED_FOR_RATIO_CHECK, so a bomb made of many tiny
+# high-ratio entries would slip past unless the totals are also checked.
+MAX_TOTAL_COMPRESSION_RATIO = 100
+MIN_TOTAL_COMPRESSED_FOR_RATIO_CHECK = 8 * 1024 # 8 KiB
+
+
+class ZipValidationError(Exception):
+ """Raised when an uploaded archive fails a safety check."""
+
+
+def validate_safe_zip(source):
+ """
+ Inspect an uploaded zip archive.
+
+ source: filesystem path (str / os.PathLike) or a file-like object opened
+ in binary mode and seekable.
+
+ Raises ZipValidationError on any unsafe entry. Returns None on success.
+ """
+ try:
+ with zipfile.ZipFile(source) as zf:
+ infos = zf.infolist()
+ except zipfile.BadZipFile as exc:
+ raise ZipValidationError(f"Not a valid zip archive: {exc}") from exc
+
+ if len(infos) > MAX_ENTRIES:
+ raise ZipValidationError(f"Archive has {len(infos)} entries (max {MAX_ENTRIES}).")
+
+ total_uncompressed = 0
+ total_compressed = 0
+ for info in infos:
+ _check_entry_name(info.filename)
+ _check_not_symlink(info)
+ total_uncompressed += info.file_size
+ total_compressed += info.compress_size
+ if total_uncompressed > MAX_TOTAL_UNCOMPRESSED:
+ raise ZipValidationError(f"Archive decompresses to more than {MAX_TOTAL_UNCOMPRESSED} bytes.")
+ _check_compression_ratio(info)
+
+ _check_cumulative_ratio(total_compressed, total_uncompressed)
+
+
+def _check_entry_name(name):
+ if not name:
+ raise ZipValidationError("Archive contains an entry with an empty name.")
+ if "\x00" in name:
+ raise ZipValidationError("Archive contains an entry name with a NUL byte.")
+ # Reject absolute paths (POSIX and Windows) and drive letters.
+ if name.startswith(("/", "\\")) or (len(name) >= 2 and name[1] == ":"):
+ raise ZipValidationError(f"Absolute path in archive entry: {name!r}.")
+ # Reject traversal in any segment once we normalise separators.
+ normalised = name.replace("\\", "/")
+ parts = normalised.split("/")
+ if any(part == ".." for part in parts):
+ raise ZipValidationError(f"Path traversal in archive entry: {name!r}.")
+ # Belt-and-braces: after posix normpath, the result must not escape.
+ resolved = posixpath.normpath(normalised)
+ if resolved.startswith("../") or resolved == "..":
+ raise ZipValidationError(f"Path traversal in archive entry: {name!r}.")
+
+
+def _check_not_symlink(info):
+ # Zip stores Unix mode in the top 16 bits of external_attr.
+ unix_mode = info.external_attr >> 16
+ if unix_mode and stat.S_ISLNK(unix_mode):
+ raise ZipValidationError(f"Archive contains a symlink entry: {info.filename!r}.")
+
+
+def _check_compression_ratio(info):
+ if info.compress_size < MIN_COMPRESSED_FOR_RATIO_CHECK:
+ return
+ if info.file_size == 0:
+ return
+ ratio = info.file_size / max(info.compress_size, 1)
+ if ratio > MAX_COMPRESSION_RATIO:
+ raise ZipValidationError(
+ f"Entry {info.filename!r} has a suspicious compression ratio " f"({ratio:.0f}x); possible zip bomb."
+ )
+
+
+def _check_cumulative_ratio(total_compressed, total_uncompressed):
+ if total_compressed < MIN_TOTAL_COMPRESSED_FOR_RATIO_CHECK:
+ return
+ if total_uncompressed == 0:
+ return
+ ratio = total_uncompressed / max(total_compressed, 1)
+ if ratio > MAX_TOTAL_COMPRESSION_RATIO:
+ raise ZipValidationError(
+ f"Archive has a suspicious cumulative compression ratio " f"({ratio:.0f}x); possible zip bomb."
+ )
+
+
+def is_zip_extension(filename):
+ """Return True if the filename suggests a zip-based upload we should inspect."""
+ if not filename:
+ return False
+ ext = os.path.splitext(filename)[1].lower().lstrip(".")
+ return ext in {"zip", "kmz", "xlsx"}
diff --git a/geonode/utils.py b/geonode/utils.py
index b4d2d6e4b50..cc9c494143f 100755
--- a/geonode/utils.py
+++ b/geonode/utils.py
@@ -1782,6 +1782,10 @@ def get_allowed_extensions():
def is_safe_url(url: str) -> bool:
+ # Required to let tests disable the safe url check via settings orerride
+ if not getattr(settings, "SAFE_URL_CHECK_ENABLED", True):
+ return True
+
def _is_ip_allowed(ip: str) -> bool:
obj = ipaddress.ip_address(ip.split("%")[0])
return not (obj.is_loopback or obj.is_private or obj.is_link_local or obj.is_multicast or obj.is_reserved)
diff --git a/requirements.txt b/requirements.txt
index ca9f8088a7f..99336084472 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -150,3 +150,4 @@ webdriver_manager==4.0.1
# Security and audit
cryptography==47.0.0
jwcrypto>=1.5.6
+python-magic==0.4.27
diff --git a/setup.cfg b/setup.cfg
index 73d0eb6b6b2..b72c95d5c48 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -176,6 +176,7 @@ install_requires =
# Security and audit
cryptography==47.0.0
jwcrypto>=1.5.6
+ python-magic==0.4.27
[options.packages.find]