Skip to content

Commit 0677cb9

Browse files
committed
updates for sync
1 parent d194fae commit 0677cb9

7 files changed

Lines changed: 183 additions & 29 deletions

File tree

src/handlers/http/cluster/mod.rs

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ pub async fn sync_users_with_roles_with_ingestors(
523523
let userid = userid.to_owned();
524524
let headers = req.headers().clone();
525525
let op = operation.to_string();
526+
let caller_userid = get_user_from_request(req).unwrap();
526527
for_each_live_node(tenant_id, move |ingestor| {
527528
let url = format!(
528529
"{}{}/user/{}/role/sync/{}",
@@ -533,7 +534,8 @@ pub async fn sync_users_with_roles_with_ingestors(
533534
);
534535

535536
let role_data = role_data.clone();
536-
let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid);
537+
let headermap =
538+
create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid);
537539
async move {
538540
let res = INTRA_CLUSTER_CLIENT
539541
.patch(url)
@@ -572,6 +574,7 @@ pub async fn sync_user_deletion_with_ingestors(
572574
tenant_id: &Option<String>,
573575
) -> Result<(), RBACError> {
574576
let userid = userid.to_owned();
577+
let caller_userid = get_user_from_request(req).unwrap();
575578
let headers = req.headers().clone();
576579
for_each_live_node(tenant_id, move |ingestor| {
577580
let url = format!(
@@ -580,7 +583,8 @@ pub async fn sync_user_deletion_with_ingestors(
580583
base_path_without_preceding_slash(),
581584
userid
582585
);
583-
let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid);
586+
let headermap =
587+
create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid);
584588
async move {
585589
let res = INTRA_CLUSTER_CLIENT
586590
.delete(url)
@@ -629,6 +633,7 @@ pub async fn sync_user_creation(
629633
RBACError::SerdeError(err)
630634
})?;
631635

636+
let caller_userid = get_user_from_request(req)?;
632637
let userid = userid.to_string();
633638
let headers = req.headers().clone();
634639
for_each_live_node(tenant_id, move |node| {
@@ -638,7 +643,7 @@ pub async fn sync_user_creation(
638643
base_path_without_preceding_slash(),
639644
userid
640645
);
641-
let headermap = create_intracluster_auth_headermap(&headers, &node.token, &userid);
646+
let headermap = create_intracluster_auth_headermap(&headers, &node.token, &caller_userid);
642647
let user_data = user_data.clone();
643648

644649
async move {
@@ -678,6 +683,7 @@ pub async fn sync_password_reset_with_ingestors(
678683
) -> Result<(), RBACError> {
679684
let userid = username.to_owned();
680685
let tenant_id = get_tenant_id_from_request(&req);
686+
let caller_userid = get_user_from_request(&req).unwrap();
681687
let headers = req.headers().clone();
682688
for_each_live_node(&tenant_id, move |ingestor| {
683689
let url = format!(
@@ -686,7 +692,8 @@ pub async fn sync_password_reset_with_ingestors(
686692
base_path_without_preceding_slash(),
687693
userid
688694
);
689-
let headermap = create_intracluster_auth_headermap(&headers, &ingestor.token, &userid);
695+
let headermap =
696+
create_intracluster_auth_headermap(&headers, &ingestor.token, &caller_userid);
690697
async move {
691698
let res = INTRA_CLUSTER_CLIENT
692699
.post(url)
@@ -767,6 +774,52 @@ pub async fn sync_role_update(
767774
.await
768775
}
769776

777+
// forward the put role request to all ingestors and queriers to keep them in sync
778+
pub async fn sync_role_delete(
779+
req: &HttpRequest,
780+
name: String,
781+
tenant_id: &Option<String>,
782+
) -> Result<(), RoleError> {
783+
let userid = get_user_from_request(req).unwrap();
784+
let headers = req.headers().clone();
785+
for_each_live_node(tenant_id, move |node| {
786+
let url = format!(
787+
"{}{}/role/{}/sync",
788+
node.domain_name,
789+
base_path_without_preceding_slash(),
790+
name
791+
);
792+
793+
let headermap = create_intracluster_auth_headermap(&headers, &node.token, &userid);
794+
async move {
795+
let res = INTRA_CLUSTER_CLIENT
796+
.delete(url)
797+
.headers(headermap)
798+
.header(header::CONTENT_TYPE, "application/json")
799+
.send()
800+
.await
801+
.map_err(|err| {
802+
error!(
803+
"Fatal: failed to forward request to node: {}\n Error: {:?}",
804+
node.domain_name, err
805+
);
806+
RoleError::Network(err)
807+
})?;
808+
809+
if !res.status().is_success() {
810+
error!(
811+
"failed to forward request to node: {}\nResponse Returned: {:?}",
812+
node.domain_name,
813+
res.text().await
814+
);
815+
}
816+
817+
Ok(())
818+
}
819+
})
820+
.await
821+
}
822+
770823
pub fn fetch_daily_stats(
771824
date: &str,
772825
stream_meta_list: &[ObjectStoreFormat],

src/handlers/http/modal/ingest/ingestor_rbac.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ use crate::{
3535
utils::get_tenant_id_from_request,
3636
};
3737

38-
// Handler for POST /api/v1/user/{username}
38+
// Handler for POST /api/v1/user/{userid}
3939
// Creates a new user by username if it does not exists
4040
pub async fn post_user(
4141
req: HttpRequest,
42-
username: web::Path<String>,
42+
userid: web::Path<String>,
4343
body: Option<web::Json<serde_json::Value>>,
4444
) -> Result<HttpResponse, RBACError> {
45-
let username = username.into_inner();
45+
let username = userid.into_inner();
4646

4747
if let Some(body) = body {
4848
let user: ParseableUser = serde_json::from_value(body.into_inner())?;
@@ -198,13 +198,13 @@ pub async fn remove_roles_from_user(
198198
Ok(HttpResponse::Ok().status(StatusCode::OK).finish())
199199
}
200200

201-
// Handler for POST /api/v1/user/{username}/generate-new-password
201+
// Handler for POST /api/v1/user/{userid}/generate-new-password
202202
// Resets password for the user to a newly generated one and returns it
203203
pub async fn post_gen_password(
204204
req: HttpRequest,
205-
username: web::Path<String>,
205+
userid: web::Path<String>,
206206
) -> Result<HttpResponse, RBACError> {
207-
let username = username.into_inner();
207+
let username = userid.into_inner();
208208
let tenant_id = get_tenant_id_from_request(&req);
209209
let mut new_hash = String::default();
210210
let mut metadata = get_metadata(&tenant_id).await?;

src/handlers/http/modal/ingest/ingestor_role.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,33 @@ pub async fn put(
9393

9494
Ok(HttpResponse::Ok().finish())
9595
}
96+
97+
// Handler for PUT /api/v1/role/{name}
98+
// Creates a new role or update existing one
99+
pub async fn delete(
100+
req: HttpRequest,
101+
name: web::Path<String>,
102+
) -> Result<impl Responder, RoleError> {
103+
let name = name.into_inner();
104+
let req_tenant_id = get_tenant_id_from_request(&req);
105+
106+
// if req_tenant.ne(DEFAULT_TENANT) && (req_tenant_id.eq(&sync_req.tenant_id)) {
107+
// return Err(RoleError::Anyhow(anyhow::Error::msg(
108+
// "non super-admin user trying to create role for another tenant",
109+
// )));
110+
// }
111+
let mut metadata = get_metadata(&req_tenant_id).await?;
112+
metadata.roles.remove(&name);
113+
114+
let _ = storage::put_staging_metadata(&metadata, &req_tenant_id);
115+
let tenant_id = req_tenant_id
116+
.as_deref()
117+
.unwrap_or(DEFAULT_TENANT)
118+
.to_owned();
119+
mut_roles()
120+
.entry(tenant_id.clone())
121+
.or_default()
122+
.remove(&name);
123+
124+
Ok(HttpResponse::Ok().finish())
125+
}

src/handlers/http/modal/ingest_server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ impl IngestServer {
185185
pub fn get_user_webscope() -> Scope {
186186
web::scope("/user")
187187
.service(
188-
web::resource("/{username}/sync")
189-
// POST /user/{username}/sync => Sync creation of a new user
188+
web::resource("/{userid}/sync")
189+
// POST /user/{userid}/sync => Sync creation of a new user
190190
.route(
191191
web::post()
192192
.to(ingestor_rbac::post_user)
@@ -225,8 +225,8 @@ impl IngestServer {
225225
),
226226
)
227227
.service(
228-
web::resource("/{username}/generate-new-password/sync")
229-
// POST /user/{username}/generate-new-password => reset password for this user
228+
web::resource("/{userid}/generate-new-password/sync")
229+
// POST /user/{userid}/generate-new-password => reset password for this user
230230
.route(
231231
web::post()
232232
.to(ingestor_rbac::post_gen_password)

src/handlers/http/modal/query/querier_rbac.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@ use crate::{
3939
validator,
4040
};
4141

42-
// Handler for POST /api/v1/user/{username}
42+
// Handler for POST /api/v1/user/{userid}
4343
// Creates a new user by username if it does not exists
4444
pub async fn post_user(
4545
req: HttpRequest,
46-
username: web::Path<String>,
46+
userid: web::Path<String>,
4747
body: Option<web::Json<serde_json::Value>>,
4848
) -> Result<impl Responder, RBACError> {
49-
let username = username.into_inner();
49+
let username = userid.into_inner();
5050
let tenant_id = get_tenant_id_from_request(&req);
5151
validator::user_role_name(&username)?;
5252
let mut metadata = get_metadata(&tenant_id).await?;
@@ -300,13 +300,13 @@ pub async fn remove_roles_from_user(
300300
Ok(HttpResponse::Ok().json(format!("Roles updated successfully for {username}")))
301301
}
302302

303-
// Handler for POST /api/v1/user/{username}/generate-new-password
303+
// Handler for POST /api/v1/user/{userid}/generate-new-password
304304
// Resets password for the user to a newly generated one and returns it
305305
pub async fn post_gen_password(
306306
req: HttpRequest,
307-
username: web::Path<String>,
307+
userid: web::Path<String>,
308308
) -> Result<impl Responder, RBACError> {
309-
let username = username.into_inner();
309+
let username = userid.into_inner();
310310
let mut new_password = String::default();
311311
let mut new_hash = String::default();
312312
let tenant_id = get_tenant_id_from_request(&req);

src/handlers/http/modal/query/querier_role.rs

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ use actix_web::{
2525

2626
use crate::{
2727
handlers::http::{
28-
cluster::sync_role_update,
28+
cluster::{sync_role_delete, sync_role_update},
2929
modal::utils::rbac_utils::{get_metadata, put_metadata},
3030
role::RoleError,
3131
},
3232
parseable::DEFAULT_TENANT,
3333
rbac::{
34-
map::{mut_roles, mut_sessions, read_user_groups, users},
34+
map::{mut_roles, mut_sessions, read_user_groups, roles, users},
3535
role::model::{Role, RoleType},
3636
},
3737
utils::get_tenant_id_from_request,
@@ -108,3 +108,70 @@ pub async fn put(
108108

109109
Ok(HttpResponse::Ok().finish())
110110
}
111+
112+
// Handler for DELETE /api/v1/role/{name}
113+
// Delete existing role
114+
pub async fn delete(
115+
req: HttpRequest,
116+
name: web::Path<String>,
117+
) -> Result<impl Responder, RoleError> {
118+
let name = name.into_inner();
119+
let tenant_id = get_tenant_id_from_request(&req);
120+
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
121+
if let Some(tenant_roles) = roles().get(tenant)
122+
&& let Some(role) = tenant_roles.get(&name)
123+
&& role.role_type().eq(&RoleType::Internal)
124+
{
125+
return Err(RoleError::ProtectedRole);
126+
}
127+
128+
// check if the role is being used by any user or group
129+
let mut metadata = get_metadata(&tenant_id).await?;
130+
if metadata.users.iter().any(|user| user.roles.contains(&name)) {
131+
return Err(RoleError::RoleInUse);
132+
}
133+
if metadata
134+
.user_groups
135+
.iter()
136+
.any(|user_group| user_group.roles.contains(&name))
137+
{
138+
return Err(RoleError::RoleInUse);
139+
}
140+
metadata.roles.remove(&name);
141+
put_metadata(&metadata, &tenant_id).await?;
142+
143+
mut_roles()
144+
.entry(tenant.to_owned())
145+
.or_default()
146+
.remove(&name);
147+
// mut_roles().remove(&name);
148+
149+
// refresh the sessions of all users using this role
150+
// for this, iterate over all user_groups and users and create a hashset of users
151+
let mut session_refresh_users: HashSet<String> = HashSet::new();
152+
if let Some(groups) = read_user_groups().get(tenant) {
153+
for user_group in groups.values() {
154+
if user_group.roles.contains(&name) {
155+
session_refresh_users
156+
.extend(user_group.users.iter().map(|u| u.userid().to_string()));
157+
}
158+
}
159+
}
160+
161+
// iterate over all users to see if they have this role
162+
if let Some(users) = users().get(tenant) {
163+
for user in users.values() {
164+
if user.roles.contains(&name) {
165+
session_refresh_users.insert(user.userid().to_string());
166+
}
167+
}
168+
}
169+
170+
for userid in session_refresh_users {
171+
mut_sessions().remove_user(&userid, tenant);
172+
}
173+
174+
sync_role_delete(&req, name.clone(), &tenant_id).await?;
175+
176+
Ok(HttpResponse::Ok().finish())
177+
}

src/handlers/http/modal/query_server.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,11 @@ impl QueryServer {
181181
// PUT, GET, DELETE Roles
182182
resource("/{name}")
183183
.route(web::put().to(querier_role::put).authorize(Action::PutRole))
184-
.route(web::delete().to(role::delete).authorize(Action::DeleteRole))
184+
.route(
185+
web::delete()
186+
.to(querier_role::delete)
187+
.authorize(Action::DeleteRole),
188+
)
185189
.route(web::get().to(role::get).authorize(Action::GetRole)),
186190
)
187191
}
@@ -195,8 +199,8 @@ impl QueryServer {
195199
.route(web::get().to(rbac::list_users).authorize(Action::ListUser)),
196200
)
197201
.service(
198-
web::resource("/{username}")
199-
// POST /user/{username} => Create a new user
202+
web::resource("/{userid}")
203+
// POST /user/{userid} => Create a new user
200204
.route(
201205
web::post()
202206
.to(querier_rbac::post_user)
@@ -222,7 +226,7 @@ impl QueryServer {
222226
// PATCH /user/{userid}/role/add => Add roles to a user
223227
.route(
224228
web::patch()
225-
.to(rbac::add_roles_to_user)
229+
.to(querier_rbac::add_roles_to_user)
226230
.authorize(Action::PutUserRoles)
227231
.wrap(DisAllowRootUser),
228232
),
@@ -232,14 +236,14 @@ impl QueryServer {
232236
// PATCH /user/{userid}/role/remove => Remove roles from a user
233237
.route(
234238
web::patch()
235-
.to(rbac::remove_roles_from_user)
239+
.to(querier_rbac::remove_roles_from_user)
236240
.authorize(Action::PutUserRoles)
237241
.wrap(DisAllowRootUser),
238242
),
239243
)
240244
.service(
241-
web::resource("/{username}/generate-new-password")
242-
// POST /user/{username}/generate-new-password => reset password for this user
245+
web::resource("/{userid}/generate-new-password")
246+
// POST /user/{userid}/generate-new-password => reset password for this user
243247
.route(
244248
web::post()
245249
.to(querier_rbac::post_gen_password)

0 commit comments

Comments
 (0)