-
Notifications
You must be signed in to change notification settings - Fork 152
[ISSUE #3295]add updateAndSubscripGroup func #3293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #3295]add updateAndSubscripGroup func #3293
Conversation
- 新增 SubscriptionGroupHandler 处理订阅组相关请求 - 实现 update_and_create_subscription_group 方法处理更新和创建订阅组 - 添加 SubscriptionGroupAttributes 和相关属性处理 - 修改 SubscriptionGroupManager 支持更新订阅组配置
""" WalkthroughA new feature for updating and creating subscription group configurations was added to the RocketMQ broker. This includes a new handler for subscription group operations, updates to the admin processor for request routing, enhancements to the subscription group manager for attribute merging and versioning, and supporting attribute structures. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant AdminBrokerProcessor
participant SubscriptionGroupHandler
participant SubscriptionGroupManager
Client->>AdminBrokerProcessor: Send UpdateAndCreateSubscriptionGroup request
AdminBrokerProcessor->>SubscriptionGroupHandler: update_and_create_subscription_group(request)
SubscriptionGroupHandler->>SubscriptionGroupManager: update_subscription_group_config(config)
SubscriptionGroupManager-->>SubscriptionGroupHandler: Update config & version
SubscriptionGroupHandler-->>AdminBrokerProcessor: Success response
AdminBrokerProcessor-->>Client: Return response
Assessment against linked issues
Poem
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@5782235 8000 92 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3293 +/- ##
==========================================
- Coverage 26.43% 26.40% -0.03%
==========================================
Files 526 528 +2
Lines 74229 74413 +184
==========================================
+ Hits 19619 19648 +29
- Misses 54610 54765 +155 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@578223592 Create ISSUE to related this PR |
thanks for remind.I would create a issue when i am ready for this pr ! |
- 在 SubscriptionGroupManager 中添加 data_version 字段,用于管理数据版本 - 修改 update_subscription_group_config 方法,使其能够更新数据版本 - 优化 update_data_version 方法,根据消息存储的状态机版本生成新版本 - 调整代码结构,提高可维护性和可扩展性
- 删除了 ConfigManager trait 定义中的多余空行- 优化了代码结构,提高了代码的可读性和整洁性
🔊@578223592 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
…ashMap::new()作为参数传递给 get_or_init 方法 - 简化了代码结构,提高了可读性
🔊@578223592 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (4)
rocketmq-common/src/common/attribute/subscription_group_attributes.rs (1)
12-15
: Consider documenting the empty HashMap initialization.The
all()
method currently returns an empty HashMap. If this is intentional for the initial implementation, consider adding a comment to clarify the future intent.impl SubscriptionGroupAttributes { + /// Returns all available subscription group attributes. + /// Currently empty - attributes will be added in future implementations. pub fn all() -> &'static HashMap<CheetahString, Arc<dyn Attribute>> {🧰 Tools
🪛 GitHub Actions: CI
[error] 14-14: Clippy lint 'redundant_closure': redundant closure used in 'ALL.get_or_init(|| HashMap::new())'. Suggest replacing closure with function itself: 'HashMap::new'.
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (1)
80-86
: Reminder: Implement metrics tracking for monitoring.The TODO comment indicates that metrics for tracking invocation status and execution time need to be implemented.
Would you like me to generate the metrics implementation code or create an issue to track this task?
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (2)
99-110
: Consider avoiding the clone for better performance.The comment indicates awareness of the performance impact of cloning the entire config.
Consider refactoring to avoid the clone by either:
- Modifying
put_subscription_group_config
to accept a reference and clone internally only if needed- Using
Arc
for the config if it's read more often than modifiedExample approach:
// Option 1: Clone only the old value for logging match self.subscription_group_wrapper .lock() .subscription_group_table .insert(config.group_name().into(), config.clone()) { Some(old) => { info!( "update subscription group config, old: {:?} new: {:?}", old, config, ); } None => { info!("create new subscription group, {:?}", config) } }🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 99-99: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L99
Added line #L99 was not covered by tests
[warning] 101-103: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L101-L103
Added lines #L101 - L103 were not covered by tests
[warning] 108-108: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L108
Added line #L108 was not covered by tests
316-317
: Consider using DashMap for better concurrent performance.The TODO correctly identifies that
DashMap
would provide better concurrent access patterns thanMutex<HashMap>
.
DashMap
would eliminate the need for locking the entire map during reads/writes, improving performance under high concurrency. However, consider these trade-offs:
- DashMap has slightly higher memory overhead
- Individual operations are atomic, but multi-step operations still need external synchronization
- Migration should be done carefully to maintain consistency
Would you like me to create an issue to track this architectural improvement?
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
rocketmq-broker/src/processor/admin_broker_processor.rs
(4 hunks)rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs
(1 hunks)rocketmq-broker/src/subscription/manager/subscription_group_manager.rs
(4 hunks)rocketmq-common/src/common/attribute.rs
(1 hunks)rocketmq-common/src/common/attribute/subscription_group_attributes.rs
(1 hunks)rocketmq-common/src/utils/file_utils.rs
(1 hunks)rocketmq-error/src/lib.rs
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
rocketmq-broker/src/processor/admin_broker_processor.rs (2)
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (1)
new
(42-46)rocketmq-broker/src/broker_runtime.rs (1)
new
(128-244)
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (5)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
code
(592-594)create_response_command
(220-224)rocketmq-broker/src/broker_runtime.rs (1)
message_store
(1620-1622)rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (3)
new
(52-62)request
(114-119)decode
(190-212)rocketmq-broker/src/processor/admin_broker_processor.rs (1)
new
(54-72)rocketmq-remoting/src/net/channel.rs (1)
remote_address
(82-84)
🪛 GitHub Actions: CI
rocketmq-common/src/common/attribute/subscription_group_attributes.rs
[error] 14-14: Clippy lint 'redundant_closure': redundant closure used in 'ALL.get_or_init(|| HashMap::new())'. Suggest replacing closure with function itself: 'HashMap::new'.
🪛 GitHub Check: codecov/patch
rocketmq-broker/src/processor/admin_broker_processor.rs
[warning] 61-62: rocketmq-broker/src/processor/admin_broker_processor.rs#L61-L62
Added lines #L61 - L62 were not covered by tests
[warning] 69-69: rocketmq-broker/src/processor/admin_broker_processor.rs#L69
Added line #L69 was not covered by tests
[warning] 182-184: rocketmq-broker/src/processor/admin_broker_processor.rs#L182-L184
Added lines #L182 - L184 were not covered by tests
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs
[warning] 42-46: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L42-L46
Added lines #L42 - L46 were not covered by tests
[warning] 48-55: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L48-L55
Added lines #L48 - L55 were not covered by tests
[warning] 57-57: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L57
Added line #L57 was not covered by tests
[warning] 59-61: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L59-L61
Added lines #L59 - L61 were not covered by tests
[warning] 63-70: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L63-L70
Added lines #L63 - L70 were not covered by tests
[warning] 72-75: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L72-L75
Added lines #L72 - L75 were not covered by tests
[warning] 78-78: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L78
Added line #L78 was not covered by tests
[warning] 87-88: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L87-L88
Added lines #L87 - L88 were not covered by tests
[warning] 90-119: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L90-L119
Added lines #L90 - L119 were not covered by tests
[warning] 121-125: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L121-L125
Added lines #L121 - L125 were not covered by tests
[warning] 127-129: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L127-L129
Added lines #L127 - L129 were not covered by tests
[warning] 134-135: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L134-L135
Added lines #L134 - L135 were not covered by tests
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs
[warning] 60-60: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L60
Added line #L60 was not covered by tests
[warning] 67-73: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L67-L73
Added lines #L67 - L73 were not covered by tests
[warning] 75-94: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L75-L94
Added lines #L75 - L94 were not covered by tests
[warning] 97-97: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L97
Added line #L97 was not covered by tests
[warning] 99-99: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L99
Added line #L99 was not covered by tests
[warning] 101-103: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L101-L103
Added lines #L101 - L103 were not covered by tests
[warning] 108-108: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L108
Added line #L108 was not covered by tests
[warning] 112-119: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L112-L119
Added lines #L112 - L119 were not covered by tests
[warning] 121-126: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L121-L126
Added lines #L121 - L126 were not covered by tests
[warning] 128-129: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L128-L129
Added lines #L128 - L129 were not covered by tests
[warning] 131-131: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L131
Added line #L131 was not covered by tests
[warning] 133-142: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L133-L142
Added lines #L133 - L142 were not covered by tests
[warning] 144-144: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L144
Added line #L144 was not covered by tests
[warning] 146-149: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L146-L149
Added lines #L146 - L149 were not covered by tests
[warning] 151-153: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L151-L153
Added lines #L151 - L153 were not covered by tests
🔇 Additional comments (7)
rocketmq-common/src/common/attribute.rs (1)
26-26
: LGTM! Module declaration follows established patterns.The addition of the
subscription_group_attributes
module declaration is consistent with the existing module organization and properly exposes the new functionality.rocketmq-common/src/utils/file_utils.rs (1)
54-54
: Good documentation of potential improvement.The TODO comment appropriately documents a known limitation and potential optimization opportunity for future work.
rocketmq-error/src/lib.rs (1)
35-35
: LGTM! Proper simplification of type alias.Removing the explicit
std::result::
prefix is appropriate sinceResult
is available in the prelude. This improves code readability.rocketmq-broker/src/processor/admin_broker_processor.rs (2)
32-32
: LGTM! Proper module integration.The import and module declaration follow the established pattern for other handlers in the processor.
Also applies to: 39-39
181-185
:✅ Verification successful
Request code handling looks correct.
The new
UpdateAndCreateSubscriptionGroup
request code handling follows the same async pattern as other handlers. The method delegation tosubscription_group_handler.update_and_create_subscription_group()
is appropriate.Verify that the method signature matches the expected pattern:
🏁 Script executed:
#!/bin/bash # Description: Verify the handler method signature matches other handler methods # Check the method signature in the handler ast-grep --pattern 'pub async fn update_and_create_subscription_group( &mut self, channel: Channel, ctx: ConnectionHandlerContext, request_code: RequestCode, request: RemotingCommand, ) -> Option<RemotingCommand> { $$$ }'Length of output: 235
🏁 Script executed:
#!/bin/bash # Search for the handler method definition across the codebase echo "Looking for exact async handler signature..." rg -n "pub async fn update_and_create_subscription_group" -g "*.rs" echo -e "\nShowing any occurrences of the method name with context..." rg -n "update_and_create_subscription_group" -g "*.rs" -C3Length of output: 2008
Handler signature and delegation confirmed
Verified thatupdate_and_create_subscription_group
is declared aspub async fn update_and_create_subscription_group( &mut self, _channel: Channel, _ctx: ConnectionHandlerContext, _request_code: RequestCode, _request: RemotingCommand, ) -> Option<RemotingCommand> { … }– matching the async pattern and return type of other handlers (underscored parameters silence unused‐variable warnings). No further changes required.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 182-184: rocketmq-broker/src/processor/admin_broker_processor.rs#L182-L184
Added lines #L182 - L184 were not covered by testsrocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (1)
42-46
: LGTM!The constructor implementation is straightforward and follows Rust conventions.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 42-46: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L42-L46
Added lines #L42 - L46 were not covered by testsrocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)
67-73
: LGTM!Good separation of concerns between the public API and internal implementation.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 67-73: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L67-L73
Added lines #L67 - L73 were not covered by tests
@@ -43,6 +45,8 @@ | |||
consumer_request_handler: ConsumerRequestHandler<MS>, | |||
offset_request_handler: OffsetRequestHandler<MS>, | |||
batch_mq_handler: BatchMqHandler<MS>, | |||
subscription_group_handler: SubscriptionGroupHandler<MS>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Handler integration follows established patterns.
The addition of the subscription_group_handler
field and its initialization in the constructor are consistent with how other handlers are integrated in this processor.
However, ensure that test coverage is added for the new handler integration:
Also applies to: 61-62, 69-69
🏁 Script executed:
#!/bin/bash
# Description: Check for existing tests and verify the handler method exists
# Check if tests exist for the AdminBrokerProcessor
fd -e rs -x grep -l "AdminBrokerProcessor" {} \; | grep -i test
# Verify the handler method exists
ast-grep --pattern 'impl $_ SubscriptionGroupHandler$_ {
$$$
update_and_create_subscription_group($$$) {
$$$
}
$$$
}'
Length of output: 198
I’m not seeing any definitions or tests for update_and_create_subscription_group
yet. Let’s double-check across the whole repo:
🏁 Script executed:
#!/usr/bin/env bash
set -euo pipefail
# 1. Locate the handler method definition
rg -n "fn update_and_create_subscription_group" .
# 2. Locate any impl blocks of the SubscriptionGroupHandler trait
ast-grep --pattern $'impl<$$$> SubscriptionGroupHandler<$_> for $_ {\n $$$\n}'
# 3. Search for tests referencing this handler method
rg -n "update_and_create_subscription_group" -g "*test*.rs" .
Length of output: 355
Add unit tests for SubscriptionGroupHandler.update_and_create_subscription_group
The integration of subscription_group_handler
follows established patterns and the handler method is correctly defined here:
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs:48
pub async fn update_and_create_subscription_group(...)
However, I didn’t find any existing tests that exercise this method. Please add unit tests to cover its behavior. For example:
- Create a new test module in
subscription_group_handler.rs
or a dedicated file undertests/
- Write async tests that initialize a
SubscriptionGroupHandler
, invokeupdate_and_create_subscription_group
, and verify both update and creation branches
Fixes required before merging.
🤖 Prompt for AI Agents
In rocketmq-broker/src/processor/admin_broker_processor.rs around lines 48, and
also in subscription_group_handler.rs at line 48 where the
update_and_create_subscription_group method is defined, there are no existing
unit tests covering this handler method. To fix this, add a new test module
either in subscription_group_handler.rs or in a dedicated test file under
tests/. Write async unit tests that instantiate SubscriptionGroupHandler, call
update_and_create_subscription_group, and verify its behavior for both updating
and creating subscription groups to ensure proper coverage before merging.
let mut request_body = UnlockBatchRequestBody::decode(request.get_body().unwrap()).unwrap(); | ||
if request_body.only_this_broker | ||
|| !self | ||
.broker_runtime_inner | ||
.broker_config() | ||
.lock_in_strict_mode | ||
{ | ||
self.broker_runtime_inner | ||
.rebalance_lock_manager() | ||
.unlock_batch( | ||
request_body.consumer_group.as_ref().unwrap(), | ||
&request_body.mq_set, | ||
request_body.client_id.as_ref().unwrap(), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix error handling to prevent panics from unwrap() calls.
Multiple unwrap()
calls could cause panics:
- Line 97: Both decode and get_body could fail
- Lines 107-109: Option fields might be None
Apply this fix to handle errors gracefully:
- let mut request_body = UnlockBatchRequestBody::decode(request.get_body().unwrap()).unwrap();
+ let body = match request.get_body() {
+ Some(body) => body,
+ None => {
+ let mut response = RemotingCommand::create_response_command();
+ response.set_code_ref(ResponseCode::SystemError);
+ response.set_remark(Some("Request body is empty".to_string()));
+ return Some(response);
+ }
+ };
+
+ let mut request_body = match UnlockBatchRequestBody::decode(body) {
+ Ok(body) => body,
+ Err(e) => {
+ let mut response = RemotingCommand::create_response_command();
+ response.set_code_ref(ResponseCode::SystemError);
+ response.set_remark(Some(format!("Failed to decode unlock batch request: {:?}", e)));
+ return Some(response);
+ }
+ };
if request_body.only_this_broker
|| !self
.broker_runtime_inner
.broker_config()
.lock_in_strict_mode
{
+ let consumer_group = match request_body.consumer_group.as_ref() {
+ Some(group) => group,
+ None => {
+ let mut response = RemotingCommand::create_response_command();
+ response.set_code_ref(ResponseCode::SystemError);
+ response.set_remark(Some("Consumer group is missing".to_string()));
+ return Some(response);
+ }
+ };
+ let client_id = match request_body.client_id.as_ref() {
+ Some(id) => id,
+ None => {
+ let mut response = RemotingCommand::create_response_command();
+ response.set_code_ref(ResponseCode::SystemError);
+ response.set_remark(Some("Client ID is missing".to_string()));
+ return Some(response);
+ }
+ };
self.broker_runtime_inner
.rebalance_lock_manager()
.unlock_batch(
- request_body.consumer_group.as_ref().unwrap(),
+ consumer_group,
&request_body.mq_set,
- request_body.client_id.as_ref().unwrap(),
+ client_id,
);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut request_body = UnlockBatchRequestBody::decode(request.get_body().unwrap()).unwrap(); | |
if request_body.only_this_broker | |
|| !self | |
.broker_runtime_inner | |
.broker_config() | |
.lock_in_strict_mode | |
{ | |
self.broker_runtime_inner | |
.rebalance_lock_manager() | |
.unlock_batch( | |
request_body.consumer_group.as_ref().unwrap(), | |
&request_body.mq_set, | |
request_body.client_id.as_ref().unwrap(), | |
); | |
// Safely extract the raw request body | |
let body = match request.get_body() { | |
Some(body) => body, | |
None => { | |
let mut response = RemotingCommand::create_response_command(); | |
response.set_code_ref(ResponseCode::SystemError); | |
response.set_remark(Some("Request body is empty".to_string())); | |
return Some(response); | |
} | |
}; | |
// Safely decode the UnlockBatchRequestBody | |
let mut request_body = match UnlockBatchRequestBody::decode(body) { | |
Ok(body) => body, | |
Err(e) => { | |
let mut response = RemotingCommand::create_response_command(); | |
response.set_code_ref(ResponseCode::SystemError); | |
response.set_remark(Some(format!( | |
"Failed to decode unlock batch request: {:?}", | |
e | |
))); | |
return Some(response); | |
} | |
}; | |
if request_body.only_this_broker | |
|| !self | |
.broker_runtime_inner | |
.broker_config() | |
.lock_in_strict_mode | |
{ | |
// Safely extract the consumer group | |
let consumer_group = match request_body.consumer_group.as_ref() { | |
Some(group) => group, | |
None => { | |
let mut response = RemotingCommand::create_response_command(); | |
response.set_code_ref(ResponseCode::SystemError); | |
response.set_remark(Some("Consumer group is missing".to_string())); | |
return Some(response); | |
} | |
}; | |
// Safely extract the client ID | |
let client_id = match request_body.client_id.as_ref() { | |
Some(id) => id, | |
None => { | |
let mut response = RemotingCommand |
|
response.set_code_ref(ResponseCode::SystemError); | |
response.set_remark(Some("Client ID is missing".to_string())); | |
return Some(response); | |
} | |
}; | |
self.broker_runtime_inner | |
.rebalance_lock_manager() | |
.unlock_batch( | |
consumer_group, | |
&request_body.mq_set, | |
client_id, | |
); | |
} |
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs
around lines 97 to 110, the code uses unwrap() on potentially failing calls like
get_body(), decode(), and on Option fields consumer_group and client_id, which
can cause panics. Refactor to handle these errors gracefully by checking if
get_body() returns Some, handling decode errors with proper error propagation or
logging, and verifying that consumer_group and client_id are Some before using
them, returning an error or skipping the unlock_batch call if any are None.
let mut config = SubscriptionGroupConfig::decode(request.get_body().unwrap()); | ||
if let Ok(config) = config.as_mut() { | ||
self.broker_runtime_inner | ||
.subscription_group_manager_mut() | ||
.update_subscription_group_config(config) | ||
} | ||
response.set_code_ref(ResponseCode::Success); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix error handling to prevent panics and incorrect success responses.
The current implementation has two critical issues:
- Line 63 uses
unwrap()
which will panic if the request body isNone
- The response returns success even when the config decode fails
Apply this fix to handle errors properly:
- let mut config = SubscriptionGroupConfig::decode(request.get_body().unwrap());
- if let Ok(config) = config.as_mut() {
- self.broker_runtime_inner
- .subscription_group_manager_mut()
- .update_subscription_group_config(config)
- }
- response.set_code_ref(ResponseCode::Success);
+ let body = match request.get_body() {
+ Some(body) => body,
+ None => {
+ response.set_code_ref(ResponseCode::SystemError);
+ response.set_remark(Some("Request body is empty".to_string()));
+ return Some(response);
+ }
+ };
+
+ let mut config = match SubscriptionGroupConfig::decode(body) {
+ Ok(config) => config,
+ Err(e) => {
+ response.set_code_ref(ResponseCode::SystemError);
+ response.set_remark(Some(format!("Failed to decode subscription group config: {:?}", e)));
+ return Some(response);
+ }
+ };
+
+ self.broker_runtime_inner
+ .subscription_group_manager_mut()
+ .update_subscription_group_config(&mut config);
+ response.set_code_ref(ResponseCode::Success);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before c 9E88 ommitting. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let mut config = SubscriptionGroupConfig::decode(request.get_body().unwrap()); | |
if let Ok(config) = config.as_mut() { | |
self.broker_runtime_inner | |
.subscription_group_manager_mut() | |
.update_subscription_group_config(config) | |
} | |
response.set_code_ref(ResponseCode::Success); | |
let body = match request.get_body() { | |
Some(body) => body, | |
None => { | |
response.set_code_ref(ResponseCode::SystemError); | |
response.set_remark(Some("Request body is empty".to_string())); | |
return Some(response); | |
} | |
}; | |
let mut config = match SubscriptionGroupConfig::decode(body) { | |
Ok(config) => config, | |
Err(e) => { | |
response.set_code_ref(ResponseCode::SystemError); | |
response.set_remark(Some(format!( | |
"Failed to decode subscription group config: {:?}", | |
e | |
))); | |
return Some(response); | |
} | |
}; | |
self.broker_runtime_inner | |
.subscription_group_manager_mut() | |
.update_subscription_group_config(&mut config); | |
response.set_code_ref(ResponseCode::Success); |
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 63-70: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L63-L70
Added lines #L63 - L70 were not covered by tests
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs
lines 63 to 69, avoid using unwrap() on request.get_body() to prevent panics
when the body is None. Instead, check if the body exists before decoding. Also,
only set the response code to Success if decoding the config succeeds;
otherwise, set an appropriate error response code. This ensures proper error
handling and prevents incorrect success responses.
) { | ||
Ok(final_attributes) => final_attributes, | ||
Err(_err) => { | ||
todo!("deal runtime exception") | ||
} | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace todo!() with proper error handling to prevent panics.
The todo!()
macro at line 94 will cause a panic in production if an error occurs during attribute merging.
Replace the todo with proper error handling:
) {
Ok(final_attributes) => final_attributes,
- Err(_err) => {
- todo!("deal runtime exception")
- }
+ Err(err) => {
+ warn!("Failed to alter subscription group attributes for {}: {:?}",
+ config.group_name(), err);
+ // Return early without updating to maintain consistency
+ return;
+ }
};
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
) { | |
Ok(final_attributes) => final_attributes, | |
Err(_err) => { | |
todo!("deal runtime exception") | |
} | |
}; | |
) { | |
Ok(final_attributes) => final_attributes, | |
Err(err) => { | |
warn!("Failed to alter subscription group attributes for {}: {:?}", | |
config.group_name(), err); | |
// Return early without updating to maintain consistency | |
F438 return; | |
} | |
}; |
🤖 Prompt for AI Agents
In rocketmq-broker/src/subscription/manager/subscription_group_manager.rs around
lines 91 to 96, replace the todo!() macro used for error handling with proper
error handling logic to avoid panics in production. Instead of panicking, handle
the error gracefully by logging the error or returning a Result with an
appropriate error variant, ensuring the function can recover or propagate the
error safely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@578223592 Please fix CI error thanks
🔊@578223592 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
- 使用 contains_key 替代 get 方法来检查订阅组是否存在,提高代码效率 - 在 SubscriptionGroupAttributes 中添加 clippy::redundant_closure 注解,忽略冗余闭包警告
🔊@578223592 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)
90-95
:⚠️ Potential issueCritical: Replace todo!() with proper error handling to prevent panics.
The
todo!()
macro will cause a panic in production if an error occurs during attribute merging, which could bring down the broker.Replace the todo with proper error handling:
) { Ok(final_attributes) => final_attributes, - Err(_err) => { - todo!("deal runtime exception") - } + Err(err) => { + warn!("Failed to alter subscription group attributes for {}: {:?}", + config.group_name(), err); + // Return early without updating to maintain consistency + return; + } };
🧹 Nitpick comments (1)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)
98-109
: Optimize: Avoid unnecessary cloning in put operation.The TODO comment correctly identifies that cloning the config could be avoided for better performance.
Consider refactoring to use references or implement a more efficient insertion strategy:
- match self.put_subscription_group_config(config.clone()) { - //todo avoid clone + let group_name = config.group_name().clone(); + match self.put_subscription_group_config_by_ref(config) { Some(old) => { info!( "update subscription group config, old: {:?} new: {:?}", - old, config, + old, config ); } None => { - info!("create new subscription group, {:?}", config) + info!("create new subscription group, {:?}", config); } }You would need to implement a method that takes ownership or uses references appropriately.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs
(4 hunks)rocketmq-common/src/common/attribute/subscription_group_attributes.rs
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- rocketmq-common/src/common/attribute/subscription_group_attributes.rs
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build
- GitHub Check: test
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: auto-approve
🔇 Additional comments (5)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (5)
22-23
: LGTM - Import statements are appropriate.The new imports for
AttributeUtil
andSubscriptionGroupAttributes
are correctly added to support the subscription group attribute management functionality.
45-45
: LGTM - Data version tracking properly implemented.The addition of the
data_version
field with theMessageStore
constraint and proper initialization enables version tracking for subscription group configurations, which is essential for consistency management.Also applies to: 48-51, 60-60
67-73
: LGTM - Clean separation of concerns.The public method properly delegates to the internal implementation and handles persistence, providing a clean interface for subscription group updates.
113-130
: LGTM - Helper methods are well-structured.The
request
andcurrent
methods provide clean abstractions for extracting subscription group attributes from different sources.
145-152
: LGTM - Data version management is correctly implemented.The version update logic properly integrates with the message store's state machine version for consistency tracking.
@@ -212,6 +312,7 @@ | |||
#[derive(Debug, Clone, Default, Serialize, Deserialize)] | |||
#[serde(rename_all = "camelCase")] | |||
pub(crate) struct SubscriptionGroupWrapper { | |||
//todo dashmap to concurrent safe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Address concurrent safety concern.
The TODO comment highlights a potential concurrency issue with using HashMap
instead of a thread-safe alternative.
Consider replacing the HashMap
with DashMap
for better concurrent access:
use dashmap::DashMap;
- //todo dashmap to concurrent safe
- subscription_group_table: HashMap<CheetahString, SubscriptionGroupConfig>,
+ subscription_group_table: DashMap<CheetahString, SubscriptionGroupConfig>,
This would eliminate the need for explicit locking in many operations and improve performance under concurrent access patterns.
Would you like me to help implement the migration to DashMap across the codebase?
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In rocketmq-broker/src/subscription/manager/subscription_group_manager.rs at
line 315, the current use of HashMap is not thread-safe and may cause
concurrency issues. Replace the HashMap with DashMap to ensure concurrent safety
and improve performance by avoiding explicit locking. Update all related code to
use DashMap's API for concurrent access and mutation accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #3295
Brief Description
How Did You Test This Change?
Summary by CodeRabbit