diff --git a/docs/use-cases/http/advanced-use-cases.mdx b/docs/use-cases/http/advanced-use-cases.mdx index cc7333931..74f52da62 100644 --- a/docs/use-cases/http/advanced-use-cases.mdx +++ b/docs/use-cases/http/advanced-use-cases.mdx @@ -79,6 +79,28 @@ functions: url: true ``` +#### Invoke Method RESPONSE_STREAM + +In order to support Lambda Function URLs invoke method as `RESPONSE_STREAM` you need to change to settings: + +* BREF_STREAMED_MODE = 1 in the lambda environment +* invokeMode: RESPONSE_STREAM in the function url settings + +Like the following sample config: + +```yml filename="serverless.yml" +functions: + hello: + handler: MyApp\Handlers\MyLambdaUrlHandler + # ... + environment: + BREF_STREAMED_MODE: 1 + url: + invokeMode: RESPONSE_STREAM +``` + +Be aware that you must implement an `HttpHandler` handler, or use something like Laravel Octane handler if you are using Laravel. + ### API Gateway v1 REST API The syntax is slightly different from API Gateway v2 HTTP APIs as we must use a different `events` configuration. Here is an example that sends all requests to a single Lambda function: diff --git a/src/Bref.php b/src/Bref.php index d5d6c5fe7..0885856d3 100644 --- a/src/Bref.php +++ b/src/Bref.php @@ -19,6 +19,16 @@ class Bref ]; private static EventDispatcher $eventDispatcher; + public static function isRunningInStreamingMode(): bool + { + return (bool) getenv('BREF_STREAMED_MODE'); + } + + public static function doesStreamingSupportsFibers(): bool + { + return PHP_VERSION_ID >= 80100 && ! (bool) getenv('BREF_STREAM_NO_FIBER') && class_exists('Fiber'); + } + /** * Configure the container that provides Lambda handlers. * diff --git a/src/Event/Http/HttpHandler.php b/src/Event/Http/HttpHandler.php index ebd7092d5..bd902f407 100644 --- a/src/Event/Http/HttpHandler.php +++ b/src/Event/Http/HttpHandler.php @@ -10,7 +10,7 @@ abstract class HttpHandler implements Handler abstract public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse; /** {@inheritDoc} */ - public function handle($event, Context $context): array + public function handle($event, Context $context): array|\Generator { // See https://bref.sh/docs/runtimes/http.html#cold-starts if (isset($event['warmer']) && $event['warmer'] === true) { diff --git a/src/Event/Http/HttpResponse.php b/src/Event/Http/HttpResponse.php index 80a679bc2..97bf0eeaf 100644 --- a/src/Event/Http/HttpResponse.php +++ b/src/Event/Http/HttpResponse.php @@ -2,6 +2,8 @@ namespace Bref\Event\Http; +use Bref\Bref; + /** * Formats the response expected by AWS Lambda and the API Gateway integration. */ @@ -9,20 +11,21 @@ final class HttpResponse { private int $statusCode; private array $headers; - private string $body; + private string|\Generator $body; /** * @param array $headers */ - public function __construct(string $body, array $headers = [], int $statusCode = 200) + public function __construct(string|\Generator $body, array $headers = [], int $statusCode = 200) { $this->body = $body; $this->headers = $headers; $this->statusCode = $statusCode; } - public function toApiGatewayFormat(bool $multiHeaders = false): array + public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator { + $isStreamedMode = Bref::isRunningInStreamingMode(); $base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES'); $headers = []; @@ -47,19 +50,40 @@ public function toApiGatewayFormat(bool $multiHeaders = false): array // This is the format required by the AWS_PROXY lambda integration // See https://stackoverflow.com/questions/43708017/aws-lambda-api-gateway-error-malformed-lambda-proxy-response - return [ - 'isBase64Encoded' => $base64Encoding, - 'statusCode' => $this->statusCode, - $headersKey => $headers, - 'body' => $base64Encoding ? base64_encode($this->body) : $this->body, - ]; + + if ($isStreamedMode) { + return $this->yieldBody([ + 'statusCode' => $this->statusCode, + $headersKey => $headers, + ]); + } else { + if ($this->body instanceof \Generator) { + $dataChunk = ''; + + while ($this->body->valid()) { + $dataChunk .= $this->body->current(); + + $this->body->next(); + } + } else { + $dataChunk = $this->body; + } + + return [ + 'isBase64Encoded' => $base64Encoding, + 'statusCode' => $this->statusCode, + $headersKey => $headers, + 'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk, + ]; + } } /** * See https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html#http-api-develop-integrations-lambda.response */ - public function toApiGatewayFormatV2(): array + public function toApiGatewayFormatV2(): array|\Generator { + $isStreamedMode = Bref::isRunningInStreamingMode(); $base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES'); $headers = []; @@ -80,13 +104,33 @@ public function toApiGatewayFormatV2(): array // serialized to `[]` (we want `{}`) so we force it to an empty object. $headers = empty($headers) ? new \stdClass : $headers; - return [ - 'cookies' => $cookies, - 'isBase64Encoded' => $base64Encoding, - 'statusCode' => $this->statusCode, - 'headers' => $headers, - 'body' => $base64Encoding ? base64_encode($this->body) : $this->body, - ]; + if ($isStreamedMode) { + return $this->yieldBody([ + 'cookies' => $cookies, + 'statusCode' => $this->statusCode, + 'headers' => $headers, + ]); + } else { + if ($this->body instanceof \Generator) { + $dataChunk = ''; + + while ($this->body->valid()) { + $dataChunk .= $this->body->current(); + + $this->body->next(); + } + } else { + $dataChunk = $this->body; + } + + return [ + 'cookies' => $cookies, + 'isBase64Encoded' => $base64Encoding, + 'statusCode' => $this->statusCode, + 'headers' => $headers, + 'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk, + ]; + } } /** @@ -98,4 +142,29 @@ private function capitalizeHeaderName(string $name): string $name = ucwords($name); return str_replace(' ', '-', $name); } + + /** + * Yields headers and response body in Lambda Streaming Format. + * + * @param array $headersFormat + */ + private function yieldBody(array $headersFormat): \Generator + { + /* + * Lambda Streaming response requires you to send the headers in API Gateway format + * followed by a set of 8 NULL bits, and only then you can start sending the actual + * response body. + */ + yield json_encode($headersFormat); + + yield "\0\0\0\0\0\0\0\0"; + + if ($this->body instanceof \Generator) { + foreach ($this->body as $dataChunk) { + yield $dataChunk; + } + } else { + yield $this->body; + } + } } diff --git a/src/FpmRuntime/FpmHandler.php b/src/FpmRuntime/FpmHandler.php index caa1ef145..b19551303 100644 --- a/src/FpmRuntime/FpmHandler.php +++ b/src/FpmRuntime/FpmHandler.php @@ -2,6 +2,7 @@ namespace Bref\FpmRuntime; +use Bref\Bref; use Bref\Context\Context; use Bref\Event\Http\HttpHandler; use Bref\Event\Http\HttpRequestEvent; @@ -14,6 +15,7 @@ use hollodotme\FastCGI\Exceptions\TimedoutException; use hollodotme\FastCGI\Interfaces\ProvidesRequestData; use hollodotme\FastCGI\Interfaces\ProvidesResponseData; +use hollodotme\FastCGI\Responses\Response as FastCGIResponse; use hollodotme\FastCGI\SocketConnections\UnixDomainSocket; use RuntimeException; use Throwable; @@ -114,26 +116,134 @@ public function __destruct() $this->stop(); } - /** - * Proxy the API Gateway event to PHP-FPM and return its response. - * - * @throws FastCgiCommunicationFailed - * @throws Timeout - * @throws Exception - */ - public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse + public function handleStreamedRequest(HttpRequestEvent $event, Context $context): HttpResponse { - $request = $this->eventToFastCgiRequest($event, $context); + $responseFiber = new \Fiber( + function () use (&$event, &$context): void { + $this->sendRequestToFastCgi( + $event, + $context, + function (string $stdOut = '', string $stdErr = ''): void { + if ($stdOut !== '') { + \Fiber::suspend(['stdout', $stdOut]); + } elseif ($stdErr !== '') { + \Fiber::suspend(['stderr', $stdErr]); + } + }, + false + ); + } + ); + + $outputAccumulator = ''; + $stdErrAccumulator = ''; + $finishHeaders = false; + $startTime = microtime(true); + $headerLines = ''; + $responseHeaders = []; + + do { + if (($responseFiber->isStarted() || $responseFiber->isSuspended()) && ! $responseFiber->isTerminated()) { + [$chunkType, $fiberChunk] = $responseFiber->resume() ?: ['', '']; + } elseif (! $responseFiber->isTerminated()) { + [$chunkType, $fiberChunk] = $responseFiber->start() ?: ['', '']; + } else { + [$chunkType, $fiberChunk] = ['finish', PHP_INT_MAX]; + } + + if ($fiberChunk !== PHP_INT_MAX) { + if ($chunkType === 'stderr') { + $stdErrAccumulator .= $fiberChunk; + } else { + $outputAccumulator .= $fiberChunk; + } + } + + $lines = explode(PHP_EOL, $outputAccumulator); + + $hasHeaderFound = false; + foreach ($lines as $i => $line) { + if (! $hasHeaderFound) { + if (trim($line) !== '') { + $hasHeaderFound = true; + } + } + + if ($line === "\r" && $hasHeaderFound) { + $finishHeaders = true; + $headerLines = implode(PHP_EOL, array_slice($lines, 0, $i)) . "\r\n\r\n"; + $outputAccumulator = implode(PHP_EOL, array_slice($lines, $i + 1)); + break; + } + } + + if ($finishHeaders) { + $responseHeaders = $this->getResponseHeaders( + ( + new FastCGIResponse( + $headerLines, + '', + microtime(true) - $startTime + ) + ) + ); + } else { + $responseHeaders = []; + } + } while (! $finishHeaders); + + if (isset($responseHeaders['status'])) { + $status = (int) (is_array($responseHeaders['status']) ? $responseHeaders['status'][0] : $responseHeaders['status']); + unset($responseHeaders['status']); + } + + $this->ensureStillRunning(); + + return new HttpResponse( + (function () use (&$responseFiber, &$outputAccumulator): \Generator { + if ($outputAccumulator !== '') { // We can never yield an empty string, otherwise it thinks its the end of the execution + yield $outputAccumulator; + } + + while (! $responseFiber->isTerminated()) { + [$chunkType, $fiberChunk] = $responseFiber->resume() ?: ['', '']; + + if ($chunkType === 'stdout') { + if ($fiberChunk !== '') { // We can never yield an empty string, otherwise it thinks its the end of the execution + yield $fiberChunk; + } + } + } + })(), + $responseHeaders, + $status ?? 200 + ); + } + + protected function sendRequestToFastCgi( + HttpRequestEvent $event, + Context $context, + ?callable $passThroughCallback = null, + bool $readResponse = true + ): ?ProvidesResponseData { + $request = $this->eventToFastCgiRequest( + $event, + $context, + $passThroughCallback + ); - // The script will timeout 1 second before the remaining time - // to allow some time for Bref/PHP-FPM to recover and cleanup $margin = 1000; $timeoutDelayInMs = max(1000, $context->getRemainingTimeInMillis() - $margin); try { $socketId = $this->client->sendAsyncRequest($this->connection, $request); + if ($readResponse) { + return $this->client->readResponse($socketId, $timeoutDelayInMs); + } else { + $this->client->waitForResponse($socketId, $timeoutDelayInMs); - $response = $this->client->readResponse($socketId, $timeoutDelayInMs); + return null; + } } catch (TimedoutException) { $invocationId = $context->getAwsRequestId(); echo "$invocationId The PHP script timed out. Bref will now restart PHP-FPM to start from a clean slate and flush the PHP logs.\nTimeouts can happen for example when trying to connect to a remote API or database, if this happens continuously check for those.\nIf you are using a RDS database, read this: https://bref.sh/docs/environment/database.html#accessing-the-internet\n"; @@ -172,6 +282,22 @@ public function handleRequest(HttpRequestEvent $event, Context $context): HttpRe throw new FastCgiCommunicationFailed; } + } + + /** + * Proxy the API Gateway event to PHP-FPM and return its response. + * + * @throws FastCgiCommunicationFailed + * @throws Timeout + * @throws Exception + */ + public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse + { + if (Bref::isRunningInStreamingMode() && Bref::doesStreamingSupportsFibers()) { + return $this->handleStreamedRequest($event, $context); + } + + $response = $this->sendRequestToFastCgi($event, $context, null, true); $responseHeaders = $this->getResponseHeaders($response); @@ -228,7 +354,7 @@ private function isReady(): bool return file_exists(self::SOCKET); } - private function eventToFastCgiRequest(HttpRequestEvent $event, Context $context): ProvidesRequestData + private function eventToFastCgiRequest(HttpRequestEvent $event, Context $context, ?callable $passThroughCallback = null): ProvidesRequestData { $request = new FastCgiRequest($event->getMethod(), $this->handler, $event->getBody()); $request->setRequestUri($event->getUri()); @@ -243,6 +369,10 @@ private function eventToFastCgiRequest(HttpRequestEvent $event, Context $context $request->setCustomVar('LAMBDA_INVOCATION_CONTEXT', json_encode($context, JSON_THROW_ON_ERROR)); $request->setCustomVar('LAMBDA_REQUEST_CONTEXT', json_encode($event->getRequestContext(), JSON_THROW_ON_ERROR)); + if ($passThroughCallback) { + $request->addPassThroughCallbacks($passThroughCallback); + } + $contentType = $event->getContentType(); if ($contentType) { $request->setContentType($contentType); diff --git a/src/Listener/BrefEventSubscriber.php b/src/Listener/BrefEventSubscriber.php index ec2b315d0..817e52074 100644 --- a/src/Listener/BrefEventSubscriber.php +++ b/src/Listener/BrefEventSubscriber.php @@ -30,6 +30,20 @@ public function afterStartup(): void { } + /** + * Register a hook to be executed before the stream fiber loops, usually used to setup extra context. + */ + public function beforeStreamFiberLoops(): void + { + } + + /** + * Register a hook to be executed when the stream fiber is about to finish, usually used to draw resources. + */ + public function afterStreamFiberLoops(): void + { + } + /** * Register a hook to be executed before any Lambda invocation. */ diff --git a/src/Listener/EventDispatcher.php b/src/Listener/EventDispatcher.php index 001e709f1..e042693c4 100644 --- a/src/Listener/EventDispatcher.php +++ b/src/Listener/EventDispatcher.php @@ -51,6 +51,30 @@ public function afterStartup(): void } } + /** + * Trigger the `beforeStreamFiberLoops` event. + * + * @internal This method is called by Bref and should not be called by user code. + */ + public function beforeStreamFiberLoops(): void + { + foreach ($this->subscribers as $listener) { + $listener->beforeStreamFiberLoops(); + } + } + + /** + * Trigger the `afterStreamFiberLoops` event. + * + * @internal This method is called by Bref and should not be called by user code. + */ + public function afterStreamFiberLoops(): void + { + foreach ($this->subscribers as $listener) { + $listener->afterStreamFiberLoops(); + } + } + /** * Trigger the `beforeInvoke` event. * diff --git a/src/Runtime/LambdaRuntime.php b/src/Runtime/LambdaRuntime.php index 17fa52e65..68bc3de56 100755 --- a/src/Runtime/LambdaRuntime.php +++ b/src/Runtime/LambdaRuntime.php @@ -8,6 +8,7 @@ use Bref\Event\Handler; use CurlHandle; use Exception; +use Generator; use JsonException; use Psr\Http\Server\RequestHandlerInterface; use RuntimeException; @@ -39,6 +40,8 @@ final class LambdaRuntime private $curlHandleNext; /** @var resource|CurlHandle|null */ private $curlHandleResult; + /** @var resource|CurlHandle|null */ + private $curlStreamedHandleResult; private string $apiUrl; private Invoker $invoker; private string $layer; @@ -97,12 +100,18 @@ public function processNextEvent(Handler | RequestHandlerInterface | callable $h $this->sendResponse($context->getAwsRequestId(), $result); } catch (Throwable $e) { - $this->signalFailure($context->getAwsRequestId(), $e); - try { - Bref::events()->afterInvoke($handler, $event, $context, null, $e); - } catch (Throwable $e) { - $this->logError($e, $context->getAwsRequestId()); + if (isset($result) && $result instanceof \Generator) { + $this->logError($e, $context->getAwsRequestId()); // We just log the error as we can't mark the lambda as failed when the streaming has started. + } else { + $this->signalFailure($context->getAwsRequestId(), $e); + } + } finally { + try { + Bref::events()->afterInvoke($handler, $event, $context, null, $e); + } catch (Throwable $e) { + $this->logError($e, $context->getAwsRequestId()); + } } return false; @@ -201,7 +210,12 @@ private function waitNextInvocation(): array private function sendResponse(string $invocationId, mixed $responseData): void { $url = "http://$this->apiUrl/2018-06-01/runtime/invocation/$invocationId/response"; - $this->postJson($url, $responseData); + + if ($responseData instanceof Generator) { + $this->postStreamed($url, $responseData); + } else { + $this->postJson($url, $responseData); + } } /** @@ -281,6 +295,121 @@ public function failInitialization( exit(1); } + /** + * @param string[] $headers + * @throws Exception + * @throws ResponseTooBig + */ + private function postStreamed(string $url, Generator $data, array $headers = []): void + { + if ($this->curlStreamedHandleResult === null) { + $this->curlStreamedHandleResult = curl_init(); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_UPLOAD, true); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_CUSTOMREQUEST, 'POST'); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_RETURNTRANSFER, true); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_INFILESIZE, -1); + } + + curl_setopt($this->curlStreamedHandleResult, CURLOPT_URL, $url); + curl_setopt($this->curlStreamedHandleResult, CURLOPT_HTTPHEADER, [ + 'Lambda-Runtime-Function-Response-Mode: streaming', + 'Content-Type: application/vnd.awslambda.http-integration-response', + 'Transfer-Encoding: chunked', + ...$headers, + ]); + + if (! Bref::doesStreamingSupportsFibers()) { + $buffer = ''; + curl_setopt( + $this->curlStreamedHandleResult, + CURLOPT_READFUNCTION, + function ($ch, $fd, $length) use (&$data, &$buffer) { + if (strlen($buffer) < $length && $data->valid()) { + $buffer .= (string) $data->current(); + + /* + As this method needs to return an string, we need to wait for the next generator item to yield. + This can lead to the initial part of the buffer taking longer to load if the next chunk takes longer. + */ + $data->next(); + } + + $chunk = substr($buffer, 0, $length); + $buffer = substr($buffer, strlen($chunk)); + + return $chunk; + } + ); + } else { + $buffer = ''; + /* + * We use Fibers so we can suspend the yields and read data as needed. + * That way we don't block the response as more data comes. + */ + $fiber = new \Fiber( + function () use (&$data): void { + Bref::events()->beforeStreamFiberLoops(); + + foreach ($data as $dataChunk) { + \Fiber::suspend((string) $dataChunk); + } + + Bref::events()->afterStreamFiberLoops(); + + \Fiber::suspend(PHP_INT_MIN); + } + ); + + curl_setopt( + $this->curlStreamedHandleResult, + CURLOPT_READFUNCTION, + function ($ch, $fd, $length) use (&$fiber, &$buffer) { + if ($buffer === '') { + if ($fiber->isStarted() || $fiber->isSuspended()) { + $fiberChunk = $fiber->resume(); + } elseif (! $fiber->isTerminated()) { + $fiberChunk = $fiber->start(); + } else { + $fiberChunk = PHP_INT_MIN; + } + + if ($fiberChunk !== PHP_INT_MIN) { + $buffer .= $fiberChunk; + } + } + + $chunk = substr($buffer, 0, $length); + $buffer = substr($buffer, strlen($chunk)); + + return $chunk; + } + ); + } + + $body = curl_exec($this->curlStreamedHandleResult); + + $statusCode = curl_getinfo($this->curlStreamedHandleResult, CURLINFO_HTTP_CODE); + if ($statusCode >= 400) { + // Re-open the connection in case of failure to start from a clean state + $this->closeCurlStreamedHandleResult(); + + if ($statusCode === 413) { + throw new ResponseTooBig; + } + + try { + $error = json_decode($body, true, 512, JSON_THROW_ON_ERROR); + $errorMessage = "{$error['errorType']}: {$error['errorMessage']}"; + } catch (JsonException) { + // In case we didn't get any JSON + $errorMessage = 'unknown error'; + } + + throw new Exception("Error $statusCode while calling the Lambda runtime API: $errorMessage"); + } + } + /** * @param string[] $headers * @throws Exception @@ -350,6 +479,14 @@ private function closeCurlHandleResult(): void } } + private function closeCurlStreamedHandleResult(): void + { + if ($this->curlStreamedHandleResult !== null) { + curl_close($this->curlStreamedHandleResult); + $this->curlStreamedHandleResult = null; + } + } + /** * Ping a Bref server with a statsd request. * diff --git a/tests/Event/Http/HttpResponseTest.php b/tests/Event/Http/HttpResponseTest.php index dab15792e..6d6ae9230 100644 --- a/tests/Event/Http/HttpResponseTest.php +++ b/tests/Event/Http/HttpResponseTest.php @@ -177,4 +177,73 @@ public function test response with multiple cookies() 'body' => '', ], $response->toApiGatewayFormatV2()); } + + public function test generator returns generator api gateway v1() + { + putenv('BREF_STREAMED_MODE=1'); + + $generatorFunction = function (): \Generator { + yield 'hello'; + yield 'world'; + }; + + $response = new HttpResponse($generatorFunction(), [ + 'x-foo-bar' => 'baz', + ]); + + $generator = $response->toApiGatewayFormat(); + + self::assertInstanceOf(\Generator::class, $generator); + + self::assertEquals([ + 'statusCode' => 200, + 'headers' => ['X-Foo-Bar' => 'baz'], + ], json_decode($generator->current(), true)); + + $generator->next(); + self::assertEquals("\0\0\0\0\0\0\0\0", $generator->current()); + + $generator->next(); + self::assertEquals('hello', $generator->current()); + + $generator->next(); + self::assertEquals('world', $generator->current()); + + putenv('BREF_STREAMED_MODE=0'); + } + + public function test generator returns generator api gateway v2() + { + putenv('BREF_STREAMED_MODE=1'); + + $generatorFunction = function (): \Generator { + yield 'hello'; + yield 'world'; + }; + + $response = new HttpResponse($generatorFunction(), [ + 'x-foo-bar' => 'baz', + ]); + + $generator = $response->toApiGatewayFormatV2(); + + self::assertInstanceOf(\Generator::class, $generator); + + self::assertEquals([ + 'cookies' => [], + 'statusCode' => 200, + 'headers' => ['X-Foo-Bar' => 'baz'], + ], json_decode($generator->current(), true)); + + $generator->next(); + self::assertEquals("\0\0\0\0\0\0\0\0", $generator->current()); + + $generator->next(); + self::assertEquals('hello', $generator->current()); + + $generator->next(); + self::assertEquals('world', $generator->current()); + + putenv('BREF_STREAMED_MODE=0'); + } } diff --git a/tests/Runtime/LambdaRuntimeTest.php b/tests/Runtime/LambdaRuntimeTest.php index aecc7d89a..a5fcb00b2 100644 --- a/tests/Runtime/LambdaRuntimeTest.php +++ b/tests/Runtime/LambdaRuntimeTest.php @@ -8,6 +8,7 @@ use Bref\Event\EventBridge\EventBridgeHandler; use Bref\Event\Handler; use Bref\Event\Http\HttpRequestEvent; +use Bref\Event\Http\HttpResponse; use Bref\Event\S3\S3Event; use Bref\Event\S3\S3Handler; use Bref\Event\Sns\SnsEvent; @@ -248,7 +249,7 @@ public function test function results that cannot be encoded are reporte public function test generic event handler() { - $handler = new class() implements Handler { + $handler = new class () implements Handler { public function handle(mixed $event, Context $context): mixed { return $event; @@ -264,7 +265,7 @@ public function handle(mixed $event, Context $context): mixed public function test SQS event handler() { - $handler = new class() extends SqsHandler { + $handler = new class () extends SqsHandler { public SqsEvent $event; public function handleSqs(SqsEvent $event, Context $context): void { @@ -282,7 +283,7 @@ public function handleSqs(SqsEvent $event, Context $context): void public function test SNS event handler() { - $handler = new class() extends SnsHandler { + $handler = new class () extends SnsHandler { public SnsEvent $event; public function handleSns(SnsEvent $event, Context $context): void { @@ -300,7 +301,7 @@ public function handleSns(SnsEvent $event, Context $context): void public function test S3 event handler() { - $handler = new class() extends S3Handler { + $handler = new class () extends S3Handler { public S3Event $event; public function handleS3(S3Event $event, Context $context): void { @@ -318,7 +319,7 @@ public function handleS3(S3Event $event, Context $context): void public function test PSR15 event handler() { - $handler = new class() implements RequestHandlerInterface { + $handler = new class () implements RequestHandlerInterface { public ServerRequestInterface $request; public function handle(ServerRequestInterface $request): ResponseInterface { @@ -350,7 +351,7 @@ public function handle(ServerRequestInterface $request): ResponseInterface public function test EventBridge event handler() { - $handler = new class() extends EventBridgeHandler { + $handler = new class () extends EventBridgeHandler { public EventBridgeEvent $event; public function handleEventBridge(EventBridgeEvent $event, Context $context): void { @@ -368,7 +369,7 @@ public function handleEventBridge(EventBridgeEvent $event, Context $context): vo public function test exceptions in beforeInvoke result in an invocation error() { - Bref::events()->subscribe(new class extends BrefEventSubscriber { + Bref::events()->subscribe(new class () extends BrefEventSubscriber { public function beforeInvoke(mixed ...$params): void { throw new Exception('This is an exception in beforeInvoke'); @@ -390,7 +391,7 @@ public function beforeInvoke(mixed ...$params): void */ public function test a failure in afterInvoke after a success does not signal a failure() { - Bref::events()->subscribe(new class extends BrefEventSubscriber { + Bref::events()->subscribe(new class () extends BrefEventSubscriber { public function afterInvoke(mixed ...$params): void { throw new Exception('This is an exception in afterInvoke'); @@ -414,7 +415,7 @@ public function afterInvoke(mixed ...$params): void public function test a failure in afterInvoke after a failure does not crash the runtime() { - Bref::events()->subscribe(new class extends BrefEventSubscriber { + Bref::events()->subscribe(new class () extends BrefEventSubscriber { public function afterInvoke(mixed ...$params): void { throw new Exception('This is an exception in afterInvoke'); @@ -431,4 +432,57 @@ public function afterInvoke(mixed ...$params): void $this->assertStringContainsString('Invoke Error {"errorType":"Exception","errorMessage":"Invocation error","stack":', $this->getActualOutput()); $this->assertStringContainsString('Invoke Error {"errorType":"Exception","errorMessage":"This is an exception in afterInvoke","stack":', $this->getActualOutput()); } + + public function test streamed response from generator http event() + { + putenv('BREF_STREAMED_MODE=1'); + + Server::flush(); + + Server::enqueue([ + new Response( // lambda event + 200, + [ + 'lambda-runtime-aws-request-id' => '1', + ], + '{ "Hello": "world!"}' + ), + new Response(202), + ]); + + $this->runtime->processNextEvent(function ($event) { + $generatorFunction = function (): \Generator { + yield 'hello'; + yield 'world'; + }; + + return ( + new HttpResponse( + $generatorFunction(), + [ + 'Content-Type' => 'text/html; charset=utf-8', + ] + ) + )->toApiGatewayFormatV2(); + }); + + $requests = Server::received(); + + $this->assertCount(2, $requests); + + [$eventRequest, $eventStreamResponse] = $requests; + + $this->assertSame('GET', $eventRequest->getMethod()); + $this->assertSame('http://localhost:8126/2018-06-01/runtime/invocation/next', $eventRequest->getUri()->__toString()); + + $this->assertSame('POST', $eventStreamResponse->getMethod()); + + $this->assertSame('streaming', $eventStreamResponse->getHeaderLine('lambda-runtime-function-response-mode')); + $this->assertSame('chunked', $eventStreamResponse->getHeaderLine('transfer-encoding')); + $this->assertSame('application/vnd.awslambda.http-integration-response', $eventStreamResponse->getHeaderLine('content-type')); + + $this->assertSame('http://localhost:8126/2018-06-01/runtime/invocation/1/response', $eventStreamResponse->getUri()->__toString()); + + putenv('BREF_STREAMED_MODE=0'); + } }