diff --git a/app/src/fileflash/core/deps.py b/app/src/fileflash/core/deps.py
index 5f415bc..6b486ac 100644
--- a/app/src/fileflash/core/deps.py
+++ b/app/src/fileflash/core/deps.py
@@ -143,8 +143,9 @@ def get_admin_storage_service(
def get_admin_files_service(
db: AsyncSession = Depends(get_db),
event_publisher: InProcessAuthEventPublisher = Depends(get_event_publisher),
+ storage: MinioObjectStorageClient = Depends(get_object_storage),
) -> AdminFilesService:
- return AdminFilesService(db=db, publisher=event_publisher)
+ return AdminFilesService(db=db, publisher=event_publisher, storage=storage)
def get_admin_moderation_service(
diff --git a/app/src/fileflash/core/security.py b/app/src/fileflash/core/security.py
index f2b3c45..c932376 100644
--- a/app/src/fileflash/core/security.py
+++ b/app/src/fileflash/core/security.py
@@ -99,3 +99,32 @@ def decode_file_preview_token(token: str, settings: Settings) -> dict[str, Any]:
if token_type != "file_preview" or scope != "file.preview":
raise jwt.InvalidTokenError("Invalid token type")
return payload
+
+
+def create_admin_file_preview_token(
+ *,
+ admin_user_id: int,
+ file_id: int,
+ settings: Settings,
+ expires_at: datetime,
+) -> str:
+ now = datetime.now(UTC)
+ payload: dict[str, Any] = {
+ "sub": str(admin_user_id),
+ "typ": "admin_file_preview",
+ "scope": "admin.file.preview",
+ "fileId": str(file_id),
+ "iat": int(now.timestamp()),
+ "exp": int(expires_at.timestamp()),
+ "jti": str(uuid.uuid4()),
+ }
+ return jwt.encode(payload, settings.jwt_secret_key, algorithm=settings.jwt_algorithm)
+
+
+def decode_admin_file_preview_token(token: str, settings: Settings) -> dict[str, Any]:
+ payload = jwt.decode(token, settings.jwt_secret_key, algorithms=[settings.jwt_algorithm])
+ token_type = payload.get("typ")
+ scope = payload.get("scope")
+ if token_type != "admin_file_preview" or scope != "admin.file.preview":
+ raise jwt.InvalidTokenError("Invalid token type")
+ return payload
diff --git a/app/src/fileflash/exceptions/__init__.py b/app/src/fileflash/exceptions/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/app/src/fileflash/routers/admin_files.py b/app/src/fileflash/routers/admin_files.py
index 55069c7..024a680 100644
--- a/app/src/fileflash/routers/admin_files.py
+++ b/app/src/fileflash/routers/admin_files.py
@@ -1,11 +1,19 @@
from __future__ import annotations
-from fastapi import APIRouter, Depends
+from datetime import UTC, datetime, timedelta
+from urllib.parse import urlencode
-from ..core.deps import get_admin_files_service, require_admin
-from ..core.errors import api_success
+from fastapi import APIRouter, Depends, Header, Query, Request
+from fastapi.responses import StreamingResponse
+from jwt import InvalidTokenError
+
+from ..core.deps import get_admin_files_service, get_settings_dep, require_admin
+from ..core.errors import ApiError, api_success
+from ..core.security import create_admin_file_preview_token, decode_admin_file_preview_token
+from ..core.settings import Settings
from ..models.tables_identity import User
from ..schemas.admin.files import ListAdminFilesQuery
+from ..schemas.file import FilePreviewUrlResponse
from ..services.admin.files import AdminFilesService
router = APIRouter(prefix="/admin/files", tags=["admin"])
@@ -21,6 +29,82 @@ async def list_admin_files(
return api_success(data=data.model_dump(by_alias=True), message="Files fetched")
+@router.get("/{file_id}")
+async def get_admin_file_detail(
+ file_id: int,
+ _: User = Depends(require_admin),
+ service: AdminFilesService = Depends(get_admin_files_service),
+):
+ result = await service.get_file_detail(file_id=file_id)
+ return api_success(data=result.model_dump(by_alias=True), message="File audit detail fetched")
+
+
+@router.get("/{file_id}/preview")
+async def preview_admin_file(
+ file_id: int,
+ range_header: str | None = Header(default=None, alias="Range"),
+ _: User = Depends(require_admin),
+ service: AdminFilesService = Depends(get_admin_files_service),
+):
+ result = await service.get_preview_stream(file_id=file_id, range_header=range_header)
+ return StreamingResponse(
+ result.stream,
+ media_type=result.content_type,
+ headers=result.headers,
+ status_code=result.status_code,
+ )
+
+
+@router.post("/{file_id}/preview-url")
+async def create_admin_file_preview_url(
+ file_id: int,
+ request: Request,
+ admin: User = Depends(require_admin),
+ service: AdminFilesService = Depends(get_admin_files_service),
+ settings: Settings = Depends(get_settings_dep),
+):
+ await service.get_file_detail(file_id=file_id)
+ expires_at = datetime.now(UTC) + timedelta(seconds=settings.file_preview_url_ttl_seconds)
+ token = create_admin_file_preview_token(
+ admin_user_id=int(admin.user_id),
+ file_id=file_id,
+ settings=settings,
+ expires_at=expires_at,
+ )
+ stream_url = str(request.url_for("admin_preview_file_stream", file_id=str(file_id)))
+ result = FilePreviewUrlResponse(
+ url=f"{stream_url}?{urlencode({'token': token})}",
+ expires_at=expires_at,
+ )
+ return api_success(data=result.model_dump(by_alias=True), message="Preview URL created")
+
+
+@router.get("/{file_id}/preview-stream", name="admin_preview_file_stream")
+async def preview_admin_file_stream(
+ file_id: int,
+ token: str = Query(..., min_length=1),
+ range_header: str | None = Header(default=None, alias="Range"),
+ service: AdminFilesService = Depends(get_admin_files_service),
+ settings: Settings = Depends(get_settings_dep),
+):
+ try:
+ payload = decode_admin_file_preview_token(token, settings)
+ token_file_id = str(payload["fileId"])
+ except (InvalidTokenError, KeyError, ValueError):
+ raise ApiError(status_code=401, code=401, message="Invalid or expired preview token") from None
+
+ if token_file_id != str(file_id):
+ raise ApiError(status_code=403, code=403, message="Preview token does not match file")
+
+ result = await service.get_preview_stream(file_id=file_id, range_header=range_header)
+ return StreamingResponse(
+ result.stream,
+ media_type=result.content_type,
+ headers=result.headers,
+ status_code=result.status_code,
+ )
+
+
@router.post("/{file_id}/rescan")
async def rescan_admin_file(
file_id: int,
diff --git a/app/src/fileflash/schemas/__init__.py b/app/src/fileflash/schemas/__init__.py
index 39e1ac4..8fdbb94 100644
--- a/app/src/fileflash/schemas/__init__.py
+++ b/app/src/fileflash/schemas/__init__.py
@@ -19,8 +19,15 @@
UpdateAgentSkillRequest,
)
from .common import ApiResponse, CamelModel, PageQuery, PaginatedData, PaginationMeta
-from .file import (
+from .admin.files import (
+ AdminFileAuditDetail,
AdminFileAuditItem,
+ AdminFileAuditOwner,
+ AdminFileLatestScan,
+ ListAdminFilesQuery,
+ RescanResponse,
+)
+from .file import (
BatchDownloadRequest,
BatchMoveItemResult,
BatchFilesRequest,
@@ -150,7 +157,10 @@
"ActivityItem",
"AddGroupMemberRequest",
"AddGroupMemberResponse",
+ "AdminFileAuditDetail",
"AdminFileAuditItem",
+ "AdminFileAuditOwner",
+ "AdminFileLatestScan",
"ApiResponse",
"BatchFilesRequest",
"BatchFilesResponse",
@@ -200,6 +210,7 @@
"LoginRequest",
"LogItem",
"LogsList",
+ "ListAdminFilesQuery",
"MarkAllAsReadResponse",
"MarkAsReadResponse",
"MergeChunksRequest",
@@ -226,6 +237,7 @@
"RenameFileRequest",
"RenameFolderRequest",
"RescanAdminFileResponse",
+ "RescanResponse",
"ResolveViolationResponse",
"ResetPasswordRequest",
"RestoreRecycleItemRequest",
diff --git a/app/src/fileflash/schemas/admin/files.py b/app/src/fileflash/schemas/admin/files.py
index c54f06c..d96dd74 100644
--- a/app/src/fileflash/schemas/admin/files.py
+++ b/app/src/fileflash/schemas/admin/files.py
@@ -1,15 +1,24 @@
from __future__ import annotations
from datetime import datetime
-from typing import Literal
+from typing import Any, Literal
from ..common import CamelModel, PageQuery
VirusStatus = Literal["clean", "pending", "flagged"]
+class AdminFileLatestScan(CamelModel):
+ scan_type: str
+ scan_result: str
+ virus_status: VirusStatus
+ scanned_at: datetime
+ details: dict[str, Any] | None = None
+
+
class AdminFileAuditItem(CamelModel):
id: str
+ object_id: str
name: str
size: int
mime_type: str
@@ -17,10 +26,30 @@ class AdminFileAuditItem(CamelModel):
virus_status: VirusStatus
is_shared: bool
owner_name: str
+ upload_count: int
+ owner_count: int
+ scanned_at: datetime | None = None
updated_at: datetime
created_at: datetime
+class AdminFileAuditOwner(CamelModel):
+ user_id: str
+ username: str
+ email: str
+ file_count: int
+ first_uploaded_at: datetime
+ last_uploaded_at: datetime
+
+
+class AdminFileAuditDetail(AdminFileAuditItem):
+ object_hash: str | None = None
+ hash_algorithm: str
+ storage_status: str
+ latest_scan: AdminFileLatestScan | None = None
+ owners: list[AdminFileAuditOwner]
+
+
class ListAdminFilesQuery(PageQuery):
search: str | None = None
virus_status: VirusStatus | None = None
@@ -36,4 +65,12 @@ class RescanResponse(CamelModel):
scanned_at: datetime
-__all__ = ["AdminFileAuditItem", "ListAdminFilesQuery", "RescanResponse", "VirusStatus"]
+__all__ = [
+ "AdminFileAuditDetail",
+ "AdminFileAuditItem",
+ "AdminFileAuditOwner",
+ "AdminFileLatestScan",
+ "ListAdminFilesQuery",
+ "RescanResponse",
+ "VirusStatus",
+]
diff --git a/app/src/fileflash/services/admin/files.py b/app/src/fileflash/services/admin/files.py
index a06eee5..a818da5 100644
--- a/app/src/fileflash/services/admin/files.py
+++ b/app/src/fileflash/services/admin/files.py
@@ -1,17 +1,31 @@
from __future__ import annotations
+from collections.abc import AsyncIterator
+from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any, Protocol
-from sqlalchemy import and_, func, select
+from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from ...core.errors import ApiError
-from ...models.enums import FileStatus, ScanResult
+from ...core.http_headers import build_content_disposition
+from ...core.mime import DEFAULT_MIME_TYPE, resolve_file_mime_type
+from ...models.enums import FileStatus, ScanResult, ShareStatus, UploadStatus
+from ...models.tables_access_share import Share
from ...models.tables_audit_security import ObjectScanResult
from ...models.tables_identity import User
-from ...models.tables_storage import File, StorageObject
-from ...schemas.admin.files import AdminFileAuditItem, ListAdminFilesQuery, RescanResponse, VirusStatus
+from ...models.tables_storage import File, FileMediaMetadata, StorageObject
+from ...s3.minio_client import MinioObjectStorageClient
+from ...schemas.admin.files import (
+ AdminFileAuditDetail,
+ AdminFileAuditItem,
+ AdminFileAuditOwner,
+ AdminFileLatestScan,
+ ListAdminFilesQuery,
+ RescanResponse,
+ VirusStatus,
+)
from ...schemas.common import PaginatedData, PaginationMeta
@@ -19,6 +33,21 @@ class EventPublisherProtocol(Protocol):
async def publish(self, event_name: str, payload: dict[str, Any]) -> None: ...
+@dataclass(slots=True)
+class AdminFileStreamResult:
+ stream: AsyncIterator[bytes]
+ filename: str
+ content_type: str
+ status_code: int
+ headers: dict[str, str]
+
+
+@dataclass(slots=True)
+class ResolvedAdminStreamObject:
+ storage_object: StorageObject
+ content_type_override: str | None = None
+
+
_VIRUS_STATUS_MAP: dict[ScanResult, VirusStatus] = {
ScanResult.CLEAN: "clean",
ScanResult.PENDING: "pending",
@@ -27,22 +56,39 @@ async def publish(self, event_name: str, payload: dict[str, Any]) -> None: ...
ScanResult.FAILED: "pending",
}
+TRANSCODE_READY_STATUS = "ready"
+
class AdminFilesService:
- def __init__(self, db: AsyncSession, publisher: EventPublisherProtocol) -> None:
+ def __init__(
+ self,
+ db: AsyncSession,
+ publisher: EventPublisherProtocol,
+ storage: MinioObjectStorageClient | None = None,
+ ) -> None:
self.db = db
self.publisher = publisher
+ self.storage = storage
async def list_files(self, *, query: ListAdminFilesQuery) -> PaginatedData[AdminFileAuditItem]:
- latest_scan = (
- select(ObjectScanResult.object_id, func.max(ObjectScanResult.scanned_at).label("scanned_at"))
- .group_by(ObjectScanResult.object_id)
- .subquery()
- )
+ latest_scan = self._latest_scan_subquery()
+ object_stats = self._object_stats_subquery()
+ share_stats = self._share_stats_subquery()
+
statement = (
- select(File, StorageObject, User, ObjectScanResult)
+ select(
+ File,
+ StorageObject,
+ User,
+ ObjectScanResult,
+ object_stats.c.upload_count,
+ object_stats.c.owner_count,
+ share_stats.c.share_count,
+ )
.join(StorageObject, File.storage_object_id == StorageObject.object_id)
.join(User, File.owner_id == User.user_id)
+ .join(object_stats, object_stats.c.object_id == StorageObject.object_id, isouter=True)
+ .join(share_stats, share_stats.c.object_id == StorageObject.object_id, isouter=True)
.join(latest_scan, latest_scan.c.object_id == StorageObject.object_id, isouter=True)
.join(
ObjectScanResult,
@@ -67,6 +113,8 @@ async def list_files(self, *, query: ListAdminFilesQuery) -> PaginatedData[Admin
wanted = [raw for raw, mapped in _VIRUS_STATUS_MAP.items() if mapped == query.virus_status]
statement = statement.where(ObjectScanResult.result.in_(wanted))
+ total = int(await self.db.scalar(select(func.count()).select_from(statement.subquery())) or 0)
+
sort_column = {
"name": File.file_name,
"size": File.file_size,
@@ -75,11 +123,29 @@ async def list_files(self, *, query: ListAdminFilesQuery) -> PaginatedData[Admin
}[query.sort]
statement = statement.order_by(sort_column.desc() if query.order == "desc" else sort_column.asc())
- total = int(await self.db.scalar(select(func.count()).select_from(statement.subquery())) or 0)
total_pages = max(1, -(-total // query.per_page))
offset = (query.page - 1) * query.per_page
rows = (await self.db.execute(statement.offset(offset).limit(query.per_page))).all()
- items = [self._to_item(file_row, object_row, owner_row, scan_row) for file_row, object_row, owner_row, scan_row in rows]
+ items = [
+ self._to_item(
+ file_row,
+ object_row,
+ owner_row,
+ scan_row,
+ upload_count=upload_count,
+ owner_count=owner_count,
+ share_count=share_count,
+ )
+ for (
+ file_row,
+ object_row,
+ owner_row,
+ scan_row,
+ upload_count,
+ owner_count,
+ share_count,
+ ) in rows
+ ]
return PaginatedData(
items=items,
pagination=PaginationMeta(
@@ -92,6 +158,101 @@ async def list_files(self, *, query: ListAdminFilesQuery) -> PaginatedData[Admin
),
)
+ async def get_file_detail(self, *, file_id: int) -> AdminFileAuditDetail:
+ file_row, object_row, owner_row, scan_row = await self._get_active_file_context(file_id=file_id)
+ owners = await self._load_object_owners(object_id=int(object_row.object_id))
+ upload_count = sum(owner.file_count for owner in owners)
+ owner_count = len(owners)
+ is_shared = await self._object_is_shared(object_id=int(object_row.object_id))
+
+ item = self._to_item(
+ file_row,
+ object_row,
+ owner_row,
+ scan_row,
+ upload_count=upload_count,
+ owner_count=owner_count,
+ share_count=1 if is_shared else 0,
+ )
+ return AdminFileAuditDetail(
+ **item.model_dump(),
+ object_hash=object_row.object_hash,
+ hash_algorithm=object_row.hash_algorithm,
+ storage_status=self._enum_value(object_row.upload_status),
+ latest_scan=self._to_latest_scan(scan_row),
+ owners=owners,
+ )
+
+ async def get_preview_stream(
+ self,
+ *,
+ file_id: int,
+ range_header: str | None,
+ ) -> AdminFileStreamResult:
+ if self.storage is None:
+ raise ApiError(status_code=503, code=503, message="Object storage is unavailable")
+
+ file_row, object_row, _owner_row, _scan_row = await self._get_active_file_context(file_id=file_id)
+ resolved_object = await self._resolve_stream_storage_object(
+ file_row=file_row,
+ source_object=object_row,
+ prefer_optimized=True,
+ )
+ storage_object = resolved_object.storage_object if resolved_object is not None else None
+ if storage_object is None or not self._is_upload_status_active(storage_object.upload_status):
+ raise ApiError(status_code=404, code=404, message="File content not found")
+
+ object_size = int(storage_object.object_size or file_row.file_size or 0)
+ if object_size <= 0:
+ raise ApiError(status_code=404, code=404, message="File content not found")
+
+ content_type = resolve_file_mime_type(
+ mime_type=(
+ resolved_object.content_type_override
+ if resolved_object is not None and resolved_object.content_type_override
+ else file_row.mime_type or storage_object.content_type
+ ),
+ file_ext=file_row.file_ext,
+ file_name=file_row.file_name,
+ default=DEFAULT_MIME_TYPE,
+ )
+ headers = {
+ "Accept-Ranges": "bytes",
+ "Content-Disposition": build_content_disposition(file_row.file_name, disposition="inline"),
+ }
+
+ byte_range = self._parse_range_header(range_header=range_header, file_size=object_size)
+ if byte_range is None:
+ headers["Content-Length"] = str(object_size)
+ stream = self.storage.iter_object(
+ bucket_name=storage_object.bucket_name,
+ object_key=storage_object.object_key,
+ )
+ return AdminFileStreamResult(
+ stream=stream,
+ filename=file_row.file_name,
+ content_type=content_type,
+ status_code=200,
+ headers=headers,
+ )
+
+ start, end = byte_range
+ headers["Content-Length"] = str(end - start + 1)
+ headers["Content-Range"] = f"bytes {start}-{end}/{object_size}"
+ stream = self.storage.iter_object_range(
+ bucket_name=storage_object.bucket_name,
+ object_key=storage_object.object_key,
+ start=start,
+ end=end,
+ )
+ return AdminFileStreamResult(
+ stream=stream,
+ filename=file_row.file_name,
+ content_type=content_type,
+ status_code=206,
+ headers=headers,
+ )
+
async def request_rescan(self, *, file_id: int, requested_by: int) -> RescanResponse:
file_row = await self.db.get(File, file_id)
if file_row is None or file_row.deleted_at is not None or file_row.status != FileStatus.ACTIVE:
@@ -120,25 +281,283 @@ async def request_rescan(self, *, file_id: int, requested_by: int) -> RescanResp
)
return RescanResponse(file_id=str(file_id), virus_status="pending", scanned_at=now)
+ async def _get_active_file_context(
+ self,
+ *,
+ file_id: int,
+ ) -> tuple[File, StorageObject, User, ObjectScanResult | None]:
+ latest_scan = self._latest_scan_subquery()
+ row = (
+ await self.db.execute(
+ select(File, StorageObject, User, ObjectScanResult)
+ .join(StorageObject, File.storage_object_id == StorageObject.object_id)
+ .join(User, File.owner_id == User.user_id)
+ .join(latest_scan, latest_scan.c.object_id == StorageObject.object_id, isouter=True)
+ .join(
+ ObjectScanResult,
+ and_(
+ ObjectScanResult.object_id == latest_scan.c.object_id,
+ ObjectScanResult.scanned_at == latest_scan.c.scanned_at,
+ ),
+ isouter=True,
+ )
+ .where(File.file_id == file_id)
+ .where(File.status == FileStatus.ACTIVE)
+ .where(File.deleted_at.is_(None))
+ )
+ ).first()
+ if row is None:
+ raise ApiError(status_code=404, code=404, message="File not found")
+ file_row, object_row, owner_row, scan_row = row
+ return file_row, object_row, owner_row, scan_row
+
+ async def _load_object_owners(self, *, object_id: int) -> list[AdminFileAuditOwner]:
+ first_uploaded = func.min(File.created_at)
+ last_uploaded = func.max(File.created_at)
+ rows = (
+ await self.db.execute(
+ select(
+ User.user_id,
+ User.username,
+ User.email,
+ func.count(File.file_id),
+ first_uploaded,
+ last_uploaded,
+ )
+ .join(User, User.user_id == File.owner_id)
+ .where(File.storage_object_id == object_id)
+ .where(File.status == FileStatus.ACTIVE)
+ .where(File.deleted_at.is_(None))
+ .group_by(User.user_id, User.username, User.email)
+ .order_by(last_uploaded.desc())
+ )
+ ).all()
+ return [
+ AdminFileAuditOwner(
+ user_id=str(user_id),
+ username=username,
+ email=email,
+ file_count=int(file_count or 0),
+ first_uploaded_at=first_uploaded_at,
+ last_uploaded_at=last_uploaded_at,
+ )
+ for user_id, username, email, file_count, first_uploaded_at, last_uploaded_at in rows
+ ]
+
+ async def _object_is_shared(self, *, object_id: int) -> bool:
+ now = datetime.now(UTC)
+ share_id = await self.db.scalar(
+ select(Share.share_id)
+ .join(File, File.file_id == Share.file_id)
+ .where(File.storage_object_id == object_id)
+ .where(File.status == FileStatus.ACTIVE)
+ .where(File.deleted_at.is_(None))
+ .where(Share.status == ShareStatus.ACTIVE)
+ .where(or_(Share.expire_time.is_(None), Share.expire_time > now))
+ .limit(1)
+ )
+ return share_id is not None
+
+ async def _resolve_stream_storage_object(
+ self,
+ *,
+ file_row: File,
+ source_object: StorageObject,
+ prefer_optimized: bool,
+ ) -> ResolvedAdminStreamObject | None:
+ if not prefer_optimized:
+ return ResolvedAdminStreamObject(storage_object=source_object)
+
+ metadata_row = await self.db.scalar(
+ select(FileMediaMetadata)
+ .where(FileMediaMetadata.source_object_id == int(file_row.storage_object_id))
+ .limit(1)
+ )
+ if not isinstance(metadata_row, FileMediaMetadata):
+ return ResolvedAdminStreamObject(storage_object=source_object)
+ transcode = (metadata_row.extra_metadata or {}).get("transcode")
+ if not isinstance(transcode, dict):
+ return ResolvedAdminStreamObject(storage_object=source_object)
+ if str(transcode.get("status") or "").strip().lower() != TRANSCODE_READY_STATUS:
+ return ResolvedAdminStreamObject(storage_object=source_object)
+
+ bucket_name = str(transcode.get("optimizedBucketName") or "").strip()
+ object_key = str(transcode.get("optimizedObjectKey") or "").strip()
+ if not bucket_name or not object_key:
+ return ResolvedAdminStreamObject(storage_object=source_object)
+
+ optimized_mime_type = str(transcode.get("optimizedMimeType") or "").strip() or None
+ optimized_object = await self.db.scalar(
+ select(StorageObject)
+ .where(
+ and_(
+ StorageObject.bucket_name == bucket_name,
+ StorageObject.object_key == object_key,
+ StorageObject.upload_status == UploadStatus.ACTIVE,
+ )
+ )
+ .limit(1)
+ )
+ if isinstance(optimized_object, StorageObject):
+ return ResolvedAdminStreamObject(
+ storage_object=optimized_object,
+ content_type_override=optimized_mime_type or optimized_object.content_type,
+ )
+
+ if self.storage is None:
+ return ResolvedAdminStreamObject(storage_object=source_object)
+ exists = await self.storage.object_exists(bucket_name=bucket_name, object_key=object_key)
+ if not exists:
+ return ResolvedAdminStreamObject(storage_object=source_object)
+
+ stat = await self.storage.stat_object(bucket_name=bucket_name, object_key=object_key)
+ created = StorageObject(
+ bucket_name=bucket_name,
+ object_key=object_key,
+ object_size=int(stat.size),
+ etag=stat.etag,
+ version_id=stat.version_id,
+ content_type=stat.content_type,
+ upload_status=UploadStatus.ACTIVE,
+ )
+ self.db.add(created)
+ await self.db.flush()
+ return ResolvedAdminStreamObject(
+ storage_object=created,
+ content_type_override=optimized_mime_type or created.content_type,
+ )
+
+ @staticmethod
+ def _latest_scan_subquery():
+ return (
+ select(ObjectScanResult.object_id, func.max(ObjectScanResult.scanned_at).label("scanned_at"))
+ .group_by(ObjectScanResult.object_id)
+ .subquery()
+ )
+
+ @staticmethod
+ def _object_stats_subquery():
+ return (
+ select(
+ File.storage_object_id.label("object_id"),
+ func.count(File.file_id).label("upload_count"),
+ func.count(func.distinct(File.owner_id)).label("owner_count"),
+ )
+ .where(File.status == FileStatus.ACTIVE)
+ .where(File.deleted_at.is_(None))
+ .group_by(File.storage_object_id)
+ .subquery()
+ )
+
+ @staticmethod
+ def _share_stats_subquery():
+ now = datetime.now(UTC)
+ return (
+ select(
+ File.storage_object_id.label("object_id"),
+ func.count(Share.share_id).label("share_count"),
+ )
+ .join(Share, Share.file_id == File.file_id)
+ .where(File.status == FileStatus.ACTIVE)
+ .where(File.deleted_at.is_(None))
+ .where(Share.status == ShareStatus.ACTIVE)
+ .where(or_(Share.expire_time.is_(None), Share.expire_time > now))
+ .group_by(File.storage_object_id)
+ .subquery()
+ )
+
@staticmethod
def _to_item(
file_row: File,
object_row: StorageObject,
owner_row: User,
scan_row: ObjectScanResult | None,
+ *,
+ upload_count: int | None,
+ owner_count: int | None,
+ share_count: int | None,
) -> AdminFileAuditItem:
return AdminFileAuditItem(
id=str(file_row.file_id),
+ object_id=str(object_row.object_id),
name=file_row.file_name,
size=int(file_row.file_size),
- mime_type=file_row.mime_type or object_row.content_type or "application/octet-stream",
+ mime_type=file_row.mime_type or object_row.content_type or DEFAULT_MIME_TYPE,
hash=(object_row.object_hash or "")[:16],
virus_status=_VIRUS_STATUS_MAP.get(scan_row.result, "pending") if scan_row else "pending",
- is_shared=False,
+ is_shared=bool(share_count or 0),
owner_name=owner_row.username,
+ upload_count=int(upload_count or 1),
+ owner_count=int(owner_count or 1),
+ scanned_at=scan_row.scanned_at if scan_row else None,
updated_at=file_row.updated_at,
created_at=file_row.created_at,
)
+ @staticmethod
+ def _to_latest_scan(scan_row: ObjectScanResult | None) -> AdminFileLatestScan | None:
+ if scan_row is None:
+ return None
+ result = AdminFilesService._enum_value(scan_row.result)
+ return AdminFileLatestScan(
+ scan_type=scan_row.scan_type,
+ scan_result=result,
+ virus_status=_VIRUS_STATUS_MAP.get(scan_row.result, "pending"),
+ scanned_at=scan_row.scanned_at,
+ details=scan_row.details,
+ )
+
+ @staticmethod
+ def _enum_value(value: object) -> str:
+ return str(getattr(value, "value", value))
+
+ @staticmethod
+ def _is_upload_status_active(value: object) -> bool:
+ return value == UploadStatus.ACTIVE or AdminFilesService._enum_value(value) == UploadStatus.ACTIVE.value
+
+ @staticmethod
+ def _parse_range_header(range_header: str | None, file_size: int) -> tuple[int, int] | None:
+ if not range_header:
+ return None
+
+ value = range_header.strip()
+ if not value.lower().startswith("bytes="):
+ raise ApiError(status_code=416, code=416, message="Invalid Range header")
+
+ spec = value[6:].strip()
+ if "," in spec:
+ raise ApiError(status_code=416, code=416, message="Multiple ranges are not supported")
+
+ if spec.startswith("-"):
+ suffix_part = spec[1:].strip()
+ if not suffix_part.isdigit():
+ raise ApiError(status_code=416, code=416, message="Invalid Range header")
+ suffix = int(suffix_part)
+ if suffix <= 0:
+ raise ApiError(status_code=416, code=416, message="Invalid Range header")
+ start = max(file_size - suffix, 0)
+ end = file_size - 1
+ return start, end
+
+ if "-" not in spec:
+ raise ApiError(status_code=416, code=416, message="Invalid Range header")
+
+ start_part, end_part = spec.split("-", 1)
+ if not start_part.strip().isdigit():
+ raise ApiError(status_code=416, code=416, message="Invalid Range header")
+ start = int(start_part.strip())
+ end = file_size - 1
+ if end_part.strip():
+ if not end_part.strip().isdigit():
+ raise ApiError(status_code=416, code=416, message="Invalid Range header")
+ end = int(end_part.strip())
+
+ if start < 0 or start >= file_size or end < start:
+ raise ApiError(status_code=416, code=416, message="Requested range is not satisfiable")
+
+ if end >= file_size:
+ end = file_size - 1
+ return start, end
+
-__all__ = ["AdminFilesService"]
+__all__ = ["AdminFilesService", "AdminFileStreamResult"]
diff --git a/app/tests/test_admin_files_routes.py b/app/tests/test_admin_files_routes.py
index be04461..3b91678 100644
--- a/app/tests/test_admin_files_routes.py
+++ b/app/tests/test_admin_files_routes.py
@@ -1,15 +1,19 @@
from __future__ import annotations
-from datetime import UTC, datetime
+from collections.abc import AsyncIterator
+from datetime import UTC, datetime, timedelta
from types import SimpleNamespace
from fastapi import FastAPI
from fastapi.testclient import TestClient
-from fileflash.core.deps import get_admin_files_service, require_admin
+from fileflash.core.deps import get_admin_files_service, get_settings_dep, require_admin
from fileflash.core.errors import ApiError, api_error_handler
+from fileflash.core.security import create_admin_file_preview_token
+from fileflash.core.settings import Settings
from fileflash.routers.admin_files import router as admin_router
from fileflash.schemas.admin.files import RescanResponse
+from fileflash.services.admin.files import AdminFileStreamResult
class StubService:
@@ -28,6 +32,76 @@ async def list_files(self, *, query): # noqa: ANN001
}
)
+ async def get_file_detail(self, *, file_id): # noqa: ANN001
+ return SimpleNamespace(
+ model_dump=lambda **_: {
+ "id": str(file_id),
+ "objectId": "42",
+ "name": "demo.txt",
+ "size": 12,
+ "mimeType": "text/plain",
+ "hash": "abc123",
+ "virusStatus": "clean",
+ "isShared": False,
+ "ownerName": "owner",
+ "uploadCount": 2,
+ "ownerCount": 1,
+ "scannedAt": "2026-01-01T00:00:00Z",
+ "updatedAt": "2026-01-01T00:00:00Z",
+ "createdAt": "2026-01-01T00:00:00Z",
+ "objectHash": "abc123full",
+ "hashAlgorithm": "sha256",
+ "storageStatus": "active",
+ "latestScan": {
+ "scanType": "virus",
+ "scanResult": "clean",
+ "virusStatus": "clean",
+ "scannedAt": "2026-01-01T00:00:00Z",
+ "details": {},
+ },
+ "owners": [
+ {
+ "userId": "1",
+ "username": "owner",
+ "email": "owner@example.com",
+ "fileCount": 2,
+ "firstUploadedAt": "2026-01-01T00:00:00Z",
+ "lastUploadedAt": "2026-01-01T00:00:00Z",
+ }
+ ],
+ }
+ )
+
+ async def get_preview_stream(self, *, file_id, range_header): # noqa: ANN001
+ _ = file_id
+
+ async def _stream(content: bytes) -> AsyncIterator[bytes]:
+ yield content
+
+ headers = {
+ "Accept-Ranges": "bytes",
+ "Content-Disposition": 'inline; filename="demo.txt"',
+ }
+ if range_header:
+ headers["Content-Length"] = "4"
+ headers["Content-Range"] = "bytes 0-3/12"
+ return AdminFileStreamResult(
+ stream=_stream(b"prev"),
+ filename="demo.txt",
+ content_type="text/plain",
+ status_code=206,
+ headers=headers,
+ )
+
+ headers["Content-Length"] = "12"
+ return AdminFileStreamResult(
+ stream=_stream(b"preview-bytes"),
+ filename="demo.txt",
+ content_type="text/plain",
+ status_code=200,
+ headers=headers,
+ )
+
async def request_rescan(self, *, file_id, requested_by): # noqa: ANN001
_ = requested_by
return RescanResponse(
@@ -37,12 +111,24 @@ async def request_rescan(self, *, file_id, requested_by): # noqa: ANN001
)
-def _client() -> TestClient:
+def _client(*, admin: bool = True, settings: Settings | None = None) -> TestClient:
app = FastAPI()
app.add_exception_handler(ApiError, api_error_handler)
app.include_router(admin_router, prefix="/api/v1")
app.dependency_overrides[get_admin_files_service] = lambda: StubService()
- app.dependency_overrides[require_admin] = lambda: SimpleNamespace(user_id=99)
+
+ def _admin_override():
+ if not admin:
+ raise ApiError(status_code=403, code=403, message="Admin access required")
+ return SimpleNamespace(user_id=99)
+
+ resolved_settings = settings or Settings(
+ JWT_SECRET_KEY="unit-test-secret-key-1234567890abcd",
+ FF_DB_URI="postgresql://u:p@localhost:5432/db",
+ )
+
+ app.dependency_overrides[require_admin] = _admin_override
+ app.dependency_overrides[get_settings_dep] = lambda: resolved_settings
return TestClient(app)
@@ -57,3 +143,95 @@ def test_rescan_returns_pending() -> None:
resp = c.post("/api/v1/admin/files/7/rescan")
assert resp.status_code == 200
assert resp.json()["data"]["virusStatus"] == "pending"
+
+
+def test_get_file_detail_returns_owner_and_upload_counts() -> None:
+ with _client() as c:
+ resp = c.get("/api/v1/admin/files/7")
+ assert resp.status_code == 200
+ data = resp.json()["data"]
+ assert data["uploadCount"] == 2
+ assert data["ownerCount"] == 1
+ assert data["owners"][0]["email"] == "owner@example.com"
+
+
+def test_preview_route_returns_inline_stream() -> None:
+ with _client() as c:
+ resp = c.get("/api/v1/admin/files/7/preview", headers={"Range": "bytes=0-3"})
+ assert resp.status_code == 206
+ assert resp.headers["content-range"] == "bytes 0-3/12"
+ assert resp.content == b"prev"
+
+
+def test_create_preview_url_returns_admin_stream_url() -> None:
+ with _client() as c:
+ resp = c.post("/api/v1/admin/files/7/preview-url")
+ assert resp.status_code == 200
+ data = resp.json()["data"]
+ assert data["url"].startswith("http://testserver/api/v1/admin/files/7/preview-stream?token=")
+ assert data["expiresAt"]
+
+
+def test_admin_preview_requires_admin() -> None:
+ with _client(admin=False) as c:
+ resp = c.get("/api/v1/admin/files/7/preview")
+ assert resp.status_code == 403
+ assert resp.json()["message"] == "Admin access required"
+
+
+def test_preview_stream_supports_range_with_valid_token() -> None:
+ settings = Settings(
+ JWT_SECRET_KEY="unit-test-secret-key-1234567890abcd",
+ FF_DB_URI="postgresql://u:p@localhost:5432/db",
+ )
+ token = create_admin_file_preview_token(
+ admin_user_id=99,
+ file_id=7,
+ settings=settings,
+ expires_at=datetime.now(UTC) + timedelta(minutes=10),
+ )
+
+ with _client(settings=settings) as c:
+ resp = c.get(f"/api/v1/admin/files/7/preview-stream?token={token}", headers={"Range": "bytes=0-3"})
+
+ assert resp.status_code == 206
+ assert resp.headers["content-range"] == "bytes 0-3/12"
+ assert resp.content == b"prev"
+
+
+def test_preview_stream_rejects_mismatched_token_file_id() -> None:
+ settings = Settings(
+ JWT_SECRET_KEY="unit-test-secret-key-1234567890abcd",
+ FF_DB_URI="postgresql://u:p@localhost:5432/db",
+ )
+ token = create_admin_file_preview_token(
+ admin_user_id=99,
+ file_id=8,
+ settings=settings,
+ expires_at=datetime.now(UTC) + timedelta(minutes=10),
+ )
+
+ with _client(settings=settings) as c:
+ resp = c.get(f"/api/v1/admin/files/7/preview-stream?token={token}")
+
+ assert resp.status_code == 403
+ assert resp.json()["message"] == "Preview token does not match file"
+
+
+def test_preview_stream_rejects_expired_token() -> None:
+ settings = Settings(
+ JWT_SECRET_KEY="unit-test-secret-key-1234567890abcd",
+ FF_DB_URI="postgresql://u:p@localhost:5432/db",
+ )
+ token = create_admin_file_preview_token(
+ admin_user_id=99,
+ file_id=7,
+ settings=settings,
+ expires_at=datetime.now(UTC) - timedelta(minutes=10),
+ )
+
+ with _client(settings=settings) as c:
+ resp = c.get(f"/api/v1/admin/files/7/preview-stream?token={token}")
+
+ assert resp.status_code == 401
+ assert resp.json()["message"] == "Invalid or expired preview token"
diff --git a/app/tests/test_admin_files_service.py b/app/tests/test_admin_files_service.py
index b5bd4eb..8889275 100644
--- a/app/tests/test_admin_files_service.py
+++ b/app/tests/test_admin_files_service.py
@@ -1,12 +1,13 @@
from __future__ import annotations
+from datetime import UTC, datetime
from unittest.mock import AsyncMock, Mock
import pytest
from fileflash.core.errors import ApiError
-from fileflash.models.enums import FileStatus
-from fileflash.schemas.admin.files import ListAdminFilesQuery
+from fileflash.models.enums import FileStatus, ScanResult
+from fileflash.schemas.admin.files import AdminFileAuditOwner, ListAdminFilesQuery
from fileflash.services.admin.files import AdminFilesService
@@ -70,3 +71,67 @@ async def test_rescan_inserts_scan_record_and_publishes_event() -> None:
assert result.virus_status == "pending"
assert publisher.calls and publisher.calls[0][0] == "files.rescan_requested"
session.commit.assert_awaited()
+
+
+@pytest.mark.asyncio
+async def test_detail_aggregates_active_object_owner_counts(monkeypatch: pytest.MonkeyPatch) -> None:
+ session = DummySession()
+ service = AdminFilesService(db=session, publisher=DummyPublisher()) # type: ignore[arg-type]
+ now = datetime.now(UTC)
+ file_row = Mock(
+ file_id=10,
+ file_name="report.pdf",
+ file_size=128,
+ mime_type="application/pdf",
+ file_ext="pdf",
+ updated_at=now,
+ created_at=now,
+ )
+ object_row = Mock(
+ object_id=20,
+ object_hash="a" * 64,
+ hash_algorithm="sha256",
+ content_type="application/pdf",
+ upload_status="active",
+ )
+ owner_row = Mock(username="alice")
+ scan_row = Mock(
+ result=ScanResult.CLEAN,
+ scan_type="virus",
+ scanned_at=now,
+ details={"engine": "unit"},
+ )
+ owners = [
+ AdminFileAuditOwner(
+ user_id="1",
+ username="alice",
+ email="alice@example.com",
+ file_count=2,
+ first_uploaded_at=now,
+ last_uploaded_at=now,
+ ),
+ AdminFileAuditOwner(
+ user_id="2",
+ username="bob",
+ email="bob@example.com",
+ file_count=1,
+ first_uploaded_at=now,
+ last_uploaded_at=now,
+ ),
+ ]
+
+ monkeypatch.setattr(
+ service,
+ "_get_active_file_context",
+ AsyncMock(return_value=(file_row, object_row, owner_row, scan_row)),
+ )
+ monkeypatch.setattr(service, "_load_object_owners", AsyncMock(return_value=owners))
+ monkeypatch.setattr(service, "_object_is_shared", AsyncMock(return_value=True))
+
+ detail = await service.get_file_detail(file_id=10)
+
+ assert detail.object_id == "20"
+ assert detail.upload_count == 3
+ assert detail.owner_count == 2
+ assert detail.is_shared is True
+ assert detail.latest_scan and detail.latest_scan.virus_status == "clean"
diff --git a/app/tests/test_security.py b/app/tests/test_security.py
index d1b3e45..6fb4a73 100644
--- a/app/tests/test_security.py
+++ b/app/tests/test_security.py
@@ -3,9 +3,11 @@
from datetime import UTC, datetime, timedelta
from fileflash.core.security import (
+ create_admin_file_preview_token,
create_access_token,
create_file_preview_token,
create_refresh_token,
+ decode_admin_file_preview_token,
decode_access_token,
decode_file_preview_token,
get_password_hash,
@@ -53,6 +55,24 @@ def test_file_preview_token_round_trip():
assert payload["typ"] == "file_preview"
+def test_admin_file_preview_token_round_trip():
+ settings = Settings(
+ JWT_SECRET_KEY="unit-test-secret-key-1234567890abcd",
+ FF_DB_URI="postgresql://u:p@localhost:5432/db",
+ )
+ token = create_admin_file_preview_token(
+ admin_user_id=7,
+ file_id=99,
+ settings=settings,
+ expires_at=datetime.now(UTC) + timedelta(minutes=10),
+ )
+ payload = decode_admin_file_preview_token(token=token, settings=settings)
+ assert payload["sub"] == "7"
+ assert payload["fileId"] == "99"
+ assert payload["scope"] == "admin.file.preview"
+ assert payload["typ"] == "admin_file_preview"
+
+
def test_refresh_token_hash_is_deterministic():
settings = Settings(
JWT_SECRET_KEY="unit-test-secret-key-1234567890abcd",
diff --git a/web/src/api/file.ts b/web/src/api/file.ts
index 38c5d32..e6c798d 100644
--- a/web/src/api/file.ts
+++ b/web/src/api/file.ts
@@ -27,6 +27,7 @@ import type {
MergeChunksResponse,
CancelUploadResponse,
AdminFileAuditItem,
+ AdminFileAuditDetail,
GetAdminFilesRequest,
ArchiveExtractRequest,
BackgroundJob,
@@ -34,6 +35,63 @@ import type {
JobResultArchivePreview,
} from '../types/file';
+type MockPreviewPayload = {
+ __mockPreview: true;
+ mimeType: string;
+ content: string;
+ encoding: 'text' | 'base64';
+};
+
+function isBlobLike(value: unknown): value is Blob {
+ return value instanceof Blob || Object.prototype.toString.call(value) === '[object Blob]';
+}
+
+function base64ToBytes(content: string) {
+ const decoded = atob(content);
+ const bytes = new Uint8Array(decoded.length);
+ for (let index = 0; index < decoded.length; index += 1) {
+ bytes[index] = decoded.charCodeAt(index);
+ }
+ return bytes;
+}
+
+function isMockPreviewPayload(value: unknown): value is MockPreviewPayload {
+ if (!value || typeof value !== 'object') {
+ return false;
+ }
+ const payload = value as Partial