From e384323a9a7690111a2cc0783ea850249e2a39ae Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Tue, 26 Aug 2025 11:00:48 +0900 Subject: [PATCH 1/3] fix: handle non-cloudevent requests gracefully --- .gitignore | 2 + src/func_python/cloudevent.py | 58 ++++++++++++++++++---- tests/test_cloudevent.py | 90 +++++++++++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index bee8a64b..f056ce57 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ __pycache__ + +CLAUDE.md diff --git a/src/func_python/cloudevent.py b/src/func_python/cloudevent.py index 66c18f92..ce9fb21f 100644 --- a/src/func_python/cloudevent.py +++ b/src/func_python/cloudevent.py @@ -7,6 +7,7 @@ from cloudevents.http import from_http, CloudEvent from cloudevents.conversion import to_structured, to_binary +from cloudevents.exceptions import MissingRequiredFields, InvalidRequiredFields DEFAULT_LOG_LEVEL = logging.INFO DEFAULT_LISTEN_ADDRESS = "[::]:8080" @@ -143,14 +144,39 @@ async def __call__(self, scope, receive, send): # interstitial encode/decode, and thus avoid the approx. 200 # lines of shared server boilerplate. # - # Decode the event and make it available in the scope - scope["event"] = await decode_event(scope, receive) - # Wrap the sender in a CloudEventSender - send = CloudEventSender(send) - # Delegate processing to user's Function - await self.f.handle(scope, receive, send) + try: + # Decode the event and make it available in the scope + scope["event"] = await decode_event(scope, receive) + # Wrap the sender in a CloudEventSender + send = CloudEventSender(send) + # Delegate processing to user's Function + await self.f.handle(scope, receive, send) + except (MissingRequiredFields, InvalidRequiredFields) as e: + # Log the non-CloudEvent request for debugging + logging.warning(f"Received non-CloudEvent request: {scope['method']} {scope['path']}") + headers_dict = {k.decode('utf-8'): v.decode('utf-8') for k, v in scope.get('headers', [])} + logging.debug(f"Request headers: {headers_dict}") + + # Return 400 Bad Request for non-CloudEvent requests + await send({ + 'type': 'http.response.start', + 'status': 400, + 'headers': [[b'content-type', b'text/plain']] + }) + await send({ + 'type': 'http.response.body', + 'body': b'Bad Request: This endpoint expects CloudEvent requests. ' + }) + return except Exception as e: - await send_exception_cloudevent(send, 500, f"Error: {e}") + # For other unexpected errors, try to send a CloudEvent error response + # But check if send is already a CloudEventSender + if hasattr(send, 'structured'): + await send_exception_cloudevent(send, 500, f"Error: {e}") + else: + # Fallback to plain HTTP error + logging.error(f"Unexpected error: {e}") + await send_exception(send, 500, f"Internal Server Error: {e}".encode()) async def handle_liveness(self, scope, receive, send): alive = True @@ -234,7 +260,23 @@ async def send_exception_cloudevent(send, status, message): } data = {"message": message} - await send.structured(CloudEvent(attributes, data), status) + # Check if send is a CloudEventSender with structured method + if hasattr(send, 'structured'): + await send.structured(CloudEvent(attributes, data), status) + else: + # Fallback to plain HTTP error response if send is not a CloudEventSender + logging.warning("send_exception_cloudevent called with non-CloudEventSender, falling back to HTTP response") + await send({ + 'type': 'http.response.start', + 'status': status, + 'headers': [[b'content-type', b'application/json']] + }) + import json + error_body = json.dumps({"error": {"message": message, "type": "dev.functions.error"}}) + await send({ + 'type': 'http.response.body', + 'body': error_body.encode() + }) class CloudEventSender: diff --git a/tests/test_cloudevent.py b/tests/test_cloudevent.py index 2549841c..192ea628 100644 --- a/tests/test_cloudevent.py +++ b/tests/test_cloudevent.py @@ -209,6 +209,96 @@ def send_signal(): signal_thread.join(timeout=5) +def test_non_cloudevent_get_request(): + """ + Tests that non-CloudEvent GET requests return 400 Bad Request. + """ + # Simple CloudEvent handler + async def handle(scope, receive, send): + # This should not be reached for non-CloudEvent requests + await send(CloudEvent({"type": "test", "source": "test"}, {"message": "Should not reach here"})) + + test_complete = threading.Event() + test_results = {"success": False, "error": None} + + def test(): + try: + wait_for_function() # Wait for server to start + + # Send a regular GET request without CloudEvent headers + response = httpx.get(f"http://{LISTEN_ADDRESS}/test") + + # Should return 400 Bad Request + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + assert b"CloudEvent" in response.content, "Response should mention CloudEvent" + + test_results["success"] = True + except Exception as e: + test_results["error"] = str(e) + finally: + test_complete.set() + os.kill(os.getpid(), signal.SIGINT) + + test_thread = threading.Thread(target=test) + test_thread.daemon = True + test_thread.start() + + serve(handle) + + if not test_complete.wait(10): + pytest.fail("Test timed out") + + if not test_results["success"]: + pytest.fail(test_results["error"] or "Test failed") + + +def test_non_cloudevent_post_request(): + """ + Tests that non-CloudEvent POST requests with JSON body return 400 Bad Request. + """ + # Simple CloudEvent handler + async def handle(scope, receive, send): + # This should not be reached for non-CloudEvent requests + await send(CloudEvent({"type": "test", "source": "test"}, {"message": "Should not reach here"})) + + test_complete = threading.Event() + test_results = {"success": False, "error": None} + + def test(): + try: + wait_for_function() # Wait for server to start + + # Send a regular POST request with JSON but without CloudEvent headers + response = httpx.post( + f"http://{LISTEN_ADDRESS}/test", + json={"data": "test"}, + headers={"content-type": "application/json"} + ) + + # Should return 400 Bad Request + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + assert b"CloudEvent" in response.content, "Response should mention CloudEvent" + + test_results["success"] = True + except Exception as e: + test_results["error"] = str(e) + finally: + test_complete.set() + os.kill(os.getpid(), signal.SIGINT) + + test_thread = threading.Thread(target=test) + test_thread.daemon = True + test_thread.start() + + serve(handle) + + if not test_complete.wait(10): + pytest.fail("Test timed out") + + if not test_results["success"]: + pytest.fail(test_results["error"] or "Test failed") + + class FunctionNotAvailableError(Exception): """ Raised when a Function is not available """ From a2c73042d900b1b2e35d0c44fb5b1663839f8c35 Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Tue, 26 Aug 2025 14:39:49 +0900 Subject: [PATCH 2/3] add cloudevent test cmd --- cmd/fcloudevent/README.md | 57 +++++++++++++---- cmd/fcloudevent/main.py | 125 +++++++++++++++++++++++++++----------- cmd/fhttp/main.py | 2 +- 3 files changed, 133 insertions(+), 51 deletions(-) diff --git a/cmd/fcloudevent/README.md b/cmd/fcloudevent/README.md index e303b07f..3384fc87 100644 --- a/cmd/fcloudevent/README.md +++ b/cmd/fcloudevent/README.md @@ -1,30 +1,61 @@ -# Function CloudEvents Test Command +# CloudEvent Function Example -fcloudevent is a command which illustrates how the func-python library middleware -wraps a function and exposes it as a service. Useful for development. +This directory contains an example CloudEvent function that demonstrates how to use the `func_python.cloudevent` middleware to handle CloudEvents. This is an example usage of the Functions CloudEvents middleware. Run the function ``` + +## Start the Function + +Run the instanced handler (default): +```bash poetry run python cmd/fcloudevent/main.py ``` -Send a CloudEvent against it: +Run the static handler: +```bash +poetry run python cmd/fcloudevent/main.py --static +``` + +Change the listen address (default is [::]:8080): +```bash +LISTEN_ADDRESS=127.0.0.1:8081 poetry run python cmd/fcloudevent/main.py ``` -curl -v -X POST http://127.0.0.1:8080/ \ + +## Invoke the Function + +You can send a CloudEvent to the function using curl with structured encoding: + +```bash +curl -X POST http://127.0.0.1:8080 \ + -H "Ce-Specversion: 1.0" \ + -H "Ce-Type: com.example.test" \ + -H "Ce-Source: /test/source" \ + -H "Ce-Id: test-123" \ -H "Content-Type: application/json" \ - -H "ce-specversion: 1.0" \ - -H "ce-type: com.example.event.submit" \ - -H "ce-source: /applications/user-service" \ - -H "ce-id: $(uuidgen)" \ - -H "ce-time: $(date -u +"%Y-%m-%dT%H:%M:%SZ")" \ + -d '{"message": "Hello CloudEvents!"}' +``` + +Or with a full CloudEvent in structured format: + +```bash +curl -X POST http://127.0.0.1:8080 \ + -H "Content-Type: application/cloudevents+json" \ -d '{ - "message": "Hello CloudEvents", - "username": "testuser", - "action": "submit", + "specversion": "1.0", + "type": "com.example.test", + "source": "/test/source", + "id": "test-456", + "datacontenttype": "application/json", + "data": { + "message": "Hello from structured CloudEvent!", + "value": 42 + } }' ``` + To see the actual middleware which is used when building a Python Function, see the [Functions Python Scaffolding](https://github.com/knative/func/tree/main/templates/python/cloudevents) diff --git a/cmd/fcloudevent/main.py b/cmd/fcloudevent/main.py index c71181bc..2a81bff6 100644 --- a/cmd/fcloudevent/main.py +++ b/cmd/fcloudevent/main.py @@ -1,84 +1,125 @@ import argparse import logging -import json from func_python.cloudevent import serve from cloudevents.http import CloudEvent # Set the default logging level to INFO logging.basicConfig(level=logging.INFO) -# Parse command line arguments -parser = argparse.ArgumentParser(description='Serve a CloudEvents Function') +# Allow this test to be either instanced (default) or --static +# to test the two different primary method signatures supported in the +# final Function. +parser = argparse.ArgumentParser(description='Serve a Test CloudEvent Function') parser.add_argument('--static', action='store_true', - help='Serve the static handler (default is instanced)') + help='Serve the example static handler (default is to ' + 'instantiate and serve the example class)') args = parser.parse_args() -# Static handler implementation +# Example static handler. # Enable with --static # Must be named exactly "handle" async def handle(scope, receive, send): - """Static handler for CloudEvents""" - logging.info("CloudEvent static handler called") + """ handle is an example of a static handler which can be sent to the + middleware as a function. It will be wrapped in a default Function + instance before being served as an ASGI application. + """ + logging.info("Static CloudEvent handler invoked") - # Process the CloudEvent from the scope - event = scope.get("event") + # Access the CloudEvent from the scope + event = scope["event"] if not event: error_event = CloudEvent( - {"type": "dev.functions.error", "source": "/cloudevent/error"}, - {"message": "No CloudEvent found in request"} + {"type": "dev.functions.error", "source": "/fcloudevent/error"}, + {"message": "No CloudEvent found in scope"} ) - await send(error_event, 400) + await send(error_event, 500) return - # Log the received event - logging.info(f"Received event type: {event['type']}") - logging.info(f"Received event source: {event['source']}") - logging.info(f"Received event data: {json.dumps(event.data)}") + logging.info(f"Received CloudEvent: type={event['type']}, source={event['source']}") + + # Handle event data - it might be bytes or dict + event_data = event.data + if isinstance(event_data, bytes): + import json + event_data = json.loads(event_data) + logging.info(f"CloudEvent data: {event_data}") - # Create and send response CloudEvent response_event = CloudEvent( - {"type": "dev.functions.response", "source": "/cloudevent/processor"}, - {"message": "Processed static", "original_data": event.data} + { + "type": "com.example.response.static", + "source": "/fcloudevent/static", + }, + { + "message": "OK: static CloudEvent handler", + "received_event_type": event['type'], + "received_event_source": event['source'], + "received_data": event_data + } ) + await send(response_event) -# Instanced handler implementation +# Example instanced handler +# This is the default expected by this test. +# The class can be named anything, but there must be a constructor named "new" +# which returns an object with an async method "handle" conforming to the ASGI +# callable's method signature. class MyCloudEventFunction: def __init__(self): self.event_count = 0 logging.info("CloudEvent function instance created") async def handle(self, scope, receive, send): - """Handle CloudEvents in an instanced function""" - event = scope.get("event") + logging.info("Instanced CloudEvent handler invoked") + + # Access the CloudEvent from the scope + event = scope["event"] if not event: + # This shouldn't happen with our fix, but let's handle it gracefully error_event = CloudEvent( - {"type": "dev.functions.error", "source": "/cloudevent/error"}, - {"message": "No CloudEvent found in request"} + {"type": "dev.functions.error", "source": "/fcloudevent/error"}, + {"message": "No CloudEvent found in scope"} ) - await send(error_event, 400) + await send(error_event, 500) return self.event_count += 1 - # Log the received event - logging.info(f"Received event type: {event['type']}") - logging.info(f"Received event source: {event['source']}") - logging.info(f"Received event data: {json.dumps(event.data)}") + logging.info(f"Received CloudEvent #{self.event_count}: type={event['type']}, source={event['source']}") + + # Handle event data - it might be bytes or dict + event_data = event.data + if isinstance(event_data, bytes): + import json + event_data = json.loads(event_data) + logging.info(f"CloudEvent data: {event_data}") logging.info(f"Total events processed: {self.event_count}") - # Create and send response CloudEvent response_event = CloudEvent( - {"type": "dev.functions.response", "source": "/cloudevent/processor"}, { - "message": "Processed instanced", - "original_data": event.data, - "count": self.event_count + "type": "com.example.response.instanced", + "source": "/fcloudevent/instanced", + }, + { + "message": "OK: instanced CloudEvent handler", + "event_count": self.event_count, + "received_event_type": event['type'], + "received_event_source": event['source'], + "received_data": event_data } ) - await send(response_event) + + # Demonstrate different sending methods + if self.event_count % 2 == 0: + # Use structured encoding (default) + logging.info("Sending structured CloudEvent response") + await send.structured(response_event) + else: + # Use binary encoding + logging.info("Sending binary CloudEvent response") + await send.binary(response_event) def alive(self): logging.info("Liveness checked") @@ -93,12 +134,21 @@ def stop(self): # Function instance constructor +# expected to be named exactly "new" +# Must return a object which exposes a method "handle" which conforms to the +# ASGI specification's method signature. def new(): - """Create a new function instance""" + """ new is the factory function (or constructor) which will create + a new function instance when invoked. This must be named "new", and the + structure returned must include a method named "handle" which implements + the ASGI specification's method signature. The name of the class itself + can be changed. + """ return MyCloudEventFunction() -# Run the example +# Run the example. +# Start either the static or instanced handler depending on flag --static if __name__ == "__main__": if args.static: logging.info("Starting static CloudEvent handler") @@ -106,3 +156,4 @@ def new(): else: logging.info("Starting instanced CloudEvent handler") serve(new) + diff --git a/cmd/fhttp/main.py b/cmd/fhttp/main.py index deb101b3..608ff869 100644 --- a/cmd/fhttp/main.py +++ b/cmd/fhttp/main.py @@ -23,7 +23,7 @@ async def handle(scope, receive, send): middleware as a function. It will be wrapped in a default Function instance before being served as an ASGI application. """ - logging.info("OK: static!!") + logging.info("Static HTTP handler invoked") await send({ 'type': 'http.response.start', From e9daf5165db29b58cbb11f1c19d828ffd81cb750 Mon Sep 17 00:00:00 2001 From: Luke Kingland Date: Wed, 27 Aug 2025 19:41:03 +0900 Subject: [PATCH 3/3] update changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c8cd0ad..b52e10c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,9 +10,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added ### Changed + +- Improved test function to cover additional CloudEvent cases + ### Deprecated ### Removed ### Fixed + +- CloudEvent handler gracefully fails on non-cloudevent requests + ### Security ## [0.5.1] - 2025-03-10