Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
__pycache__

CLAUDE.md
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added
### Changed

- Improved test function to cover additional CloudEvent cases

### Deprecated
### Removed
### Fixed

- CloudEvent handler gracefully fails on non-cloudevent requests

### Security

## [0.5.1] - 2025-03-10
Expand Down
57 changes: 44 additions & 13 deletions cmd/fcloudevent/README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,61 @@
# Function CloudEvents Test Command
# CloudEvent Function Example

fcloudevent is a command which illustrates how the func-python library middleware
wraps a function and exposes it as a service. Useful for development.
This directory contains an example CloudEvent function that demonstrates how to use the `func_python.cloudevent` middleware to handle CloudEvents.

This is an example usage of the Functions CloudEvents middleware.

Run the function
```

## Start the Function

Run the instanced handler (default):
```bash
poetry run python cmd/fcloudevent/main.py
```

Send a CloudEvent against it:
Run the static handler:
```bash
poetry run python cmd/fcloudevent/main.py --static
```

Change the listen address (default is [::]:8080):
```bash
LISTEN_ADDRESS=127.0.0.1:8081 poetry run python cmd/fcloudevent/main.py
```
curl -v -X POST http://127.0.0.1:8080/ \

## Invoke the Function

You can send a CloudEvent to the function using curl with structured encoding:

```bash
curl -X POST http://127.0.0.1:8080 \
-H "Ce-Specversion: 1.0" \
-H "Ce-Type: com.example.test" \
-H "Ce-Source: /test/source" \
-H "Ce-Id: test-123" \
-H "Content-Type: application/json" \
-H "ce-specversion: 1.0" \
-H "ce-type: com.example.event.submit" \
-H "ce-source: /applications/user-service" \
-H "ce-id: $(uuidgen)" \
-H "ce-time: $(date -u +"%Y-%m-%dT%H:%M:%SZ")" \
-d '{"message": "Hello CloudEvents!"}'
```

Or with a full CloudEvent in structured format:

```bash
curl -X POST http://127.0.0.1:8080 \
-H "Content-Type: application/cloudevents+json" \
-d '{
"message": "Hello CloudEvents",
"username": "testuser",
"action": "submit",
"specversion": "1.0",
"type": "com.example.test",
"source": "/test/source",
"id": "test-456",
"datacontenttype": "application/json",
"data": {
"message": "Hello from structured CloudEvent!",
"value": 42
}
}'
```


To see the actual middleware which is used when building a Python Function,
see the [Functions Python Scaffolding](https://github.com/knative/func/tree/main/templates/python/cloudevents)
125 changes: 88 additions & 37 deletions cmd/fcloudevent/main.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,125 @@
import argparse
import logging
import json
from func_python.cloudevent import serve
from cloudevents.http import CloudEvent

# Set the default logging level to INFO
logging.basicConfig(level=logging.INFO)

# Parse command line arguments
parser = argparse.ArgumentParser(description='Serve a CloudEvents Function')
# Allow this test to be either instanced (default) or --static
# to test the two different primary method signatures supported in the
# final Function.
parser = argparse.ArgumentParser(description='Serve a Test CloudEvent Function')
parser.add_argument('--static', action='store_true',
help='Serve the static handler (default is instanced)')
help='Serve the example static handler (default is to '
'instantiate and serve the example class)')
args = parser.parse_args()


# Static handler implementation
# Example static handler.
# Enable with --static
# Must be named exactly "handle"
async def handle(scope, receive, send):
"""Static handler for CloudEvents"""
logging.info("CloudEvent static handler called")
""" handle is an example of a static handler which can be sent to the
middleware as a function. It will be wrapped in a default Function
instance before being served as an ASGI application.
"""
logging.info("Static CloudEvent handler invoked")

# Process the CloudEvent from the scope
event = scope.get("event")
# Access the CloudEvent from the scope
event = scope["event"]
if not event:
error_event = CloudEvent(
{"type": "dev.functions.error", "source": "/cloudevent/error"},
{"message": "No CloudEvent found in request"}
{"type": "dev.functions.error", "source": "/fcloudevent/error"},
{"message": "No CloudEvent found in scope"}
)
await send(error_event, 400)
await send(error_event, 500)
return

# Log the received event
logging.info(f"Received event type: {event['type']}")
logging.info(f"Received event source: {event['source']}")
logging.info(f"Received event data: {json.dumps(event.data)}")
logging.info(f"Received CloudEvent: type={event['type']}, source={event['source']}")

# Handle event data - it might be bytes or dict
event_data = event.data
if isinstance(event_data, bytes):
import json
event_data = json.loads(event_data)
logging.info(f"CloudEvent data: {event_data}")

# Create and send response CloudEvent
response_event = CloudEvent(
{"type": "dev.functions.response", "source": "/cloudevent/processor"},
{"message": "Processed static", "original_data": event.data}
{
"type": "com.example.response.static",
"source": "/fcloudevent/static",
},
{
"message": "OK: static CloudEvent handler",
"received_event_type": event['type'],
"received_event_source": event['source'],
"received_data": event_data
}
)

await send(response_event)


# Instanced handler implementation
# Example instanced handler
# This is the default expected by this test.
# The class can be named anything, but there must be a constructor named "new"
# which returns an object with an async method "handle" conforming to the ASGI
# callable's method signature.
class MyCloudEventFunction:
def __init__(self):
self.event_count = 0
logging.info("CloudEvent function instance created")

async def handle(self, scope, receive, send):
"""Handle CloudEvents in an instanced function"""
event = scope.get("event")
logging.info("Instanced CloudEvent handler invoked")

# Access the CloudEvent from the scope
event = scope["event"]
if not event:
# This shouldn't happen with our fix, but let's handle it gracefully
error_event = CloudEvent(
{"type": "dev.functions.error", "source": "/cloudevent/error"},
{"message": "No CloudEvent found in request"}
{"type": "dev.functions.error", "source": "/fcloudevent/error"},
{"message": "No CloudEvent found in scope"}
)
await send(error_event, 400)
await send(error_event, 500)
return

self.event_count += 1

# Log the received event
logging.info(f"Received event type: {event['type']}")
logging.info(f"Received event source: {event['source']}")
logging.info(f"Received event data: {json.dumps(event.data)}")
logging.info(f"Received CloudEvent #{self.event_count}: type={event['type']}, source={event['source']}")

# Handle event data - it might be bytes or dict
event_data = event.data
if isinstance(event_data, bytes):
import json
event_data = json.loads(event_data)
logging.info(f"CloudEvent data: {event_data}")
logging.info(f"Total events processed: {self.event_count}")

# Create and send response CloudEvent
response_event = CloudEvent(
{"type": "dev.functions.response", "source": "/cloudevent/processor"},
{
"message": "Processed instanced",
"original_data": event.data,
"count": self.event_count
"type": "com.example.response.instanced",
"source": "/fcloudevent/instanced",
},
{
"message": "OK: instanced CloudEvent handler",
"event_count": self.event_count,
"received_event_type": event['type'],
"received_event_source": event['source'],
"received_data": event_data
}
)
await send(response_event)

# Demonstrate different sending methods
if self.event_count % 2 == 0:
# Use structured encoding (default)
logging.info("Sending structured CloudEvent response")
await send.structured(response_event)
else:
# Use binary encoding
logging.info("Sending binary CloudEvent response")
await send.binary(response_event)

def alive(self):
logging.info("Liveness checked")
Expand All @@ -93,16 +134,26 @@ def stop(self):


# Function instance constructor
# expected to be named exactly "new"
# Must return a object which exposes a method "handle" which conforms to the
# ASGI specification's method signature.
def new():
"""Create a new function instance"""
""" new is the factory function (or constructor) which will create
a new function instance when invoked. This must be named "new", and the
structure returned must include a method named "handle" which implements
the ASGI specification's method signature. The name of the class itself
can be changed.
"""
return MyCloudEventFunction()


# Run the example
# Run the example.
# Start either the static or instanced handler depending on flag --static
if __name__ == "__main__":
if args.static:
logging.info("Starting static CloudEvent handler")
serve(handle)
else:
logging.info("Starting instanced CloudEvent handler")
serve(new)

2 changes: 1 addition & 1 deletion cmd/fhttp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async def handle(scope, receive, send):
middleware as a function. It will be wrapped in a default Function
instance before being served as an ASGI application.
"""
logging.info("OK: static!!")
logging.info("Static HTTP handler invoked")

await send({
'type': 'http.response.start',
Expand Down
58 changes: 50 additions & 8 deletions src/func_python/cloudevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from cloudevents.http import from_http, CloudEvent
from cloudevents.conversion import to_structured, to_binary
from cloudevents.exceptions import MissingRequiredFields, InvalidRequiredFields

DEFAULT_LOG_LEVEL = logging.INFO
DEFAULT_LISTEN_ADDRESS = "[::]:8080"
Expand Down Expand Up @@ -143,14 +144,39 @@ async def __call__(self, scope, receive, send):
# interstitial encode/decode, and thus avoid the approx. 200
# lines of shared server boilerplate.
#
# Decode the event and make it available in the scope
scope["event"] = await decode_event(scope, receive)
# Wrap the sender in a CloudEventSender
send = CloudEventSender(send)
# Delegate processing to user's Function
await self.f.handle(scope, receive, send)
try:
# Decode the event and make it available in the scope
scope["event"] = await decode_event(scope, receive)
# Wrap the sender in a CloudEventSender
send = CloudEventSender(send)
# Delegate processing to user's Function
await self.f.handle(scope, receive, send)
except (MissingRequiredFields, InvalidRequiredFields) as e:
# Log the non-CloudEvent request for debugging
logging.warning(f"Received non-CloudEvent request: {scope['method']} {scope['path']}")
headers_dict = {k.decode('utf-8'): v.decode('utf-8') for k, v in scope.get('headers', [])}
logging.debug(f"Request headers: {headers_dict}")

# Return 400 Bad Request for non-CloudEvent requests
await send({
'type': 'http.response.start',
'status': 400,
'headers': [[b'content-type', b'text/plain']]
})
await send({
'type': 'http.response.body',
'body': b'Bad Request: This endpoint expects CloudEvent requests. '
})
return
except Exception as e:
await send_exception_cloudevent(send, 500, f"Error: {e}")
# For other unexpected errors, try to send a CloudEvent error response
# But check if send is already a CloudEventSender
if hasattr(send, 'structured'):
await send_exception_cloudevent(send, 500, f"Error: {e}")
else:
# Fallback to plain HTTP error
logging.error(f"Unexpected error: {e}")
await send_exception(send, 500, f"Internal Server Error: {e}".encode())

async def handle_liveness(self, scope, receive, send):
alive = True
Expand Down Expand Up @@ -234,7 +260,23 @@ async def send_exception_cloudevent(send, status, message):
}
data = {"message": message}

await send.structured(CloudEvent(attributes, data), status)
# Check if send is a CloudEventSender with structured method
if hasattr(send, 'structured'):
await send.structured(CloudEvent(attributes, data), status)
else:
# Fallback to plain HTTP error response if send is not a CloudEventSender
logging.warning("send_exception_cloudevent called with non-CloudEventSender, falling back to HTTP response")
await send({
'type': 'http.response.start',
'status': status,
'headers': [[b'content-type', b'application/json']]
})
import json
error_body = json.dumps({"error": {"message": message, "type": "dev.functions.error"}})
await send({
'type': 'http.response.body',
'body': error_body.encode()
})


class CloudEventSender:
Expand Down
Loading
Loading