8000 [ISSUE #3315]🚀Implement GetNamesrvConfigCommand for name server tool💫 by mxsm · Pull Request #3333 · mxsm/rocketmq-rust · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[ISSUE #3315]🚀Implement GetNamesrvConfigCommand for name server tool💫 #3333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion rocketmq-client/src/admin/default_mq_admin_ext_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,16 @@
name_servers: Vec<CheetahString>,
) -> rocketmq_error::RocketMQResult<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>>
{
todo!()
Ok(self
.client_instance
.as_ref()
.unwrap()
.mq_client_api_impl
.as_ref()
.unwrap()
.get_name_server_config(Some(name_servers), self.timeout_millis)
.await?
.unwrap_or_default())

Check warning on line 669 in rocketmq-client/src/admin/default_mq_admin_ext_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/admin/default_mq_admin_ext_impl.rs#L660-L669

Added lines #L660 - L669 were not covered by tests
}

async fn resume_check_half_message(
Expand Down
71 changes: 71 additions & 0 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::collections::HashSet;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use cheetah_string::CheetahString;
Expand All @@ -36,11 +37,14 @@
use rocketmq_common::common::namesrv::top_addressing::TopAddressing;
use rocketmq_common::common::sys_flag::pull_sys_flag::PullSysFlag;
use rocketmq_common::common::topic::TopicValidator;
use rocketmq_common::utils::serde_json_utils::SerdeJsonUtils;
use rocketmq_common::MessageDecoder;
use rocketmq_error::client_broker_err;
use rocketmq_error::mq_client_err;
use rocketmq_error::ClientErr;
use rocketmq_error::MQBrokerErr;
use rocketmq_error::RocketMQResult;
use rocketmq_error::RocketmqError;
use rocketmq_error::RocketmqError::MQClientBrokerError;
use rocketmq_remoting::base::connection_net_event::ConnectionNetEvent;
use rocketmq_remoting::clients::rocketmq_default_impl::RocketmqDefaultClient;
Expand Down Expand Up @@ -1746,6 +1750,73 @@
}
Ok(())
}

pub async fn get_name_server_config(
&self,
name_servers: Option<Vec<CheetahString>>,
timeout_millis: Duration,
) -> rocketmq_error::RocketMQResult<
Option<HashMap<CheetahString, HashMap<CheetahString, CheetahString>>>,
> {

Check warning on line 1760 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1754-L1760

Added lines #L1754 - L1760 were not covered by tests
// Determine which name servers to invoke
let invoke_name_servers = match name_servers {
Some(servers) if !servers.is_empty() => servers,
_ => self.remoting_client.get_name_server_address_list().to_vec(),

Check warning on line 1764 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1762-L1764

Added lines #L1762 - L1764 were not covered by tests
};

if invoke_name_servers.is_empty() {
return Ok(None);
}

Check warning on line 1769 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1767-L1769

Added lines #L1767 - L1769 were not covered by tests

// Create request command
let request = RemotingCommand::create_remoting_command(RequestCode::GetNamesrvConfig);
let mut config_map = HashMap::with_capacity(4);

Check warning on line 1773 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1772-L1773

Added lines #L1772 - L1773 were not covered by tests
// Iterate through each name server
for name_server in invoke_name_servers {

Check warning on line 1775 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1775

Added line #L1775 was not covered by tests
// Make synchronous call with timeout
let response = self
.remoting_client
.invoke_async(
Some(&name_server),
request.clone(),
timeout_millis.as_millis() as u64,
)
.await?;

Check warning on line 1784 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1777-L1784

Added lines #L1777 - L1784 were not covered by tests
// Check response code
match ResponseCode::from(response.code()) {

Check warning on line 1786 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1786

Added line #L1786 was not covered by tests
ResponseCode::Success => {
// Parse response body as properties
match response.get_body() {
Some(body) => {
let body_str = String::from_utf8_lossy(body.as_ref()).to_string();

Check warning on line 1791 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1789-L1791

Added lines #L1789 - L1791 were not covered by tests
// if body_str contains =, return from Java version
let properties = if body_str.contains('=') {
mix_all::string_to_properties(&body_str).unwrap_or_default()

Check warning on line 1794 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1793-L1794

Added lines #L1793 - L1794 were not covered by tests
} else {
SerdeJsonUtils::from_json_str::<
HashMap<CheetahString, CheetahString>,
>(&body_str).expect("failed to parse JSON")

Check warning on line 1798 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1796-L1798

Added lines #L1796 - L1798 were not covered by tests
};
Comment on lines +1796 to +1799
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace expect() with proper error handling to prevent potential panic.

The expect() call on line 1798 can cause the application to panic if JSON parsing fails. This should be handled gracefully.

Apply this diff to improve error handling:

-                            } else {
-                                SerdeJsonUtils::from_json_str::<
-                                    HashMap<CheetahString, CheetahString>,
-                                >(&body_str).expect("failed to parse JSON")
-                            };
+                            } else {
+                                match SerdeJsonUtils::from_json_str::<
+                                    HashMap<CheetahString, CheetahString>,
+                                >(&body_str) {
+                                    Ok(json_props) => json_props,
+                                    Err(_) => {
+                                        return Err(RocketmqError::MQClientErr(ClientErr::new(
+                                            format!("Failed to parse JSON response from server {}", name_server)
+                                        )));
+                                    }
+                                }
+                            };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
SerdeJsonUtils::from_json_str::<
HashMap<CheetahString, CheetahString>,
>(&body_str).expect("failed to parse JSON")
};
} else {
match SerdeJsonUtils::from_json_str::<
HashMap<CheetahString, CheetahString>,
>(&body_str) {
Ok(json_props) => json_props,
Err(_) => {
return Err(RocketmqError::MQClientErr(ClientErr::new(
format!("Failed to parse JSON response from server {}", name_server)
)));
}
}
};
🤖 Prompt for AI Agents
In rocketmq-client/src/implementation/mq_client_api_impl.rs around lines 1796 to
1799, the code uses expect() on the result of JSON parsing, which can cause a
panic if parsing fails. Replace the expect() call with proper error handling by
matching on the Result returned from from_json_str, and handle the Err case
gracefully, such as returning an error or logging it, to prevent the application
from panicking.


config_map.insert(name_server.clone(), properties);

Check warning on line 1801 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1801

Added line #L1801 was not covered by tests
}
None => {
return Err(RocketmqError::MQClientErr(ClientErr::new(
"Body is empty".to_string(),
)))

Check warning on line 1806 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1804-L1806

Added lines #L1804 - L1806 were not covered by tests
}
}
}
code => {
return Err(RocketmqError::MQClientErr(ClientErr::new_with_code(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string()),

Check warning on line 1813 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1810-L1813

Added lines #L1810 - L1813 were not covered by tests
)));
}
}
}
Ok(Some(config_map))
}

Check warning on line 1819 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1818-L1819

Added lines #L1818 - L1819 were not covered by tests
}

fn build_queue_offset_sorted_map(
Expand Down
3 changes: 2 additions & 1 deletion rocketmq-tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ rocketmq-error = { workspace = true }

serde.workspace = true
serde_json.workspace = true
tokio = {workspace = true}
tokio = { workspace = true }

cheetah-string = { workspace = true }
clap = { version = "4.5.38", features = ["derive"] }
tabled = { version = & 9E7A quot;0.19.0", features = ["derive"] }
futures = "0.3.31"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify the necessity of the futures dependency.

The futures dependency is added but doesn't appear to be explicitly used in the command implementation. Please confirm this dependency is required for the async operations.


🏁 Script executed:

#!/bin/bash
# Description: Check if futures crate is actually used in the tools module
# Expected: If futures is not used, it should be removed to avoid unnecessary dependencies

echo "Searching for futures usage in rocketmq-tools..."
rg -A 3 -B 3 "futures::" rocketmq-tools/
echo "---"
echo "Searching for use futures statements..."
rg -A 1 "use futures" rocketmq-tools/
echo "---"
echo "Checking if futures is used in any way..."
rg "futures" rocketmq-tools/ --type rust

Length of output: 440


Remove unused futures dependency

I searched the entire rocketmq-tools directory (no matches for futures::, use futures, or plain futures) and didn’t find any explicit usage of the futures crate. Unless you’re planning future work that relies on it, please remove this entry to keep dependencies lean. If it’s needed, add a code comment or demonstrate its use in this PR.

• File: rocketmq-tools/Cargo.toml (line 28)

- futures = "0.3.31"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
futures = "0.3.31"
🤖 Prompt for AI Agents
In rocketmq-tools/Cargo.toml at line 28, the futures dependency is declared but
not used anywhere in the rocketmq-tools codebase. Remove the futures = "0.3.31"
line from Cargo.toml to eliminate this unnecessary dependency and keep the
project dependencies clean.


[[bin]]
name = "rocketmq-admin-cli-rust"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;
use std::sync::Arc;

use cheetah_string::CheetahString;
Expand All @@ -22,6 +23,12 @@
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_error::RocketMQResult;
use rocketmq_remoting::runtime::RPCHook;
use tabled::settings::object::Rows;
use tabled::settings::Alignment;
use tabled::settings::Modify;
use tabled::settings::Style;
use tabled::Table;
use tabled::Tabled;

use crate::admin::default_mq_admin_ext::DefaultMQAdminExt;
use crate::commands::CommandExecute;
Expand Down Expand Up @@ -68,12 +75,57 @@
let server_list = self.parse_server_list();
if let Some(server_list) = server_list {
admin.start().await?;
let _ = admin.get_name_server_config(server_list).await;
let configs = admin.get_name_server_config(server_list).await?;
display_configs_with_table(&configs);
admin.shutdown().await;
return Ok(());

Check warning on line 81 in rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs#L78-L81

Added lines #L78 - L81 were not covered by tests
} else {
eprintln!("Please set the namesrvAddr parameter");
return Ok(());
}

unimplemented!("GetNamesrvConfigCommand is not implemented yet");
Ok(())
}

Check warning on line 87 in rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs#L86-L87

Added lines #L86 - L87 were not covered by tests
}

/// Configuration entry for table display
#[derive(Debug, Clone, Tabled)]
struct ConfigEntry {
#[tabled(rename = "Configuration Key")]
key: CheetahString,
#[tabled(rename = "Value")]
value: CheetahString,
}

/// Display configurations using tabled for formatted output
fn display_configs_with_table(
configs: &HashMap<CheetahString, HashMap<CheetahString, CheetahString>>,
) {
for (server_addr, properties) in configs {
println!("============Name server: {server_addr}============",);

Check warning on line 104 in rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs#L100-L104

Added lines #L100 - L104 were not covered by tests

// Convert properties to ConfigEntry vector
let mut config_entries: Vec<ConfigEntry> = properties
.iter()
.map(|(key, value)| ConfigEntry {
key: key.clone(),
value: value.clone(),
})
.collect();

Check warning on line 113 in rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs#L107-L113

Added lines #L107 - L113 were not covered by tests

// Sort by key for consistent output
config_entries.sort_by(|a, b| a.key.cmp(&b.key));

Check warning on line 116 in rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs#L116

Added line #L116 was not covered by tests

// Create and display table
if !config_entries.is_empty() {
let table = Table::new(&config_entries)
.with(Style::modern())
.with(Modify::new(Rows::new(1..)).with(Alignment::left()))
.to_string();

println!("{table}");
} else {
println!("No configuration found for this server.");
}
println!(); // Add blank line between servers

Check warning on line 129 in rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-tools/src/commands/namesrv_commands/get_namesrv_config_command.rs#L119-L129

Added lines #L119 - L129 were not covered by tests
}
}
Loading
0