diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 53ce63f..00c227f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -16,7 +16,7 @@ jobs: with: linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" + linux_6_2_enabled: false linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable" @@ -31,3 +31,5 @@ jobs: with: linux_6_0_enabled: false linux_6_1_enabled: false + linux_6_2_enabled: false + linux_6_3_enabled: true diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 5c82e85..39e7f61 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -12,7 +12,7 @@ jobs: name: Soundness uses: swiftlang/github-workflows/.github/workflows/soundness.yml@0.0.7 with: - api_breakage_check_container_image: "swift:6.2-noble" + api_breakage_check_container_image: "swift:6.3-noble" format_check_container_image: "swiftlang/swift:nightly-main-noble" # Needed due to https://github.com/swiftlang/swift-format/issues/1081 license_header_check_project_name: "Swift HTTP Server" @@ -23,9 +23,7 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - # linux_6_1_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" - linux_6_2_enabled: true - linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" + linux_6_2_enabled: false linux_6_3_enabled: true linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error" linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error" @@ -38,7 +36,7 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_enabled: true + linux_6_2_enabled: false linux_6_3_enabled: true static-sdk: @@ -52,4 +50,5 @@ jobs: linux_5_10_enabled: false linux_6_0_enabled: false linux_6_1_enabled: false - linux_6_2_enabled: true + linux_6_2_enabled: false + linux_6_3_enabled: true diff --git a/Package.swift b/Package.swift index 859d9fa..249d23d 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version:6.2 +// swift-tools-version:6.3 //===----------------------------------------------------------------------===// // // This source file is part of the Swift HTTP Server open source project diff --git a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift index b70c4dd..e718026 100644 --- a/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift +++ b/Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift @@ -38,7 +38,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// The type of errors that can occur during reading operations. public typealias ReadFailure = any Error - /// The HTTP trailer fields captured at the end of the request. + /// The shared reader state that holds the iterator and captures trailers. fileprivate var state: ReaderState struct RequestBodyStateMachine { @@ -60,19 +60,16 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable } private var state: State + private var readerState: ReaderState - /// The iterator that provides HTTP request parts from the underlying channel. - private var iterator: NIOAsyncChannelInboundStream.AsyncIterator - - init(iterator: NIOAsyncChannelInboundStream.AsyncIterator) { + init(readerState: ReaderState) { self.state = .readingBody(.noExcess) - self.iterator = iterator + self.readerState = readerState } enum ReadResult { case readBody(ByteBuffer) - case readEnd(HTTPFields?) - case streamFinished + case requestFinished } mutating func read(limit: Int?) async throws -> ReadResult { @@ -88,21 +85,47 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable case .noExcess: // There is no excess from previous reads. We obtain the next element from the stream. - let requestPart = try await self.iterator.next(isolation: #isolation) + // Take the iterator from ReaderState, read one part, and put it back. + // This ensures the iterator is always recoverable even if the reader + // is dropped without consuming .end. + guard var iterator = self.readerState.takeIterator() else { + throw RequestBodyReadError.requestEndedBeforeReceivingEnd + } + + let requestPart: HTTPRequestPart? + do { + requestPart = try await iterator.next(isolation: #isolation) + } catch { + // Put the iterator back before propagating the error. + nonisolated(unsafe) let iter = iterator + self.readerState.putIterator(iter) + throw error + } switch requestPart { case .head: + nonisolated(unsafe) let iter = iterator + self.readerState.putIterator(iter) fatalError("Unexpectedly received a request head.") case .none: - throw RequestBodyReadError.streamEndedBeforeReceivingRequestEnd + // Stream ended without .end — don't put iterator back. + throw RequestBodyReadError.requestEndedBeforeReceivingEnd case .body(let element): + nonisolated(unsafe) let iter = iterator + self.readerState.putIterator(iter) bodyElement = element case .end(let trailers): self.state = .finished - return .readEnd(trailers) + nonisolated(unsafe) let iter = iterator + self.readerState.putIterator(iter) + self.readerState.wrapped.withLock { state in + state.finishedReading = true + state.trailers = trailers + } + return .requestFinished } } @@ -119,21 +142,16 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable return .readBody(bodyElement) case .finished: - return .streamFinished + return .requestFinished } } } var requestBodyStateMachine: RequestBodyStateMachine - /// Initializes a new request body reader with the given NIO async channel iterator. - /// - /// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts. - fileprivate init( - iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, - readerState: ReaderState - ) { - self.requestBodyStateMachine = .init(iterator: iterator) + /// Initializes a new request body reader. + fileprivate init(readerState: ReaderState) { + self.requestBodyStateMachine = .init(readerState: readerState) self.state = readerState } @@ -159,14 +177,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable case .readBody(let readElement): return try await body(Array(buffer: readElement).span) - case .readEnd(let trailers): - self.state.wrapped.withLock { state in - state.trailers = trailers - state.finishedReading = true - } - return try await body(.init()) - - case .streamFinished: + case .requestFinished: return try await body(.init()) } } catch { @@ -176,15 +187,36 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable } final class ReaderState: Sendable { - struct Wrapped { + struct Wrapped: ~Copyable { var trailers: HTTPFields? = nil var finishedReading: Bool = false + + /// The iterator that provides HTTP request parts from the underlying channel. + /// Stored here between read cycles for HTTP/1.1 keep-alive recovery. + var iterator: NIOAsyncChannelInboundStream.AsyncIterator? } let wrapped: Mutex - init() { - self.wrapped = .init(.init()) + init(iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator) { + self.wrapped = .init(.init(iterator: iterator)) + } + + func takeIterator() -> sending NIOAsyncChannelInboundStream.AsyncIterator? { + self.wrapped.withLock { state in + let iterator = state.iterator + state.iterator = nil + return iterator + } + } + + func putIterator( + _ iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator + ) { + var disconnected = Disconnected(value: Optional(iterator)) + self.wrapped.withLock { state in + state.iterator = disconnected.swap(newValue: nil) + } } } @@ -197,18 +229,12 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable /// The type of errors that can occur during reading operations. public typealias Failure = any Error - private var iterator: Disconnected.AsyncIterator?> - internal var state: ReaderState - /// Initializes a new HTTP request body and trailers reader with the given NIO async channel iterator. + /// Initializes a new HTTP request body and trailers reader. /// - /// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts. - init( - iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, - readerState: ReaderState - ) { - self.iterator = Disconnected(value: iterator) + /// - Parameter readerState: The shared reader state that holds the iterator and captures trailers. + init(readerState: ReaderState) { self.state = readerState } @@ -240,14 +266,10 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable public consuming func consumeAndConclude( body: nonisolated(nonsending) (consuming sending RequestBodyAsyncReader) async throws(Failure) -> Return ) async throws(Failure) -> (Return, HTTPFields?) { - if let iterator = self.iterator.take() { - let partsReader = RequestBodyAsyncReader(iterator: iterator, readerState: self.state) - let result = try await body(partsReader) - let trailers = self.state.wrapped.withLock { $0.trailers } - return (result, trailers) - } else { - fatalError("consumeAndConclude called more than once") - } + let partsReader = RequestBodyAsyncReader(readerState: self.state) + let result = try await body(partsReader) + let trailers = self.state.wrapped.withLock { $0.trailers } + return (result, trailers) } } diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift index d8c2f01..58a0cc7 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import HTTPAPIs +import Logging import NIOCore import NIOExtras import NIOHTTP1 @@ -45,7 +46,7 @@ extension NIOHTTPServer { do { for try await http1Channel in inbound { group.addTask { - await self.handleRequestChannel(channel: http1Channel, handler: handler) + await self.handleHTTP1RequestChannel(channel: http1Channel, handler: handler) } } @@ -105,4 +106,41 @@ extension NIOHTTPServer { ) } } + + /// Handles an HTTP/1.1 connection channel, which may carry multiple serial requests on the + /// same connection (keep-alive). + func handleHTTP1RequestChannel( + channel: NIOAsyncChannel, + handler: some HTTPServerRequestHandler + ) async { + do { + try await channel.executeThenClose { inbound, outbound in + var iterator = inbound.makeAsyncIterator() + + requestLoop: while !Task.isCancelled { + guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { + break requestLoop + } + + guard + let recoveredIterator = try await self.invokeHandler( + request: httpRequest, + iterator: iterator, + outbound: outbound, + handler: handler + ) + else { + // Handler did not fully consume the request; cannot continue on this + // connection. + break requestLoop + } + + iterator = recoveredIterator + } + } + } catch { + self.logger.debug("Error thrown while handling HTTP/1.1 connection", metadata: ["error": "\(error)"]) + try? await channel.channel.close() + } + } } diff --git a/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift b/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift index 3b708f0..9ff2316 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift @@ -110,7 +110,7 @@ extension NIOHTTPServer { let chainFuture = requestChannel.channel.nioSSL_peerValidatedCertificateChain() await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) { - await self.handleRequestChannel( + await self.handleHTTP1RequestChannel( channel: requestChannel, handler: handler ) @@ -137,7 +137,7 @@ extension NIOHTTPServer { try await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) { for try await streamChannel in multiplexer.inbound { streamGroup.addTask { - await self.handleRequestChannel( + await self.handleHTTP2StreamChannel( channel: streamChannel, handler: handler ) @@ -297,6 +297,45 @@ extension NIOHTTPServer { } } } + + /// Handles an HTTP/2 stream channel, which carries exactly one request per stream. + func handleHTTP2StreamChannel( + channel: NIOAsyncChannel, + handler: some HTTPServerRequestHandler + ) async { + do { + try await channel + .executeThenClose { inbound, outbound in + var iterator = inbound.makeAsyncIterator() + + guard let httpRequest = try await self.nextRequestHead(from: &iterator) else { + outbound.finish() + return + } + + _ = try await self.invokeHandler( + request: httpRequest, + iterator: iterator, + outbound: outbound, + handler: handler + ) + + // TODO: handle other state scenarios. + // For example, if we didn't finish reading but we wrote back a response, we + // should send a RST_STREAM with NO_ERROR set. If we finished reading but we + // didn't write back a response, then RST_STREAM is also likely appropriate but + // unclear about the error. + + // Finish the outbound and wait on the close future to make sure all pending + // writes are actually written. + outbound.finish() + try await channel.channel.closeFuture.get() + } + } catch { + self.logger.debug("Error thrown while handling HTTP/2 stream: \(error)") + try? await channel.channel.close() + } + } } @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) diff --git a/Sources/NIOHTTPServer/NIOHTTPServer.swift b/Sources/NIOHTTPServer/NIOHTTPServer.swift index 96755c6..14e613d 100644 --- a/Sources/NIOHTTPServer/NIOHTTPServer.swift +++ b/Sources/NIOHTTPServer/NIOHTTPServer.swift @@ -84,6 +84,10 @@ public struct NIOHTTPServer: HTTPServer { public typealias RequestConcludingReader = HTTPRequestConcludingAsyncReader public typealias ResponseConcludingWriter = HTTPResponseConcludingAsyncWriter + /// Maximum number of bytes to drain from an unconsumed request body before + /// giving up and closing the connection. + private var maxDrainBytes: Int { 256 * 1024 } + let logger: Logger let configuration: NIOHTTPServerConfiguration @@ -213,103 +217,112 @@ public struct NIOHTTPServer: HTTPServer { } } - /// Handles a single HTTP request. - /// - /// - Note: Errors do not propagate to the caller. When an error occurs, it is logged and the channel is closed. - /// - /// - Parameters: - /// - channel: The async channel to read the request from and write the response to. - /// - handler: The request handler. - func handleRequestChannel( - channel: NIOAsyncChannel, + /// Reads the next request head from the iterator. Returns `nil` if the connection is done or + /// an unexpected part is received. + func nextRequestHead( + from iterator: inout NIOAsyncChannelInboundStream.AsyncIterator + ) async throws -> HTTPRequest? { + switch try await iterator.next(isolation: #isolation) { + case .head(let request): + return request + case .body: + self.logger.debug("Unexpectedly received body on connection. Closing now.") + return nil + case .end: + self.logger.debug("Unexpectedly received end on connection. Closing now.") + return nil + case .none: + self.logger.trace("No more request parts on connection") + return nil + } + } + + /// Shared core: invokes the request handler with the appropriate reader/writer state. + /// Returns the recovered iterator if the request was fully consumed (for HTTP/1.1 reuse), + /// or `nil` if the request could not be fully consumed. + func invokeHandler( + request: HTTPRequest, + iterator: consuming sending NIOAsyncChannelInboundStream.AsyncIterator, + outbound: NIOAsyncChannelOutboundWriter, handler: some HTTPServerRequestHandler - ) async { + ) async throws -> NIOAsyncChannelInboundStream.AsyncIterator? { + let readerState = HTTPRequestConcludingAsyncReader.ReaderState(iterator: iterator) + let writerState = HTTPResponseConcludingAsyncWriter.WriterState() + do { - try await channel.executeThenClose { inbound, outbound in - var iterator = inbound.makeAsyncIterator() - - let nextPart: HTTPRequestPart? - do { - nextPart = try await iterator.next() - } catch { - self.logger.error( - "Error thrown while advancing the request iterator", - metadata: ["error": "\(error)"] + try await handler.handle( + request: request, + requestContext: HTTPRequestContext(), + requestBodyAndTrailers: HTTPRequestConcludingAsyncReader( + readerState: readerState + ), + responseSender: HTTPResponseSender { response in + try await outbound.write(.head(response)) + return HTTPResponseConcludingAsyncWriter( + writer: outbound, + writerState: writerState ) - throw error + } sendInformational: { response in + try await outbound.write(.head(response)) } + ) + } catch { + logger.error("Error thrown while handling request: \(error)") + if !readerState.wrapped.withLock({ $0.finishedReading }) { + logger.error("Did not finish reading but error thrown.") + } + if !writerState.wrapped.withLock({ $0.finishedWriting }) { + logger.error("Did not write response but error thrown.") + } + throw error + } - let httpRequest: HTTPRequest - switch nextPart { - case .head(let request): - httpRequest = request - case .body: - self.logger.debug("Unexpectedly received body on connection. Closing now") - outbound.finish() - return - case .end: - self.logger.debug("Unexpectedly received end on connection. Closing now") - outbound.finish() - return - case .none: - self.logger.trace("No more requests parts on connection") - return - } + // If the handler didn't properly conclude the response, the HTTP codec + // is in an inconsistent state and the connection cannot be reused. + if !writerState.wrapped.withLock({ $0.finishedWriting }) { + self.logger.debug("Handler did not conclude the response. Closing connection.") + return nil + } - let readerState = HTTPRequestConcludingAsyncReader.ReaderState() - let writerState = HTTPResponseConcludingAsyncWriter.WriterState() - - do { - try await handler.handle( - request: httpRequest, - requestContext: HTTPRequestContext(), - requestBodyAndTrailers: HTTPRequestConcludingAsyncReader( - iterator: iterator, - readerState: readerState - ), - responseSender: HTTPResponseSender { response in - try await outbound.write(.head(response)) - return HTTPResponseConcludingAsyncWriter( - writer: outbound, - writerState: writerState - ) - } sendInformational: { response in - try await outbound.write(.head(response)) - } - ) - } catch { - if !readerState.wrapped.withLock({ $0.finishedReading }) { - self.logger.error("Did not finish reading but error thrown.") - // TODO: if h2 reset stream; if h1 try draining request? - } + // Recover the iterator for potential connection reuse. + guard var recoveredIterator = readerState.takeIterator() else { + // The handler started reading the request body but didn't finish. + // The iterator was consumed by the reader and not returned. + return nil + } - if !writerState.wrapped.withLock({ $0.finishedWriting }) { - self.logger.error("Did not write response but error thrown.") - // TODO: we need to do something, possibly just close the connection or - // reset the stream with the appropriate error. + // If the handler didn't fully consume the request body, drain the remaining + // parts so the iterator is positioned at the start of the next request. + // To prevent an attacker from keeping the connection in an infinite draining + // state, we only drain up to `Self.bytesDrained`. If more remains, close the connection. + if !readerState.wrapped.withLock({ $0.finishedReading }) { + var bytesDrained = 0 + do { + drainLoop: while true { + switch try await recoveredIterator.next(isolation: #isolation) { + case .head: + self.logger.debug( + "Unexpectedly received request head while draining unconsumed request body." + ) + return nil + case .body(let buffer): + bytesDrained += buffer.readableBytes + if bytesDrained > self.maxDrainBytes { + return nil + } + continue drainLoop + case .end: + break drainLoop + case .none: + return nil } - - throw error } - - // TODO: handle other state scenarios. - // For example, if we're using h2 and we didn't finish reading but we wrote back - // a response, we should send a RST_STREAM with NO_ERROR set. - // If we finished reading but we didn't write back a response, then RST_STREAM - // is also likely appropriate but unclear about the error. - // For h1, we should close the connection. - - // Finish the outbound and wait on the close future to make sure all pending - // writes are actually written. - outbound.finish() - try await channel.channel.closeFuture.get() + } catch { + return nil } - } catch { - // TODO: We need to send a response head here potentially - self.logger.error("Error thrown while handling connection", metadata: ["error": "\(error)"]) - - try? await channel.channel.close() } + + return recoveredIterator } /// Fail the listening address promise if the server is shutting down before it began listening. diff --git a/Sources/NIOHTTPServer/RequestBodyReadError.swift b/Sources/NIOHTTPServer/RequestBodyReadError.swift index ec23503..ebd72ed 100644 --- a/Sources/NIOHTTPServer/RequestBodyReadError.swift +++ b/Sources/NIOHTTPServer/RequestBodyReadError.swift @@ -13,12 +13,12 @@ //===----------------------------------------------------------------------===// enum RequestBodyReadError: Error, CustomStringConvertible { - case streamEndedBeforeReceivingRequestEnd + case requestEndedBeforeReceivingEnd var description: String { switch self { - case .streamEndedBeforeReceivingRequestEnd: - "The request stream unexpectedly ended before receiving a request end part." + case .requestEndedBeforeReceivingEnd: + "The request unexpectedly ended before receiving a request end part." } } } diff --git a/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift b/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift index 7095595..325abb9 100644 --- a/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift +++ b/Tests/NIOHTTPServerTests/HTTPRequestConcludingAsyncReaderTests.swift @@ -36,8 +36,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = try await requestReader.consumeAndConclude { bodyReader in @@ -58,8 +57,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = try await requestReader.consumeAndConclude { bodyReader in @@ -90,7 +88,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() // Then start reading the request - let requestReader = HTTPRequestConcludingAsyncReader(iterator: stream.makeAsyncIterator(), readerState: .init()) + let requestReader = HTTPRequestConcludingAsyncReader(readerState: .init(iterator: stream.makeAsyncIterator())) let (requestBody, finalElement) = try await requestReader.consumeAndConclude { bodyReader in var bodyReader = bodyReader @@ -140,8 +138,7 @@ struct HTTPRequestConcludingAsyncReaderTests { group.addTask { let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) let (_, finalElement) = try await requestReader.consumeAndConclude { bodyReader in // Read all body chunks @@ -174,8 +171,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = await requestReader.consumeAndConclude { bodyReader in @@ -204,8 +200,7 @@ struct HTTPRequestConcludingAsyncReaderTests { source.finish() let requestReader = HTTPRequestConcludingAsyncReader( - iterator: stream.makeAsyncIterator(), - readerState: .init() + readerState: .init(iterator: stream.makeAsyncIterator()) ) _ = try await requestReader.consumeAndConclude { requestBodyReader in @@ -237,7 +232,7 @@ struct HTTPRequestConcludingAsyncReaderTests { let streamIterator = stream.makeAsyncIterator() - let requestReader = HTTPRequestConcludingAsyncReader(iterator: streamIterator, readerState: .init()) + let requestReader = HTTPRequestConcludingAsyncReader(readerState: .init(iterator: streamIterator)) _ = try await requestReader.consumeAndConclude { requestBodyReader in var requestBodyReader = requestBodyReader @@ -290,7 +285,7 @@ struct HTTPRequestConcludingAsyncReaderTests { let streamIterator = stream.makeAsyncIterator() - let requestReader = HTTPRequestConcludingAsyncReader(iterator: streamIterator, readerState: .init()) + let requestReader = HTTPRequestConcludingAsyncReader(readerState: .init(iterator: streamIterator)) _ = try await requestReader.consumeAndConclude { requestBodyReader in var requestBodyReader = requestBodyReader diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift index aff9b36..95b8261 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServer+ServiceLifecycleTests.swift @@ -184,6 +184,13 @@ struct NIOHTTPServiceLifecycleTests { // Wait for the server to shut down. try await group.waitForAll() + // Wait for the client channel to be fully closed. The server has closed + // its side of the connection, but the client's event loop may not have + // processed the TCP FIN/RST yet. closeFuture completes only once the + // channel is fully inactive, which is a stronger guarantee than just + // draining inbound (which may return while the channel is half-closed). + try await client.channel.closeFuture.get() + // We shouldn't be able to complete our request; the server should have shut down. await #expect(throws: ChannelError.ioOnClosedChannel) { try await outbound.write(Self.reqBody) @@ -234,7 +241,7 @@ struct NIOHTTPServiceLifecycleTests { // intentional because we want to keep the connection alive until the grace timer (500ms) fires. try await bodyReader.read(maximumCount: Self.bodyData.readableBytes) { _ in } } - #expect(throws: RequestBodyReadError.streamEndedBeforeReceivingRequestEnd) { try error.unwrap() } + #expect(throws: RequestBodyReadError.requestEndedBeforeReceivingEnd) { try error.unwrap() } } } } diff --git a/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift b/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift index 2eef788..226f6e4 100644 --- a/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift +++ b/Tests/NIOHTTPServerTests/NIOHTTPServerTests.swift @@ -110,7 +110,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: false ) } } @@ -190,7 +191,8 @@ struct NIOHTTPServerTests { inbound, expectedHead: [Self.responseHead(status: .ok, for: httpVersion)], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: httpVersion == .http2 ) responseReceived() @@ -240,7 +242,8 @@ struct NIOHTTPServerTests { Self.responseHead(status: .ok, for: httpVersion), ], expectedBody: [Self.bodyData], - expectedTrailers: Self.trailer + expectedTrailers: Self.trailer, + expectStreamEnd: httpVersion == .http2 ) responseReceived() } @@ -372,6 +375,60 @@ struct NIOHTTPServerTests { ) } + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test("Multiple serial HTTP/1.1 requests on the same connection") + func testMultipleSerialHTTP1Requests() async throws { + let server = NIOHTTPServer( + logger: self.serverLogger, + configuration: try .init( + bindTarget: .hostAndPort(host: "127.0.0.1", port: 0), + supportedHTTPVersions: [.http1_1], + transportSecurity: .plaintext + ) + ) + + let requestCount = 3 + + try await confirmation(expectedCount: requestCount) { responseReceived in + try await Self.withServer( + server: server, + serverHandler: HTTPServerClosureRequestHandler { request, requestContext, reader, responseWriter in + // Echo the request body back as the response body. + try await Self.echoResponse(readUpTo: 1024, reader: reader, sender: responseWriter) + }, + body: { serverAddress in + let client = try await ClientBootstrap(group: .singletonMultiThreadedEventLoopGroup) + .connectToTestHTTP1Server(at: serverAddress) + + try await client.executeThenClose { inbound, outbound in + var responseIterator = inbound.makeAsyncIterator() + + for i in 1...requestCount { + // Send request + try await outbound.write( + .head(.init(method: .post, scheme: "http", authority: "", path: "/\(i)")) + ) + try await outbound.write(Self.reqBody) + try await outbound.write(.end(nil)) + + // Read response + let headPart = try await responseIterator.next() + #expect(headPart == .head(Self.responseHead(status: .ok, for: .http1_1))) + + let bodyPart = try await responseIterator.next() + #expect(bodyPart == .body(Self.bodyData)) + + let endPart = try await responseIterator.next() + #expect(endPart == .end(nil)) + + responseReceived() + } + } + } + ) + } + } + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) @Test("Multiple concurrent connections", arguments: [HTTPVersion.http1_1, HTTPVersion.http2]) func testMultipleConcurrentConnections(httpVersion: HTTPVersion) async throws { @@ -427,7 +484,8 @@ struct NIOHTTPServerTests { try await Self.validateResponse( inbound, expectedHead: [Self.responseHead(status: .ok, for: httpVersion)], - expectedBody: [Self.bodyData] + expectedBody: [Self.bodyData], + expectStreamEnd: httpVersion == .http2 ) responseReceived() @@ -543,7 +601,6 @@ struct NIOHTTPServerTests { try await firstClientChannel.executeThenClose { inbound, outbound in // Only send a request head; finish the stream immediately afterwards. try await outbound.write(.head(.init(method: .post, scheme: "http", authority: "", path: "/"))) - outbound.finish() } try await firstRequestErrorCaught.futureResult.get() @@ -559,7 +616,8 @@ struct NIOHTTPServerTests { try await Self.validateResponse( inbound, expectedHead: [Self.responseHead(status: .ok, for: .http1_1)], - expectedBody: [Self.bodyData] + expectedBody: [Self.bodyData], + expectStreamEnd: false ) responseReceived() @@ -615,6 +673,7 @@ extension NIOHTTPServerTests { expectedHead: [HTTPResponse], expectedBody: [ByteBuffer], expectedTrailers: HTTPFields? = nil, + expectStreamEnd: Bool = true, sourceLocation: SourceLocation = #_sourceLocation ) async throws { var responseIterator = responseStream.makeAsyncIterator() @@ -632,11 +691,13 @@ extension NIOHTTPServerTests { let endResponsePart = try await responseIterator.next() #expect(endResponsePart == .end(expectedTrailers), sourceLocation: sourceLocation) - #expect( - try await responseIterator.next() == nil, - "Received another response part when the response stream should have finished.", - sourceLocation: sourceLocation - ) + if expectStreamEnd { + #expect( + try await responseIterator.next() == nil, + "Received another response part when the response stream should have finished.", + sourceLocation: sourceLocation + ) + } } /// Unwraps a negotiated channel, asserting it matches the expected `httpVersion`. For HTTP/2, opens and returns a diff --git a/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift b/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift index 9af92f8..4153738 100644 --- a/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift +++ b/Tests/NIOHTTPServerTests/Utilities/TestingChannelClientServer/TestingChannelServer+HTTP1.swift @@ -98,7 +98,7 @@ struct TestingChannelHTTP1Server { try await body(clientAsyncChannel) - try await serverTestConnectionChannel.close() + try? await serverTestConnectionChannel.close() } } }