From 0a3a783f41bf19965af4614cb7e4ab4099b80928 Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Mon, 21 Jul 2025 11:14:44 +0100 Subject: [PATCH 01/14] chore: add response stream support for lambda function urls --- docs/use-cases/http/advanced-use-cases.mdx | 22 +++ src/Event/Http/HttpHandler.php | 4 +- src/Event/Http/HttpResponse.php | 56 +++++--- src/Event/Http/StreamedHttpResponse.php | 149 +++++++++++++++++++++ src/Runtime/LambdaRuntime.php | 131 +++++++++++++++++- 5 files changed, 344 insertions(+), 18 deletions(-) create mode 100644 src/Event/Http/StreamedHttpResponse.php diff --git a/docs/use-cases/http/advanced-use-cases.mdx b/docs/use-cases/http/advanced-use-cases.mdx index cc7333931..74f52da62 100644 --- a/docs/use-cases/http/advanced-use-cases.mdx +++ b/docs/use-cases/http/advanced-use-cases.mdx @@ -79,6 +79,28 @@ functions: url: true ``` +#### Invoke Method RESPONSE_STREAM + +In order to support Lambda Function URLs invoke method as `RESPONSE_STREAM` you need to change to settings: + +* BREF_STREAMED_MODE = 1 in the lambda environment +* invokeMode: RESPONSE_STREAM in the function url settings + +Like the following sample config: + +```yml filename="serverless.yml" +functions: + hello: + handler: MyApp\Handlers\MyLambdaUrlHandler + # ... + environment: + BREF_STREAMED_MODE: 1 + url: + invokeMode: RESPONSE_STREAM +``` + +Be aware that you must implement an `HttpHandler` handler, or use something like Laravel Octane handler if you are using Laravel. + ### API Gateway v1 REST API The syntax is slightly different from API Gateway v2 HTTP APIs as we must use a different `events` configuration. Here is an example that sends all requests to a single Lambda function: diff --git a/src/Event/Http/HttpHandler.php b/src/Event/Http/HttpHandler.php index ebd7092d5..c417c1cfb 100644 --- a/src/Event/Http/HttpHandler.php +++ b/src/Event/Http/HttpHandler.php @@ -7,10 +7,10 @@ abstract class HttpHandler implements Handler { - abstract public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse; + abstract public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse|StreamedHttpResponse; /** {@inheritDoc} */ - public function handle($event, Context $context): array + public function handle($event, Context $context): array|\Generator { // See https://bref.sh/docs/runtimes/http.html#cold-starts if (isset($event['warmer']) && $event['warmer'] === true) { diff --git a/src/Event/Http/HttpResponse.php b/src/Event/Http/HttpResponse.php index 80a679bc2..941f30835 100644 --- a/src/Event/Http/HttpResponse.php +++ b/src/Event/Http/HttpResponse.php @@ -21,8 +21,9 @@ public function __construct(string $body, array $headers = [], int $statusCode = $this->statusCode = $statusCode; } - public function toApiGatewayFormat(bool $multiHeaders = false): array + public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator { + $isStreamedMode = (bool) getenv('BREF_STREAMED_MODE'); $base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES'); $headers = []; @@ -47,19 +48,32 @@ public function toApiGatewayFormat(bool $multiHeaders = false): array // This is the format required by the AWS_PROXY lambda integration // See https://stackoverflow.com/questions/43708017/aws-lambda-api-gateway-error-malformed-lambda-proxy-response - return [ - 'isBase64Encoded' => $base64Encoding, - 'statusCode' => $this->statusCode, - $headersKey => $headers, - 'body' => $base64Encoding ? base64_encode($this->body) : $this->body, - ]; + + if ($isStreamedMode) { + yield json_encode([ + 'statusCode' => $this->statusCode, + $headersKey => $headers, + ]); + + yield "\0\0\0\0\0\0\0\0"; + + yield $this->body; + } else { + return [ + 'isBase64Encoded' => $base64Encoding, + 'statusCode' => $this->statusCode, + $headersKey => $headers, + 'body' => $base64Encoding ? base64_encode($this->body) : $this->body, + ]; + } } /** * See https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html#http-api-develop-integrations-lambda.response */ - public function toApiGatewayFormatV2(): array + public function toApiGatewayFormatV2(): array|\Generator { + $isStreamedMode = (bool) getenv('BREF_STREAMED_MODE'); $base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES'); $headers = []; @@ -80,13 +94,25 @@ public function toApiGatewayFormatV2(): array // serialized to `[]` (we want `{}`) so we force it to an empty object. $headers = empty($headers) ? new \stdClass : $headers; - return [ - 'cookies' => $cookies, - 'isBase64Encoded' => $base64Encoding, - 'statusCode' => $this->statusCode, - 'headers' => $headers, - 'body' => $base64Encoding ? base64_encode($this->body) : $this->body, - ]; + if ($isStreamedMode) { + yield json_encode([ + 'cookies' => $cookies, + 'statusCode' => $this->statusCode, + 'headers' => $headers, + ]); + + yield "\0\0\0\0\0\0\0\0"; + + yield $this->body; + } else { + return [ + 'cookies' => $cookies, + 'isBase64Encoded' => $base64Encoding, + 'statusCode' => $this->statusCode, + 'headers' => $headers, + 'body' => $base64Encoding ? base64_encode($this->body) : $this->body, + ]; + } } /** diff --git a/src/Event/Http/StreamedHttpResponse.php b/src/Event/Http/StreamedHttpResponse.php new file mode 100644 index 000000000..e04dafe96 --- /dev/null +++ b/src/Event/Http/StreamedHttpResponse.php @@ -0,0 +1,149 @@ + $headers + */ + public function __construct(Generator $body, array $headers = [], int $statusCode = 200) + { + $this->body = $body; + $this->headers = $headers; + $this->statusCode = $statusCode; + } + + public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator + { + $isStreamedMode = (bool) getenv('BREF_STREAMED_MODE'); + $base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES'); + + $headers = []; + foreach ($this->headers as $name => $values) { + $name = $this->capitalizeHeaderName($name); + + if ($multiHeaders) { + // Make sure the values are always arrays + $headers[$name] = is_array($values) ? $values : [$values]; + } else { + // Make sure the values are never arrays + $headers[$name] = is_array($values) ? end($values) : $values; + } + } + + // The headers must be a JSON object. If the PHP array is empty it is + // serialized to `[]` (we want `{}`) so we force it to an empty object. + $headers = empty($headers) ? new \stdClass : $headers; + + // Support for multi-value headers (only in version 1.0 of the http payload) + $headersKey = $multiHeaders ? 'multiValueHeaders' : 'headers'; + + // This is the format required by the AWS_PROXY lambda integration + // See https://stackoverflow.com/questions/43708017/aws-lambda-api-gateway-error-malformed-lambda-proxy-response + + if ($isStreamedMode) { + yield json_encode([ + 'statusCode' => $this->statusCode, + $headersKey => $headers, + ]); + + yield "\0\0\0\0\0\0\0\0"; + + foreach ($this->body as $dataChunk) { + yield $dataChunk; + } + } else { + $dataChunk = ''; + + while ($this->body->valid()) { + $dataChunk .= $this->body->current(); + + $this->body->next(); + } + + return [ + 'isBase64Encoded' => $base64Encoding, + 'statusCode' => $this->statusCode, + $headersKey => $headers, + 'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk, + ]; + } + } + + /** + * See https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html#http-api-develop-integrations-lambda.response + */ + public function toApiGatewayFormatV2(): array|\Generator + { + $isStreamedMode = (bool) getenv('BREF_STREAMED_MODE'); + $base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES'); + + $headers = []; + $cookies = []; + foreach ($this->headers as $name => $values) { + $name = $this->capitalizeHeaderName($name); + + if ($name === 'Set-Cookie') { + $cookies = is_array($values) ? $values : [$values]; + } else { + // Make sure the values are never arrays + // because API Gateway v2 does not support multi-value headers + $headers[$name] = is_array($values) ? implode(', ', $values) : $values; + } + } + + // The headers must be a JSON object. If the PHP array is empty it is + // serialized to `[]` (we want `{}`) so we force it to an empty object. + $headers = empty($headers) ? new \stdClass : $headers; + + if ($isStreamedMode) { + yield json_encode([ + 'cookies' => $cookies, + 'statusCode' => $this->statusCode, + 'headers' => $headers, + ]); + + yield "\0\0\0\0\0\0\0\0"; + + foreach ($this->body as $dataChunk) { + yield $dataChunk; + } + } else { + $dataChunk = ''; + + while ($this->body->valid()) { + $dataChunk .= $this->body->current(); + + $this->body->next(); + } + + return [ + 'cookies' => $cookies, + 'isBase64Encoded' => $base64Encoding, + 'statusCode' => $this->statusCode, + 'headers' => $headers, + 'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk, + ]; + } + } + + /** + * See https://github.com/zendframework/zend-diactoros/blob/754a2ceb7ab753aafe6e3a70a1fb0370bde8995c/src/Response/SapiEmitterTrait.php#L96 + */ + private function capitalizeHeaderName(string $name): string + { + $name = str_replace('-', ' ', $name); + $name = ucwords($name); + return str_replace(' ', '-', $name); + } +} diff --git a/src/Runtime/LambdaRuntime.php b/src/Runtime/LambdaRuntime.php index 17fa52e65..c51393fbc 100755 --- a/src/Runtime/LambdaRuntime.php +++ b/src/Runtime/LambdaRuntime.php @@ -8,6 +8,7 @@ use Bref\Event\Handler; use CurlHandle; use Exception; +use Generator; use JsonException; use Psr\Http\Server\RequestHandlerInterface; use RuntimeException; @@ -39,6 +40,8 @@ final class LambdaRuntime private $curlHandleNext; /** @var resource|CurlHandle|null */ private $curlHandleResult; + /** @var resource|CurlHandle|null */ + private $curlStreamedHandleResult; private string $apiUrl; private Invoker $invoker; private string $layer; @@ -97,6 +100,8 @@ public function processNextEvent(Handler | RequestHandlerInterface | callable $h $this->sendResponse($context->getAwsRequestId(), $result); } catch (Throwable $e) { + $this->logError($e, $context->getAwsRequestId()); + $this->signalFailure($context->getAwsRequestId(), $e); try { @@ -201,7 +206,12 @@ private function waitNextInvocation(): array private function sendResponse(string $invocationId, mixed $responseData): void { $url = "http://$this->apiUrl/2018-06-01/runtime/invocation/$invocationId/response"; - $this->postJson($url, $responseData); + + if ($responseData instanceof Generator) { + $this->postStreamed($url, $responseData); + } else { + $this->postJson($url, $responseData); + } } /** @@ -281,6 +291,117 @@ public function failInitialization( exit(1); } + /** + * @param string[] $headers + * @throws Exception + * @throws ResponseTooBig + */ + private function postStreamed(string $url, Generator $data, array $headers = []): void + { + if ($this->curlStreamedHandleResult === null) { + $this->curlStreamedHandleResult = curl_init(); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_UPLOAD, true); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_CUSTOMREQUEST, 'POST'); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_RETURNTRANSFER, true); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_INFILESIZE, -1); + } + + curl_setopt($this->curlStreamedHandleResult, CURLOPT_URL, $url); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_HTTPHEADER, [ + 'Lambda-Runtime-Function-Response-Mode: streaming', + 'Content-Type: application/vnd.awslambda.http-integration-response', + 'Transfer-Encoding: chunked', + ...$headers, + ]); + + if (PHP_VERSION_ID < 80100) { + $buffer = ''; + curl_setopt( + $this->curlStreamedHandleResult, + CURLOPT_READFUNCTION, + function ($ch, $fd, $length) use (&$data, &$buffer) { + if (strlen($buffer) < $length && $data->valid()) { + $buffer .= (string) $data->current(); + + /* + As this method needs to return an string, we need to wait for the next generator item to yield. + This can lead to the initial part of the buffer taking longer to load if the next chunk takes longer. + */ + $data->next(); + } + + $chunk = substr($buffer, 0, $length); + $buffer = substr($buffer, strlen($chunk)); + + return $chunk; + } + ); + } else { + $buffer = ''; + /* + * We use Fibers so we can suspend the yields and read data as needed. + * That way we don't block the response as more data comes. + */ + $fiber = new \Fiber( + function () use (&$data): void { + foreach ($data as $dataChunk) { + \Fiber::suspend((string) $dataChunk); + } + + \Fiber::suspend(PHP_INT_MIN); + } + ); + + curl_setopt( + $this->curlStreamedHandleResult, + CURLOPT_READFUNCTION, + function ($ch, $fd, $length) use (&$fiber, &$buffer) { + if ($buffer === '') { + if ($fiber->isStarted() || $fiber->isSuspended()) { + $fiberChunk = $fiber->resume(); + } elseif (! $fiber->isTerminated()) { + $fiberChunk = $fiber->start(); + } else { + $fiberChunk = PHP_INT_MIN; + } + + if ($fiberChunk !== PHP_INT_MIN) { + $buffer .= $fiberChunk; + } + } + + $chunk = substr($buffer, 0, $length); + $buffer = substr($buffer, strlen($chunk)); + + return $chunk; + } + ); + } + + $body = curl_exec($this->curlStreamedHandleResult); + + $statusCode = curl_getinfo($this->curlStreamedHandleResult, CURLINFO_HTTP_CODE); + if ($statusCode >= 400) { + // Re-open the connection in case of failure to start from a clean state + $this->closeCurlStreamedHandleResult(); + + if ($statusCode === 413) { + throw new ResponseTooBig; + } + + try { + $error = json_decode($body, true, 512, JSON_THROW_ON_ERROR); + $errorMessage = "{$error['errorType']}: {$error['errorMessage']}"; + } catch (JsonException) { + // In case we didn't get any JSON + $errorMessage = 'unknown error'; + } + + throw new Exception("Error $statusCode while calling the Lambda runtime API: $errorMessage"); + } + } + /** * @param string[] $headers * @throws Exception @@ -350,6 +471,14 @@ private function closeCurlHandleResult(): void } } + private function closeCurlStreamedHandleResult(): void + { + if ($this->curlStreamedHandleResult !== null) { + curl_close($this->curlStreamedHandleResult); + $this->curlStreamedHandleResult = null; + } + } + /** * Ping a Bref server with a statsd request. * From 067adf16fce23bfcb2abae6dfc10d8a1f8171b57 Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Mon, 21 Jul 2025 11:23:40 +0100 Subject: [PATCH 02/14] chore: merge StreamedHttpResponse to HttpResponse --- src/Event/Http/HttpHandler.php | 2 +- src/Event/Http/HttpResponse.php | 48 +++++++- src/Event/Http/StreamedHttpResponse.php | 149 ------------------------ 3 files changed, 43 insertions(+), 156 deletions(-) delete mode 100644 src/Event/Http/StreamedHttpResponse.php diff --git a/src/Event/Http/HttpHandler.php b/src/Event/Http/HttpHandler.php index c417c1cfb..bd902f407 100644 --- a/src/Event/Http/HttpHandler.php +++ b/src/Event/Http/HttpHandler.php @@ -7,7 +7,7 @@ abstract class HttpHandler implements Handler { - abstract public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse|StreamedHttpResponse; + abstract public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse; /** {@inheritDoc} */ public function handle($event, Context $context): array|\Generator diff --git a/src/Event/Http/HttpResponse.php b/src/Event/Http/HttpResponse.php index 941f30835..ba1f92a00 100644 --- a/src/Event/Http/HttpResponse.php +++ b/src/Event/Http/HttpResponse.php @@ -9,12 +9,12 @@ final class HttpResponse { private int $statusCode; private array $headers; - private string $body; + private string|\Generator $body; /** * @param array $headers */ - public function __construct(string $body, array $headers = [], int $statusCode = 200) + public function __construct(string|\Generator $body, array $headers = [], int $statusCode = 200) { $this->body = $body; $this->headers = $headers; @@ -57,13 +57,31 @@ public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator yield "\0\0\0\0\0\0\0\0"; - yield $this->body; + if ( $this->body instanceof \Generator ) { + foreach ($this->body as $dataChunk) { + yield $dataChunk; + } + } else { + yield $this->body; + } } else { + if ( $this->body instanceof \Generator ) { + $dataChunk = ''; + + while ($this->body->valid()) { + $dataChunk .= $this->body->current(); + + $this->body->next(); + } + } else { + $dataChunk = $this->body; + } + return [ 'isBase64Encoded' => $base64Encoding, 'statusCode' => $this->statusCode, $headersKey => $headers, - 'body' => $base64Encoding ? base64_encode($this->body) : $this->body, + 'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk, ]; } } @@ -103,14 +121,32 @@ public function toApiGatewayFormatV2(): array|\Generator yield "\0\0\0\0\0\0\0\0"; - yield $this->body; + if ( $this->body instanceof \Generator ) { + foreach ($this->body as $dataChunk) { + yield $dataChunk; + } + } else { + yield $this->body; + } } else { + if ( $this->body instanceof \Generator ) { + $dataChunk = ''; + + while ($this->body->valid()) { + $dataChunk .= $this->body->current(); + + $this->body->next(); + } + } else { + $dataChunk = $this->body; + } + return [ 'cookies' => $cookies, 'isBase64Encoded' => $base64Encoding, 'statusCode' => $this->statusCode, 'headers' => $headers, - 'body' => $base64Encoding ? base64_encode($this->body) : $this->body, + 'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk, ]; } } diff --git a/src/Event/Http/StreamedHttpResponse.php b/src/Event/Http/StreamedHttpResponse.php deleted file mode 100644 index e04dafe96..000000000 --- a/src/Event/Http/StreamedHttpResponse.php +++ /dev/null @@ -1,149 +0,0 @@ - $headers - */ - public function __construct(Generator $body, array $headers = [], int $statusCode = 200) - { - $this->body = $body; - $this->headers = $headers; - $this->statusCode = $statusCode; - } - - public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator - { - $isStreamedMode = (bool) getenv('BREF_STREAMED_MODE'); - $base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES'); - - $headers = []; - foreach ($this->headers as $name => $values) { - $name = $this->capitalizeHeaderName($name); - - if ($multiHeaders) { - // Make sure the values are always arrays - $headers[$name] = is_array($values) ? $values : [$values]; - } else { - // Make sure the values are never arrays - $headers[$name] = is_array($values) ? end($values) : $values; - } - } - - // The headers must be a JSON object. If the PHP array is empty it is - // serialized to `[]` (we want `{}`) so we force it to an empty object. - $headers = empty($headers) ? new \stdClass : $headers; - - // Support for multi-value headers (only in version 1.0 of the http payload) - $headersKey = $multiHeaders ? 'multiValueHeaders' : 'headers'; - - // This is the format required by the AWS_PROXY lambda integration - // See https://stackoverflow.com/questions/43708017/aws-lambda-api-gateway-error-malformed-lambda-proxy-response - - if ($isStreamedMode) { - yield json_encode([ - 'statusCode' => $this->statusCode, - $headersKey => $headers, - ]); - - yield "\0\0\0\0\0\0\0\0"; - - foreach ($this->body as $dataChunk) { - yield $dataChunk; - } - } else { - $dataChunk = ''; - - while ($this->body->valid()) { - $dataChunk .= $this->body->current(); - - $this->body->next(); - } - - return [ - 'isBase64Encoded' => $base64Encoding, - 'statusCode' => $this->statusCode, - $headersKey => $headers, - 'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk, - ]; - } - } - - /** - * See https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html#http-api-develop-integrations-lambda.response - */ - public function toApiGatewayFormatV2(): array|\Generator - { - $isStreamedMode = (bool) getenv('BREF_STREAMED_MODE'); - $base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES'); - - $headers = []; - $cookies = []; - foreach ($this->headers as $name => $values) { - $name = $this->capitalizeHeaderName($name); - - if ($name === 'Set-Cookie') { - $cookies = is_array($values) ? $values : [$values]; - } else { - // Make sure the values are never arrays - // because API Gateway v2 does not support multi-value headers - $headers[$name] = is_array($values) ? implode(', ', $values) : $values; - } - } - - // The headers must be a JSON object. If the PHP array is empty it is - // serialized to `[]` (we want `{}`) so we force it to an empty object. - $headers = empty($headers) ? new \stdClass : $headers; - - if ($isStreamedMode) { - yield json_encode([ - 'cookies' => $cookies, - 'statusCode' => $this->statusCode, - 'headers' => $headers, - ]); - - yield "\0\0\0\0\0\0\0\0"; - - foreach ($this->body as $dataChunk) { - yield $dataChunk; - } - } else { - $dataChunk = ''; - - while ($this->body->valid()) { - $dataChunk .= $this->body->current(); - - $this->body->next(); - } - - return [ - 'cookies' => $cookies, - 'isBase64Encoded' => $base64Encoding, - 'statusCode' => $this->statusCode, - 'headers' => $headers, - 'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk, - ]; - } - } - - /** - * See https://github.com/zendframework/zend-diactoros/blob/754a2ceb7ab753aafe6e3a70a1fb0370bde8995c/src/Response/SapiEmitterTrait.php#L96 - */ - private function capitalizeHeaderName(string $name): string - { - $name = str_replace('-', ' ', $name); - $name = ucwords($name); - return str_replace(' ', '-', $name); - } -} From cfa55fcbce7c488f3f562b94d0ef13ff1b50c4da Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Mon, 21 Jul 2025 11:25:59 +0100 Subject: [PATCH 03/14] style: fix style --- src/Event/Http/HttpResponse.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Event/Http/HttpResponse.php b/src/Event/Http/HttpResponse.php index ba1f92a00..61a9730c5 100644 --- a/src/Event/Http/HttpResponse.php +++ b/src/Event/Http/HttpResponse.php @@ -57,7 +57,7 @@ public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator yield "\0\0\0\0\0\0\0\0"; - if ( $this->body instanceof \Generator ) { + if ($this->body instanceof \Generator) { foreach ($this->body as $dataChunk) { yield $dataChunk; } @@ -65,7 +65,7 @@ public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator yield $this->body; } } else { - if ( $this->body instanceof \Generator ) { + if ($this->body instanceof \Generator) { $dataChunk = ''; while ($this->body->valid()) { @@ -121,7 +121,7 @@ public function toApiGatewayFormatV2(): array|\Generator yield "\0\0\0\0\0\0\0\0"; - if ( $this->body instanceof \Generator ) { + if ($this->body instanceof \Generator) { foreach ($this->body as $dataChunk) { yield $dataChunk; } @@ -129,7 +129,7 @@ public function toApiGatewayFormatV2(): array|\Generator yield $this->body; } } else { - if ( $this->body instanceof \Generator ) { + if ($this->body instanceof \Generator) { $dataChunk = ''; while ($this->body->valid()) { From 85aa4fb362fa5ca04cd43658a7171fbe54a13024 Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Mon, 21 Jul 2025 15:00:30 +0100 Subject: [PATCH 04/14] tests: fix yield generator or array, add tests to lambda runtime, and to http event --- src/Event/Http/HttpHandler.php | 1 + src/Event/Http/HttpResponse.php | 39 +++++++-------- src/Runtime/LambdaRuntime.php | 2 - tests/Event/Http/HttpResponseTest.php | 69 +++++++++++++++++++++++++ tests/Runtime/LambdaRuntimeTest.php | 72 +++++++++++++++++++++++---- 5 files changed, 150 insertions(+), 33 deletions(-) diff --git a/src/Event/Http/HttpHandler.php b/src/Event/Http/HttpHandler.php index bd902f407..d08223ad6 100644 --- a/src/Event/Http/HttpHandler.php +++ b/src/Event/Http/HttpHandler.php @@ -24,6 +24,7 @@ public function handle($event, Context $context): array|\Generator $response = $this->handleRequest($httpEvent, $context); + var_dump($response->toApiGatewayFormat()); if ($httpEvent->isFormatV2()) { return $response->toApiGatewayFormatV2(); } diff --git a/src/Event/Http/HttpResponse.php b/src/Event/Http/HttpResponse.php index 61a9730c5..16270bd2d 100644 --- a/src/Event/Http/HttpResponse.php +++ b/src/Event/Http/HttpResponse.php @@ -50,20 +50,10 @@ public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator // See https://stackoverflow.com/questions/43708017/aws-lambda-api-gateway-error-malformed-lambda-proxy-response if ($isStreamedMode) { - yield json_encode([ + return $this->yieldBody([ 'statusCode' => $this->statusCode, $headersKey => $headers, ]); - - yield "\0\0\0\0\0\0\0\0"; - - if ($this->body instanceof \Generator) { - foreach ($this->body as $dataChunk) { - yield $dataChunk; - } - } else { - yield $this->body; - } } else { if ($this->body instanceof \Generator) { $dataChunk = ''; @@ -113,21 +103,11 @@ public function toApiGatewayFormatV2(): array|\Generator $headers = empty($headers) ? new \stdClass : $headers; if ($isStreamedMode) { - yield json_encode([ + return $this->yieldBody([ 'cookies' => $cookies, 'statusCode' => $this->statusCode, 'headers' => $headers, ]); - - yield "\0\0\0\0\0\0\0\0"; - - if ($this->body instanceof \Generator) { - foreach ($this->body as $dataChunk) { - yield $dataChunk; - } - } else { - yield $this->body; - } } else { if ($this->body instanceof \Generator) { $dataChunk = ''; @@ -160,4 +140,19 @@ private function capitalizeHeaderName(string $name): string $name = ucwords($name); return str_replace(' ', '-', $name); } + + private function yieldBody($headersFormat): \Generator + { + yield json_encode($headersFormat); + + yield "\0\0\0\0\0\0\0\0"; + + if ($this->body instanceof \Generator) { + foreach ($this->body as $dataChunk) { + yield $dataChunk; + } + } else { + yield $this->body; + } + } } diff --git a/src/Runtime/LambdaRuntime.php b/src/Runtime/LambdaRuntime.php index c51393fbc..7e9f77373 100755 --- a/src/Runtime/LambdaRuntime.php +++ b/src/Runtime/LambdaRuntime.php @@ -100,8 +100,6 @@ public function processNextEvent(Handler | RequestHandlerInterface | callable $h $this->sendResponse($context->getAwsRequestId(), $result); } catch (Throwable $e) { - $this->logError($e, $context->getAwsRequestId()); - $this->signalFailure($context->getAwsRequestId(), $e); try { diff --git a/tests/Event/Http/HttpResponseTest.php b/tests/Event/Http/HttpResponseTest.php index dab15792e..6d6ae9230 100644 --- a/tests/Event/Http/HttpResponseTest.php +++ b/tests/Event/Http/HttpResponseTest.php @@ -177,4 +177,73 @@ public function test response with multiple cookies() 'body' => '', ], $response->toApiGatewayFormatV2()); } + + public function test generator returns generator api gateway v1() + { + putenv('BREF_STREAMED_MODE=1'); + + $generatorFunction = function (): \Generator { + yield 'hello'; + yield 'world'; + }; + + $response = new HttpResponse($generatorFunction(), [ + 'x-foo-bar' => 'baz', + ]); + + $generator = $response->toApiGatewayFormat(); + + self::assertInstanceOf(\Generator::class, $generator); + + self::assertEquals([ + 'statusCode' => 200, + 'headers' => ['X-Foo-Bar' => 'baz'], + ], json_decode($generator->current(), true)); + + $generator->next(); + self::assertEquals("\0\0\0\0\0\0\0\0", $generator->current()); + + $generator->next(); + self::assertEquals('hello', $generator->current()); + + $generator->next(); + self::assertEquals('world', $generator->current()); + + putenv('BREF_STREAMED_MODE=0'); + } + + public function test generator returns generator api gateway v2() + { + putenv('BREF_STREAMED_MODE=1'); + + $generatorFunction = function (): \Generator { + yield 'hello'; + yield 'world'; + }; + + $response = new HttpResponse($generatorFunction(), [ + 'x-foo-bar' => 'baz', + ]); + + $generator = $response->toApiGatewayFormatV2(); + + self::assertInstanceOf(\Generator::class, $generator); + + self::assertEquals([ + 'cookies' => [], + 'statusCode' => 200, + 'headers' => ['X-Foo-Bar' => 'baz'], + ], json_decode($generator->current(), true)); + + $generator->next(); + self::assertEquals("\0\0\0\0\0\0\0\0", $generator->current()); + + $generator->next(); + self::assertEquals('hello', $generator->current()); + + $generator->next(); + self::assertEquals('world', $generator->current()); + + putenv('BREF_STREAMED_MODE=0'); + } } diff --git a/tests/Runtime/LambdaRuntimeTest.php b/tests/Runtime/LambdaRuntimeTest.php index aecc7d89a..a5fcb00b2 100644 --- a/tests/Runtime/LambdaRuntimeTest.php +++ b/tests/Runtime/LambdaRuntimeTest.php @@ -8,6 +8,7 @@ use Bref\Event\EventBridge\EventBridgeHandler; use Bref\Event\Handler; use Bref\Event\Http\HttpRequestEvent; +use Bref\Event\Http\HttpResponse; use Bref\Event\S3\S3Event; use Bref\Event\S3\S3Handler; use Bref\Event\Sns\SnsEvent; @@ -248,7 +249,7 @@ public function test function results that cannot be encoded are reporte public function test generic event handler() { - $handler = new class() implements Handler { + $handler = new class () implements Handler { public function handle(mixed $event, Context $context): mixed { return $event; @@ -264,7 +265,7 @@ public function handle(mixed $event, Context $context): mixed public function test SQS event handler() { - $handler = new class() extends SqsHandler { + $handler = new class () extends SqsHandler { public SqsEvent $event; public function handleSqs(SqsEvent $event, Context $context): void { @@ -282,7 +283,7 @@ public function handleSqs(SqsEvent $event, Context $context): void public function test SNS event handler() { - $handler = new class() extends SnsHandler { + $handler = new class () extends SnsHandler { public SnsEvent $event; public function handleSns(SnsEvent $event, Context $context): void { @@ -300,7 +301,7 @@ public function handleSns(SnsEvent $event, Context $context): void public function test S3 event handler() { - $handler = new class() extends S3Handler { + $handler = new class () extends S3Handler { public S3Event $event; public function handleS3(S3Event $event, Context $context): void { @@ -318,7 +319,7 @@ public function handleS3(S3Event $event, Context $context): void public function test PSR15 event handler() { - $handler = new class() implements RequestHandlerInterface { + $handler = new class () implements RequestHandlerInterface { public ServerRequestInterface $request; public function handle(ServerRequestInterface $request): ResponseInterface { @@ -350,7 +351,7 @@ public function handle(ServerRequestInterface $request): ResponseInterface public function test EventBridge event handler() { - $handler = new class() extends EventBridgeHandler { + $handler = new class () extends EventBridgeHandler { public EventBridgeEvent $event; public function handleEventBridge(EventBridgeEvent $event, Context $context): void { @@ -368,7 +369,7 @@ public function handleEventBridge(EventBridgeEvent $event, Context $context): vo public function test exceptions in beforeInvoke result in an invocation error() { - Bref::events()->subscribe(new class extends BrefEventSubscriber { + Bref::events()->subscribe(new class () extends BrefEventSubscriber { public function beforeInvoke(mixed ...$params): void { throw new Exception('This is an exception in beforeInvoke'); @@ -390,7 +391,7 @@ public function beforeInvoke(mixed ...$params): void */ public function test a failure in afterInvoke after a success does not signal a failure() { - Bref::events()->subscribe(new class extends BrefEventSubscriber { + Bref::events()->subscribe(new class () extends BrefEventSubscriber { public function afterInvoke(mixed ...$params): void { throw new Exception('This is an exception in afterInvoke'); @@ -414,7 +415,7 @@ public function afterInvoke(mixed ...$params): void public function test a failure in afterInvoke after a failure does not crash the runtime() { - Bref::events()->subscribe(new class extends BrefEventSubscriber { + Bref::events()->subscribe(new class () extends BrefEventSubscriber { public function afterInvoke(mixed ...$params): void { throw new Exception('This is an exception in afterInvoke'); @@ -431,4 +432,57 @@ public function afterInvoke(mixed ...$params): void $this->assertStringContainsString('Invoke Error {"errorType":"Exception","errorMessage":"Invocation error","stack":', $this->getActualOutput()); $this->assertStringContainsString('Invoke Error {"errorType":"Exception","errorMessage":"This is an exception in afterInvoke","stack":', $this->getActualOutput()); } + + public function test streamed response from generator http event() + { + putenv('BREF_STREAMED_MODE=1'); + + Server::flush(); + + Server::enqueue([ + new Response( // lambda event + 200, + [ + 'lambda-runtime-aws-request-id' => '1', + ], + '{ "Hello": "world!"}' + ), + new Response(202), + ]); + + $this->runtime->processNextEvent(function ($event) { + $generatorFunction = function (): \Generator { + yield 'hello'; + yield 'world'; + }; + + return ( + new HttpResponse( + $generatorFunction(), + [ + 'Content-Type' => 'text/html; charset=utf-8', + ] + ) + )->toApiGatewayFormatV2(); + }); + + $requests = Server::received(); + + $this->assertCount(2, $requests); + + [$eventRequest, $eventStreamResponse] = $requests; + + $this->assertSame('GET', $eventRequest->getMethod()); + $this->assertSame('http://localhost:8126/2018-06-01/runtime/invocation/next', $eventRequest->getUri()->__toString()); + + $this->assertSame('POST', $eventStreamResponse->getMethod()); + + $this->assertSame('streaming', $eventStreamResponse->getHeaderLine('lambda-runtime-function-response-mode')); + $this->assertSame('chunked', $eventStreamResponse->getHeaderLine('transfer-encoding')); + $this->assertSame('application/vnd.awslambda.http-integration-response', $eventStreamResponse->getHeaderLine('content-type')); + + $this->assertSame('http://localhost:8126/2018-06-01/runtime/invocation/1/response', $eventStreamResponse->getUri()->__toString()); + + putenv('BREF_STREAMED_MODE=0'); + } } From 264728db36155d99544c463b32f90001092de04e Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Mon, 21 Jul 2025 15:01:17 +0100 Subject: [PATCH 05/14] style: remove var_dump --- src/Event/Http/HttpHandler.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Event/Http/HttpHandler.php b/src/Event/Http/HttpHandler.php index d08223ad6..bd902f407 100644 --- a/src/Event/Http/HttpHandler.php +++ b/src/Event/Http/HttpHandler.php @@ -24,7 +24,6 @@ public function handle($event, Context $context): array|\Generator $response = $this->handleRequest($httpEvent, $context); - var_dump($response->toApiGatewayFormat()); if ($httpEvent->isFormatV2()) { return $response->toApiGatewayFormatV2(); } From 247ff59ef03fbb5988e422b69385cebf818001db Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Mon, 13 Oct 2025 23:29:06 +0100 Subject: [PATCH 06/14] chore: add flag to disable fibers in streamed responses --- src/Runtime/LambdaRuntime.php | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/Runtime/LambdaRuntime.php b/src/Runtime/LambdaRuntime.php index 7e9f77373..4f1488fe2 100755 --- a/src/Runtime/LambdaRuntime.php +++ b/src/Runtime/LambdaRuntime.php @@ -1,4 +1,6 @@ -apiUrl = $apiUrl; - $this->invoker = new Invoker; + $this->invoker = new Invoker(); $this->layer = $layer; } @@ -145,7 +147,7 @@ private function waitNextInvocation(): array } // Retrieve invocation ID - $contextBuilder = new ContextBuilder; + $contextBuilder = new ContextBuilder(); curl_setopt($this->curlHandleNext, CURLOPT_HEADERFUNCTION, function ($ch, $header) use ($contextBuilder) { if (! preg_match('/:\s*/', $header)) { return strlen($header); @@ -313,7 +315,7 @@ private function postStreamed(string $url, Generator $data, array $headers = []) ...$headers, ]); - if (PHP_VERSION_ID < 80100) { + if (PHP_VERSION_ID < 80100 || ((bool) getenv('BREF_STREAM_NO_FIBER'))) { $buffer = ''; curl_setopt( $this->curlStreamedHandleResult, @@ -385,7 +387,7 @@ function ($ch, $fd, $length) use (&$fiber, &$buffer) { $this->closeCurlStreamedHandleResult(); if ($statusCode === 413) { - throw new ResponseTooBig; + throw new ResponseTooBig(); } try { @@ -438,7 +440,7 @@ private function postJson(string $url, mixed $data, array $headers = []): void $this->closeCurlHandleResult(); if ($statusCode === 413) { - throw new ResponseTooBig; + throw new ResponseTooBig(); } try { From 69bf061fcfb2eaf190b3c14ceb9ec2a39abd9e46 Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Tue, 14 Oct 2025 12:22:52 +0100 Subject: [PATCH 07/14] chore: add setupStreamFiberContext event/hook --- src/Bref.php | 16 ++++++++++++++++ src/Listener/BrefEventSubscriber.php | 11 ++++++++++- src/Listener/EventDispatcher.php | 16 +++++++++++++++- src/Runtime/LambdaRuntime.php | 6 ++++++ 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/src/Bref.php b/src/Bref.php index d5d6c5fe7..93085f01d 100644 --- a/src/Bref.php +++ b/src/Bref.php @@ -16,6 +16,7 @@ class Bref private static array $hooks = [ 'beforeStartup' => [], 'beforeInvoke' => [], + 'setupStreamFiberContext' => [], ]; private static EventDispatcher $eventDispatcher; @@ -65,6 +66,20 @@ public static function beforeInvoke(Closure $hook): void self::$hooks['beforeInvoke'][] = $hook; } + /** + * Register a hook to be executed when the stream fiber starts. + * + * Warning: hooks are low-level extension points to be used by framework + * integrations. For user code, it is not recommended to use them. Use your + * framework's extension points instead. + * + * @deprecated Use Bref::events()->subscribe() instead. + */ + public static function setupStreamFiberContext(Closure $hook): void + { + self::$hooks['setupStreamFiberContext'][] = $hook; + } + /** * @param 'beforeStartup'|'beforeInvoke' $hookName * @@ -106,6 +121,7 @@ public static function reset(): void self::$hooks = [ 'beforeStartup' => [], 'beforeInvoke' => [], + 'setupStreamFiberContext' => [], ]; self::$eventDispatcher = new EventDispatcher; } diff --git a/src/Listener/BrefEventSubscriber.php b/src/Listener/BrefEventSubscriber.php index ec2b315d0..cd5b21b63 100644 --- a/src/Listener/BrefEventSubscriber.php +++ b/src/Listener/BrefEventSubscriber.php @@ -1,4 +1,6 @@ -subscribers as $listener) { + $listener->setupStreamFiberContext(); + } + } + /** * Trigger the `beforeInvoke` event. * diff --git a/src/Runtime/LambdaRuntime.php b/src/Runtime/LambdaRuntime.php index 4f1488fe2..0aebf1b30 100755 --- a/src/Runtime/LambdaRuntime.php +++ b/src/Runtime/LambdaRuntime.php @@ -321,6 +321,9 @@ private function postStreamed(string $url, Generator $data, array $headers = []) $this->curlStreamedHandleResult, CURLOPT_READFUNCTION, function ($ch, $fd, $length) use (&$data, &$buffer) { + Bref::triggerHooks('setupStreamFiberContext'); + Bref::events()->setupStreamFiberContext(); + if (strlen($buffer) < $length && $data->valid()) { $buffer .= (string) $data->current(); @@ -345,6 +348,9 @@ function ($ch, $fd, $length) use (&$data, &$buffer) { */ $fiber = new \Fiber( function () use (&$data): void { + Bref::triggerHooks('setupStreamFiberContext'); + Bref::events()->setupStreamFiberContext(); + foreach ($data as $dataChunk) { \Fiber::suspend((string) $dataChunk); } From c3af981dc272098d478a58d770237827357f4416 Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Wed, 15 Oct 2025 13:40:20 +0100 Subject: [PATCH 08/14] chore: stream fiber support, initial FPM streaming --- src/Bref.php | 38 +++++++++++++--------------- src/Event/Http/HttpResponse.php | 14 ++++++---- src/FpmRuntime/FpmHandler.php | 1 + src/Listener/BrefEventSubscriber.php | 11 ++++++-- src/Listener/EventDispatcher.php | 18 ++++++++++--- src/Runtime/LambdaRuntime.php | 10 +++----- 6 files changed, 56 insertions(+), 36 deletions(-) diff --git a/src/Bref.php b/src/Bref.php index 93085f01d..f3b3d5d79 100644 --- a/src/Bref.php +++ b/src/Bref.php @@ -1,4 +1,6 @@ - [], 'beforeInvoke' => [], - 'setupStreamFiberContext' => [], ]; private static EventDispatcher $eventDispatcher; + public static function isRunningInStreamingMode(): bool + { + return (bool) getenv('BREF_STREAMED_MODE'); + } + + public static function doesStreamingSupportsFibers(): bool + { + return PHP_VERSION_ID >= 80100 && !((bool) getenv('BREF_STREAM_NO_FIBER')); + } + /** * Configure the container that provides Lambda handlers. * @@ -33,7 +44,7 @@ public static function setContainer(Closure $containerProvider): void public static function events(): EventDispatcher { if (! isset(self::$eventDispatcher)) { - self::$eventDispatcher = new EventDispatcher; + self::$eventDispatcher = new EventDispatcher(); } return self::$eventDispatcher; } @@ -66,20 +77,6 @@ public static function beforeInvoke(Closure $hook): void self::$hooks['beforeInvoke'][] = $hook; } - /** - * Register a hook to be executed when the stream fiber starts. - * - * Warning: hooks are low-level extension points to be used by framework - * integrations. For user code, it is not recommended to use them. Use your - * framework's extension points instead. - * - * @deprecated Use Bref::events()->subscribe() instead. - */ - public static function setupStreamFiberContext(Closure $hook): void - { - self::$hooks['setupStreamFiberContext'][] = $hook; - } - /** * @param 'beforeStartup'|'beforeInvoke' $hookName * @@ -104,7 +101,7 @@ public static function getContainer(): ContainerInterface throw new RuntimeException('The closure provided to Bref\Bref::setContainer() did not return an instance of ' . ContainerInterface::class); } } else { - self::$container = new FileHandlerLocator; + self::$container = new FileHandlerLocator(); } } @@ -121,8 +118,9 @@ public static function reset(): void self::$hooks = [ 'beforeStartup' => [], 'beforeInvoke' => [], - 'setupStreamFiberContext' => [], + 'beforeStreamFiberLoops' => [], + 'afterStreamFiberLoops' => [], ]; - self::$eventDispatcher = new EventDispatcher; + self::$eventDispatcher = new EventDispatcher(); } } diff --git a/src/Event/Http/HttpResponse.php b/src/Event/Http/HttpResponse.php index 16270bd2d..654adc6d3 100644 --- a/src/Event/Http/HttpResponse.php +++ b/src/Event/Http/HttpResponse.php @@ -1,7 +1,11 @@ -yieldBody([ diff --git a/src/FpmRuntime/FpmHandler.php b/src/FpmRuntime/FpmHandler.php index caa1ef145..c8db038fd 100644 --- a/src/FpmRuntime/FpmHandler.php +++ b/src/FpmRuntime/FpmHandler.php @@ -134,6 +134,7 @@ public function handleRequest(HttpRequestEvent $event, Context $context): HttpRe $socketId = $this->client->sendAsyncRequest($this->connection, $request); $response = $this->client->readResponse($socketId, $timeoutDelayInMs); + // TODO: Here when it's streamed mode we return an http response with body as a generator, and then we use the callback to yield the body } catch (TimedoutException) { $invocationId = $context->getAwsRequestId(); echo "$invocationId The PHP script timed out. Bref will now restart PHP-FPM to start from a clean slate and flush the PHP logs.\nTimeouts can happen for example when trying to connect to a remote API or database, if this happens continuously check for those.\nIf you are using a RDS database, read this: https://bref.sh/docs/environment/database.html#accessing-the-internet\n"; diff --git a/src/Listener/BrefEventSubscriber.php b/src/Listener/BrefEventSubscriber.php index cd5b21b63..f0c051660 100644 --- a/src/Listener/BrefEventSubscriber.php +++ b/src/Listener/BrefEventSubscriber.php @@ -33,9 +33,16 @@ public function afterStartup(): void } /** - * Register a hook to be executed when the stream fiber needs to have context setup. + * Register a hook to be executed before the stream fiber loops, usually used to setup extra context. */ - public function setupStreamFiberContext(): void + public function beforeStreamFiberLoops(): void + { + } + + /** + * Register a hook to be executed when the stream fiber is about to finish, usually used to draw resources. + */ + public function afterStreamFiberLoops(): void { } diff --git a/src/Listener/EventDispatcher.php b/src/Listener/EventDispatcher.php index 8dee1142a..ee0171243 100644 --- a/src/Listener/EventDispatcher.php +++ b/src/Listener/EventDispatcher.php @@ -54,14 +54,26 @@ public function afterStartup(): void } /** - * Trigger the `setupStreamFiberContext` event. + * Trigger the `beforeStreamFiberLoops` event. * * @internal This method is called by Bref and should not be called by user code. */ - public function setupStreamFiberContext(): void + public function beforeStreamFiberLoops(): void { foreach ($this->subscribers as $listener) { - $listener->setupStreamFiberContext(); + $listener->beforeStreamFiberLoops(); + } + } + + /** + * Trigger the `afterStreamFiberLoops` event. + * + * @internal This method is called by Bref and should not be called by user code. + */ + public function afterStreamFiberLoops(): void + { + foreach ($this->subscribers as $listener) { + $listener->afterStreamFiberLoops(); } } diff --git a/src/Runtime/LambdaRuntime.php b/src/Runtime/LambdaRuntime.php index 0aebf1b30..0e99a6f47 100755 --- a/src/Runtime/LambdaRuntime.php +++ b/src/Runtime/LambdaRuntime.php @@ -315,15 +315,12 @@ private function postStreamed(string $url, Generator $data, array $headers = []) ...$headers, ]); - if (PHP_VERSION_ID < 80100 || ((bool) getenv('BREF_STREAM_NO_FIBER'))) { + if (! Bref::doesStreamingSupportsFibers()) { $buffer = ''; curl_setopt( $this->curlStreamedHandleResult, CURLOPT_READFUNCTION, function ($ch, $fd, $length) use (&$data, &$buffer) { - Bref::triggerHooks('setupStreamFiberContext'); - Bref::events()->setupStreamFiberContext(); - if (strlen($buffer) < $length && $data->valid()) { $buffer .= (string) $data->current(); @@ -348,13 +345,14 @@ function ($ch, $fd, $length) use (&$data, &$buffer) { */ $fiber = new \Fiber( function () use (&$data): void { - Bref::triggerHooks('setupStreamFiberContext'); - Bref::events()->setupStreamFiberContext(); + Bref::events()->beforeStreamFiberLoops(); foreach ($data as $dataChunk) { \Fiber::suspend((string) $dataChunk); } + Bref::events()->afterStreamFiberLoops(); + \Fiber::suspend(PHP_INT_MIN); } ); From e8c79eda5c6cd14426735fd18fd968a9417f29bc Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Wed, 15 Oct 2025 15:08:33 +0100 Subject: [PATCH 09/14] chore: fpm handler initial streaming support --- src/Bref.php | 2 +- src/FpmRuntime/FpmHandler.php | 104 ++++++++++++++++++++++++++++++++-- 2 files changed, 101 insertions(+), 5 deletions(-) diff --git a/src/Bref.php b/src/Bref.php index f3b3d5d79..7254e9b3c 100644 --- a/src/Bref.php +++ b/src/Bref.php @@ -28,7 +28,7 @@ public static function isRunningInStreamingMode(): bool public static function doesStreamingSupportsFibers(): bool { - return PHP_VERSION_ID >= 80100 && !((bool) getenv('BREF_STREAM_NO_FIBER')); + return PHP_VERSION_ID >= 80100 && !((bool) getenv('BREF_STREAM_NO_FIBER')) && class_exists('Fiber'); } /** diff --git a/src/FpmRuntime/FpmHandler.php b/src/FpmRuntime/FpmHandler.php index c8db038fd..d1c35326d 100644 --- a/src/FpmRuntime/FpmHandler.php +++ b/src/FpmRuntime/FpmHandler.php @@ -1,7 +1,10 @@ -fpm = $resource; - $this->client = new Client; + $this->client = new Client(); $this->connection = new UnixDomainSocket(self::SOCKET, 1000, 900000); $this->waitUntilReady(); @@ -114,6 +118,90 @@ public function __destruct() $this->stop(); } + public function handleStreamedRequest(HttpRequestEvent $event, Context $context): HttpResponse + { + $responseFiber = new \Fiber( + function () use (&$event, &$context) { + $request = $this->eventToFastCgiRequest( + $event, + $context, + function (string $stdOut = '', string $stdErr = '') { + if ($stdOut !== '') { + \Fiber::suspend(['stdout', $stdOut]); + } elseif ($stdErr !== '') { + \Fiber::suspend(['stderr', $stdErr]); + } + } + ); + + $margin = 1000; + $timeoutDelayInMs = max(1000, $context->getRemainingTimeInMillis() - $margin); + + try { + $socketId = $this->client->sendAsyncRequest($this->connection, $request); + + $this->client->readResponse($socketId, $timeoutDelayInMs); + + \Fiber::suspend(['finish', PHP_INT_MAX]); + } catch (TimedoutException) { + } + } + ); + + [, $outputAccumulator] = $responseFiber->start(); + $stdErrAccumulator = ''; + $finishHeaders = false; + $startTime = microtime(true); + $headerLines = ''; + $responseHeaders = []; + + do { + if ($responseFiber->isStarted() || $responseFiber->isSuspended()) { + [$chunkType, $fiberChunk] = $responseFiber->resume(); + } else { + [$chunkType, $fiberChunk] = ['finish', PHP_INT_MIN]; + } + + if ($fiberChunk !== PHP_INT_MIN) { + if ($chunkType === 'stderr') { + $stdErrAccumulator .= $fiberChunk; + } else { + $outputAccumulator .= $fiberChunk; + } + } + + $lines = explode(PHP_EOL, $outputAccumulator); + $firstCRLFOffset = 0; + + foreach ($lines as $i => $line) { + if ($line === "\r" && ! $firstCRLFOffset) { + $firstCRLFOffset = $i; + } elseif ($line === "\r" && $i - 1 === $firstCRLFOffset) { + $finishHeaders = true; + $headerLines = implode(PHP_EOL, array_slice($lines, 0, $i)) . "\r\n\r\n"; + $outputAccumulator = implode(PHP_EOL, array_slice($lines, $i)); + } + } + + $responseHeaders = $this->getResponseHeaders( + ( + new FastCGIResponse( + $headerLines, + '', + microtime(true) - $startTime + ) + ) + ); + } while(!$finishHeaders); + + if (isset($responseHeaders['status'])) { + $status = (int) (is_array($responseHeaders['status']) ? $responseHeaders['status'][0] : $responseHeaders['status']); + unset($responseHeaders['status']); + } + + return new HttpResponse('ola', $responseHeaders, $status ?? 200); + } + /** * Proxy the API Gateway event to PHP-FPM and return its response. * @@ -123,6 +211,10 @@ public function __destruct() */ public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse { + if (Bref::doesStreamingSupportsFibers()) { + return $this->handleStreamedRequest($event, $context); + } + $request = $this->eventToFastCgiRequest($event, $context); // The script will timeout 1 second before the remaining time @@ -171,7 +263,7 @@ public function handleRequest(HttpRequestEvent $event, Context $context): HttpRe $this->stop(); $this->start(); - throw new FastCgiCommunicationFailed; + throw new FastCgiCommunicationFailed(); } $responseHeaders = $this->getResponseHeaders($response); @@ -229,7 +321,7 @@ private function isReady(): bool return file_exists(self::SOCKET); } - private function eventToFastCgiRequest(HttpRequestEvent $event, Context $context): ProvidesRequestData + private function eventToFastCgiRequest(HttpRequestEvent $event, Context $context, ?callable $passThroughCallback = null): ProvidesRequestData { $request = new FastCgiRequest($event->getMethod(), $this->handler, $event->getBody()); $request->setRequestUri($event->getUri()); @@ -244,6 +336,10 @@ private function eventToFastCgiRequest(HttpRequestEvent $event, Context $context $request->setCustomVar('LAMBDA_INVOCATION_CONTEXT', json_encode($context, JSON_THROW_ON_ERROR)); $request->setCustomVar('LAMBDA_REQUEST_CONTEXT', json_encode($event->getRequestContext(), JSON_THROW_ON_ERROR)); + if ($passThroughCallback) { + $request->addPassThroughCallbacks($passThroughCallback); + } + $contentType = $event->getContentType(); if ($contentType) { $request->setContentType($contentType); From 8856caf52aa46c74fc8fca57bea2e16cb7a1f6ad Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Wed, 15 Oct 2025 22:00:59 +0100 Subject: [PATCH 10/14] chore: fpm runtime streamed responses --- src/FpmRuntime/FpmHandler.php | 139 +++++++++++++++++++++------------- 1 file changed, 87 insertions(+), 52 deletions(-) diff --git a/src/FpmRuntime/FpmHandler.php b/src/FpmRuntime/FpmHandler.php index d1c35326d..1e7e47577 100644 --- a/src/FpmRuntime/FpmHandler.php +++ b/src/FpmRuntime/FpmHandler.php @@ -122,7 +122,7 @@ public function handleStreamedRequest(HttpRequestEvent $event, Context $context) { $responseFiber = new \Fiber( function () use (&$event, &$context) { - $request = $this->eventToFastCgiRequest( + $this->sendRequestToFastCgi( $event, $context, function (string $stdOut = '', string $stdErr = '') { @@ -131,24 +131,13 @@ function (string $stdOut = '', string $stdErr = '') { } elseif ($stdErr !== '') { \Fiber::suspend(['stderr', $stdErr]); } - } + }, + false ); - - $margin = 1000; - $timeoutDelayInMs = max(1000, $context->getRemainingTimeInMillis() - $margin); - - try { - $socketId = $this->client->sendAsyncRequest($this->connection, $request); - - $this->client->readResponse($socketId, $timeoutDelayInMs); - - \Fiber::suspend(['finish', PHP_INT_MAX]); - } catch (TimedoutException) { - } } ); - [, $outputAccumulator] = $responseFiber->start(); + $outputAccumulator = ''; $stdErrAccumulator = ''; $finishHeaders = false; $startTime = microtime(true); @@ -156,13 +145,15 @@ function (string $stdOut = '', string $stdErr = '') { $responseHeaders = []; do { - if ($responseFiber->isStarted() || $responseFiber->isSuspended()) { - [$chunkType, $fiberChunk] = $responseFiber->resume(); + if (($responseFiber->isStarted() || $responseFiber->isSuspended()) && ! $responseFiber->isTerminated()) { + [$chunkType, $fiberChunk] = $responseFiber->resume() ?: ['', '']; + } elseif (! $responseFiber->isTerminated()) { + [$chunkType, $fiberChunk] = $responseFiber->start() ?: ['', '']; } else { - [$chunkType, $fiberChunk] = ['finish', PHP_INT_MIN]; + [$chunkType, $fiberChunk] = ['finish', PHP_INT_MAX]; } - if ($fiberChunk !== PHP_INT_MIN) { + if ($fiberChunk !== PHP_INT_MAX) { if ($chunkType === 'stderr') { $stdErrAccumulator .= $fiberChunk; } else { @@ -171,27 +162,36 @@ function (string $stdOut = '', string $stdErr = '') { } $lines = explode(PHP_EOL, $outputAccumulator); - $firstCRLFOffset = 0; + $hasHeaderFound = false; foreach ($lines as $i => $line) { - if ($line === "\r" && ! $firstCRLFOffset) { - $firstCRLFOffset = $i; - } elseif ($line === "\r" && $i - 1 === $firstCRLFOffset) { + if (! $hasHeaderFound) { + if (trim($line) !== "") { + $hasHeaderFound = true; + } + } + + if ($line === "\r" && $hasHeaderFound) { $finishHeaders = true; $headerLines = implode(PHP_EOL, array_slice($lines, 0, $i)) . "\r\n\r\n"; - $outputAccumulator = implode(PHP_EOL, array_slice($lines, $i)); + $outputAccumulator = implode(PHP_EOL, array_slice($lines, $i + 1)); + break; } } - $responseHeaders = $this->getResponseHeaders( - ( - new FastCGIResponse( - $headerLines, - '', - microtime(true) - $startTime - ) - ) - ); + if ($finishHeaders) { + $responseHeaders = $this->getResponseHeaders( + ( + new FastCGIResponse( + $headerLines, + '', + microtime(true) - $startTime + ) + ) + ); + } else { + $responseHeaders = []; + } } while(!$finishHeaders); if (isset($responseHeaders['status'])) { @@ -199,34 +199,53 @@ function (string $stdOut = '', string $stdErr = '') { unset($responseHeaders['status']); } - return new HttpResponse('ola', $responseHeaders, $status ?? 200); - } + $this->ensureStillRunning(); - /** - * Proxy the API Gateway event to PHP-FPM and return its response. - * - * @throws FastCgiCommunicationFailed - * @throws Timeout - * @throws Exception - */ - public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse - { - if (Bref::doesStreamingSupportsFibers()) { - return $this->handleStreamedRequest($event, $context); - } + return new HttpResponse( + (function () use (&$responseFiber, &$outputAccumulator): \Generator { + if ($outputAccumulator !== '') { // We can never yield an empty string, otherwise it thinks its the end of the execution + yield $outputAccumulator; + } + + while (! $responseFiber->isTerminated()) { + [$chunkType, $fiberChunk] = $responseFiber->resume() ?: ['', '']; - $request = $this->eventToFastCgiRequest($event, $context); + if ($chunkType === 'stdout') { + if ($fiberChunk !== '') { // We can never yield an empty string, otherwise it thinks its the end of the execution + yield $fiberChunk; + } + } + } + })(), + $responseHeaders, + $status ?? 200 + ); + } + + protected function sendRequestToFastCgi( + HttpRequestEvent $event, + Context $context, + ?callable $passThroughCallback = null, + bool $readResponse = true + ): ?ProvidesResponseData { + $request = $this->eventToFastCgiRequest( + $event, + $context, + $passThroughCallback + ); - // The script will timeout 1 second before the remaining time - // to allow some time for Bref/PHP-FPM to recover and cleanup $margin = 1000; $timeoutDelayInMs = max(1000, $context->getRemainingTimeInMillis() - $margin); try { $socketId = $this->client->sendAsyncRequest($this->connection, $request); + if ($readResponse) { + return $this->client->readResponse($socketId, $timeoutDelayInMs); + } else { + $this->client->waitForResponse($socketId, $timeoutDelayInMs); - $response = $this->client->readResponse($socketId, $timeoutDelayInMs); - // TODO: Here when it's streamed mode we return an http response with body as a generator, and then we use the callback to yield the body + return null; + } } catch (TimedoutException) { $invocationId = $context->getAwsRequestId(); echo "$invocationId The PHP script timed out. Bref will now restart PHP-FPM to start from a clean slate and flush the PHP logs.\nTimeouts can happen for example when trying to connect to a remote API or database, if this happens continuously check for those.\nIf you are using a RDS database, read this: https://bref.sh/docs/environment/database.html#accessing-the-internet\n"; @@ -265,6 +284,22 @@ public function handleRequest(HttpRequestEvent $event, Context $context): HttpRe throw new FastCgiCommunicationFailed(); } + } + + /** + * Proxy the API Gateway event to PHP-FPM and return its response. + * + * @throws FastCgiCommunicationFailed + * @throws Timeout + * @throws Exception + */ + public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse + { + if (Bref::isRunningInStreamingMode() && Bref::doesStreamingSupportsFibers()) { + return $this->handleStreamedRequest($event, $context); + } + + $response = $this->sendRequestToFastCgi($event, $context, null, true); $responseHeaders = $this->getResponseHeaders($response); From 76d43ed45c4138ffb7dd429a20977e546431283e Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Thu, 16 Oct 2025 11:34:55 +0100 Subject: [PATCH 11/14] style: fix php style --- src/Bref.php | 12 +++++------- src/Event/Http/HttpResponse.php | 10 ++++------ src/FpmRuntime/FpmHandler.php | 16 +++++++--------- src/Listener/BrefEventSubscriber.php | 4 +--- src/Listener/EventDispatcher.php | 4 +--- src/Runtime/LambdaRuntime.php | 12 +++++------- 6 files changed, 23 insertions(+), 35 deletions(-) diff --git a/src/Bref.php b/src/Bref.php index 7254e9b3c..b7218a892 100644 --- a/src/Bref.php +++ b/src/Bref.php @@ -1,6 +1,4 @@ -= 80100 && !((bool) getenv('BREF_STREAM_NO_FIBER')) && class_exists('Fiber'); + return PHP_VERSION_ID >= 80100 && ! (bool) getenv('BREF_STREAM_NO_FIBER') && class_exists('Fiber'); } /** @@ -44,7 +42,7 @@ public static function setContainer(Closure $containerProvider): void public static function events(): EventDispatcher { if (! isset(self::$eventDispatcher)) { - self::$eventDispatcher = new EventDispatcher(); + self::$eventDispatcher = new EventDispatcher; } return self::$eventDispatcher; } @@ -101,7 +99,7 @@ public static function getContainer(): ContainerInterface throw new RuntimeException('The closure provided to Bref\Bref::setContainer() did not return an instance of ' . ContainerInterface::class); } } else { - self::$container = new FileHandlerLocator(); + self::$container = new FileHandlerLocator; } } @@ -121,6 +119,6 @@ public static function reset(): void 'beforeStreamFiberLoops' => [], 'afterStreamFiberLoops' => [], ]; - self::$eventDispatcher = new EventDispatcher(); + self::$eventDispatcher = new EventDispatcher; } } diff --git a/src/Event/Http/HttpResponse.php b/src/Event/Http/HttpResponse.php index 654adc6d3..81366c9d9 100644 --- a/src/Event/Http/HttpResponse.php +++ b/src/Event/Http/HttpResponse.php @@ -1,6 +1,4 @@ -yieldBody([ @@ -145,7 +143,7 @@ private function capitalizeHeaderName(string $name): string return str_replace(' ', '-', $name); } - private function yieldBody($headersFormat): \Generator + private function yieldBody(array $headersFormat): \Generator { yield json_encode($headersFormat); diff --git a/src/FpmRuntime/FpmHandler.php b/src/FpmRuntime/FpmHandler.php index 1e7e47577..b19551303 100644 --- a/src/FpmRuntime/FpmHandler.php +++ b/src/FpmRuntime/FpmHandler.php @@ -1,6 +1,4 @@ -fpm = $resource; - $this->client = new Client(); + $this->client = new Client; $this->connection = new UnixDomainSocket(self::SOCKET, 1000, 900000); $this->waitUntilReady(); @@ -121,11 +119,11 @@ public function __destruct() public function handleStreamedRequest(HttpRequestEvent $event, Context $context): HttpResponse { $responseFiber = new \Fiber( - function () use (&$event, &$context) { + function () use (&$event, &$context): void { $this->sendRequestToFastCgi( $event, $context, - function (string $stdOut = '', string $stdErr = '') { + function (string $stdOut = '', string $stdErr = ''): void { if ($stdOut !== '') { \Fiber::suspend(['stdout', $stdOut]); } elseif ($stdErr !== '') { @@ -166,7 +164,7 @@ function (string $stdOut = '', string $stdErr = '') { $hasHeaderFound = false; foreach ($lines as $i => $line) { if (! $hasHeaderFound) { - if (trim($line) !== "") { + if (trim($line) !== '') { $hasHeaderFound = true; } } @@ -192,7 +190,7 @@ function (string $stdOut = '', string $stdErr = '') { } else { $responseHeaders = []; } - } while(!$finishHeaders); + } while (! $finishHeaders); if (isset($responseHeaders['status'])) { $status = (int) (is_array($responseHeaders['status']) ? $responseHeaders['status'][0] : $responseHeaders['status']); @@ -282,7 +280,7 @@ protected function sendRequestToFastCgi( $this->stop(); $this->start(); - throw new FastCgiCommunicationFailed(); + throw new FastCgiCommunicationFailed; } } diff --git a/src/Listener/BrefEventSubscriber.php b/src/Listener/BrefEventSubscriber.php index f0c051660..817e52074 100644 --- a/src/Listener/BrefEventSubscriber.php +++ b/src/Listener/BrefEventSubscriber.php @@ -1,6 +1,4 @@ -apiUrl = $apiUrl; - $this->invoker = new Invoker(); + $this->invoker = new Invoker; $this->layer = $layer; } @@ -147,7 +145,7 @@ private function waitNextInvocation(): array } // Retrieve invocation ID - $contextBuilder = new ContextBuilder(); + $contextBuilder = new ContextBuilder; curl_setopt($this->curlHandleNext, CURLOPT_HEADERFUNCTION, function ($ch, $header) use ($contextBuilder) { if (! preg_match('/:\s*/', $header)) { return strlen($header); @@ -391,7 +389,7 @@ function ($ch, $fd, $length) use (&$fiber, &$buffer) { $this->closeCurlStreamedHandleResult(); if ($statusCode === 413) { - throw new ResponseTooBig(); + throw new ResponseTooBig; } try { @@ -444,7 +442,7 @@ private function postJson(string $url, mixed $data, array $headers = []): void $this->closeCurlHandleResult(); if ($statusCode === 413) { - throw new ResponseTooBig(); + throw new ResponseTooBig; } try { From 5375fdc35aec447103285e2f9e86829634aca063 Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Thu, 16 Oct 2025 11:36:07 +0100 Subject: [PATCH 12/14] chore: remove deprecated hooks --- src/Bref.php | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Bref.php b/src/Bref.php index b7218a892..0885856d3 100644 --- a/src/Bref.php +++ b/src/Bref.php @@ -116,8 +116,6 @@ public static function reset(): void self::$hooks = [ 'beforeStartup' => [], 'beforeInvoke' => [], - 'beforeStreamFiberLoops' => [], - 'afterStreamFiberLoops' => [], ]; self::$eventDispatcher = new EventDispatcher; } From 50966451f32c223deb65d2366ebf59209a29195f Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Thu, 16 Oct 2025 11:41:09 +0100 Subject: [PATCH 13/14] docs: add an explanation on how the Lambda Streaming format works --- src/Event/Http/HttpResponse.php | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/Event/Http/HttpResponse.php b/src/Event/Http/HttpResponse.php index 81366c9d9..97bf0eeaf 100644 --- a/src/Event/Http/HttpResponse.php +++ b/src/Event/Http/HttpResponse.php @@ -143,8 +143,18 @@ private function capitalizeHeaderName(string $name): string return str_replace(' ', '-', $name); } + /** + * Yields headers and response body in Lambda Streaming Format. + * + * @param array $headersFormat + */ private function yieldBody(array $headersFormat): \Generator { + /* + * Lambda Streaming response requires you to send the headers in API Gateway format + * followed by a set of 8 NULL bits, and only then you can start sending the actual + * response body. + */ yield json_encode($headersFormat); yield "\0\0\0\0\0\0\0\0"; From 7be61c9f1667d023300fff08af6d0dc91067e22f Mon Sep 17 00:00:00 2001 From: Vin Souza Date: Thu, 16 Oct 2025 16:11:38 +0100 Subject: [PATCH 14/14] chore: prevent the lambda from signaling failure in case its a streamed body --- src/Runtime/LambdaRuntime.php | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/Runtime/LambdaRuntime.php b/src/Runtime/LambdaRuntime.php index 6c164080a..68bc3de56 100755 --- a/src/Runtime/LambdaRuntime.php +++ b/src/Runtime/LambdaRuntime.php @@ -100,12 +100,18 @@ public function processNextEvent(Handler | RequestHandlerInterface | callable $h $this->sendResponse($context->getAwsRequestId(), $result); } catch (Throwable $e) { - $this->signalFailure($context->getAwsRequestId(), $e); - try { - Bref::events()->afterInvoke($handler, $event, $context, null, $e); - } catch (Throwable $e) { - $this->logError($e, $context->getAwsRequestId()); + if (isset($result) && $result instanceof \Generator) { + $this->logError($e, $context->getAwsRequestId()); // We just log the error as we can't mark the lambda as failed when the streaming has started. + } else { + $this->signalFailure($context->getAwsRequestId(), $e); + } + } finally { + try { + Bref::events()->afterInvoke($handler, $event, $context, null, $e); + } catch (Throwable $e) { + $this->logError($e, $context->getAwsRequestId()); + } } return false;