Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024-2025 the original author or authors.
* Copyright 2024-2026 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;
Expand All @@ -23,6 +23,7 @@
import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.client.transport.customizer.McpHttpClientAuthorizationErrorHandler;
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.json.McpJsonDefaults;
Expand Down Expand Up @@ -72,6 +73,7 @@
* </p>
*
* @author Christian Tzolov
* @author Daniel Garnier-Moiroux
* @see <a href=
* "https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable
* HTTP transport specification</a>
Expand Down Expand Up @@ -115,6 +117,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {

private final boolean openConnectionOnStartup;

private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler;

private final boolean resumableStreams;

private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
Expand All @@ -132,14 +136,15 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
List<String> supportedProtocolVersions) {
McpHttpClientAuthorizationErrorHandler authorizationErrorHandler, List<String> supportedProtocolVersions) {
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.requestBuilder = requestBuilder;
this.baseUri = URI.create(baseUri);
this.endpoint = endpoint;
this.resumableStreams = resumableStreams;
this.openConnectionOnStartup = openConnectionOnStartup;
this.authorizationErrorHandler = authorizationErrorHandler;
this.activeSession.set(createTransportSession());
this.httpRequestCustomizer = httpRequestCustomizer;
this.supportedProtocolVersions = Collections.unmodifiableList(supportedProtocolVersions);
Expand Down Expand Up @@ -239,7 +244,6 @@ public Mono<Void> closeGracefully() {
}

private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {

return Mono.deferContextual(ctx -> {

if (stream != null) {
Expand Down Expand Up @@ -275,121 +279,128 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
})
.flatMapMany(
requestBuilder -> Flux.<ResponseEvent>create(
sseSink -> this.httpClient
.sendAsync(requestBuilder.build(),
responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo,
sseSink))
.whenComplete((response, throwable) -> {
if (throwable != null) {
sseSink.error(throwable);
}
else {
logger.debug("SSE connection established successfully");
}
}))
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
.flatMap(responseEvent -> {
int statusCode = responseEvent.responseInfo().statusCode();

if (statusCode >= 200 && statusCode < 300) {

if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
String data = responseEvent.sseEvent().data();
// Per 2025-11-25 spec (SEP-1699), servers may
// send SSE events
// with empty data to prime the client for
// reconnection.
// Skip these events as they contain no JSON-RPC
// message.
if (data == null || data.isBlank()) {
logger.debug("Skipping SSE event with empty data (stream primer)");
return Flux.empty();
}
try {
// We don't support batching ATM and probably
// won't since the next version considers
// removing it.
McpSchema.JSONRPCMessage message = McpSchema
.deserializeJsonRpcMessage(this.jsonMapper, data);

Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
.of(Optional.ofNullable(responseEvent.sseEvent().id()),
List.of(message));

McpTransportStream<Disposable> sessionStream = stream != null ? stream
: new DefaultMcpTransportStream<>(this.resumableStreams,
this::reconnect);
logger.debug("Connected stream {}", sessionStream.streamId());

return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));

}
catch (IOException ioException) {
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
"Error parsing JSON-RPC message: " + responseEvent, ioException));
}
}
else {
logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
return Flux.empty();
}
}
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
logger
.debug("The server does not support SSE streams, using request-response mode.");
.flatMapMany(requestBuilder -> Flux.<ResponseEvent>create(sseSink -> this.httpClient
.sendAsync(requestBuilder.build(), this.toSendMessageBodySubscriber(sseSink))
.whenComplete((response, throwable) -> {
if (throwable != null) {
sseSink.error(throwable);
}
else {
logger.debug("SSE connection established successfully");
}
})).flatMap(responseEvent -> {
int statusCode = responseEvent.responseInfo().statusCode();
if (statusCode == 401 || statusCode == 403) {
logger.debug("Authorization error in sendMessage with code {}", statusCode);
return Mono.deferContextual(innerCtx -> {
var transportContext = innerCtx.getOrDefault(McpTransportContext.KEY,
McpTransportContext.EMPTY);
return Mono.from(this.authorizationErrorHandler.onAuthorizationError(
responseEvent.responseInfo(), transportContext, Mono.defer(() -> {
logger.debug("Authorization error handled, retrying original request");
return this.reconnect(stream).then();
}),
Mono.error(new McpHttpClientTransportException(
"Authorization error connecting to SSE stream",
responseEvent.responseInfo()))))
.then(Mono.empty());
});
}

if (!(responseEvent instanceof ResponseSubscribers.SseResponseEvent sseResponseEvent)) {
return Flux.<McpSchema.JSONRPCMessage>error(new McpHttpClientTransportException(
"Unrecognized server error when connecting to SSE stream",
responseEvent.responseInfo()));
}
else if (statusCode >= 200 && statusCode < 300) {
if (MESSAGE_EVENT_TYPE.equals(sseResponseEvent.sseEvent().event())) {
String data = sseResponseEvent.sseEvent().data();
// Per 2025-11-25 spec (SEP-1699), servers may
// send SSE events
// with empty data to prime the client for
// reconnection.
// Skip these events as they contain no JSON-RPC
// message.
if (data == null || data.isBlank()) {
logger.debug("Skipping SSE event with empty data (stream primer)");
return Flux.empty();
}
else if (statusCode == NOT_FOUND) {

if (transportSession != null && transportSession.sessionId().isPresent()) {
// only if the request was sent with a session id
// and the response is 404, we consider it a
// session not found error.
logger.debug("Session not found for session ID: {}",
transportSession.sessionId().get());
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
"Session not found for session ID: " + sessionIdRepresentation);
return Flux.<McpSchema.JSONRPCMessage>error(exception);
}
return Flux.<McpSchema.JSONRPCMessage>error(
new McpTransportException("Server Not Found. Status code:" + statusCode
+ ", response-event:" + responseEvent));
}
else if (statusCode == BAD_REQUEST) {
if (transportSession != null && transportSession.sessionId().isPresent()) {
// only if the request was sent with a session id
// and thre response is 404, we consider it a
// session not found error.
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
"Session not found for session ID: " + sessionIdRepresentation);
return Flux.<McpSchema.JSONRPCMessage>error(exception);
}
return Flux.<McpSchema.JSONRPCMessage>error(
new McpTransportException("Bad Request. Status code:" + statusCode
+ ", response-event:" + responseEvent));
try {
// We don't support batching ATM and probably
// won't since the next version considers
// removing it.
McpSchema.JSONRPCMessage message = McpSchema
.deserializeJsonRpcMessage(this.jsonMapper, data);

}
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
.of(Optional.ofNullable(sseResponseEvent.sseEvent().id()), List.of(message));

McpTransportStream<Disposable> sessionStream = stream != null ? stream
: new DefaultMcpTransportStream<>(this.resumableStreams, this::reconnect);
logger.debug("Connected stream {}", sessionStream.streamId());

return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
}).<McpSchema
.JSONRPCMessage>flatMap(
jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
.onErrorMap(CompletionException.class, t -> t.getCause())
.onErrorComplete(t -> {
this.handleException(t);
return true;
})
.doFinally(s -> {
Disposable ref = disposableRef.getAndSet(null);
if (ref != null) {
transportSession.removeConnection(ref);
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));

}
catch (IOException ioException) {
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
"Error parsing JSON-RPC message: " + responseEvent, ioException));
}
}))
}
else {
logger.debug("Received SSE event with type: {}", sseResponseEvent.sseEvent());
return Flux.empty();
}
}
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
logger.debug("The server does not support SSE streams, using request-response mode.");
return Flux.empty();
}
else if (statusCode == NOT_FOUND) {

if (transportSession != null && transportSession.sessionId().isPresent()) {
// only if the request was sent with a session id
// and the response is 404, we consider it a
// session not found error.
logger.debug("Session not found for session ID: {}",
transportSession.sessionId().get());
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
"Session not found for session ID: " + sessionIdRepresentation);
return Flux.<McpSchema.JSONRPCMessage>error(exception);
}
return Flux.<McpSchema.JSONRPCMessage>error(
new McpTransportException("Server Not Found. Status code:" + statusCode
+ ", response-event:" + responseEvent));
}
else if (statusCode == BAD_REQUEST) {
if (transportSession != null && transportSession.sessionId().isPresent()) {
// only if the request was sent with a session id
// and thre response is 404, we consider it a
// session not found error.
String sessionIdRepresentation = sessionIdOrPlaceholder(transportSession);
McpTransportSessionNotFoundException exception = new McpTransportSessionNotFoundException(
"Session not found for session ID: " + sessionIdRepresentation);
return Flux.<McpSchema.JSONRPCMessage>error(exception);
}
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
"Bad Request. Status code:" + statusCode + ", response-event:" + responseEvent));
}
return Flux.<McpSchema.JSONRPCMessage>error(new McpTransportException(
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
})
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
.onErrorMap(CompletionException.class, t -> t.getCause())
.onErrorComplete(t -> {
this.handleException(t);
return true;
})
.doFinally(s -> {
Disposable ref = disposableRef.getAndSet(null);
if (ref != null) {
transportSession.removeConnection(ref);
}
}))
.contextWrite(ctx)
.subscribe();

Expand Down Expand Up @@ -478,6 +489,22 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();

})).flatMap(responseEvent -> {
int statusCode = responseEvent.responseInfo().statusCode();
if (statusCode == 401 || statusCode == 403) {
logger.debug("Authorization error in sendMessage with code {}", statusCode);
return Mono.deferContextual(ctx -> {
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
return Mono.from(this.authorizationErrorHandler
.onAuthorizationError(responseEvent.responseInfo(), transportContext, Mono.defer(() -> {
logger.debug("Authorization error handled, retrying original request");
return this.sendMessage(sentMessage);
}), Mono.error(new McpHttpClientTransportException(
"Authorization error when sending message", responseEvent.responseInfo()))))
.doOnSuccess(s -> deliveredSink.success())
.then(Mono.empty());
});
}

if (transportSession.markInitialized(
responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) {
// Once we have a session, we try to open an async stream for
Expand All @@ -488,8 +515,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {

String sessionRepresentation = sessionIdOrPlaceholder(transportSession);

int statusCode = responseEvent.responseInfo().statusCode();

if (statusCode >= 200 && statusCode < 300) {

String contentType = responseEvent.responseInfo()
Expand Down Expand Up @@ -664,6 +689,8 @@ public static class Builder {
private List<String> supportedProtocolVersions = List.of(ProtocolVersions.MCP_2024_11_05,
ProtocolVersions.MCP_2025_03_26, ProtocolVersions.MCP_2025_06_18, ProtocolVersions.MCP_2025_11_25);

private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler.NOOP;

/**
* Creates a new builder with the specified base URI.
* @param baseUri the base URI of the MCP server
Expand Down Expand Up @@ -801,6 +828,17 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
return this;
}

/**
* Sets the handler to be used when the server responds with HTTP 401 or HTTP 403
* when sending a message.
* @param authorizationErrorHandler the handler
* @return this builder
*/
public Builder authorizationErrorHandler(McpHttpClientAuthorizationErrorHandler authorizationErrorHandler) {
this.authorizationErrorHandler = authorizationErrorHandler;
return this;
}

/**
* Sets the connection timeout for the HTTP client.
* @param connectTimeout the connection timeout duration
Expand Down Expand Up @@ -845,7 +883,7 @@ public HttpClientStreamableHttpTransport build() {
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonDefaults.getMapper() : jsonMapper,
httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
httpRequestCustomizer, supportedProtocolVersions);
httpRequestCustomizer, authorizationErrorHandler, supportedProtocolVersions);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2026-2026 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;

import java.net.http.HttpResponse;

import io.modelcontextprotocol.spec.McpTransportException;

/**
* Authorization-related exception for {@link java.net.http.HttpClient}-based client
* transport. Thrown when the server responds with HTTP 401 or HTTP 403. Wraps the
* response info for further inspection of the headers and the status code.
*
* @see <a href=
* "https://modelcontextprotocol.io/specification/2025-11-25/basic/authorization">MCP
* Specification: Authorization</a>
* @author Daniel Garnier-Moiroux
*/
public class McpHttpClientTransportException extends McpTransportException {

private final HttpResponse.ResponseInfo responseInfo;

public McpHttpClientTransportException(String message, HttpResponse.ResponseInfo responseInfo) {
super(message);
this.responseInfo = responseInfo;
}

public HttpResponse.ResponseInfo getResponseInfo() {
return responseInfo;
}

}
Loading
Loading