Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ public class Config extends ConfigBase {
"The maximum HTTP POST size of Jetty, in bytes, the default value is 100MB."})
public static int jetty_server_max_http_post_size = 100 * 1024 * 1024;

@ConfField(mutable = true, description = {
"Jetty 在应用未消费完请求体时,额外尝试读取剩余内容的最大次数。"
+ "-1 表示不限制,0 表示不额外读取,正数表示最大读取次数。",
"The maximum number of extra reads Jetty performs for unconsumed request content. "
+ "-1 means unlimited, 0 means disabled, and a positive value limits the read attempts."})
public static int jetty_server_max_unconsumed_request_content_reads = -1;

@ConfField(description = {"Jetty 的最大 HTTP header 大小,单位是字节,默认值是 1MB。",
"The maximum HTTP header size of Jetty, in bytes, the default value is 1MB."})
public static int jetty_server_max_http_header_size = 1048576;
Expand Down Expand Up @@ -3305,6 +3312,13 @@ public static int metaServiceRpcRetryTimes() {
+ "public-private/public/private/direct/random-be and empty string" })
public static String streamload_redirect_policy = "";

@ConfField(mutable = true, description = {
"Stream Load redirect 场景下,FE 在返回 307 后额外丢弃请求体的最大字节数。"
+ "0 表示关闭该兼容逻辑,正数表示最大丢弃字节数。",
"The maximum number of request body bytes FE drains after returning 307 for Stream Load redirects. "
+ "0 disables the compatibility logic, and a positive value sets the byte limit."})
public static long stream_load_redirect_bounded_drain_max_bytes = 0;

@ConfField(mutable = true, description = {
"存算分离模式下是否启用group commit的streamload BE转发功能。"
+ "解决LB随机转发导致group commit攒批失效的问题,通过BE二次转发确保同表请求到达同一BE节点。",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.Config;

import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ServerConnector;
Expand All @@ -37,20 +38,36 @@ public void customize(ConfigurableJettyWebServerFactory factory) {
((JettyServletWebServerFactory) factory).setConfigurations(
Collections.singletonList(new HttpToHttpsJettyConfig())
);
}

factory.addServerCustomizers(
server -> {
factory.addServerCustomizers(
server -> {
if (Config.enable_https) {
HttpConfiguration httpConfiguration = new HttpConfiguration();
httpConfiguration.setSecurePort(Config.https_port);
httpConfiguration.setSecureScheme("https");
httpConfiguration.setMaxUnconsumedRequestContentReads(
Config.jetty_server_max_unconsumed_request_content_reads);

ServerConnector connector = new ServerConnector(server);
connector.addConnectionFactory(new HttpConnectionFactory(httpConfiguration));
connector.setPort(Config.http_port);

server.addConnector(connector);
}
);
}

for (Connector connector : server.getConnectors()) {
if (!(connector instanceof ServerConnector)) {
continue;
}
HttpConnectionFactory httpConnectionFactory =
((ServerConnector) connector).getConnectionFactory(HttpConnectionFactory.class);
if (httpConnectionFactory != null) {
httpConnectionFactory.getHttpConfiguration().setMaxUnconsumedRequestContentReads(
Config.jetty_server_max_unconsumed_request_content_reads);
}
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
Expand All @@ -38,6 +37,7 @@
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.httpv2.util.StreamLoadRedirectDrainUtil;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.StreamLoadHandler;
import org.apache.doris.load.loadv2.IngestionLoadJob;
Expand Down Expand Up @@ -77,7 +77,6 @@

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -210,8 +209,7 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);

RedirectView redirectView = redirectTo(request, redirectAddr);
return redirectView;
return createRedirectResponse(request, response, redirectAddr, true, null, null, label);
} catch (Exception e) {
return new RestBaseResult(e.getMessage());
}
Expand Down Expand Up @@ -334,11 +332,10 @@ private Object executeWithoutPassword(HttpServletRequest request,
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);
}

RedirectView redirectView = redirectTo(request, redirectAddr);
return redirectView;
return createRedirectResponse(request, response, redirectAddr, isStreamLoad, dbName, tableName, label);
} catch (StreamLoadForwardException e) {
// Special handling for stream load forwarding
return e.getRedirectView();
return createRedirectResponse(request, response, e.getRedirectView(), isStreamLoad, db, table, label);
} catch (Exception e) {
LOG.warn("load failed, stream: {}, db: {}, tbl: {}, label: {}, err: {}",
isStreamLoad, db, table, label, e.getMessage());
Expand Down Expand Up @@ -672,24 +669,7 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);

URI urlObj = null;
URI resultUriObj = null;
String urlStr = request.getRequestURI();
String userInfo = null;

try {
urlObj = new URI(urlStr);
resultUriObj = new URI("http", userInfo, redirectAddr.getHostname(),
redirectAddr.getPort(), urlObj.getPath(), "", null);
} catch (Exception e) {
throw new RuntimeException(e);
}
String redirectUrl = resultUriObj.toASCIIString();
if (!Strings.isNullOrEmpty(request.getQueryString())) {
redirectUrl += request.getQueryString();
}
LOG.info("Redirect url: {}", "http://" + redirectAddr.getHostname() + ":"
+ redirectAddr.getPort() + urlObj.getPath());
String redirectUrl = buildRedirectUrl(request, redirectAddr);
RedirectView redirectView = new RedirectView(redirectUrl);
redirectView.setContentType("text/html;charset=utf-8");
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
Expand All @@ -714,6 +694,47 @@ private String getAllHeaders(HttpServletRequest request) {
return headers.toString();
}

private Object createRedirectResponse(HttpServletRequest request, HttpServletResponse response,
TNetworkAddress redirectAddr, boolean isStreamLoad, String dbName, String tableName, String label)
throws IOException {
String redirectUrl = buildRedirectUrl(request, redirectAddr);
if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
return redirectTo(request, redirectAddr);
}
writeTemporaryRedirect(response, redirectUrl);
drainStreamLoadRequestBodyAfterRedirect(request, redirectAddr.toString(), dbName, tableName, label);
return null;
}

private Object createRedirectResponse(HttpServletRequest request, HttpServletResponse response,
RedirectView redirectView, boolean isStreamLoad, String dbName, String tableName, String label)
throws IOException {
if (!shouldUseBoundedDrainForStreamLoad(isStreamLoad)) {
return redirectView;
}
writeTemporaryRedirect(response, redirectView.getUrl());
drainStreamLoadRequestBodyAfterRedirect(request, redirectView.getUrl(), dbName, tableName, label);
return null;
}

private boolean shouldUseBoundedDrainForStreamLoad(boolean isStreamLoad) {
return isStreamLoad && Config.stream_load_redirect_bounded_drain_max_bytes > 0;
}

private void drainStreamLoadRequestBodyAfterRedirect(HttpServletRequest request, String redirectTarget,
String dbName, String tableName, String label) {
long drainLimit = Config.stream_load_redirect_bounded_drain_max_bytes;
LOG.info("write stream load redirect and start bounded drain, target: {}, db: {}, tbl: {}, label: {},"
+ " max_drain_bytes: {}",
redirectTarget, dbName, tableName, label, drainLimit);
StreamLoadRedirectDrainUtil.DrainResult drainResult =
StreamLoadRedirectDrainUtil.drainRequestBodyAfterRedirect(request, drainLimit);
LOG.info("finish bounded drain after stream load redirect, target: {}, db: {}, tbl: {}, label: {},"
+ " drained_bytes: {}, elapsed_ms: {}, exit_reason: {}",
redirectTarget, dbName, tableName, label, drainResult.getDrainedBytes(),
drainResult.getElapsedMillis(), drainResult.getExitReason());
}

private Backend selectBackendForGroupCommit(String clusterName, HttpServletRequest req, long tableId)
throws LoadException {
ConnectContext ctx = new ConnectContext();
Expand Down Expand Up @@ -959,35 +980,13 @@ public Object updateIngestionLoad(HttpServletRequest request, HttpServletRespons
*/
private RedirectView redirectToStreamLoadForward(HttpServletRequest request, TNetworkAddress addr,
String forwardTarget) {
URI urlObj = null;
URI resultUriObj = null;
String urlStr = request.getRequestURI();
String userInfo = null;
String modifiedPath = null;

if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
userInfo = ClusterNamespace.getNameFromFullName(authInfo.fullUserName)
+ ":" + authInfo.password;
}
try {
urlObj = new URI(urlStr);
// Replace _stream_load with _stream_load_forward in the path
modifiedPath = urlObj.getPath().replace("/_stream_load", "/_stream_load_forward");
resultUriObj = new URI("http", userInfo, addr.getHostname(),
addr.getPort(), modifiedPath, "", null);
} catch (Exception e) {
throw new RuntimeException(e);
}
String redirectUrl = resultUriObj.toASCIIString();

// Add forward_to parameter (note: toASCIIString() already includes '?' due to empty query)
String modifiedPath = request.getRequestURI().replace("/_stream_load", "/_stream_load_forward");
String queryString = request.getQueryString();
String redirectQuery = "forward_to=" + forwardTarget;
if (!Strings.isNullOrEmpty(queryString)) {
redirectUrl += queryString + "&forward_to=" + forwardTarget;
} else {
redirectUrl += "forward_to=" + forwardTarget;
redirectQuery = queryString + "&" + redirectQuery;
}
String redirectUrl = buildRedirectUrl(request, addr, modifiedPath, redirectQuery);

LOG.info("Redirect stream load forward url: {}, forward_to: {}",
"http://" + addr.getHostname() + ":" + addr.getPort() + modifiedPath, forwardTarget);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,29 +73,43 @@ public ActionAuthorizationInfo executeCheckPassword(HttpServletRequest request,
return authInfo;
}

public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress addr) {
URI urlObj = null;
protected String buildRedirectUrl(HttpServletRequest request, TNetworkAddress addr) {
return buildRedirectUrl(request, addr, request.getRequestURI(), request.getQueryString());
}

protected String buildRedirectUrl(HttpServletRequest request, TNetworkAddress addr, String requestPath,
String queryString) {
URI resultUriObj = null;
String urlStr = request.getRequestURI();
String userInfo = null;
if (!Strings.isNullOrEmpty(request.getHeader("Authorization"))) {
ActionAuthorizationInfo authInfo = getAuthorizationInfo(request);
userInfo = ClusterNamespace.getNameFromFullName(authInfo.fullUserName)
+ ":" + authInfo.password;
}
try {
urlObj = new URI(urlStr);
resultUriObj = new URI("http", userInfo, addr.getHostname(),
addr.getPort(), urlObj.getPath(), "", null);
addr.getPort(), requestPath, null, null);
} catch (Exception e) {
throw new RuntimeException(e);
}
String redirectUrl = resultUriObj.toASCIIString();
if (!Strings.isNullOrEmpty(request.getQueryString())) {
redirectUrl += request.getQueryString();
if (!Strings.isNullOrEmpty(queryString)) {
redirectUrl += "?" + queryString;
}
LOG.info("Redirect url: {}", "http://" + addr.getHostname() + ":"
+ addr.getPort() + urlObj.getPath());
+ addr.getPort() + requestPath);
return redirectUrl;
}

protected void writeTemporaryRedirect(HttpServletResponse response, String redirectUrl) throws IOException {
response.setContentType("text/html;charset=utf-8");
response.setStatus(HttpStatus.TEMPORARY_REDIRECT.value());
response.setHeader("Location", redirectUrl);
response.flushBuffer();
}

public RedirectView redirectTo(HttpServletRequest request, TNetworkAddress addr) {
String redirectUrl = buildRedirectUrl(request, addr);
RedirectView redirectView = new RedirectView(redirectUrl);
redirectView.setContentType("text/html;charset=utf-8");
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);
Expand Down
Loading
Loading