Skip to content

Commit 0ca4cfd

Browse files
authored
fix: sync streams (#1560)
1 parent 082a7b4 commit 0ca4cfd

3 files changed

Lines changed: 42 additions & 2 deletions

File tree

src/handlers/http/cluster/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,14 @@ pub async fn sync_streams_with_ingestors(
412412
base_path_without_preceding_slash(),
413413
stream_name
414414
);
415-
let headers = reqwest_headers_clone.clone();
415+
let mut headers = reqwest_headers_clone.clone();
416+
417+
if !headers.contains_key("intra-cluster-userid") {
418+
headers.insert(
419+
reqwest::header::HeaderName::from_static("intra-cluster-userid"),
420+
reqwest::header::HeaderValue::from_str(&PARSEABLE.options.username).unwrap(),
421+
);
422+
}
416423
let body = body_clone.clone();
417424
async move {
418425
let res = INTRA_CLUSTER_CLIENT

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ use tracing::{error, warn};
3131

3232
pub static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());
3333

34+
use crate::handlers::http::middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER};
35+
use crate::parseable::DEFAULT_TENANT;
36+
use crate::utils::get_user_from_request;
3437
use crate::{
3538
handlers::{
3639
UPDATE_STREAM_KEY,
@@ -126,7 +129,7 @@ pub async fn put_stream(
126129
let stream_name = stream_name.into_inner();
127130
let tenant_id = get_tenant_id_from_request(&req);
128131
let _guard = CREATE_STREAM_LOCK.lock().await;
129-
let headers = PARSEABLE
132+
let mut headers = PARSEABLE
130133
.create_update_stream(req.headers(), &body, &stream_name, &tenant_id)
131134
.await?;
132135

@@ -136,6 +139,24 @@ pub async fn put_stream(
136139
false
137140
};
138141

142+
if let Some((_, hash)) = CLUSTER_SECRET.get() {
143+
let userid = get_user_from_request(&req).unwrap();
144+
headers.insert(
145+
actix_web::http::header::HeaderName::from_static(CLUSTER_SECRET_HEADER),
146+
actix_web::http::header::HeaderValue::from_str(hash).unwrap(),
147+
);
148+
headers.insert(
149+
actix_web::http::header::HeaderName::from_static("intra-cluster-tenant"),
150+
actix_web::http::header::HeaderValue::from_str(
151+
tenant_id.as_deref().unwrap_or(DEFAULT_TENANT),
152+
)
153+
.unwrap(),
154+
);
155+
headers.insert(
156+
actix_web::http::header::HeaderName::from_static("intra-cluster-userid"),
157+
actix_web::http::header::HeaderValue::from_str(&userid).unwrap(),
158+
);
159+
}
139160
sync_streams_with_ingestors(headers, body, &stream_name, &tenant_id).await?;
140161

141162
if is_update {

src/parseable/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ use crate::{
5757
cluster::{PMETA_STREAM_NAME, sync_streams_with_ingestors},
5858
ingest::PostError,
5959
logstream::error::{CreateStreamError, StreamError},
60+
middleware::{CLUSTER_SECRET, CLUSTER_SECRET_HEADER},
6061
modal::{
6162
ingest_server::INGESTOR_META,
6263
utils::{logstream_utils::PutStreamHeaders, rbac_utils::get_metadata},
@@ -494,6 +495,17 @@ impl Parseable {
494495
);
495496
header_map.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
496497

498+
if let Some((_, hash)) = CLUSTER_SECRET.get() {
499+
header_map.insert(
500+
HeaderName::from_static(CLUSTER_SECRET_HEADER),
501+
HeaderValue::from_str(hash).unwrap(),
502+
);
503+
header_map.insert(
504+
HeaderName::from_static("intra-cluster-tenant"),
505+
HeaderValue::from_str(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)).unwrap(),
506+
);
507+
}
508+
497509
// Sync only the streams that were created successfully
498510
if matches!(internal_stream_result, Ok(false))
499511
&& let Err(e) = sync_streams_with_ingestors(

0 commit comments

Comments
 (0)