Skip to content
Open
2 changes: 1 addition & 1 deletion questions/services/forecasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def after_forecast_actions(question: Question, user: User):
)

# Run async tasks
from ..tasks import run_build_question_forecasts
from questions.tasks import run_build_question_forecasts

run_build_question_forecasts.send(question.id)

Expand Down
5 changes: 5 additions & 0 deletions questions/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
views.bulk_withdraw_forecasts_api_view,
name="create-withdraw",
),
path(
"questions/bulk-forecast-comment/",
views.bulk_forecast_and_comment_api_view,
name="bulk-forecast-comment",
),
path(
"questions/<int:pk>/", views.question_detail_api_view, name="question-details"
),
Expand Down
174 changes: 165 additions & 9 deletions questions/views.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
import numpy as np
from django.db import transaction
from django.http import Http404
from django.utils import timezone
from rest_framework import status
import numpy as np
from rest_framework import serializers, status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.exceptions import ValidationError
from rest_framework.exceptions import PermissionDenied, ValidationError
from rest_framework.generics import get_object_or_404
from rest_framework.permissions import AllowAny, IsAdminUser
from rest_framework.permissions import AllowAny, IsAuthenticated, IsAdminUser
from rest_framework.response import Response
from rest_framework.serializers import DateTimeField


from comments.serializers.common import CommentWriteSerializer
from comments.services.common import create_comment
from posts.models import Post
from posts.services.common import get_post_permission_for_user
from posts.utils import get_post_slug
from projects.permissions import ObjectPermission
from users.models import User
from utils.requests import is_internal_request
from utils.the_math.aggregations import get_aggregations_at_time
from .constants import QuestionStatus
from .models import Forecast, Question
from .serializers.common import (
from questions.constants import QuestionStatus
from questions.models import Forecast, Question
from questions.serializers.common import (
validate_question_resolution,
QuestionsCommunityPredictionsSerializer,
OldForecastWriteSerializer,
ForecastWriteSerializer,
ForecastWithdrawSerializer,
serialize_question,
)
from .services.forecasts import create_forecast_bulk, withdraw_forecast_bulk
from .services.lifecycle import resolve_question, unresolve_question
from questions.services.forecasts import (
create_forecast_bulk,
withdraw_forecast_bulk,
)
from questions.services.lifecycle import resolve_question, unresolve_question


@api_view(["GET"])
Expand Down Expand Up @@ -274,6 +282,154 @@ def legacy_question_api_view(request, pk: int):
)


class BulkForecastAndCommentSerializer(serializers.Serializer):
user_id = serializers.IntegerField(required=False, allow_null=True)
username = serializers.CharField(required=False, allow_null=True)
is_staff_override = serializers.BooleanField(required=False, default=False)
forecasts = ForecastWriteSerializer(many=True, required=False, default=list)
comments = CommentWriteSerializer(many=True, required=False, default=list)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small nit: I don’t like that we have forecasts and comments objects acting independently from their target post, so you could create a forecast for post A and a comment on unrelated post B at the same time.

I’m wondering if there’s a more sophisticated way to shape the schema so it explicitly says “this forecast comes together with this question,” instead of looking almost the same as just making two separate requests.

On the other hand, we still need to keep these things isolated and avoid mixing responsibilities. We currently have a similar case with Key Factors: comments.views.common.comment_create_api_view creates both a comment and a key factor if present.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be acceptable for this to be a bulk endpoint for forecasts and comments without restricting it such that you always need to pair comments with forecasts in the submission.

I'd be willing to change it if you insist, but I think it's simpler to allow for independent sets. I guess the risk here is that the forecast needs to attach to a Question while the comment attaches to a Post. I'll make sure that we validate all forecast and comment attachment points before saving any of them, but I still think it's good this way.


def validate(self, attrs):
if not attrs.get("user_id") and not attrs.get("username"):
raise serializers.ValidationError(
"Either user_id or username must be provided."
)
return attrs
Comment on lines +292 to +297
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I’m confused. What if the user wants to forecast on behalf of themselves? Should they also pass their own user ID?

If so, maybe we can simplify it to this flow:

  • No user params → assume current authenticated user
  • User param → check whether:
    • it’s the current user
    • the bot belongs to the current user
    • the user is an admin and can impersonate another user

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the adage of "explicit over implicit" is worth it here. What's the downside to making sure you are definitely trying to forecast for yourself vs the bot? It just forces the user to be explicit and I think that's a value add.



@api_view(["POST"])
@permission_classes([IsAuthenticated])
def bulk_forecast_and_comment_api_view(request):
"""
Submits forecasts and comments in a single atomic transaction.

Superusers may submit on behalf of any user by providing user_id or username
and flag `is_staff_override`.
Non-superusers may submit as themselves or as one of their bots (identified
by user_id or username).
"""
serializer = BulkForecastAndCommentSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
data = serializer.validated_data

user_id = data.get("user_id")
username = data.get("username")
forecasts_data = data["forecasts"]
comments_data = data.get("comments", [])
is_staff_override = data.get("is_staff_override", False)

request_user = request.user
if is_staff_override and not request_user.is_superuser:
raise PermissionDenied("Only superusers can use the is_staff_override flag.")

if is_staff_override:
if user_id:
user = get_object_or_404(User, id=user_id)
else:
user = get_object_or_404(User, username=username)
else:
user = (
User.objects.filter(id=user_id).first()
if user_id
else User.objects.filter(username=username).first()
)
is_self = user is not None and user.id == request_user.id
is_own_bot = (
user is not None
and user.is_bot
and user.bot_owner_id is not None
and user.bot_owner_id == request_user.id
)
if not is_self and not is_own_bot:
raise PermissionDenied(
"Non-superusers can only submit forecasts and comments as themselves "
"or their bots."
)
Comment on lines +325 to +347
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm task.send invocations on the forecast hot path are not already wrapped with transaction.on_commit.
rg -nP -C3 "run_build_question_forecasts\.send|run_on_post_forecast\.send" --type=py
rg -nP -C2 "transaction\.on_commit" --type=py

Repository: Metaculus/metaculus

Length of output: 1597


🏁 Script executed:

# Find the create_forecast_bulk function and its surrounding context
sed -n '160,190p' questions/services/forecasts.py

# Check the context around line 385 in views.py
sed -n '380,395p' questions/views.py

# Look for the after_forecast_actions function mentioned in the comment
rg -n "after_forecast_actions" --type=py -B2 -A10

Repository: Metaculus/metaculus

Length of output: 4794


🏁 Script executed:

# Get the full after_forecast_actions function
sed -n '137,160p' questions/services/forecasts.py

# Check line 385 in views.py and surrounding context
sed -n '375,395p' questions/views.py

# Check where create_forecast_bulk is called
rg -n "create_forecast_bulk" --type=py -B3 -A3

Repository: Metaculus/metaculus

Length of output: 6866


Defer task enqueueing until transaction commit to avoid orphaned async jobs.

create_forecast_bulk enqueues run_build_question_forecasts.send(...) (via after_forecast_actions at line 160) and run_on_post_forecast.send_with_options(...) (at line 183) inside transaction.atomic() on lines 404-405 in views.py. Dramatiq messages are published immediately, not bound to the transaction. If any subsequent create_comment call raises (line 407+), the forecasts and comments get rolled back but the workers will still pick up those messages and try to rebuild aggregates / fire post-forecast actions for objects that no longer exist, leading to noisy errors and potentially stale aggregate state.

Wrap each task send in transaction.on_commit(lambda: task.send(...)) in after_forecast_actions and line 183 of create_forecast_bulk. Doing this in the service layer also benefits the existing non-bulk callers (lines 155, 241).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questions/views.py` around lines 305 - 327, The view enqueues Dramatiq
messages inside a transaction (see create_forecast_bulk and
after_forecast_actions) which can be rolled back, producing orphaned async jobs;
change all places that call run_build_question_forecasts.send(...) and
run_on_post_forecast.send_with_options(...) so they are invoked inside
transaction.on_commit(...) lambdas instead of directly, including the callsite
in after_forecast_actions and the send at line 183 of create_forecast_bulk;
ensure any other callers from non-bulk paths (e.g., the service-layer uses
around lines 155 and 241) use the same transaction.on_commit wrapping so
messages are only published after the DB transaction successfully commits.


now = timezone.now()
errors = []

# Validate forecasts and resolve question IDs to Question objects
questions_map = {
q.pk: q
for q in Question.objects.filter(
pk__in=[f["question"] for f in forecasts_data]
).select_related("post")
}

for forecast in forecasts_data:
question_id = forecast["question"]
question = questions_map.get(question_id)
if not question:
errors.append(f"Question {question_id} does not exist.")
continue
forecast["question"] = question

post: Post = question.post
permission = get_post_permission_for_user(post, user=user)
if not ObjectPermission.can_forecast(permission):
errors.append(f"Question {question.id}: forecasting not permitted.")
continue

if (
post.curation_status != Post.CurationStatus.APPROVED
or not question.open_time
or not question.scheduled_close_time
):
errors.append(f"Question {question.id} is not open for forecasting yet.")
elif (question.scheduled_close_time < now) or (
question.actual_close_time and question.actual_close_time < now
):
errors.append(f"Question {question.id} is already closed to forecasting.")

# Validate comments
for i, comment in enumerate(comments_data):
on_post = comment["on_post"]
if not comment.get("is_private"):
errors.append(
f"Comment {i}: only private comments are allowed in bulk submissions."
)
continue
if comment.get("key_factors"):
errors.append(
f"Comment {i}: key_factors are not supported in bulk submissions."
)
continue
parent = comment.get("parent")
permission = get_post_permission_for_user(
parent.on_post if parent else on_post, user=user
)
if not ObjectPermission.can_comment(permission):
errors.append(
f"Comment {i}: commenting not permitted on post {on_post.id}."
)

if errors:
raise ValidationError(errors)

with transaction.atomic():
create_forecast_bulk(user=user, forecasts=forecasts_data)

for comment_data in comments_data:
on_post = comment_data["on_post"]
included_forecast_flag = comment_data.pop("included_forecast", False)
comment_data.pop("key_factors", None)

included_forecast = (
on_post.question.user_forecasts.filter(author_id=user.id)
.order_by("-start_time")
.first()
if included_forecast_flag and on_post.question_id
else None
)

create_comment(
**comment_data, included_forecast=included_forecast, user=user
)

return Response({}, status=status.HTTP_201_CREATED)


@api_view(["GET", "POST"])
@permission_classes([IsAdminUser])
def questions_community_predictions(request) -> Response:
Expand Down
Loading
Loading