8000 [ISSUE #3295]add updateAndSubscripGroup func by 578223592 · Pull Request #3293 · mxsm/rocketmq-rust · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[ISSUE #3295]add updateAndSubscripGroup func #3293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 25, 2025

Conversation

578223592
Copy link
Contributor
@578223592 578223592 commented May 20, 2025
  • 新增 SubscriptionGroupHandler 处理订阅组相关请求
  • 实现 update_and_create_subscription_group 方法处理更新和创建订阅组
  • 添加 SubscriptionGroupAttributes 和相关属性处理
  • 修改 SubscriptionGroupManager 支持更新订阅组配置

Which Issue(s) This PR Fixes(Closes)

Fixes #3295

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features
    • Added support for updating and creating subscription group configurations.
    • Introduced batch unlocking of message queues for improved subscription group management.
  • Enhancements
    • Subscription group updates now merge attributes and track versioning for better configuration management.
  • Documentation
    • Added comments and module documentation for new features and future improvements.

- 新增 SubscriptionGroupHandler 处理订阅组相关请求
- 实现 update_and_create_subscription_group 方法处理更新和创建订阅组
- 添加 SubscriptionGroupAttributes 和相关属性处理
- 修改 SubscriptionGroupManager 支持更新订阅组配置
Copy link
Contributor
coderabbitai bot commented May 20, 2025

"""

Walkthrough

A new feature for updating and creating subscription group configurations was added to the RocketMQ broker. This includes a new handler for subscription group operations, updates to the admin processor for request routing, enhancements to the subscription group manager for attribute merging and versioning, and supporting attribute structures.

Changes

File(s) / Path(s) Change Summary
rocketmq-broker/src/processor/admin_broker_processor.rs Added SubscriptionGroupHandler field, initialized in constructor, and routed new request code to handler.
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs Introduced SubscriptionGroupHandler with async methods for updating/creating subscription groups and unlocking.
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs Added attribute-aware config update, versioning, and helper methods to the manager.
rocketmq-common/src/common/attribute.rs Declared new public module subscription_group_attributes.
rocketmq-common/src/common/attribute/subscription_group_attributes.rs Added SubscriptionGroupAttributes struct with static attribute map accessor.
rocketmq-common/src/utils/file_utils.rs Added a comment about improving global lock usage.
rocketmq-error/src/lib.rs Simplified type alias for RocketMQResult<T>.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant AdminBrokerProcessor
    participant SubscriptionGroupHandler
    participant SubscriptionGroupManager

    Client->>AdminBrokerProcessor: Send UpdateAndCreateSubscriptionGroup request
    AdminBrokerProcessor->>SubscriptionGroupHandler: update_and_create_subscription_group(request)
    SubscriptionGroupHandler->>SubscriptionGroupManager: update_subscription_group_config(config)
    SubscriptionGroupManager-->>SubscriptionGroupHandler: Update config & version
    SubscriptionGroupHandler-->>AdminBrokerProcessor: Success response
    AdminBrokerProcessor-->>Client: Return response
Loading

Assessment against linked issues

Objective Addressed Explanation
Implement updateAndSubscripGroup functionality (#3295)

Poem

A rabbit hops with code so neat,
Subscription groups now can't be beat!
With handlers new and configs bright,
Attributes merged just right.
Versioned updates, all in flow—
RocketMQ, onward you go!
🐇✨
"""

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@rocketmq-rust-bot
Copy link
Collaborator

🔊@5782235 8000 92 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
codecov bot commented May 20, 2025

Codecov Report

Attention: Patch coverage is 0.64103% with 155 lines in your changes missing coverage. Please review.

Project coverage is 26.40%. Comparing base (cfea1ca) to head (dc68b48).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...subscription/manager/subscription_group_manager.rs 0.00% 74 Missing ⚠️
...min_broker_processor/subscription_group_handler.rs 0.00% 72 Missing ⚠️
...tmq-broker/src/processor/admin_broker_processor.rs 0.00% 6 Missing ⚠️
.../common/attribute/subscription_group_attributes.rs 0.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3293      +/-   ##
==========================================
- Coverage   26.43%   26.40%   -0.03%     
==========================================
  Files         526      528       +2     
  Lines       74229    74413     +184     
==========================================
+ Hits        19619    19648      +29     
- Misses      54610    54765     +155     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@mxsm
Copy link
Owner
mxsm commented May 20, 2025

@578223592 Create ISSUE to related this PR

@578223592
Copy link
Contributor Author

@578223592 Create ISSUE to related this PR

thanks for remind.I would create a issue when i am ready for this pr !

@rocketmq-rust-robot rocketmq-rust-robot added the feature🚀 Suggest an idea for this project. label May 21, 2025
caicai added 2 commits May 25, 2025 10:52
- 在 SubscriptionGroupManager 中添加 data_version 字段,用于管理数据版本
- 修改 update_subscription_group_config 方法,使其能够更新数据版本
- 优化 update_data_version 方法,根据消息存储的状态机版本生成新版本
- 调整代码结构,提高可维护性和可扩展性
- 删除了 ConfigManager trait 定义中的多余空行- 优化了代码结构,提高了代码的可读性和整洁性
@rocketmq-rust-bot
Copy link
Collaborator

🔊@578223592 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

@578223592 578223592 marked this pull request as ready for review May 25, 2025 03:01
…ashMap::new()作为参数传递给 get_or_init 方法

- 简化了代码结构,提高了可读性
@rocketmq-rust-bot
Copy link
Collaborator

🔊@578223592 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor
@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (4)
rocketmq-common/src/common/attribute/subscription_group_attributes.rs (1)

12-15: Consider documenting the empty HashMap initialization.

The all() method currently returns an empty HashMap. If this is intentional for the initial implementation, consider adding a comment to clarify the future intent.

 impl SubscriptionGroupAttributes {
+    /// Returns all available subscription group attributes.
+    /// Currently empty - attributes will be added in future implementations.
     pub fn all() -> &'static HashMap<CheetahString, Arc<dyn Attribute>> {
🧰 Tools
🪛 GitHub Actions: CI

[error] 14-14: Clippy lint 'redundant_closure': redundant closure used in 'ALL.get_or_init(|| HashMap::new())'. Suggest replacing closure with function itself: 'HashMap::new'.

rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (1)

80-86: Reminder: Implement metrics tracking for monitoring.

The TODO comment indicates that metrics for tracking invocation status and execution time need to be implemented.

Would you like me to generate the metrics implementation code or create an issue to track this task?

rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (2)

99-110: Consider avoiding the clone for better performance.

The comment indicates awareness of the performance impact of cloning the entire config.

Consider refactoring to avoid the clone by either:

  1. Modifying put_subscription_group_config to accept a reference and clone internally only if needed
  2. Using Arc for the config if it's read more often than modified

Example approach:

// Option 1: Clone only the old value for logging
match self.subscription_group_wrapper
    .lock()
    .subscription_group_table
    .insert(config.group_name().into(), config.clone())
{
    Some(old) => {
        info!(
            "update subscription group config, old: {:?} new: {:?}",
            old, config,
        );
    }
    None => {
        info!("create new subscription group, {:?}", config)
    }
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 99-99: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L99
Added line #L99 was not covered by tests


[warning] 101-103: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L101-L103
Added lines #L101 - L103 were not covered by tests


[warning] 108-108: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L108
Added line #L108 was not covered by tests


316-317: Consider using DashMap for better concurrent performance.

The TODO correctly identifies that DashMap would provide better concurrent access patterns than Mutex<HashMap>.

DashMap would eliminate the need for locking the entire map during reads/writes, improving performance under high concurrency. However, consider these trade-offs:

  • DashMap has slightly higher memory overhead
  • Individual operations are atomic, but multi-step operations still need external synchronization
  • Migration should be done carefully to maintain consistency

Would you like me to create an issue to track this architectural improvement?

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d63171c and 9ba4be1.

📒 Files selected for processing (7)
  • rocketmq-broker/src/processor/admin_broker_processor.rs (4 hunks)
  • rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (1 hunks)
  • rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (4 hunks)
  • rocketmq-common/src/common/attribute.rs (1 hunks)
  • rocketmq-common/src/common/attribute/subscription_group_attributes.rs (1 hunks)
  • rocketmq-common/src/utils/file_utils.rs (1 hunks)
  • rocketmq-error/src/lib.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
rocketmq-broker/src/processor/admin_broker_processor.rs (2)
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (1)
  • new (42-46)
rocketmq-broker/src/broker_runtime.rs (1)
  • new (128-244)
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (5)
rocketmq-remoting/src/protocol/remoting_command.rs (2)
  • code (592-594)
  • create_response_command (220-224)
rocketmq-broker/src/broker_runtime.rs (1)
  • message_store (1620-1622)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (3)
  • new (52-62)
  • request (114-119)
  • decode (190-212)
rocketmq-broker/src/processor/admin_broker_processor.rs (1)
  • new (54-72)
rocketmq-remoting/src/net/channel.rs (1)
  • remote_address (82-84)
🪛 GitHub Actions: CI
rocketmq-common/src/common/attribute/subscription_group_attributes.rs

[error] 14-14: Clippy lint 'redundant_closure': redundant closure used in 'ALL.get_or_init(|| HashMap::new())'. Suggest replacing closure with function itself: 'HashMap::new'.

🪛 GitHub Check: codecov/patch
rocketmq-broker/src/processor/admin_broker_processor.rs

[warning] 61-62: rocketmq-broker/src/processor/admin_broker_processor.rs#L61-L62
Added lines #L61 - L62 were not covered by tests


[warning] 69-69: rocketmq-broker/src/processor/admin_broker_processor.rs#L69
Added line #L69 was not covered by tests


[warning] 182-184: rocketmq-broker/src/processor/admin_broker_processor.rs#L182-L184
Added lines #L182 - L184 were not covered by tests

rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs

[warning] 42-46: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L42-L46
Added lines #L42 - L46 were not covered by tests


[warning] 48-55: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L48-L55
Added lines #L48 - L55 were not covered by tests


[warning] 57-57: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L57
Added line #L57 was not covered by tests


[warning] 59-61: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L59-L61
Added lines #L59 - L61 were not covered by tests


[warning] 63-70: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L63-L70
Added lines #L63 - L70 were not covered by tests


[warning] 72-75: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L72-L75
Added lines #L72 - L75 were not covered by tests


[warning] 78-78: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L78
Added line #L78 was not covered by tests


[warning] 87-88: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L87-L88
Added lines #L87 - L88 were not covered by tests


[warning] 90-119: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L90-L119
Added lines #L90 - L119 were not covered by tests


[warning] 121-125: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L121-L125
Added lines #L121 - L125 were not covered by tests


[warning] 127-129: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L127-L129
Added lines #L127 - L129 were not covered by tests


[warning] 134-135: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L134-L135
Added lines #L134 - L135 were not covered by tests

rocketmq-broker/src/subscription/manager/subscription_group_manager.rs

[warning] 60-60: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L60
Added line #L60 was not covered by tests


[warning] 67-73: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L67-L73
Added lines #L67 - L73 were not covered by tests


[warning] 75-94: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L75-L94
Added lines #L75 - L94 were not covered by tests


[warning] 97-97: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L97
Added line #L97 was not covered by tests


[warning] 99-99: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L99
Added line #L99 was not covered by tests


[warning] 101-103: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L101-L103
Added lines #L101 - L103 were not covered by tests


[warning] 108-108: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L108
Added line #L108 was not covered by tests


[warning] 112-119: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L112-L119
Added lines #L112 - L119 were not covered by tests


[warning] 121-126: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L121-L126
Added lines #L121 - L126 were not covered by tests


[warning] 128-129: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L128-L129
Added lines #L128 - L129 were not covered by tests


[warning] 131-131: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L131
Added line #L131 was not covered by tests


[warning] 133-142: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L133-L142
Added lines #L133 - L142 were not covered by tests


[warning] 144-144: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L144
Added line #L144 was not covered by tests


[warning] 146-149: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L146-L149
Added lines #L146 - L149 were not covered by tests


[warning] 151-153: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L151-L153
Added lines #L151 - L153 were not covered by tests

🔇 Additional comments (7)
rocketmq-common/src/common/attribute.rs (1)

26-26: LGTM! Module declaration follows established patterns.

The addition of the subscription_group_attributes module declaration is consistent with the existing module organization and properly exposes the new functionality.

rocketmq-common/src/utils/file_utils.rs (1)

54-54: Good documentation of potential improvement.

The TODO comment appropriately documents a known limitation and potential optimization opportunity for future work.

rocketmq-error/src/lib.rs (1)

35-35: LGTM! Proper simplification of type alias.

Removing the explicit std::result:: prefix is appropriate since Result is available in the prelude. This improves code readability.

rocketmq-broker/src/processor/admin_broker_processor.rs (2)

32-32: LGTM! Proper module integration.

The import and module declaration follow the established pattern for other handlers in the processor.

Also applies to: 39-39


181-185:

✅ Verification successful

Request code handling looks correct.

The new UpdateAndCreateSubscriptionGroup request code handling follows the same async pattern as other handlers. The method delegation to subscription_group_handler.update_and_create_subscription_group() is appropriate.

Verify that the method signature matches the expected pattern:


🏁 Script executed:

#!/bin/bash
# Description: Verify the handler method signature matches other handler methods

# Check the method signature in the handler
ast-grep --pattern 'pub async fn update_and_create_subscription_group(
  &mut self,
  channel: Channel,
  ctx: ConnectionHandlerContext,
  request_code: RequestCode,
  request: RemotingCommand,
) -> Option<RemotingCommand> {
  $$$
}'

Length of output: 235


🏁 Script executed:

#!/bin/bash
# Search for the handler method definition across the codebase
echo "Looking for exact async handler signature..."
rg -n "pub async fn update_and_create_subscription_group" -g "*.rs"

echo -e "\nShowing any occurrences of the method name with context..."
rg -n "update_and_create_subscription_group" -g "*.rs" -C3

Length of output: 2008


Handler signature and delegation confirmed
Verified that update_and_create_subscription_group is declared as

pub async fn update_and_create_subscription_group(
    &mut self,
    _channel: Channel,
    _ctx: ConnectionHandlerContext,
    _request_code: RequestCode,
    _request: RemotingCommand,
) -> Option<RemotingCommand> {}

– matching the async pattern and return type of other handlers (underscored parameters silence unused‐variable warnings). No further changes required.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 182-184: rocketmq-broker/src/processor/admin_broker_processor.rs#L182-L184
Added lines #L182 - L184 were not covered by tests

rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs (1)

42-46: LGTM!

The constructor implementation is straightforward and follows Rust conventions.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 42-46: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L42-L46
Added lines #L42 - L46 were not covered by tests

rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)

67-73: LGTM!

Good separation of concerns between the public API and internal implementation.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 67-73: rocketmq-broker/src/subscription/manager/subscription_group_manager.rs#L67-L73
Added lines #L67 - L73 were not covered by tests

@@ -43,6 +45,8 @@
consumer_request_handler: ConsumerRequestHandler<MS>,
offset_request_handler: OffsetRequestHandler<MS>,
batch_mq_handler: BatchMqHandler<MS>,
subscription_group_handler: SubscriptionGroupHandler<MS>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Handler integration follows established patterns.

The addition of the subscription_group_handler field and its initialization in the constructor are consistent with how other handlers are integrated in this processor.

However, ensure that test coverage is added for the new handler integration:

Also applies to: 61-62, 69-69


🏁 Script executed:

#!/bin/bash
# Description: Check for existing tests and verify the handler method exists

# Check if tests exist for the AdminBrokerProcessor
fd -e rs -x grep -l "AdminBrokerProcessor" {} \; | grep -i test

# Verify the handler method exists
ast-grep --pattern 'impl $_ SubscriptionGroupHandler$_ {
  $$$
  update_and_create_subscription_group($$$) {
    $$$
  }
  $$$
}'

Length of output: 198


I’m not seeing any definitions or tests for update_and_create_subscription_group yet. Let’s double-check across the whole repo:


🏁 Script executed:

#!/usr/bin/env bash
set -euo pipefail

# 1. Locate the handler method definition
rg -n "fn update_and_create_subscription_group" .

# 2. Locate any impl blocks of the SubscriptionGroupHandler trait
ast-grep --pattern $'impl<$$$> SubscriptionGroupHandler<$_> for $_ {\n  $$$\n}' 

# 3. Search for tests referencing this handler method
rg -n "update_and_create_subscription_group" -g "*test*.rs" .

Length of output: 355


Add unit tests for SubscriptionGroupHandler.update_and_create_subscription_group

The integration of subscription_group_handler follows established patterns and the handler method is correctly defined here:

  • rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs:48
    pub async fn update_and_create_subscription_group(...)

However, I didn’t find any existing tests that exercise this method. Please add unit tests to cover its behavior. For example:

  • Create a new test module in subscription_group_handler.rs or a dedicated file under tests/
  • Write async tests that initialize a SubscriptionGroupHandler, invoke update_and_create_subscription_group, and verify both update and creation branches

Fixes required before merging.

🤖 Prompt for AI Agents
In rocketmq-broker/src/processor/admin_broker_processor.rs around lines 48, and
also in subscription_group_handler.rs at line 48 where the
update_and_create_subscription_group method is defined, there are no existing
unit tests covering this handler method. To fix this, add a new test module
either in subscription_group_handler.rs or in a dedicated test file under
tests/. Write async unit tests that instantiate SubscriptionGroupHandler, call
update_and_create_subscription_group, and verify its behavior for both updating
and creating subscription groups to ensure proper coverage before merging.

Comment on lines +97 to +110
let mut request_body = UnlockBatchRequestBody::decode(request.get_body().unwrap()).unwrap();
if request_body.only_this_broker
|| !self
.broker_runtime_inner
.broker_config()
.lock_in_strict_mode
{
self.broker_runtime_inner
.rebalance_lock_manager()
.unlock_batch(
request_body.consumer_group.as_ref().unwrap(),
&request_body.mq_set,
request_body.client_id.as_ref().unwrap(),
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix error handling to prevent panics from unwrap() calls.

Multiple unwrap() calls could cause panics:

  • Line 97: Both decode and get_body could fail
  • Lines 107-109: Option fields might be None

Apply this fix to handle errors gracefully:

-        let mut request_body = UnlockBatchRequestBody::decode(request.get_body().unwrap()).unwrap();
+        let body = match request.get_body() {
+            Some(body) => body,
+            None => {
+                let mut response = RemotingCommand::create_response_command();
+                response.set_code_ref(ResponseCode::SystemError);
+                response.set_remark(Some("Request body is empty".to_string()));
+                return Some(response);
+            }
+        };
+        
+        let mut request_body = match UnlockBatchRequestBody::decode(body) {
+            Ok(body) => body,
+            Err(e) => {
+                let mut response = RemotingCommand::create_response_command();
+                response.set_code_ref(ResponseCode::SystemError);
+                response.set_remark(Some(format!("Failed to decode unlock batch request: {:?}", e)));
+                return Some(response);
+            }
+        };
         
         if request_body.only_this_broker
             || !self
                 .broker_runtime_inner
                 .broker_config()
                 .lock_in_strict_mode
         {
+            let consumer_group = match request_body.consumer_group.as_ref() {
+                Some(group) => group,
+                None => {
+                    let mut response = RemotingCommand::create_response_command();
+                    response.set_code_ref(ResponseCode::SystemError);
+                    response.set_remark(Some("Consumer group is missing".to_string()));
+                    return Some(response);
+                }
+            };
+            let client_id = match request_body.client_id.as_ref() {
+                Some(id) => id,
+                None => {
+                    let mut response = RemotingCommand::create_response_command();
+                    response.set_code_ref(ResponseCode::SystemError);
+                    response.set_remark(Some("Client ID is missing".to_string()));
+                    return Some(response);
+                }
+            };
             self.broker_runtime_inner
                 .rebalance_lock_manager()
                 .unlock_batch(
-                    request_body.consumer_group.as_ref().unwrap(),
+                    consumer_group,
                     &request_body.mq_set,
-                    request_body.client_id.as_ref().unwrap(),
+                    client_id,
                 );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut request_body = UnlockBatchRequestBody::decode(request.get_body().unwrap()).unwrap();
if request_body.only_this_broker
|| !self
.broker_runtime_inner
.broker_config()
.lock_in_strict_mode
{
self.broker_runtime_inner
.rebalance_lock_manager()
.unlock_batch(
request_body.consumer_group.as_ref().unwrap(),
&request_body.mq_set,
request_body.client_id.as_ref().unwrap(),
);
// Safely extract the raw request body
let body = match request.get_body() {
Some(body) => body,
None => {
let mut response = RemotingCommand::create_response_command();
response.set_code_ref(ResponseCode::SystemError);
response.set_remark(Some("Request body is empty".to_string()));
return Some(response);
}
};
// Safely decode the UnlockBatchRequestBody
let mut request_body = match UnlockBatchRequestBody::decode(body) {
Ok(body) => body,
Err(e) => {
let mut response = RemotingCommand::create_response_command();
response.set_code_ref(ResponseCode::SystemError);
response.set_remark(Some(format!(
"Failed to decode unlock batch request: {:?}",
e
)));
return Some(response);
}
};
if request_body.only_this_broker
|| !self
.broker_runtime_inner
.broker_config()
.lock_in_strict_mode
{
// Safely extract the consumer group
let consumer_group = match request_body.consumer_group.as_ref() {
Some(group) => group,
None => {
let mut response = RemotingCommand::create_response_command();
response.set_code_ref(ResponseCode::SystemError);
response.set_remark(Some("Consumer group is missing".to_string()));
return Some(response);
}
};
// Safely extract the client ID
let client_id = match request_body.client_id.as_ref() {
Some(id) => id,
None => {
let mut response = RemotingCommand::create_response_command();
response.set_code_ref(ResponseCode::SystemError);
response.set_remark(Some("Client ID is missing".to_string()));
return Some(response);
}
};
self.broker_runtime_inner
.rebalance_lock_manager()
.unlock_batch(
consumer_group,
&request_body.mq_set,
client_id,
);
}
🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs
around lines 97 to 110, the code uses unwrap() on potentially failing calls like
get_body(), decode(), and on Option fields consumer_group and client_id, which
can cause panics. Refactor to handle these errors gracefully by checking if
get_body() returns Some, handling decode errors with proper error propagation or
logging, and verifying that consumer_group and client_id are Some before using
them, returning an error or skipping the unlock_batch call if any are None.

Comment on lines +63 to +69
let mut config = SubscriptionGroupConfig::decode(request.get_body().unwrap());
if let Ok(config) = config.as_mut() {
self.broker_runtime_inner
.subscription_group_manager_mut()
.update_subscription_group_config(config)
}
response.set_code_ref(ResponseCode::Success);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix error handling to prevent panics and incorrect success responses.

The current implementation has two critical issues:

  1. Line 63 uses unwrap() which will panic if the request body is None
  2. The response returns success even when the config decode fails

Apply this fix to handle errors properly:

-        let mut config = SubscriptionGroupConfig::decode(request.get_body().unwrap());
-        if let Ok(config) = config.as_mut() {
-            self.broker_runtime_inner
-                .subscription_group_manager_mut()
-                .update_subscription_group_config(config)
-        }
-        response.set_code_ref(ResponseCode::Success);
+        let body = match request.get_body() {
+            Some(body) => body,
+            None => {
+                response.set_code_ref(ResponseCode::SystemError);
+                response.set_remark(Some("Request body is empty".to_string()));
+                return Some(response);
+            }
+        };
+        
+        let mut config = match SubscriptionGroupConfig::decode(body) {
+            Ok(config) => config,
+            Err(e) => {
+                response.set_code_ref(ResponseCode::SystemError);
+                response.set_remark(Some(format!("Failed to decode subscription group config: {:?}", e)));
+                return Some(response);
+            }
+        };
+        
+        self.broker_runtime_inner
+            .subscription_group_manager_mut()
+            .update_subscription_group_config(&mut config);
+        response.set_code_ref(ResponseCode::Success);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before c 9E88 ommitting. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut config = SubscriptionGroupConfig::decode(request.get_body().unwrap());
if let Ok(config) = config.as_mut() {
self.broker_runtime_inner
.subscription_group_manager_mut()
.update_subscription_group_config(config)
}
response.set_code_ref(ResponseCode::Success);
let body = match request.get_body() {
Some(body) => body,
None => {
response.set_code_ref(ResponseCode::SystemError);
response.set_remark(Some("Request body is empty".to_string()));
return Some(response);
}
};
let mut config = match SubscriptionGroupConfig::decode(body) {
Ok(config) => config,
Err(e) => {
response.set_code_ref(ResponseCode::SystemError);
response.set_remark(Some(format!(
"Failed to decode subscription group config: {:?}",
e
)));
return Some(response);
}
};
self.broker_runtime_inner
.subscription_group_manager_mut()
.update_subscription_group_config(&mut config);
response.set_code_ref(ResponseCode::Success);
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 63-70: rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs#L63-L70
Added lines #L63 - L70 were not covered by tests

🤖 Prompt for AI Agents
In
rocketmq-broker/src/processor/admin_broker_processor/subscription_group_handler.rs
lines 63 to 69, avoid using unwrap() on request.get_body() to prevent panics
when the body is None. Instead, check if the body exists before decoding. Also,
only set the response code to Success if decoding the config succeeds;
otherwise, set an appropriate error response code. This ensures proper error
handling and prevents incorrect success responses.

Comment on lines +91 to +96
) {
Ok(final_attributes) => final_attributes,
Err(_err) => {
todo!("deal runtime exception")
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace todo!() with proper error handling to prevent panics.

The todo!() macro at line 94 will cause a panic in production if an error occurs during attribute merging.

Replace the todo with proper error handling:

         ) {
             Ok(final_attributes) => final_attributes,
-            Err(_err) => {
-                todo!("deal runtime exception")
-            }
+            Err(err) => {
+                warn!("Failed to alter subscription group attributes for {}: {:?}", 
+                      config.group_name(), err);
+                // Return early without updating to maintain consistency
+                return;
+            }
         };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
) {
Ok(final_attributes) => final_attributes,
Err(_err) => {
todo!("deal runtime exception")
}
};
) {
Ok(final_attributes) => final_attributes,
Err(err) => {
warn!("Failed to alter subscription group attributes for {}: {:?}",
config.group_name(), err);
// Return early without updating to maintain consistency
F438 return;
}
};
🤖 Prompt for AI Agents
In rocketmq-broker/src/subscription/manager/subscription_group_manager.rs around
lines 91 to 96, replace the todo!() macro used for error handling with proper
error handling logic to avoid panics in production. Instead of panicking, handle
the error gracefully by logging the error or returning a Result with an
appropriate error variant, ensuring the function can recover or propagate the
error safely.

Copy link
Owner
@mxsm mxsm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@578223592 Please fix CI error thanks

@rocketmq-rust-bot rocketmq-rust-bot requested a review from mxsm May 25, 2025 09:52
@rocketmq-rust-bot
Copy link
Collaborator

🔊@578223592 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

- 使用 contains_key 替代 get 方法来检查订阅组是否存在,提高代码效率
- 在 SubscriptionGroupAttributes 中添加 clippy::redundant_closure 注解,忽略冗余闭包警告
@rocketmq-rust-bot
Copy link
Collaborator

🔊@578223592 🚀Thanks for your contribution🎉!

💡CodeRabbit(AI) will review your code first🔥!

Note

🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥.

Copy link
Contributor
@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10000

Actionable comments posted: 1

♻️ Duplicate comments (1)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)

90-95: ⚠️ Potential issue

Critical: Replace todo!() with proper error handling to prevent panics.

The todo!() macro will cause a panic in production if an error occurs during attribute merging, which could bring down the broker.

Replace the todo with proper error handling:

         ) {
             Ok(final_attributes) => final_attributes,
-            Err(_err) => {
-                todo!("deal runtime exception")
-            }
+            Err(err) => {
+                warn!("Failed to alter subscription group attributes for {}: {:?}", 
+                      config.group_name(), err);
+                // Return early without updating to maintain consistency
+                return;
+            }
         };
🧹 Nitpick comments (1)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (1)

98-109: Optimize: Avoid unnecessary cloning in put operation.

The TODO comment correctly identifies that cloning the config could be avoided for better performance.

Consider refactoring to use references or implement a more efficient insertion strategy:

-        match self.put_subscription_group_config(config.clone()) {
-            //todo avoid clone
+        let group_name = config.group_name().clone();
+        match self.put_subscription_group_config_by_ref(config) {
             Some(old) => {
                 info!(
                     "update subscription group config, old: {:?} new: {:?}",
-                    old, config,
+                    old, config
                 );
             }
             None => {
-                info!("create new subscription group, {:?}", config)
+                info!("create new subscription group, {:?}", config);
             }
         }

You would need to implement a method that takes ownership or uses references appropriately.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0acabbc and dc68b48.

📒 Files selected for processing (2)
  • rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (4 hunks)
  • rocketmq-common/src/common/attribute/subscription_group_attributes.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • rocketmq-common/src/common/attribute/subscription_group_attributes.rs
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: build
  • GitHub Check: test
  • GitHub Check: build (macos-latest, nightly)
  • GitHub Check: build (windows-latest, nightly)
  • GitHub Check: build (windows-latest, stable)
  • GitHub Check: build (macos-latest, stable)
  • GitHub Check: build (ubuntu-latest, stable)
  • GitHub Check: build (ubuntu-latest, nightly)
  • GitHub Check: auto-approve
🔇 Additional comments (5)
rocketmq-broker/src/subscription/manager/subscription_group_manager.rs (5)

22-23: LGTM - Import statements are appropriate.

The new imports for AttributeUtil and SubscriptionGroupAttributes are correctly added to support the subscription group attribute management functionality.


45-45: LGTM - Data version tracking properly implemented.

The addition of the data_version field with the MessageStore constraint and proper initialization enables version tracking for subscription group configurations, which is essential for consistency management.

Also applies to: 48-51, 60-60


67-73: LGTM - Clean separation of concerns.

The public method properly delegates to the internal implementation and handles persistence, providing a clean interface for subscription group updates.


113-130: LGTM - Helper methods are well-structured.

The request and current methods provide clean abstractions for extracting subscription group attributes from different sources.


145-152: LGTM - Data version management is correctly implemented.

The version update logic properly integrates with the message store's state machine version for consistency tracking.

@@ -212,6 +312,7 @@
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct SubscriptionGroupWrapper {
//todo dashmap to concurrent safe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Address concurrent safety concern.

The TODO comment highlights a potential concurrency issue with using HashMap instead of a thread-safe alternative.

Consider replacing the HashMap with DashMap for better concurrent access:

use dashmap::DashMap;

-    //todo dashmap to concurrent safe
-    subscription_group_table: HashMap<CheetahString, SubscriptionGroupConfig>,
+    subscription_group_table: DashMap<CheetahString, SubscriptionGroupConfig>,

This would eliminate the need for explicit locking in many operations and improve performance under concurrent access patterns.

Would you like me to help implement the migration to DashMap across the codebase?

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In rocketmq-broker/src/subscription/manager/subscription_group_manager.rs at
line 315, the current use of HashMap is not thread-safe and may cause
concurrency issues. Replace the HashMap with DashMap to ensure concurrent safety
and improve performance by avoiding explicit locking. Update all related code to
use DashMap's API for concurrent access and mutation accordingly.

Copy link
Collaborator
@rocketmq-rust-bot rocketmq-rust-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mxsm mxsm changed the title feat(broker): 添加更新和创建订阅组功能 [ISSUE #3295]add updateAndSubscripGroup func May 25, 2025
Copy link
Owner
@mxsm mxsm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mxsm mxsm merged commit 7497aa6 into mxsm:main May 25, 2025
15 of 20 checks passed
@rocketmq-rust-bot rocketmq-rust-bot added approved PR has approved and removed ready to review waiting-review waiting review this PR labels May 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI review first Ai review pr first approved PR has approved auto merge feature🚀 Suggest an idea for this project.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] add updateAndSubscripGroup func
4 participants
0