8000 feat: configure subscribers_per_topic for momento pubsub workflow by krispraws · Pull Request #363 · iopsystems/rpc-perf · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: configure subscribers_per_topic for momento pubsub workflow #363

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions src/clients/pubsub/momento.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,21 @@ pub fn launch_subscribers(
if let Component::Topics(topics) = component {
let poolsize = topics.subscriber_poolsize();
let concurrency = topics.subscriber_concurrency();

if concurrency > 100 {
eprintln!("Momento sdk does not support concurrency values greater than 100.");
std::process::exit(1);
}
let num_topics = topics.topics().len();
let subscribers_per_topic = topics
.momento_subscribers_per_topic()
.unwrap_or(poolsize * concurrency);
if num_topics * subscribers_per_topic > poolsize * concurrency {
eprintln!("Not enough Momento clients to support the workload - adjust momento_subscribers_per_topic or increase subscriber_poolsize/subscriber_concurrency.");
std::process::exit(1);
}
let mut clients = Vec::<Arc<TopicClient>>::with_capacity(poolsize);
for _ in 0..poolsize {
let client = {
let _guard = runtime.enter();

// initialize the Momento topic client
if std::env::var("MOMENTO_API_KEY").is_err() {
eprintln!("environment variable `MOMENTO_API_KEY` is not set");
Expand All @@ -53,7 +63,7 @@ pub fn launch_subscribers(
std::process::exit(1);
}
};

let _guard = runtime.enter();
match TopicClient::builder()
.configuration(LowLatency::v1())
.credential_provider(credential_provider)
Expand All @@ -66,15 +76,20 @@ pub fn launch_subscribers(
}
}
};

for _ in 0..concurrency {
for topic in topics.topics() {
runtime.spawn(subscriber_task(
client.clone(),
cache_name.clone(),
topic.to_string(),
));
}
clients.push(client);
}
let mut client_index = 0;
for topic in topics.topics() {
for _ in 0..subscribers_per_topic {
// Round-robin over the clients to pick one
let client = &clients[client_index];
client_index = (client_index + 1) % clients.len();
let _guard = runtime.enter();
runtime.spawn(subscriber_task(
client.clone(),
cache_name.clone(),
topic.to_string(),
));
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/config/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ pub struct Topics {
topic_distribution: Distribution,
#[serde(default)]
kafka_single_subscriber_group: bool,
#[serde(default)]
momento_subscribers_per_topic: Option<usize>,
}

impl Topics {
Expand Down Expand Up @@ -222,6 +224,10 @@ impl Topics {
pub fn kafka_single_subscriber_group(&self) -> bool {
self.kafka_single_subscriber_group
}

pub fn momento_subscribers_per_topic(&self) -> Option<usize> {
self.momento_subscribers_per_topic
}
}

#[derive(Clone, Deserialize)]
Expand Down
6 changes: 6 additions & 0 deletions src/workload/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ pub struct Topics {
subscriber_poolsize: usize,
subscriber_concurrency: usize,
kafka_single_subscriber_group: bool,
momento_subscribers_per_topic: Option<usize>,
}

impl Topics {
Expand Down Expand Up @@ -643,6 +644,7 @@ impl Topics {
subscriber_poolsize,
subscriber_concurrency,
kafka_single_subscriber_group: topics.kafka_single_subscriber_group(),
momento_subscribers_per_topic: topics.momento_subscribers_per_topic(),
}
}

Expand All @@ -669,6 +671,10 @@ impl Topics {
pub fn kafka_single_subscriber_group(&self) -> bool {
self.kafka_single_subscriber_group
}

pub fn momento_subscribers_per_topic(&self) -> Option<usize> {
self.momento_subscribers_per_topic
}
}

#[derive(Clone)]
Expand Down
0