-
Notifications
You must be signed in to change notification settings - Fork 220
[ISSUE#5545] Implement UpdateControllerConfig Request Handler #5682
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Implement RequestCode GetProducerConnectionList(204)
rather than ctx.channel()
|
🔊@Lori-Shu 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
WalkthroughAdds an async update API to ControllerConfig with internal locking and implements the UpdateControllerConfig request handler; switches many controller components, managers, examples, and tests from Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ControllerRequestProcessor
participant ControllerManager
participant ControllerConfig
Client->>ControllerRequestProcessor: UPDATE_CONTROLLER_CONFIG (body bytes)
ControllerRequestProcessor->>ControllerRequestProcessor: parse_properties_from_string(body)
ControllerRequestProcessor->>ControllerManager: controller_config()
ControllerManager->>ControllerConfig: return ArcMut<ControllerConfig>
ControllerRequestProcessor->>ControllerConfig: update(properties).await
ControllerConfig->>ControllerConfig: acquire write lock and apply typed changes
ControllerConfig-->>ControllerRequestProcessor: Ok / Err
ControllerRequestProcessor-->>Client: Response (Success / NoPermission / Error)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
45-58: Documentation example uses outdatedArc::newinstead ofArcMut::new.The inline doc example still references
Arc::new(ControllerConfig::test_config()), but the constructor now requiresArcMut<ControllerConfig>. This will cause confusion for developers referencing the documentation.📝 Proposed fix for the doc example
/// # Example /// /// ```no_run,ignore -/// use std::sync::Arc; +/// use rocketmq_rust::ArcMut; /// use rocketmq_controller::config::ControllerConfig; /// use rocketmq_controller::heartbeat::default_broker_heartbeat_manager::DefaultBrokerHeartbeatManager; /// use rocketmq_controller::controller::broker_heartbeat_manager::BrokerHeartbeatManager; /// /// # async fn example() { -/// let config = Arc::new(ControllerConfig::test_config()); +/// let config = ArcMut::new(ControllerConfig::test_config()); /// let mut manager = DefaultBrokerHeartbeatManager::new(config); /// manager.initialize(); /// manager.start(); /// // ... use manager ... /// manager.shutdown(); /// # } /// ```
🤖 Fix all issues with AI agents
In @rocketmq-common/src/common/controller/controller_config.rs:
- Around line 646-649: The "controllerStorePath" matcher mistakenly assigns to
self.config_store_path (PathBuf) and uses from_str::<PathBuf>, but the intended
field is self.controller_store_path which is a String; change the assignment to
set self.controller_store_path and parse the JSON value as a String (e.g.
serde_json::from_str::<String>(value).map_err(...)), removing the PathBuf
conversion and leaving self.config_store_path untouched.
- Around line 609-769: The update() method currently applies incoming properties
without blacklist checks; call self.validate_blacklist_config_exist(&properties)
at the start of update() (before acquiring the write lock) and if it indicates a
blacklist violation, return an Err(RocketMQError::Internal(...)) to reject the
update; this will prevent updates to blacklisted fields such as
"configBlackList" and "configStorePath" while keeping the rest of the existing
match-handling logic (refer to the update() method and
validate_blacklist_config_exist function).
🧹 Nitpick comments (4)
rocketmq-common/src/common/controller/controller_config.rs (2)
609-610: Unused lock guard variable creates potential confusion.The
lockvariable is acquired but never used explicitly within the scope. While this still provides synchronization (the lock is held untillockdrops at the end of scope), naming it_lockmakes the intent clearer and suppresses unused variable warnings.Suggested fix
pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { - let lock = self.rw_lock.write().await; + let _lock = self.rw_lock.write().await; for (key, value) in &properties {
224-252: Using.clone()onCopytypes is unnecessary.Fields like
scan_not_active_broker_interval(u64),controller_thread_pool_nums(usize),elect_master_max_retry_count(u32),metrics_prom_exporter_port(u16), and boolean fields areCopytypes. Calling.clone()on them works but is non-idiomatic—direct assignment is preferred.Example fix for a few fields
- scan_not_active_broker_interval: self.scan_not_active_broker_interval.clone(), - controller_thread_pool_nums: self.controller_thread_pool_nums.clone(), + scan_not_active_broker_interval: self.scan_not_active_broker_interval, + controller_thread_pool_nums: self.controller_thread_pool_nums,rocketmq-controller/src/metadata/replica.rs (1)
220-238: LGTM with a minor observation.The migration to
ArcMut<ControllerConfig>is consistent with the PR-wide changes. The constructor correctly stores the config reference.Note: The
configfield appears to be stored but not currently used in anyReplicasManagermethods. This is likely intentional for future use (e.g., checking configuration values during replica operations), but worth confirming.rocketmq-controller/src/processor/controller_request_processor.rs (1)
519-532: Remove unnecessaryasyncfrom synchronous helper function.
parse_properties_from_stringcontains no await points - all operations are synchronous. Theasyncmarker adds unnecessary overhead by generating a Future state machine.♻️ Suggested fix
- async fn parse_properties_from_string(body: &[u8]) -> RocketMQResult<HashMap<String, String>> { + fn parse_properties_from_string(body: &[u8]) -> RocketMQResult<HashMap<String, String>> { let content = String::from_utf8(body.to_vec()) .map_err(|e| RocketMQError::Internal(format!("parse property string err {}", e)))?; let mut properties = HashMap::new(); for line in content.lines() { if let Some((key, value)) = line.split_once('=') { properties.insert(key.trim().to_string(), value.trim().to_string()); } } Ok(properties) }And update the call site at line 491:
- Self::parse_properties_from_string(body).await? + Self::parse_properties_from_string(body)?
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (22)
rocketmq-common/src/common/controller/controller_config.rsrocketmq-controller/examples/single_node.rsrocketmq-controller/examples/three_node_cluster.rsrocketmq-controller/src/controller/controller_manager.rsrocketmq-controller/src/controller/open_raft_controller.rsrocketmq-controller/src/controller/raft_controller.rsrocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rsrocketmq-controller/src/manager/replicas_info_manager.rsrocketmq-controller/src/metadata/broker.rsrocketmq-controller/src/metadata/config.rsrocketmq-controller/src/metadata/mod.rsrocketmq-controller/src/metadata/replica.rsrocketmq-controller/src/metadata/topic.rsrocketmq-controller/src/openraft/node.rsrocketmq-controller/src/processor.rsrocketmq-controller/src/processor/controller_request_processor.rsrocketmq-controller/src/processor/metadata_processor.rsrocketmq-controller/tests/multi_node_cluster_test.rsrocketmq-controller/tests/openraft_integration_test.rsrocketmq-controller/tests/raft_controller_test.rsrocketmq-controller/tests/simple_cluster_test.rsrocketmq-controller/tests/snapshot_test.rs
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-05-10T06:20:00.401Z
Learnt from: 578223592
Repo: mxsm/rocketmq-rust PR: 3240
File: rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs:69-71
Timestamp: 2025-05-10T06:20:00.401Z
Learning: In RocketMQ Rust, while dashmap is concurrent-safe, returning a direct reference to a DashMap can break encapsulation by allowing external code to modify the map without triggering persistence methods like `persist()`. Consider returning a read-only view, iterator methods, or a snapshot instead.
Applied to files:
rocketmq-controller/tests/snapshot_test.rsrocketmq-controller/src/metadata/replica.rsrocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rsrocketmq-controller/src/metadata/topic.rsrocketmq-controller/src/metadata/broker.rsrocketmq-controller/src/metadata/config.rsrocketmq-common/src/common/controller/controller_config.rs
📚 Learning: 2026-01-09T14:29:51.054Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:51.054Z
Learning: In OpenRaftController and RaftRsController, when constructing ControllerRequest::ApplyBrokerId, set broker_addr to an empty string (String::new()) and rely on other operations (e.g., RegisterBroker) to populate the broker address. ApplyBrokerId should focus solely on broker ID allocation/reservation, mirroring the Java RocketMQ reference implementation.
Applied to files:
rocketmq-controller/src/controller/raft_controller.rsrocketmq-controller/src/controller/open_raft_controller.rsrocketmq-controller/src/controller/controller_manager.rsrocketmq-common/src/common/controller/controller_config.rs
📚 Learning: 2025-05-10T06:20:17.779Z
Learnt from: 578223592
Repo: mxsm/rocketmq-rust PR: 3240
File: rocketmq-namesrv/Cargo.toml:36-36
Timestamp: 2025-05-10T06:20:17.779Z
Learning: When using dashmap with types that derive Serialize/Deserialize, the "serde" feature needs to be enabled: dashmap = { version = "x.y.z", features = ["serde"] }
Applied to files:
rocketmq-controller/src/metadata/replica.rsrocketmq-controller/src/metadata/topic.rsrocketmq-controller/src/metadata/broker.rsrocketmq-controller/src/metadata/config.rs
📚 Learning: 2026-01-09T14:29:58.048Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:58.048Z
Learning: In the ApplyBrokerId operation across both OpenRaftController and RaftRsController implementations, broker_addr is intentionally set to String::new() (empty string) when constructing ControllerRequest::ApplyBrokerId. This matches the design of the Java RocketMQ reference implementation. The broker address is populated through other operations (e.g., RegisterBroker), as ApplyBrokerId focuses solely on broker ID allocation/reservation.
Applied to files:
rocketmq-controller/tests/multi_node_cluster_test.rsrocketmq-controller/tests/raft_controller_test.rs
🧬 Code graph analysis (13)
rocketmq-controller/src/processor/metadata_processor.rs (6)
rocketmq-controller/src/controller/controller_manager.rs (2)
config(575-577)new(139-211)rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)rocketmq-controller/src/metadata/config.rs (1)
new(52-57)rocketmq-controller/src/metadata/mod.rs (1)
new(65-79)rocketmq-controller/src/metadata/replica.rs (2)
new(162-174)new(232-238)rocketmq-controller/src/metadata/topic.rs (1)
new(92-97)
rocketmq-controller/tests/openraft_integration_test.rs (6)
rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)rocketmq-controller/src/metadata/config.rs (1)
new(52-57)rocketmq-controller/src/metadata/mod.rs (1)
new(65-79)rocketmq-controller/src/metadata/replica.rs (2)
new(162-174)new(232-238)rocketmq-controller/src/metadata/topic.rs (1)
new(92-97)rocketmq-controller/src/openraft/node.rs (1)
new(55-103)
rocketmq-controller/src/openraft/node.rs (11)
rocketmq-controller/src/controller/controller_manager.rs (2)
new(139-211)config(575-577)rocketmq-controller/src/controller/open_raft_controller.rs (2)
new(73-83)config(362-370)rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
new(84-92)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(108-114)rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)rocketmq-controller/src/metadata/config.rs (1)
new(52-57)rocketmq-controller/src/metadata/mod.rs (1)
new(65-79)rocketmq-controller/src/metadata/replica.rs (2)
new(162-174)new(232-238)rocketmq-controller/src/metadata/topic.rs (1)
new(92-97)rocketmq-controller/src/openraft/state_machine.rs (1)
new(98-106)rocketmq-controller/src/openraft/log_store.rs (1)
new(62-69)
rocketmq-controller/src/controller/raft_controller.rs (2)
rocketmq-controller/src/controller/controller_manager.rs (1)
config(575-577)rocketmq-controller/src/controller/open_raft_controller.rs (1)
config(362-370)
rocketmq-controller/src/metadata/mod.rs (7)
rocketmq-controller/src/controller/controller_manager.rs (2)
new(139-211)config(575-577)rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)rocketmq-controller/src/metadata/config.rs (1)
new(52-57)rocketmq-controller/src/metadata/replica.rs (2)
new(162-174)new(232-238)rocketmq-controller/src/metadata/topic.rs (1)
new(92-97)rocketmq-controller/src/raft/mod.rs (1)
new(90-123)rocketmq-remoting/src/codec/remoting_command_codec.rs (2)
new(55-57)new(136-141)
rocketmq-controller/src/processor/controller_request_processor.rs (2)
rocketmq-error/src/unified.rs (1)
request_body_invalid(462-467)rocketmq-remoting/src/protocol/remoting_command.rs (2)
create_response_command_with_code_remark(187-192)create_response_command(194-198)
rocketmq-controller/src/metadata/topic.rs (6)
rocketmq-controller/src/controller/controller_manager.rs (2)
new(139-211)config(575-577)rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
new(84-92)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(108-114)rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)rocketmq-controller/src/metadata/config.rs (1)
new(52-57)rocketmq-controller/src/metadata/mod.rs (1)
new(65-79)
rocketmq-controller/src/manager/replicas_info_manager.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (2)
new(139-211)config(575-577)rocketmq-controller/src/controller/open_raft_controller.rs (2)
new(73-83)config(362-370)rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
new(84-92)rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)
rocketmq-controller/src/controller/open_raft_controller.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (2)
new(139-211)config(575-577)rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
new(84-92)rocketmq-controller/src/processor/controller_request_processor.rs (1)
new(148-157)rocketmq-controller/src/processor/topic_processor.rs (3)
new(46-48)new(126-128)new(197-199)
rocketmq-controller/src/controller/controller_manager.rs (7)
rocketmq-controller/src/controller/open_raft_controller.rs (2)
config(362-370)new(73-83)rocketmq-common/src/common/controller/controller_config.rs (1)
new(314-316)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(108-114)rocketmq-controller/src/processor.rs (2)
new(79-127)new(192-196)rocketmq-controller/src/processor/controller_request_processor.rs (1)
new(148-157)rocketmq-controller/src/controller.rs (1)
new(511-513)rocketmq-controller/src/controller/raft_controller.rs (1)
new_open_raft(51-53)
rocketmq-controller/src/processor.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (4)
new(139-211)config(575-577)raft(548-550)metadata(557-559)rocketmq-controller/src/controller/open_raft_controller.rs (2)
new(73-83)config(362-370)rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
new(84-92)rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)
rocketmq-controller/tests/raft_controller_test.rs (5)
rocketmq-controller/src/controller/controller_manager.rs (2)
config(575-577)new(139-211)rocketmq-controller/src/controller/open_raft_controller.rs (2)
config(362-370)new(73-83)rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
new(84-92)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(108-114)rocketmq-controller/src/controller.rs (1)
new(511-513)
rocketmq-common/src/common/controller/controller_config.rs (10)
rocketmq-controller/src/controller/controller_manager.rs (1)
new(139-211)rocketmq-controller/src/controller/open_raft_controller.rs (1)
new(73-83)rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
new(84-92)rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)rocketmq-controller/src/metadata/config.rs (1)
new(52-57)rocketmq-controller/src/metadata/mod.rs (1)
new(65-79)rocketmq-controller/src/metadata/replica.rs (2)
new(162-174)new(232-238)rocketmq-controller/src/metadata/topic.rs (1)
new(92-97)rocketmq-controller/src/openraft/node.rs (1)
new(55-103)rocketmq-controller/src/processor.rs (2)
new(79-127)new(192-196)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Check (fmt + clippy)
- GitHub Check: auto-approve
🔇 Additional comments (29)
rocketmq-controller/src/processor/metadata_processor.rs (1)
90-97: LGTM!The test correctly adopts
ArcMut::new(...)for config initialization, aligning with the codebase-wide migration to support mutable configuration updates.rocketmq-controller/tests/openraft_integration_test.rs (1)
23-33: LGTM!The integration tests correctly adopt
ArcMutfor configuration wrapping. The pattern is consistent across all test cases in this file.rocketmq-controller/examples/single_node.rs (1)
31-49: LGTM!The example correctly demonstrates the updated
ArcMutpattern for configuration. This serves as good documentation for users of the API.rocketmq-controller/tests/snapshot_test.rs (1)
24-34: LGTM!The snapshot test correctly adopts
ArcMutfor configuration wrapping, consistent with the codebase-wide migration.rocketmq-controller/tests/raft_controller_test.rs (1)
21-26: LGTM!The test correctly uses
ArcMutfor the OpenRaft controller configuration while maintaining the existing RaftRs tests that use runtime instead of config.rocketmq-controller/tests/simple_cluster_test.rs (1)
27-63: LGTM!The test correctly wraps the configuration with
ArcMut::new()before passing toRaftNodeManager::new(). The outerArc::new()wrapping the manager itself is appropriate for sharing the node across async tasks.rocketmq-controller/src/controller/raft_controller.rs (1)
51-53: LGTM!The signature change to accept
ArcMut<ControllerConfig>aligns with the broader migration enabling runtime configuration updates. The config is correctly forwarded toOpenRaftController::new.rocketmq-controller/src/metadata/mod.rs (1)
31-31: LGTM!The
ArcMutimport and signature change forMetadataStore::neware consistent with the repository-wide migration. CloningArcMutcorrectly shares the mutable config reference across all sub-managers (BrokerManager,TopicManager,ConfigManager,ReplicasManager), enabling coordinated runtime configuration updates.Also applies to: 65-71
rocketmq-controller/tests/multi_node_cluster_test.rs (1)
28-28: LGTM!Test correctly updated to use
ArcMut::new(config)to match the updatedRaftNodeManager::newsignature. The test logic remains sound.Also applies to: 58-58
rocketmq-controller/examples/three_node_cluster.rs (1)
39-39: LGTM!The example correctly demonstrates the updated API usage. The
ArcMutwrapper is applied after the builder chain completes, which is the appropriate pattern. This serves as good documentation for users adopting the new configuration approach.Also applies to: 83-92
rocketmq-controller/src/metadata/replica.rs (1)
521-524: LGTM!Test correctly updated to use
ArcMut::new()for config instantiation, matching the newReplicasManager::newsignature.rocketmq-controller/src/openraft/node.rs (1)
24-24: LGTM! Consistent migration toArcMut<ControllerConfig>.The signature change aligns with the broader codebase migration to enable runtime configuration updates. The config values are correctly extracted during initialization, and the
ArcMutwrapper is appropriately passed through without being stored (sinceRaftNodeManageronly needs the config during construction).Also applies to: 55-56
rocketmq-controller/src/processor.rs (1)
63-79: LGTM! Field and constructor correctly updated.The migration to
ArcMut<ControllerConfig>is consistent with the broader codebase changes. Theconfigfield is stored for potential future use (e.g., accessing configuration in processor methods), and the constructor signature aligns with the pattern established inControllerManager::new()whereconfig.clone()is passed.rocketmq-controller/src/metadata/config.rs (1)
18-18: LGTM! Consistent migration with proper encapsulation.The
ArcMut<ControllerConfig>change aligns with the codebase-wide migration. The#[allow(dead_code)]annotation correctly indicates the field is reserved for future use. TheDashMapis properly encapsulated behind accessor methods (set_config,get_config, etc.), which is good practice per previous learnings about avoiding direct DashMap exposure.Also applies to: 46-57
rocketmq-controller/src/controller/open_raft_controller.rs (2)
46-46: LGTM! Correct migration toArcMut<ControllerConfig>.The field and constructor are properly updated. The
config.clone()calls correctly propagate theArcMutwrapper toReplicasInfoManagerandRaftNodeManager. The config is appropriately accessed throughout the controller (e.g.,self.config.listen_addr,self.config.raft_peers), and the interior mutability pattern is correctly applied.Also applies to: 60-83
87-91: Startup correctly propagatesArcMutconfig.The
self.config.clone()properly creates a shared reference to the mutable config wrapper, allowingRaftNodeManagerto access the same configuration instance that can be updated at runtime.rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (2)
60-92: LGTM! Field and constructor correctly updated.The
ArcMut<ControllerConfig>migration is consistent with the codebase-wide changes. TheDashMapforbroker_live_tableremains properly encapsulated behind accessor methods, aligning with best practices for maintaining persistence/modification control.
403-408: Test correctly updated to useArcMut.The test initialization aligns with the new constructor signature.
rocketmq-controller/src/metadata/topic.rs (2)
18-18: LGTM - Consistent ArcMut migration.The import addition, field type change, and constructor signature update are consistent with the PR-wide migration from
Arc<ControllerConfig>toArcMut<ControllerConfig>. The pattern matches other manager implementations in the codebase (e.g.,BrokerManager,ConfigManager).Also applies to: 87-87, 92-96
231-231: Test correctly updated for ArcMut usage.rocketmq-controller/src/metadata/broker.rs (2)
21-21: LGTM - Consistent ArcMut migration for BrokerManager.The type migration is consistent with other metadata managers. The
configfield enables future use of dynamic configuration values (e.g., configurableheartbeat_timeout).Also applies to: 74-74, 82-87
215-215: Test correctly updated.rocketmq-controller/src/processor/controller_request_processor.rs (2)
85-85: LGTM - Import addition for HashMap.
485-517: Implementation correctly handles UpdateControllerConfig request.The handler follows the PR objectives:
- Parses request body as key=value properties
- Validates against blacklist with
NoPermissionresponse for violations- Applies updates via
controller_config.update(properties).await- Returns appropriate error responses for empty/invalid body
rocketmq-controller/src/manager/replicas_info_manager.rs (2)
57-57: LGTM - ArcMut migration enables runtime config updates.The migration to
ArcMut<ControllerConfig>is particularly relevant here sinceReplicasInfoManagerreadsconfig.enable_elect_unclean_master(line 314) during election. TheArcMutwrapper allows this value to be updated at runtime via the newUpdateControllerConfighandler.Also applies to: 95-95, 108-113
1010-1010: Test correctly updated.rocketmq-controller/src/controller/controller_manager.rs (3)
87-87: LGTM - Core ArcMut migration in ControllerManager.The field type change and constructor wrapping establish
ControllerManageras the owner of the shared mutable configuration. TheArcMut::new(config)at line 140 is the single point of ownership establishment, with clones distributed to child components.Also applies to: 140-140
150-150: Correct layered ArcMut usage.The nested wrapping is intentional and correct:
- Inner
config.clone()shares theArcMut<ControllerConfig>withRaftController- Outer
ArcMut::new(...)wraps theRaftControllerfor shared mutable access byControllerManager
575-577: Accessor pattern supports both mutable and read-only access.The
config()method returnsArcMut<ControllerConfig>enabling mutable access (used byhandle_update_controller_config), whilecontroller_config()(line 596) returns&ControllerConfigfor read-only access. This dual pattern appropriately separates mutation and read concerns.
| pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { | ||
| let lock = self.rw_lock.write().await; | ||
| for (key, value) in &properties { | ||
| match key.as_str() { | ||
| "rocketmqHome" => { | ||
| self.rocketmq_home = value.clone(); | ||
| } | ||
|
|
||
| "configStorePath" => { | ||
| self.config_store_path = | ||
| serde_json::from_str::<PathBuf>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "controllerType" => { | ||
| self.controller_type = value.clone(); | ||
| } | ||
|
|
||
| "scanNotActiveBrokerInterval" => { | ||
| self.scan_not_active_broker_interval = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "controllerThreadPoolNums" => { | ||
| self.controller_thread_pool_nums = | ||
| serde_json::from_str::<usize>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "controllerRequestThreadPoolQueueCapacity" => { | ||
| self.controller_request_thread_pool_queue_capacity = | ||
| serde_json::from_str::<usize>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "mappedFileSize" => { | ||
| self.mapped_file_size = | ||
| serde_json::from_str::<usize>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "controllerStorePath" => { | ||
| self.config_store_path = | ||
| serde_json::from_str::<PathBuf>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "electMasterMaxRetryCount" => { | ||
| self.elect_master_max_retry_count = | ||
| serde_json::from_str::<u32>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "enableElectUncleanMaster" => { | ||
| self.enable_elect_unclean_master = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "isProcessReadEvent" => { | ||
| self.is_process_read_event = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "notifyBrokerRoleChanged" => { | ||
| self.notify_broker_role_changed = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "scanInactiveMasterInterval" => { | ||
| self.scan_inactive_master_interval = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricsExporterType" => { | ||
| self.metrics_exporter_type = serde_json::from_str::<MetricsExporterType>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricsGrpcExporterTarget" => { | ||
| self.metrics_grpc_exporter_target = value.clone(); | ||
| } | ||
|
|
||
| "metricsGrpcExporterHeader" => { | ||
| self.metrics_grpc_exporter_header = value.clone(); | ||
| } | ||
|
|
||
| "metricGrpcExporterTimeOutInMills" => { | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricGrpcExporterIntervalInMills" => { | ||
| self.metric_grpc_exporter_interval_in_mills = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricLoggingExporterIntervalInMills" => { | ||
| self.metric_logging_exporter_interval_in_mills = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricsPromExporterPort" => { | ||
| self.metrics_prom_exporter_port = | ||
| serde_json::from_str::<u16>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricsPromExporterHost" => { | ||
| self.metrics_prom_exporter_host = value.clone(); | ||
| } | ||
|
|
||
| "metricsLabel" => { | ||
| self.metrics_label = value.clone(); | ||
| } | ||
|
|
||
| "metricsInDelta" => { | ||
| self.metrics_in_delta = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "configBlackList" => { | ||
| self.config_black_list = value.clone(); | ||
| } | ||
|
|
||
| "nodeId" => { | ||
| self.node_id = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "listenAddr" => { | ||
| self.listen_addr = serde_json::from_str::<SocketAddr>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "raftPeers" => { | ||
| self.raft_peers = serde_json::from_str::<Vec<RaftPeer>>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "electionTimeoutMs" => { | ||
| self.election_timeout_ms = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "heartbeatIntervalMs" => { | ||
| self.heartbeat_interval_ms = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "storagePath" => { | ||
| self.storage_path = value.clone(); | ||
| } | ||
|
|
||
| "storageBackend" => { | ||
| self.storage_backend = serde_json::from_str::<StorageBackendType>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "enableElectUncleanMasterLocal" => { | ||
| self.enable_elect_unclean_master_local = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
| _ => { | ||
| return Err(RocketMQError::Internal(format!("found unknown property: {}", key))); | ||
| } | ||
| } | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blacklist validation is missing in update() method.
Per the PR objectives, incoming properties should be validated against the blacklist before applying updates: "Validate incoming properties against a blacklist using self.validate_blacklist_config_exist(&properties); reject requests containing blacklisted keys." Currently, keys like configBlackList and configStorePath (lines 721-723, 617-620) can be updated despite being in the default blacklist.
Consider adding validation at the start of the method:
Proposed fix
pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> {
+ // Validate blacklist before acquiring lock
+ for key in properties.keys() {
+ if self.is_config_in_blacklist(key) {
+ return Err(RocketMQError::Internal(format!(
+ "Configuration key '{}' is blacklisted and cannot be updated at runtime",
+ key
+ )));
+ }
+ }
let _lock = self.rw_lock.write().await;
for (key, value) in &properties {🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 609
- 769, The update() method currently applies incoming properties without
blacklist checks; call self.validate_blacklist_config_exist(&properties) at the
start of update() (before acquiring the write lock) and if it indicates a
blacklist violation, return an Err(RocketMQError::Internal(...)) to reject the
update; this will prevent updates to blacklisted fields such as
"configBlackList" and "configStorePath" while keeping the rest of the existing
match-handling logic (refer to the update() method and
validate_blacklist_config_exist function).
| "scanNotActiveBrokerInterval" => { | ||
| self.scan_not_active_broker_interval = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
serde_json::from_str boolean string parsing "true"
💡 Result:
Short answer: serde_json::from_str will not convert a JSON string "true" into a Rust bool — it expects a JSON true/false literal. To accept "true" (a string), add a custom deserializer or use a helper crate.
Options:
- Custom deserialize_with (standard serde) — parse the string to bool:
use serde::{Deserialize, Deserializer, de};
use std::str::FromStr;
#[derive(Deserialize)]
struct S {
#[serde(deserialize_with = "de_from_str")]
success: bool,
}
fn de_from_str<'de, D>(deserializer: D) -> Result<bool, D::Error>
where D: Deserializer<'de>
{
let s = String::deserialize(deserializer)?;
bool::from_str(&s).map_err(de::Error::custom)
}
Example & discussion. [1]
- Use helper crates that provide adapters:
- serde_aux::deserialize_bool_from_anything — accepts string/number/bool. [2]
- serde_this_or_that or serde_with provide adapters like as_bool / DeserializeFromStr / BoolFromInt for flexible parsing. [3][4]
If you want, tell me which approach (hand-rolled, serde_aux, serde_with) you prefer and I’ll give a complete example.
Sources: [1] Rust forum example (dtolnay) demonstrating deserialize_with. [2] serde_aux deserialize_bool_from_anything. [3] serde_this_or_that / serde_with docs for as_bool / DeserializeFromStr.
🏁 Script executed:
# Search for actual property value examples in the codebase
find . -name "*.rs" -type f | head -20 | xargs grep -l "scanNotActiveBrokerInterval\|enableElectUncleanMaster" 2>/dev/null | head -5Repository: mxsm/rocketmq-rust
Length of output: 44
🏁 Script executed:
# Look for property file formats or test data
find . -name "*.properties" -o -name "*.json" -o -name "*.toml" | head -10Repository: mxsm/rocketmq-rust
Length of output: 587
🏁 Script executed:
# Check the full context of the update method to see how properties are loaded and used
cat -n rocketmq-common/src/common/controller/controller_config.rs | sed -n '600,700p'Repository: mxsm/rocketmq-rust
Length of output: 5179
Use value.parse::<T>() instead of serde_json::from_str for parsing property values.
Property files deliver raw string values (e.g., "true", "5000") which serde_json::from_str cannot parse for booleans or PathBuf. The parse() method via Rust's FromStr trait correctly handles these formats.
This affects multiple lines in the update() method:
- All boolean fields (lines 658, 663, 668):
serde_json::from_str::<bool>(value)will fail - Numeric fields work only coincidentally since JSON allows unquoted numbers
PathBuffields (lines 619, 648) need proper parsing
Replace all serde_json::from_str::<T>(value) with value.parse::<T>().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In @rocketmq-common/src/common/controller/controller_config.rs:
- Around line 611-767: The update loop mutates self incrementally so a parse
error mid-iteration leaves the config partially applied; change update to first
parse/validate all incoming properties into a temporary structure (e.g., a
HashMap of parsed values or a cloned ControllerConfig), then if all parses
succeed, acquire the write lock (rw_lock) and apply/swap the validated values in
one atomic step; implement helper functions like validate_property(key, value)
that run the serde_json::from_str calls currently inline (or build a temp
ControllerConfig by running the same match logic), and apply_property(key,
parsed_value) (or replace self with the temp cloned config) so no
RocketMQError::Internal from a parsing failure can leave self in an inconsistent
state.
- Around line 617-619: The parsing of properties like config_store_path
(serde_json::from_str::<PathBuf>(value)) and other fields (e.g., SocketAddr,
numeric u64s, booleans around lines 730-733) expects JSON-quoted values but
to_properties_string emits plain text, breaking round-trip; replace
serde_json::from_str for PathBuf and SocketAddr with native parsing (e.g.,
PathBuf::from(value) and value.parse::<SocketAddr>()), use value.parse::<u64>()
for numeric fields and value.parse::<bool>() or explicit string match for
booleans, and update the parsing logic in the same parser function to handle
plain-text properties consistently with to_properties_string.
- Around line 609-610: The update method currently takes &mut self and also
acquires self.rw_lock.write(), which is redundant; either change the signature
to pub async fn update(&self, properties: HashMap<String, String>) and keep the
RwLock for interior-mutability of fields (so the lock provides synchronization),
or remove the RwLock entirely and rely on the outer ArcMut/Mutex wrapper and
keep the &mut self signature; also if you keep the lock but don’t otherwise use
the guard variable, rename it to _lock (or prefix with _) to suppress
unused-variable warnings.
- Around line 767-769: The update() method applies new configuration values but
never runs validate(), so invalid settings like controller_thread_pool_nums <= 0
can be accepted; modify the update() implementation in controller_config.rs to
call self.validate() (or ControllerConfig::validate(&self)) after applying
updates and before returning Ok(()), and propagate any validation errors (return
Err or use ?), ensuring invalid updates are rejected at runtime.
🧹 Nitpick comments (2)
rocketmq-common/src/common/controller/controller_config.rs (2)
224-253: Remove unnecessary.clone()calls onCopytypes.Types like
u64,usize,u32,bool, andu16implementCopy, so.clone()is redundant. Direct assignment is cleaner.♻️ Suggested simplification (partial example)
- scan_not_active_broker_interval: self.scan_not_active_broker_interval.clone(), - controller_thread_pool_nums: self.controller_thread_pool_nums.clone(), - controller_request_thread_pool_queue_capacity: self.controller_request_thread_pool_queue_capacity.clone(), - mapped_file_size: self.mapped_file_size.clone(), + scan_not_active_broker_interval: self.scan_not_active_broker_interval, + controller_thread_pool_nums: self.controller_thread_pool_nums, + controller_request_thread_pool_queue_capacity: self.controller_request_thread_pool_queue_capacity, + mapped_file_size: self.mapped_file_size,
772-908: Missing test coverage for the newupdate()method.The new async
update()method lacks test coverage. Consider adding tests for:
- Successful property updates
- Parse error handling (e.g., invalid numeric value)
- Unknown key rejection
- Round-trip consistency with
to_properties_string()💚 Example test skeleton
#[tokio::test] async fn test_update_success() { let mut config = ControllerConfig::default(); let mut props = HashMap::new(); props.insert("controllerThreadPoolNums".to_string(), "32".to_string()); assert!(config.update(props).await.is_ok()); assert_eq!(config.controller_thread_pool_nums, 32); } #[tokio::test] async fn test_update_invalid_value() { let mut config = ControllerConfig::default(); let mut props = HashMap::new(); props.insert("controllerThreadPoolNums".to_string(), "not_a_number".to_string()); assert!(config.update(props).await.is_err()); } #[tokio::test] async fn test_update_unknown_key() { let mut config = ControllerConfig::default(); let mut props = HashMap::new(); props.insert("unknownKey".to_string(), "value".to_string()); assert!(config.update(props).await.is_err()); }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-common/src/common/controller/controller_config.rsrocketmq-controller/src/controller/open_raft_controller.rs
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2026-01-09T14:29:51.054Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:51.054Z
Learning: In OpenRaftController and RaftRsController, when constructing ControllerRequest::ApplyBrokerId, set broker_addr to an empty string (String::new()) and rely on other operations (e.g., RegisterBroker) to populate the broker address. ApplyBrokerId should focus solely on broker ID allocation/reservation, mirroring the Java RocketMQ reference implementation.
Applied to files:
rocketmq-controller/src/controller/open_raft_controller.rsrocketmq-common/src/common/controller/controller_config.rs
📚 Learning: 2024-12-01T10:26:22.233Z
Learnt from: PanGan21
Repo: mxsm/rocketmq-rust PR: 1485
File: rocketmq-common/src/common/namesrv/namesrv_config.rs:151-245
Timestamp: 2024-12-01T10:26:22.233Z
Learning: In `NamesrvConfig`, replacing `serde(alias = "...")` with `serde(rename = "...")` could cause breaking changes, so manual serialization is preferred to avoid disrupting existing functionality.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
📚 Learning: 2025-05-10T06:20:00.401Z
Learnt from: 578223592
Repo: mxsm/rocketmq-rust PR: 3240
File: rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs:69-71
Timestamp: 2025-05-10T06:20:00.401Z
Learning: In RocketMQ Rust, while dashmap is concurrent-safe, returning a direct reference to a DashMap can break encapsulation by allowing external code to modify the map without triggering persistence methods like `persist()`. Consider returning a read-only view, iterator methods, or a snapshot instead.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
🧬 Code graph analysis (1)
rocketmq-common/src/common/controller/controller_config.rs (10)
rocketmq-controller/src/controller/open_raft_controller.rs (1)
new(72-82)rocketmq-controller/src/heartbeat/default_broker_heartbeat_manager.rs (1)
new(84-92)rocketmq-controller/src/controller/controller_manager.rs (1)
new(139-211)rocketmq-controller/src/metadata/config.rs (1)
new(52-57)rocketmq-controller/src/metadata/broker.rs (1)
new(82-88)rocketmq-controller/src/metadata/topic.rs (1)
new(92-97)rocketmq-controller/src/processor.rs (2)
new(79-127)new(192-196)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(108-114)rocketmq-controller/src/processor/metadata_processor.rs (1)
new(36-38)rocketmq-controller/src/processor/controller_request_processor.rs (1)
new(148-157)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Check (fmt + clippy)
- GitHub Check: auto-approve
🔇 Additional comments (4)
rocketmq-controller/src/controller/open_raft_controller.rs (1)
45-45: LGTM! Migration toArcMut<ControllerConfig>is consistent with the codebase.The changes correctly enable shared mutable access to
ControllerConfigacross controller components, aligning with the PR objective to support runtime configuration updates. The usage pattern matches other modules likecontroller_manager.rsandprocessor.rs.One consideration for the update implementation: methods like
get_controller_metadata()(lines 361-393) perform multiple reads fromconfig(e.g.,raft_peers,node_id). If config updates can occur between reads, ensure the update semantics provide sufficient consistency guarantees, or consider snapshotting the config at method entry if strict consistency is required.Also applies to: 60-60, 72-72, 89-89
rocketmq-common/src/common/controller/controller_config.rs (3)
215-217: LGTM for therw_lockfield addition.Using
#[serde(skip)]is appropriate sinceRwLockdoesn't implementSerialize/Deserialize, and reinitializing it inCloneandDefaultis the correct approach.
763-765: Strict unknown-key handling is appropriate for security.Rejecting unknown keys prevents typos from silently failing and is a reasonable security posture. However, the PR objectives mention "handle unknown keys gracefully" - verify this strict behavior aligns with the intended design. If logging/ignoring is preferred, adjust accordingly.
721-723: The concern raised in this review comment has already been addressed by the implementation. No action required.The validation mechanism prevents
configBlackListfrom being updated:
configBlackListis in the default blacklist:ControllerRequestProcessor.init_config_blacklist()explicitly inserts"configBlackList"at initialization (line 171), and also loads additional keys from the configuration (lines 177-181).Validation occurs before
update(): Inhandle_update_config()(lines 505-514),validate_blacklist_config_exist()is called beforeupdate()is invoked.Blacklisted keys are rejected: If any incoming property key exists in the blacklist (including
configBlackList), the request returns aNoPermissionerror without proceeding toupdate().The defense-in-depth concern mentioned in the review is already implemented through the request processor's validation layer, making the
update()method inherently safe when called through the normal request handling flow.
| pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { | ||
| let lock = self.rw_lock.write().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The RwLock provides no synchronization benefit with &mut self.
Taking &mut self already guarantees exclusive access through Rust's borrow checker. The internal RwLock is redundant here and adds overhead without enabling concurrent access.
If the intent is to allow concurrent updates (via ArcMut<ControllerConfig> as shown in the relevant snippets), consider:
- Changing the signature to
pub async fn update(&self, ...)and wrapping each field in the lock, or - Removing the
RwLockentirely sinceArcMut(likely a mutex wrapper) already provides synchronization.
Also, prefix the lock variable with _ to suppress the unused variable warning:
🔧 Suppress unused variable warning
- let lock = self.rw_lock.write().await;
+ let _lock = self.rw_lock.write().await;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { | |
| let lock = self.rw_lock.write().await; | |
| pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { | |
| let _lock = self.rw_lock.write().await; |
🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 609
- 610, The update method currently takes &mut self and also acquires
self.rw_lock.write(), which is redundant; either change the signature to pub
async fn update(&self, properties: HashMap<String, String>) and keep the RwLock
for interior-mutability of fields (so the lock provides synchronization), or
remove the RwLock entirely and rely on the outer ArcMut/Mutex wrapper and keep
the &mut self signature; also if you keep the lock but don’t otherwise use the
guard variable, rename it to _lock (or prefix with _) to suppress
unused-variable warnings.
| for (key, value) in &properties { | ||
| match key.as_str() { | ||
| "rocketmqHome" => { | ||
| self.rocketmq_home = value.clone(); | ||
| } | ||
|
|
||
| "configStorePath" => { | ||
| self.config_store_path = | ||
| serde_json::from_str::<PathBuf>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "controllerType" => { | ||
| self.controller_type = value.clone(); | ||
| } | ||
|
|
||
| "scanNotActiveBrokerInterval" => { | ||
| self.scan_not_active_broker_interval = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "controllerThreadPoolNums" => { | ||
| self.controller_thread_pool_nums = | ||
| serde_json::from_str::<usize>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "controllerRequestThreadPoolQueueCapacity" => { | ||
| self.controller_request_thread_pool_queue_capacity = | ||
| serde_json::from_str::<usize>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "mappedFileSize" => { | ||
| self.mapped_file_size = | ||
| serde_json::from_str::<usize>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "controllerStorePath" => { | ||
| self.controller_store_path = value.clone(); | ||
| } | ||
|
|
||
| "electMasterMaxRetryCount" => { | ||
| self.elect_master_max_retry_count = | ||
| serde_json::from_str::<u32>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "enableElectUncleanMaster" => { | ||
| self.enable_elect_unclean_master = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "isProcessReadEvent" => { | ||
| self.is_process_read_event = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "notifyBrokerRoleChanged" => { | ||
| self.notify_broker_role_changed = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "scanInactiveMasterInterval" => { | ||
| self.scan_inactive_master_interval = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricsExporterType" => { | ||
| self.metrics_exporter_type = serde_json::from_str::<MetricsExporterType>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricsGrpcExporterTarget" => { | ||
| self.metrics_grpc_exporter_target = value.clone(); | ||
| } | ||
|
|
||
| "metricsGrpcExporterHeader" => { | ||
| self.metrics_grpc_exporter_header = value.clone(); | ||
| } | ||
|
|
||
| "metricGrpcExporterTimeOutInMills" => { | ||
| self.metric_grpc_exporter_time_out_in_mills = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricGrpcExporterIntervalInMills" => { | ||
| self.metric_grpc_exporter_interval_in_mills = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricLoggingExporterIntervalInMills" => { | ||
| self.metric_logging_exporter_interval_in_mills = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricsPromExporterPort" => { | ||
| self.metrics_prom_exporter_port = | ||
| serde_json::from_str::<u16>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "metricsPromExporterHost" => { | ||
| self.metrics_prom_exporter_host = value.clone(); | ||
| } | ||
|
|
||
| "metricsLabel" => { | ||
| self.metrics_label = value.clone(); | ||
| } | ||
|
|
||
| "metricsInDelta" => { | ||
| self.metrics_in_delta = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "configBlackList" => { | ||
| self.config_black_list = value.clone(); | ||
| } | ||
|
|
||
| "nodeId" => { | ||
| self.node_id = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "listenAddr" => { | ||
| self.listen_addr = serde_json::from_str::<SocketAddr>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "raftPeers" => { | ||
| self.raft_peers = serde_json::from_str::<Vec<RaftPeer>>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "electionTimeoutMs" => { | ||
| self.election_timeout_ms = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "heartbeatIntervalMs" => { | ||
| self.heartbeat_interval_ms = | ||
| serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "storagePath" => { | ||
| self.storage_path = value.clone(); | ||
| } | ||
|
|
||
| "storageBackend" => { | ||
| self.storage_backend = serde_json::from_str::<StorageBackendType>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "enableElectUncleanMasterLocal" => { | ||
| self.enable_elect_unclean_master_local = | ||
| serde_json::from_str::<bool>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
| _ => { | ||
| return Err(RocketMQError::Internal(format!("found unknown property: {}", key))); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial updates on parse failure leave config in an inconsistent state.
If a property fails to parse midway through iteration (e.g., the 3rd of 5 properties), the first 2 properties are already applied. This violates atomicity and could leave the configuration partially updated.
Consider validating/parsing all properties first, then applying them:
🔧 Suggested approach for atomic updates
pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> {
let _lock = self.rw_lock.write().await;
// Phase 1: Validate all properties first
for (key, value) in &properties {
self.validate_property(key, value)?;
}
// Phase 2: Apply all properties (now safe, all validated)
for (key, value) in &properties {
self.apply_property(key, value);
}
Ok(())
}🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 611
- 767, The update loop mutates self incrementally so a parse error mid-iteration
leaves the config partially applied; change update to first parse/validate all
incoming properties into a temporary structure (e.g., a HashMap of parsed values
or a cloned ControllerConfig), then if all parses succeed, acquire the write
lock (rw_lock) and apply/swap the validated values in one atomic step; implement
helper functions like validate_property(key, value) that run the
serde_json::from_str calls currently inline (or build a temp ControllerConfig by
running the same match logic), and apply_property(key, parsed_value) (or replace
self with the temp cloned config) so no RocketMQError::Internal from a parsing
failure can leave self in an inconsistent state.
| "configStorePath" => { | ||
| self.config_store_path = | ||
| serde_json::from_str::<PathBuf>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parsing inconsistency with to_properties_string output format.
Using serde_json::from_str for PathBuf and SocketAddr expects JSON-quoted strings (e.g., "\"127.0.0.1:60109\""), but to_properties_string outputs plain text (e.g., listenAddr=127.0.0.1:60109). This breaks round-trip compatibility.
Use native parsing instead:
🔧 Suggested fix for consistent parsing
"configStorePath" => {
- self.config_store_path =
- serde_json::from_str::<PathBuf>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?;
+ self.config_store_path = PathBuf::from(value);
}
// ...
"listenAddr" => {
- self.listen_addr = serde_json::from_str::<SocketAddr>(value)
- .map_err(|e| RocketMQError::Internal(e.to_string()))?;
+ self.listen_addr = value.parse::<SocketAddr>()
+ .map_err(|e| RocketMQError::Internal(e.to_string()))?;
}Similarly for numeric types, use .parse::<u64>() instead of serde_json::from_str::<u64>(), and for booleans use value.parse::<bool>() or match on "true"/"false".
Also applies to: 730-733
🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 617
- 619, The parsing of properties like config_store_path
(serde_json::from_str::<PathBuf>(value)) and other fields (e.g., SocketAddr,
numeric u64s, booleans around lines 730-733) expects JSON-quoted values but
to_properties_string emits plain text, breaking round-trip; replace
serde_json::from_str for PathBuf and SocketAddr with native parsing (e.g.,
PathBuf::from(value) and value.parse::<SocketAddr>()), use value.parse::<u64>()
for numeric fields and value.parse::<bool>() or explicit string match for
booleans, and update the parsing logic in the same parser function to handle
plain-text properties consistently with to_properties_string.
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing validation after applying updates.
The validate() method (lines 486-512) enforces constraints like controller_thread_pool_nums > 0, but update() doesn't invoke it. This allows setting invalid values at runtime.
🔧 Add validation before returning success
_ => {
return Err(RocketMQError::Internal(format!("found unknown property: {}", key)));
}
}
}
+
+// Validate the updated configuration
+self.validate().map_err(|e| RocketMQError::Internal(e))?;
+
Ok(())🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 767
- 769, The update() method applies new configuration values but never runs
validate(), so invalid settings like controller_thread_pool_nums <= 0 can be
accepted; modify the update() implementation in controller_config.rs to call
self.validate() (or ControllerConfig::validate(&self)) after applying updates
and before returning Ok(()), and propagate any validation errors (return Err or
use ?), ensuring invalid updates are rejected at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @rocketmq-common/src/common/controller/controller_config.rs:
- Around line 609-620: The update method mutates fields (e.g., rocketmq_home,
config_store_path) directly while iterating properties, so a parse error can
leave the struct partially updated and lock is unused; fix by performing updates
on a local clone or temporary Config (clone self or create a partial struct),
apply all changes and validate/deserialise (e.g., parse config_store_path)
before acquiring the write lock, then acquire rw_lock.write().await and swap or
assign the fully-updated state atomically to self; also rename lock to _lock to
suppress the unused-variable warning if you keep its binding.
🧹 Nitpick comments (1)
rocketmq-common/src/common/controller/controller_config.rs (1)
215-217: Redundant lock design with&mut selfreceiver.The
rw_lock: RwLock<()>provides no synchronization benefit because:
- The
updatemethod takes&mut self, which already guarantees exclusive access via Rust's borrow checker- The lock guards a unit type
(), not the config fieldsIf interior mutability is intended (for use with
Arc<ControllerConfig>), the method should take&selfand the mutable fields should be behind the lock. WithArcMut<ControllerConfig>(as used elsewhere per the PR), this internal lock is redundant.Consider either:
- Remove the lock if
ArcMutalready provides exclusive access- Or redesign with
&selfand protect fields with the lock for true interior mutability
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-common/src/common/controller/controller_config.rs
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2024-12-01T10:26:22.233Z
Learnt from: PanGan21
Repo: mxsm/rocketmq-rust PR: 1485
File: rocketmq-common/src/common/namesrv/namesrv_config.rs:151-245
Timestamp: 2024-12-01T10:26:22.233Z
Learning: In `NamesrvConfig`, replacing `serde(alias = "...")` with `serde(rename = "...")` could cause breaking changes, so manual serialization is preferred to avoid disrupting existing functionality.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
📚 Learning: 2025-05-10T06:20:00.401Z
Learnt from: 578223592
Repo: mxsm/rocketmq-rust PR: 3240
File: rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs:69-71
Timestamp: 2025-05-10T06:20:00.401Z
Learning: In RocketMQ Rust, while dashmap is concurrent-safe, returning a direct reference to a DashMap can break encapsulation by allowing external code to modify the map without triggering persistence methods like `persist()`. Consider returning a read-only view, iterator methods, or a snapshot instead.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
📚 Learning: 2026-01-09T14:29:51.054Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:51.054Z
Learning: In OpenRaftController and RaftRsController, when constructing ControllerRequest::ApplyBrokerId, set broker_addr to an empty string (String::new()) and rely on other operations (e.g., RegisterBroker) to populate the broker address. ApplyBrokerId should focus solely on broker ID allocation/reservation, mirroring the Java RocketMQ reference implementation.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Check (fmt + clippy)
- GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-common/src/common/controller/controller_config.rs (3)
15-25: LGTM!Imports are appropriate for the new update functionality. Using
tokio::sync::RwLockenables async-safe locking.
218-256: LGTM!Manual Clone implementation correctly clones all fields and creates a fresh
RwLockfor the cloned instance.
763-766: LGTM on strict validation, but consider atomicity.Rejecting unknown keys is appropriate. Combined with the earlier atomicity suggestion, validate all keys exist before applying any changes.
| pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { | ||
| let lock = self.rw_lock.write().await; | ||
| for (key, value) in &properties { | ||
| match key.as_str() { | ||
| "rocketmqHome" => { | ||
| self.rocketmq_home = value.clone(); | ||
| } | ||
|
|
||
| "configStorePath" => { | ||
| self.config_store_path = | ||
| serde_json::from_str::<PathBuf>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial update on failure breaks atomicity.
If parsing fails midway through the HashMap iteration, earlier fields are already mutated while later ones are not, leaving the configuration in an inconsistent state.
Also, lock is unused—prefix with underscore to suppress warning.
🔧 Suggested approach for atomic updates
pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> {
- let lock = self.rw_lock.write().await;
+ let _lock = self.rw_lock.write().await;
+
+ // Validate all properties first before applying any changes
+ for (key, value) in &properties {
+ self.validate_property(key, value)?;
+ }
+
+ // Apply changes only after all validations pass
for (key, value) in &properties {
match key.as_str() {Alternatively, clone self, apply updates to the clone, and swap on success.
🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 609
- 620, The update method mutates fields (e.g., rocketmq_home, config_store_path)
directly while iterating properties, so a parse error can leave the struct
partially updated and lock is unused; fix by performing updates on a local clone
or temporary Config (clone self or create a partial struct), apply all changes
and validate/deserialise (e.g., parse config_store_path) before acquiring the
write lock, then acquire rw_lock.write().await and swap or assign the
fully-updated state atomically to self; also rename lock to _lock to suppress
the unused-variable warning if you keep its binding.
| "configBlackList" => { | ||
| self.config_black_list = value.clone(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Security consideration: configBlackList is modifiable via update.
Per PR objectives, configBlackList itself should be blacklisted. While validation is expected at the request handler level, allowing this field to be updated here creates a defense-in-depth gap—if handler validation is bypassed, an attacker could clear the blacklist and then modify sensitive fields.
Consider either:
- Rejecting blacklisted keys within this method as a safeguard
- Or adding a doc comment noting callers must validate against the blacklist before calling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In @rocketmq-common/src/common/controller/controller_config.rs:
- Around line 609-611: In update(&mut self, properties: HashMap<String, String>)
the write lock guard from self.rw_lock.write().await is acquired into an unused
variable (lock) while the method already has exclusive &mut self; remove the
redundant lock acquisition (delete the write().await call and the lock variable)
or, if you need to keep the lock for future compatibility, silence the unused
binding by renaming it to _lock (or prefix with an underscore) so the compiler
warning is suppressed; update any related code that assumed the guard to reflect
the chosen approach.
- Around line 763-769: After applying all property updates in the update/apply
function (the block ending with the match's `_ =>` arm), call self.validate()
before returning Ok(()) and propagate any validation error instead of silently
accepting invalid combined state; specifically invoke validate() on the
ControllerConfig instance (or the struct that defines validate()) and return
Err(RocketMQError::Internal(format!(...))) or propagate the returned
RocketMQError as appropriate so overall config consistency (e.g.,
controller_thread_pool_nums != 0) is enforced after all fields are applied.
- Around line 730-738: The current deserialization in ControllerConfig uses
serde_json::from_str for listen_addr and raft_peers which expects JSON (invalid
for plain "127.0.0.1:8080" or "id-addr;id-addr" property values); replace
serde_json::from_str::<SocketAddr>(value) with parsing via
value.parse::<SocketAddr>() (returning RocketMQError::Internal on parse error)
for the listen_addr field, and replace
serde_json::from_str::<Vec<RaftPeer>>(value) with a custom parser for raft_peers
that splits the property string (e.g., by ';'), parses each entry into a
RaftPeer by splitting "id-addr" into id and address and parsing the address with
SocketAddr::parse, and construct Vec<RaftPeer>; update the code paths in
controller_config.rs where self.listen_addr and self.raft_peers are set (refer
to the existing listen_addr and raft_peers assignments and
to_properties_string() format) and ensure errors map to RocketMQError::Internal
with clear messages.
- Around line 617-620: The code uses serde_json::from_str::<PathBuf>(value)
which expects a JSON-quoted string and will fail for plain property values;
replace that call with direct PathBuf construction (e.g. let s =
value.trim_matches('"'); self.config_store_path = PathBuf::from(s);) so plain
paths and optionally quoted values both work, and remove the map_err/serde_json
error handling (or only convert errors if you add explicit validation) while
keeping the assignment to self.config_store_path and references to
RocketMQError::Internal unchanged elsewhere.
🧹 Nitpick comments (2)
rocketmq-common/src/common/controller/controller_config.rs (2)
626-633: Usestr::parse()for primitive numeric types instead ofserde_json::from_str.While
serde_json::from_strworks for raw numbers, usingstr::parse::<T>()is more idiomatic and provides clearer error messages for plain string input from properties files.Example refactor for numeric parsing
"scanNotActiveBrokerInterval" => { - self.scan_not_active_broker_interval = - serde_json::from_str::<u64>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; + self.scan_not_active_broker_interval = value + .parse::<u64>() + .map_err(|e| RocketMQError::Internal(format!("invalid scanNotActiveBrokerInterval: {}", e)))?; }Apply similar changes to all numeric fields (
u64,usize,u32,u16).
655-668: Boolean parsing will fail for typical property values like "true" or "false".
serde_json::from_str::<bool>("true")works, but property files might have values likeTrue,TRUE, oryes. Consider case-insensitive parsing for robustness.More robust boolean parsing
fn parse_bool(value: &str) -> Result<bool, RocketMQError> { match value.to_lowercase().as_str() { "true" | "1" | "yes" => Ok(true), "false" | "0" | "no" => Ok(false), _ => Err(RocketMQError::Internal(format!("invalid boolean: {}", value))), } }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-common/src/common/controller/controller_config.rs
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2024-12-01T10:26:22.233Z
Learnt from: PanGan21
Repo: mxsm/rocketmq-rust PR: 1485
File: rocketmq-common/src/common/namesrv/namesrv_config.rs:151-245
Timestamp: 2024-12-01T10:26:22.233Z
Learning: In `NamesrvConfig`, replacing `serde(alias = "...")` with `serde(rename = "...")` could cause breaking changes, so manual serialization is preferred to avoid disrupting existing functionality.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
📚 Learning: 2025-05-10T06:20:00.401Z
Learnt from: 578223592
Repo: mxsm/rocketmq-rust PR: 3240
File: rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs:69-71
Timestamp: 2025-05-10T06:20:00.401Z
Learning: In RocketMQ Rust, while dashmap is concurrent-safe, returning a direct reference to a DashMap can break encapsulation by allowing external code to modify the map without triggering persistence methods like `persist()`. Consider returning a read-only view, iterator methods, or a snapshot instead.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
📚 Learning: 2026-01-09T14:29:51.054Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:51.054Z
Learning: In OpenRaftController and RaftRsController, when constructing ControllerRequest::ApplyBrokerId, set broker_addr to an empty string (String::new()) and rely on other operations (e.g., RegisterBroker) to populate the broker address. ApplyBrokerId should focus solely on broker ID allocation/reservation, mirroring the Java RocketMQ reference implementation.
Applied to files:
rocketmq-common/src/common/controller/controller_config.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Check (fmt + clippy)
- GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-common/src/common/controller/controller_config.rs (3)
15-25: LGTM!The imports are appropriate for the new async update functionality. Using
tokio::sync::RwLockis correct for async contexts.
218-256: LGTM!The manual
Cloneimplementation correctly handles all fields and properly initializes a freshRwLockfor cloned instances rather than attempting to clone the lock state.
215-217: TheRwLock<()>pattern is intentional and necessary here.Given that
ControllerConfigis wrapped inArcMut<ControllerConfig>, which usesSyncUnsafeCellfor interior mutability, multiple threads can obtain&mut selfreferences through the unsafe interface. TheRwLock<()>inside theupdate()method serves to serialize concurrent mutations and prevent data races that the unsafe interface would otherwise allow. This is a legitimate synchronization pattern for guarding against concurrent mutable access when usingArcMut.Each cloned instance correctly receives a fresh
RwLock, which makes sense as independent access points should have independent locks.No changes needed.
| pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { | ||
| let lock = self.rw_lock.write().await; | ||
| for (key, value) in &properties { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused lock guard and unnecessary synchronization.
The lock guard is acquired but never used, which will trigger a compiler warning. More importantly, since update() takes &mut self, exclusive access is already guaranteed—making the lock redundant.
Proposed fix
If the lock must be retained for future compatibility, silence the warning:
pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> {
- let lock = self.rw_lock.write().await;
+ let _lock = self.rw_lock.write().await;Or remove the lock usage entirely if &mut self provides sufficient guarantees.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { | |
| let lock = self.rw_lock.write().await; | |
| for (key, value) in &properties { | |
| pub async fn update(&mut self, properties: HashMap<String, String>) -> RocketMQResult<()> { | |
| let _lock = self.rw_lock.write().await; | |
| for (key, value) in &properties { |
🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 609
- 611, In update(&mut self, properties: HashMap<String, String>) the write lock
guard from self.rw_lock.write().await is acquired into an unused variable (lock)
while the method already has exclusive &mut self; remove the redundant lock
acquisition (delete the write().await call and the lock variable) or, if you
need to keep the lock for future compatibility, silence the unused binding by
renaming it to _lock (or prefix with an underscore) so the compiler warning is
suppressed; update any related code that assumed the guard to reflect the chosen
approach.
| "configStorePath" => { | ||
| self.config_store_path = | ||
| serde_json::from_str::<PathBuf>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serde_json::from_str will fail for plain path strings.
Properties file values are typically plain strings like /home/user/config, but serde_json::from_str::<PathBuf> expects a quoted JSON string like "\"/home/user/config\"". This will cause parse failures at runtime.
Proposed fix: parse directly from string
"configStorePath" => {
- self.config_store_path =
- serde_json::from_str::<PathBuf>(value).map_err(|e| RocketMQError::Internal(e.to_string()))?;
+ self.config_store_path = PathBuf::from(value);
}🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 617
- 620, The code uses serde_json::from_str::<PathBuf>(value) which expects a
JSON-quoted string and will fail for plain property values; replace that call
with direct PathBuf construction (e.g. let s = value.trim_matches('"');
self.config_store_path = PathBuf::from(s);) so plain paths and optionally quoted
values both work, and remove the map_err/serde_json error handling (or only
convert errors if you add explicit validation) while keeping the assignment to
self.config_store_path and references to RocketMQError::Internal unchanged
elsewhere.
| "listenAddr" => { | ||
| self.listen_addr = serde_json::from_str::<SocketAddr>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } | ||
|
|
||
| "raftPeers" => { | ||
| self.raft_peers = serde_json::from_str::<Vec<RaftPeer>>(value) | ||
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SocketAddr and complex type parsing expects JSON format, not plain strings.
serde_json::from_str::<SocketAddr>("127.0.0.1:8080") will fail because unquoted addresses aren't valid JSON. Similarly, raftPeers expects a full JSON array. This is inconsistent with property file format expectations.
Proposed fix for SocketAddr
"listenAddr" => {
- self.listen_addr = serde_json::from_str::<SocketAddr>(value)
- .map_err(|e| RocketMQError::Internal(e.to_string()))?;
+ self.listen_addr = value
+ .parse::<SocketAddr>()
+ .map_err(|e| RocketMQError::Internal(format!("invalid listenAddr: {}", e)))?;
}For raftPeers, consider a custom parser matching the format used in to_properties_string() (id-addr;id-addr).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "listenAddr" => { | |
| self.listen_addr = serde_json::from_str::<SocketAddr>(value) | |
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | |
| } | |
| "raftPeers" => { | |
| self.raft_peers = serde_json::from_str::<Vec<RaftPeer>>(value) | |
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | |
| } | |
| "listenAddr" => { | |
| self.listen_addr = value | |
| .parse::<SocketAddr>() | |
| .map_err(|e| RocketMQError::Internal(format!("invalid listenAddr: {}", e)))?; | |
| } | |
| "raftPeers" => { | |
| self.raft_peers = serde_json::from_str::<Vec<RaftPeer>>(value) | |
| .map_err(|e| RocketMQError::Internal(e.to_string()))?; | |
| } |
🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 730
- 738, The current deserialization in ControllerConfig uses serde_json::from_str
for listen_addr and raft_peers which expects JSON (invalid for plain
"127.0.0.1:8080" or "id-addr;id-addr" property values); replace
serde_json::from_str::<SocketAddr>(value) with parsing via
value.parse::<SocketAddr>() (returning RocketMQError::Internal on parse error)
for the listen_addr field, and replace
serde_json::from_str::<Vec<RaftPeer>>(value) with a custom parser for raft_peers
that splits the property string (e.g., by ';'), parses each entry into a
RaftPeer by splitting "id-addr" into id and address and parsing the address with
SocketAddr::parse, and construct Vec<RaftPeer>; update the code paths in
controller_config.rs where self.listen_addr and self.raft_peers are set (refer
to the existing listen_addr and raft_peers assignments and
to_properties_string() format) and ensure errors map to RocketMQError::Internal
with clear messages.
| _ => { | ||
| return Err(RocketMQError::Internal(format!("found unknown property: {}", key))); | ||
| } | ||
| } | ||
| } | ||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Consider calling validate() after applying updates.
The PR objectives mention "validate types/values" before applying, but currently validation only occurs per-field during parsing. After all updates are applied, the overall configuration may become invalid (e.g., controller_thread_pool_nums set to 0). Calling validate() ensures consistency.
Proposed fix
_ => {
return Err(RocketMQError::Internal(format!("found unknown property: {}", key)));
}
}
}
+ self.validate()
+ .map_err(|e| RocketMQError::Internal(format!("config validation failed: {}", e)))?;
Ok(())
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| _ => { | |
| return Err(RocketMQError::Internal(format!("found unknown property: {}", key))); | |
| } | |
| } | |
| } | |
| Ok(()) | |
| } | |
| _ => { | |
| return Err(RocketMQError::Internal(format!("found unknown property: {}", key))); | |
| } | |
| } | |
| } | |
| self.validate() | |
| .map_err(|e| RocketMQError::Internal(format!("config validation failed: {}", e)))?; | |
| Ok(()) | |
| } |
🤖 Prompt for AI Agents
In @rocketmq-common/src/common/controller/controller_config.rs around lines 763
- 769, After applying all property updates in the update/apply function (the
block ending with the match's `_ =>` arm), call self.validate() before returning
Ok(()) and propagate any validation error instead of silently accepting invalid
combined state; specifically invoke validate() on the ControllerConfig instance
(or the struct that defines validate()) and return
Err(RocketMQError::Internal(format!(...))) or propagate the returned
RocketMQError as appropriate so overall config consistency (e.g.,
controller_thread_pool_nums != 0) is enforced after all fields are applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
rocketmq-controller/src/metrics/controller_metrics_manager.rs (1)
191-191: Theconfigfield is stored but currently unused.The field is assigned in
init_metricsbut never read afterward. The TODOs throughout the file (e.g., lines 222-226, 233, 296, 386) indicate planned future use for retrieving addresses, metrics labels, exporter endpoints, and storage paths. This is acceptable for now given the ongoing development.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
rocketmq-controller/examples/controller_metrics_example.rsrocketmq-controller/src/metrics/controller_metrics_manager.rsrocketmq-controller/tests/metrics_quick_reference_test.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-05-10T06:20:00.401Z
Learnt from: 578223592
Repo: mxsm/rocketmq-rust PR: 3240
File: rocketmq-namesrv/src/kvconfig/kvconfig_mananger.rs:69-71
Timestamp: 2025-05-10T06:20:00.401Z
Learning: In RocketMQ Rust, while dashmap is concurrent-safe, returning a direct reference to a DashMap can break encapsulation by allowing external code to modify the map without triggering persistence methods like `persist()`. Consider returning a read-only view, iterator methods, or a snapshot instead.
Applied to files:
rocketmq-controller/src/metrics/controller_metrics_manager.rs
🧬 Code graph analysis (2)
rocketmq-controller/tests/metrics_quick_reference_test.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (2)
config(575-577)new(139-211)rocketmq-controller/src/metrics/controller_metrics_manager.rs (1)
new(215-259)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(108-114)rocketmq-controller/src/processor/controller_request_processor.rs (1)
new(148-157)
rocketmq-controller/examples/controller_metrics_example.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (2)
config(575-577)new(139-211)rocketmq-controller/src/metrics/controller_metrics_manager.rs (1)
new(215-259)rocketmq-common/src/common/controller/controller_config.rs (2)
new(314-316)new_node(472-474)rocketmq-controller/src/manager/replicas_info_manager.rs (1)
new(108-114)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Check (fmt + clippy)
- GitHub Check: auto-approve
🔇 Additional comments (4)
rocketmq-controller/tests/metrics_quick_reference_test.rs (1)
24-24: LGTM! Consistent migration fromArctoArcMut.The changes correctly update the test file to use
ArcMut<ControllerConfig>instead ofArc<ControllerConfig>, aligning with the broader refactoring across the controller module to enable runtime configuration updates. The test logic remains unchanged, and the thread safety test appropriately uses static methods without cross-thread mutation of the config.Also applies to: 28-28, 41-41, 53-53, 65-65, 76-76, 91-91, 101-101, 111-111, 123-123
rocketmq-controller/examples/controller_metrics_example.rs (1)
29-29: LGTM!The example correctly adopts the new
ArcMut<ControllerConfig>pattern, aligning with the API changes inControllerMetricsManager::get_instance. TheArcimport at line 22 is appropriately retained since it's still used for the&Arc<ControllerMetricsManager>parameter types in the helper functions.Also applies to: 41-41
rocketmq-controller/src/metrics/controller_metrics_manager.rs (2)
60-60: LGTM!The migration from
Arc<ControllerConfig>toArcMut<ControllerConfig>is correctly implemented, enabling the runtime configuration update capability required by the PR objectives.Also applies to: 191-191
205-212: Public API signature change requiresArcMut<ControllerConfig>.The
get_instancemethod now requiresArcMut<ControllerConfig>instead ofArc<ControllerConfig>. This breaking change is properly implemented across all call sites: the example, production code incontroller_manager.rs, and all test files wrap the config appropriately before passing it to the method.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5682 +/- ##
==========================================
- Coverage 39.15% 39.15% -0.01%
==========================================
Files 820 820
Lines 113010 113164 +154
==========================================
+ Hits 44246 44305 +59
- Misses 68764 68859 +95 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Fixes #5545
Summary by CodeRabbit
New Features
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.