Skip to content

implement handle_get_next_broker_id for broker ID allocation#5683

Merged
mxsm merged 1 commit intomxsm:mainfrom
aditya-rajpurohit:feature-5540
Jan 19, 2026
Merged

implement handle_get_next_broker_id for broker ID allocation#5683
mxsm merged 1 commit intomxsm:mainfrom
aditya-rajpurohit:feature-5540

Conversation

@aditya-rajpurohit
Copy link
Copy Markdown
Contributor

@aditya-rajpurohit aditya-rajpurohit commented Jan 12, 2026

Which Issue(s) This PR Fixes(Closes)

Fixes #5540

Brief Description

This PR implements the handle_get_next_broker_id method in ControllerRequestProcessor to handle broker ID allocation requests. This method:

  • Decodes the GetNextBrokerIdRequestHeader from the incoming request.
  • Validates the cluster_name and broker_name fields.
  • Forwards the request to the controller to allocate the next available broker ID using Raft consensus.
  • Returns a RemotingCommand response containing the allocated broker ID or an error if allocation fails.

This ensures that new brokers joining the cluster are assigned unique IDs and supports cluster scaling and broker replacement.

How Did You Test This Change?

  • Built the project with cargo build
  • Ran unit tests using cargo test

Summary by CodeRabbit

  • New Features
    • Controller can now provide the next broker ID on request; incoming requests are validated for non-empty cluster and broker names and allocation outcomes are logged.
    • Improved request handling for broker ID allocation so the controller’s allocation response is forwarded transparently to callers.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jan 12, 2026

Walkthrough

Implemented the controller's ControllerGetNextBrokerId request handler: decodes GetNextBrokerIdRequestHeader, validates cluster_name and broker_name, delegates allocation to the controller, logs results, and returns the controller's response. The handler parameter name was changed to request.

Changes

Cohort / File(s) Summary
GetNextBrokerId Handler Implementation
rocketmq-controller/src/processor/controller_request_processor.rs
Added import for GetNextBrokerIdRequestHeader and implemented handle_get_next_broker_id(request: &mut RemotingCommand). Decodes request header, validates cluster_name and broker_name (returns ControllerInvalidRequest if empty), calls controller.get_next_broker_id(...) and returns that response; updated function signature parameter name from _request to request.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant RequestProcessor
    participant ControllerManager
    participant Controller

    Client->>RequestProcessor: RemotingCommand (ControllerGetNextBrokerId)
    RequestProcessor->>RequestProcessor: decode GetNextBrokerIdRequestHeader
    alt invalid header
        RequestProcessor-->>Client: ControllerInvalidRequest
    else valid
        RequestProcessor->>ControllerManager: controller()
        ControllerManager->>Controller: get_next_broker_id(header)
        Controller-->>ControllerManager: RemotingCommand (response)
        ControllerManager-->>RequestProcessor: response
        RequestProcessor-->>Client: RemotingCommand (response)
    end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Poem

🐰 nibbles a carrot and giggles
A cluster asks, "Who will be me?"
I hop and fetch the next ID,
One hop, one number, set it free —
Hooray, new broker, join the tree!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: implementing the handle_get_next_broker_id method for broker ID allocation.
Linked Issues check ✅ Passed The implementation meets core objectives: decodes GetNextBrokerIdRequestHeader, validates cluster_name and broker_name, forwards to controller.get_next_broker_id via Raft consensus, and returns the response.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing handle_get_next_broker_id: the import, function signature update, and handler implementation align with issue #5540.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@rocketmq-rust-bot
Copy link
Copy Markdown
Collaborator

🔊@aditya-rajpurohit 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
rocketmq-controller/src/processor/controller_request_processor.rs (1)

595-606: Success logging doesn't verify actual allocation result.

The current logging assumes any Some(response) is a success, but the response could contain an error code. Compare with handle_apply_broker_id (lines 710-725) which checks response.code() == ResponseCode::Success before logging success.

♻️ Suggested improvement for accurate logging
         // Log result
         if let Some(res) = &response {
-            info!(
-                "Allocated broker_id response created for cluster={}, broker={}",
-                request_header.cluster_name, request_header.broker_name
-            );
+            if res.code() == ResponseCode::Success as i32 {
+                info!(
+                    "GetNextBrokerId succeeded for cluster={}, broker={}",
+                    request_header.cluster_name, request_header.broker_name
+                );
+            } else {
+                warn!(
+                    "GetNextBrokerId failed: cluster={}, broker={}, code={}, remark={:?}",
+                    request_header.cluster_name,
+                    request_header.broker_name,
+                    res.code(),
+                    res.remark()
+                );
+            }
         } else {
             warn!(
                 "Failed to allocate broker_id for cluster={}, broker={}",
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7760c56 and e569323.

📒 Files selected for processing (1)
  • rocketmq-controller/src/processor/controller_request_processor.rs
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:58.048Z
Learning: In the ApplyBrokerId operation across both OpenRaftController and RaftRsController implementations, broker_addr is intentionally set to String::new() (empty string) when constructing ControllerRequest::ApplyBrokerId. This matches the design of the Java RocketMQ reference implementation. The broker address is populated through other operations (e.g., RegisterBroker), as ApplyBrokerId focuses solely on broker ID allocation/reservation.
📚 Learning: 2026-01-09T14:29:58.048Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:58.048Z
Learning: In the ApplyBrokerId operation across both OpenRaftController and RaftRsController implementations, broker_addr is intentionally set to String::new() (empty string) when constructing ControllerRequest::ApplyBrokerId. This matches the design of the Java RocketMQ reference implementation. The broker address is populated through other operations (e.g., RegisterBroker), as ApplyBrokerId focuses solely on broker ID allocation/reservation.

Applied to files:

  • rocketmq-controller/src/processor/controller_request_processor.rs
📚 Learning: 2025-12-28T07:21:24.117Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5099
File: rocketmq-remoting/src/protocol/header/get_producer_connection_list_request_header.rs:29-32
Timestamp: 2025-12-28T07:21:24.117Z
Learning: In the rocketmq-remoting/src/protocol/header/ module, header structs consistently use public fields rather than private fields with getters/setters. This is an established pattern across the codebase.

Applied to files:

  • rocketmq-controller/src/processor/controller_request_processor.rs
🧬 Code graph analysis (1)
rocketmq-controller/src/processor/controller_request_processor.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (1)
  • controller (620-622)
rocketmq-error/src/unified.rs (1)
  • request_header_error (471-473)
rocketmq-controller/src/heartbeat/broker_live_info.rs (1)
  • broker_name (72-74)
rocketmq-remoting/src/protocol/remoting_command.rs (1)
  • create_response_command_with_code_remark (187-192)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Check (fmt + clippy)
  • GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-controller/src/processor/controller_request_processor.rs (3)

99-99: LGTM!

The import follows the established pattern for controller request headers in this file.


570-586: LGTM!

The validation logic for cluster_name and broker_name is correct and consistent with the pattern used in handle_apply_broker_id. The appropriate ControllerInvalidRequest response code is returned with clear error messages.


557-563: LGTM!

The header decoding follows the established pattern with proper error handling and logging. The request_header_error helper is used consistently with other handlers.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
rocketmq-controller/src/processor/controller_request_processor.rs (1)

596-606: Response logging doesn't distinguish success from error responses.

The current logging assumes Some(response) means success, but the response could contain an error code (e.g., ControllerBrokerIdInvalid). Compare with handle_apply_broker_id (lines 710-736) which properly inspects response.code().

♻️ Suggested improvement for accurate logging
         // Log result
-        if let Some(_res) = &response {
-            info!(
-                "Allocated broker_id response created for cluster={}, broker={}",
-                request_header.cluster_name, request_header.broker_name
-            );
-        } else {
-            warn!(
-                "Failed to allocate broker_id for cluster={}, broker={}",
-                request_header.cluster_name, request_header.broker_name
-            );
-        }
+        match &response {
+            Some(res) if res.code() == ResponseCode::Success as i32 => {
+                info!(
+                    "GetNextBrokerId succeeded: cluster={}, broker={}",
+                    request_header.cluster_name, request_header.broker_name
+                );
+            }
+            Some(res) => {
+                warn!(
+                    "GetNextBrokerId failed: cluster={}, broker={}, code={}, remark={:?}",
+                    request_header.cluster_name,
+                    request_header.broker_name,
+                    res.code(),
+                    res.remark()
+                );
+            }
+            None => {
+                warn!(
+                    "GetNextBrokerId returned no response for cluster={}, broker={}",
+                    request_header.cluster_name, request_header.broker_name
+                );
+            }
+        }
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e569323 and 0088f0f.

📒 Files selected for processing (1)
  • rocketmq-controller/src/processor/controller_request_processor.rs
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:58.048Z
Learning: In the ApplyBrokerId operation across both OpenRaftController and RaftRsController implementations, broker_addr is intentionally set to String::new() (empty string) when constructing ControllerRequest::ApplyBrokerId. This matches the design of the Java RocketMQ reference implementation. The broker address is populated through other operations (e.g., RegisterBroker), as ApplyBrokerId focuses solely on broker ID allocation/reservation.
📚 Learning: 2026-01-09T14:29:58.048Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5582
File: rocketmq-controller/src/controller/open_raft_controller.rs:182-247
Timestamp: 2026-01-09T14:29:58.048Z
Learning: In the ApplyBrokerId operation across both OpenRaftController and RaftRsController implementations, broker_addr is intentionally set to String::new() (empty string) when constructing ControllerRequest::ApplyBrokerId. This matches the design of the Java RocketMQ reference implementation. The broker address is populated through other operations (e.g., RegisterBroker), as ApplyBrokerId focuses solely on broker ID allocation/reservation.

Applied to files:

  • rocketmq-controller/src/processor/controller_request_processor.rs
📚 Learning: 2025-12-28T07:21:24.117Z
Learnt from: WaterWhisperer
Repo: mxsm/rocketmq-rust PR: 5099
File: rocketmq-remoting/src/protocol/header/get_producer_connection_list_request_header.rs:29-32
Timestamp: 2025-12-28T07:21:24.117Z
Learning: In the rocketmq-remoting/src/protocol/header/ module, header structs consistently use public fields rather than private fields with getters/setters. This is an established pattern across the codebase.

Applied to files:

  • rocketmq-controller/src/processor/controller_request_processor.rs
🧬 Code graph analysis (1)
rocketmq-controller/src/processor/controller_request_processor.rs (4)
rocketmq-controller/src/controller/controller_manager.rs (1)
  • controller (620-622)
rocketmq-error/src/unified.rs (1)
  • request_header_error (471-473)
rocketmq-controller/src/heartbeat/broker_live_info.rs (1)
  • broker_name (72-74)
rocketmq-remoting/src/protocol/remoting_command.rs (1)
  • create_response_command_with_code_remark (187-192)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Check (fmt + clippy)
  • GitHub Check: auto-approve
🔇 Additional comments (2)
rocketmq-controller/src/processor/controller_request_processor.rs (2)

99-99: LGTM!

Import follows the existing pattern for controller request headers and is correctly placed.


557-593: Implementation correctly follows established patterns.

The handler properly:

  • Decodes the request header with appropriate error handling
  • Validates both cluster_name and broker_name before forwarding
  • Delegates allocation to the controller via Raft consensus

Minor nit: The .to_string() calls on lines 575 and 584 are unnecessary since create_response_command_with_code_remark accepts impl Into<CheetahString>, but this is cosmetic.

@codecov
Copy link
Copy Markdown

codecov Bot commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 0% with 31 lines in your changes missing coverage. Please review.
✅ Project coverage is 39.15%. Comparing base (7760c56) to head (0088f0f).
⚠️ Report is 32 commits behind head on main.

Files with missing lines Patch % Lines
...ller/src/processor/controller_request_processor.rs 0.00% 31 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main    #5683   +/-   ##
=======================================
  Coverage   39.15%   39.15%           
=======================================
  Files         820      820           
  Lines      113010   113040   +30     
=======================================
+ Hits        44246    44265   +19     
- Misses      68764    68775   +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@mxsm mxsm merged commit 800088a into mxsm:main Jan 19, 2026
8 of 13 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels Jan 19, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

AI review first Ai review pr first approved PR has approved auto merge Difficulty level/Hard Hard ISSUE feature🚀 Suggest an idea for this project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature🚀] Implement ControllerGetNextBrokerId Request Handler

4 participants