2323import io .modelcontextprotocol .client .McpAsyncClient ;
2424import io .modelcontextprotocol .client .transport .ResponseSubscribers .ResponseEvent ;
2525import io .modelcontextprotocol .client .transport .customizer .McpAsyncHttpClientRequestCustomizer ;
26+ import io .modelcontextprotocol .client .transport .customizer .McpHttpClientAuthorizationErrorHandler ;
2627import io .modelcontextprotocol .client .transport .customizer .McpSyncHttpClientRequestCustomizer ;
2728import io .modelcontextprotocol .common .McpTransportContext ;
2829import io .modelcontextprotocol .json .McpJsonDefaults ;
@@ -115,6 +116,8 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
115116
116117 private final boolean openConnectionOnStartup ;
117118
119+ private final McpHttpClientAuthorizationErrorHandler authorizationErrorHandler ;
120+
118121 private final boolean resumableStreams ;
119122
120123 private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ;
@@ -132,14 +135,15 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
132135 private HttpClientStreamableHttpTransport (McpJsonMapper jsonMapper , HttpClient httpClient ,
133136 HttpRequest .Builder requestBuilder , String baseUri , String endpoint , boolean resumableStreams ,
134137 boolean openConnectionOnStartup , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ,
135- List <String > supportedProtocolVersions ) {
138+ McpHttpClientAuthorizationErrorHandler authorizationErrorHandler , List <String > supportedProtocolVersions ) {
136139 this .jsonMapper = jsonMapper ;
137140 this .httpClient = httpClient ;
138141 this .requestBuilder = requestBuilder ;
139142 this .baseUri = URI .create (baseUri );
140143 this .endpoint = endpoint ;
141144 this .resumableStreams = resumableStreams ;
142145 this .openConnectionOnStartup = openConnectionOnStartup ;
146+ this .authorizationErrorHandler = authorizationErrorHandler ;
143147 this .activeSession .set (createTransportSession ());
144148 this .httpRequestCustomizer = httpRequestCustomizer ;
145149 this .supportedProtocolVersions = Collections .unmodifiableList (supportedProtocolVersions );
@@ -478,6 +482,17 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
478482 })).onErrorMap (CompletionException .class , t -> t .getCause ()).onErrorComplete ().subscribe ();
479483
480484 })).flatMap (responseEvent -> {
485+ int statusCode = responseEvent .responseInfo ().statusCode ();
486+ if (statusCode == 401 || statusCode == 403 ) {
487+ return Mono .deferContextual (ctx -> {
488+ var transportContext = ctx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
489+ return Mono .from (this .authorizationErrorHandler .handle (responseEvent .responseInfo (),
490+ transportContext , Mono .defer (() -> this .sendMessage (sentMessage ))));
491+ })
492+ .then (Mono .error (new McpHttpClientTransportException ("Authorization error when sending message" ,
493+ responseEvent .responseInfo ())));
494+ }
495+
481496 if (transportSession .markInitialized (
482497 responseEvent .responseInfo ().headers ().firstValue ("mcp-session-id" ).orElseGet (() -> null ))) {
483498 // Once we have a session, we try to open an async stream for
@@ -488,8 +503,6 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
488503
489504 String sessionRepresentation = sessionIdOrPlaceholder (transportSession );
490505
491- int statusCode = responseEvent .responseInfo ().statusCode ();
492-
493506 if (statusCode >= 200 && statusCode < 300 ) {
494507
495508 String contentType = responseEvent .responseInfo ()
@@ -664,6 +677,8 @@ public static class Builder {
664677 private List <String > supportedProtocolVersions = List .of (ProtocolVersions .MCP_2024_11_05 ,
665678 ProtocolVersions .MCP_2025_03_26 , ProtocolVersions .MCP_2025_06_18 , ProtocolVersions .MCP_2025_11_25 );
666679
680+ private McpHttpClientAuthorizationErrorHandler authorizationErrorHandler = McpHttpClientAuthorizationErrorHandler .NOOP ;
681+
667682 /**
668683 * Creates a new builder with the specified base URI.
669684 * @param baseUri the base URI of the MCP server
@@ -801,6 +816,16 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
801816 return this ;
802817 }
803818
819+ /**
820+ * Sets the handler
821+ * @param authorizationErrorHandler
822+ * @return
823+ */
824+ public Builder authorizationErrorHandler (McpHttpClientAuthorizationErrorHandler authorizationErrorHandler ) {
825+ this .authorizationErrorHandler = authorizationErrorHandler ;
826+ return this ;
827+ }
828+
804829 /**
805830 * Sets the connection timeout for the HTTP client.
806831 * @param connectTimeout the connection timeout duration
@@ -845,7 +870,7 @@ public HttpClientStreamableHttpTransport build() {
845870 HttpClient httpClient = this .clientBuilder .connectTimeout (this .connectTimeout ).build ();
846871 return new HttpClientStreamableHttpTransport (jsonMapper == null ? McpJsonDefaults .getMapper () : jsonMapper ,
847872 httpClient , requestBuilder , baseUri , endpoint , resumableStreams , openConnectionOnStartup ,
848- httpRequestCustomizer , supportedProtocolVersions );
873+ httpRequestCustomizer , authorizationErrorHandler , supportedProtocolVersions );
849874 }
850875
851876 }
0 commit comments