-
-
Notifications
You must be signed in to change notification settings - Fork 88
[azure-service-bus] Add possibility to receive messages from Azur… #4876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHu 8000 b? Sign in to your account
[azure-service-bus] Add possibility to receive messages from Azur… #4876
Conversation
bf22998
to
3ebce8c
Compare
...ure-service-bus/src/main/java/org/vividus/azure/servicebus/model/ServiceBusClientEntity.java
Outdated
Show resolved
Hide resolved
@@ -42,4 +60,80 @@ public void sendMessageToServiceBus(ChannelType type, String name, String namesp | |||
{ | |||
serviceBusService.send(type, name, namespaceName, payload); | |||
} | |||
|
|||
@When("I send message to service bus by client `$clientKey` with payload:`$payload`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@When("I send message to service bus by client `$clientKey` with payload:`$payload`") | |
@When("I send message to `$serviceBusKey` service bus with payload:`$payload`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and I guess we need to align step sending messages as well
serviceBusService.send(clientKey, payload); | ||
} | ||
|
8000
||
@When("I $action consuming messages from service bus by client `$clientKey`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to have better autocompletion it's recommended to have 2 separate steps: start, stop
serviceBusService.send(clientKey, payload); | ||
} | ||
|
||
@When("I $action consuming messages from service bus by client `$clientKey`") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
service bus by client `$clientKey`
-> `$serviceBusKey` service bus
(everywhere)
public void send(String clientKey, String message) | ||
{ | ||
ServiceBusClientEntity serviceBusClientEntity = clientConfigs.get(clientKey, | ||
"No info provided for Service Bus client `%s`", clientKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"No info provided for Service Bus client `%s`", clientKey); | |
"No Service Bus client with key `%s` is configured", clientKey); |
BlockingQueue<ServiceBusReceivedMessage> messages = new LinkedBlockingDeque<>(); | ||
testContext.get(ServiceBusReceivedMessage.class, HashMap::new).put(clientKey, messages); | ||
ServiceBusClientEntity serviceBusClientEntity = clientConfigs.get(clientKey, | ||
"No info provided for Service Bus client `%s`", clientKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure to avoid duplications
ServiceBusProcessorClient client = processorBuilder.buildProcessorClient(); | ||
|
||
clients.put(clientKey, client); | ||
LOGGER.info("START the client processor '{}' to consume messages from the {} '{}' in namespace {}", clientKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.info("START the client processor '{}' to consume messages from the {} '{}' in namespace {}", clientKey, | |
LOGGER.info("'{}' Azure Service Bus Consumer of messages from the {} '{}' in namespace {} is started", clientKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in similar way for stop message
} | ||
else | ||
{ | ||
throw new IllegalArgumentException("There are no running Service Bus clients associated with the key: " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new IllegalArgumentException("There are no running Service Bus clients associated with the key: " | |
throw new IllegalArgumentException("There are no running Azure Service Bus consumers associated with the key: " |
5852d6d
to
494b55c
Compare
494b55c
to
b814c82
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #4876 +/- ##
============================================
+ Coverage 97.38% 97.40% +0.01%
- Complexity 6598 6624 +26
============================================
Files 921 922 +1
Lines 19071 19157 +86
Branches 1269 1271 +2
============================================
+ Hits 18573 18659 +86
Misses 390 390
Partials 108 108 ☔ View full report in Codecov by Sentry. |
=== Azure Client Configuration | ||
|
||
Azure clients are configured with a set of properties with the following format: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pleas avoid using client
, it's hardly understandable by users
|
||
[source,properties] | ||
---- | ||
service-bus.{service-bus-key}.{property-name}=property value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
service-bus.{service-bus-key}.{property-name}=property value | |
azure.service-bus.{service-bus-key}.{property-name}=property value |
service-bus.{service-bus-key}.{property-name}=property value | ||
---- | ||
|
||
Where: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where: | |
where: |
When I start consuming messages from `myProjectTopic` service bus | ||
---- | ||
|
||
=== Stop consume messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
=== Stop consume messages | |
=== Stop consuming messages |
|
||
[source,gherkin] | ||
---- | ||
When I $queueOperation consumed `$serviceBusKey` service bus messages to $scopes variable `$variableName` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's for queue only
.Peek messages from the consumer and save to variable | ||
[source,gherkin] | ||
---- | ||
When I PEEK consumed `myProjectQueue` service bus messages to STORY variable `queueMessages` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a set of complete examples to get understanding how to use these steps (in the same way we have for Kafka)
---- | ||
|
||
* `$queueOperation` - `DRAIN` - saves the messages consumed since the last drain or from the consumption start and moves the consumer cursor to the position after the last consumed message; | ||
`PEEK` - saves the messages consumed since the last drain or from the consumption start and doesn't change the consumer cursor position. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it makes sense to represent operations as sublist of queueOperation?
} | ||
catch (ServiceBusException e) | ||
{ | ||
throw new ServiceBusSendMessageException("Unable to send message to Azure Service Bus", e); | ||
} | ||
} | ||
|
||
public void startConsuming(String clientKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to handle cases when end user invokes startConsuming several times with the same key?
|
||
public void stopAll() | ||
{ | ||
clients.keySet().stream().toList().forEach(this::stopConsuming); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forEach can be invoked on stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use remove method over the clients map. Usage of forEach will lead to ConcurrentModificationException
b814c82
to
3ba8e34
Compare
where: | ||
|
||
. `service-bus-key` - The key associated with the Azure Service Bus configuration, which will be used as a step parameter. | ||
. `property-name` - The name of the topic property. One of: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
. `property-name` - The name of the topic property. One of: | |
. `property-name` - The name of Service Bus property. One of: |
|
||
. `service-bus-key` - The key associated with the Azure Service Bus configuration, which will be used as a step parameter. | ||
. `property-name` - The name of the topic property. One of: | ||
.. `channel-type` - The type of service bus messaging components: either |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. `channel-type` - The type of service bus messaging components: either | |
.. `channel-type` - The type of Service Bus messaging channel: either |
. `property-name` - The name of the topic property. One of: | ||
.. `channel-type` - The type of service bus messaging components: either | ||
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#concepts[QUEUE or TOPIC]. | ||
.. `namespace` - The name of the namespace the service bus belongs to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. `namespace` - The name of the namespace the service bus belongs to. | |
.. `namespace` - The name of the namespace Service Bus belongs to. |
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview#concepts[QUEUE or TOPIC]. | ||
.. `namespace` - The name of the namespace the service bus belongs to. | ||
.. `name` - The queue or topic name. | ||
.. `subscription-name` - The name of the topic subscription. Only for TOPIC channel-type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. `subscription-name` - The name of the topic subscription. Only for TOPIC channel-type. | |
.. `subscription-name` - The name of the topic subscription. Only for `TOPIC` channel type. |
|
||
=== Start consuming messages | ||
|
||
Starts the Azure Service Bus consumer with the provided configuration to listen the specified |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starts the Azure Service Bus consumer with the provided configuration to listen the specified | |
Starts Azure Service Bus consumer with the provided configuration to listen the specified |
* <li><b>name</b> - the queue or topic name</li> | ||
* <li><b>subscription-name</b> - the name of the topic subscription. Only for TOPIC channel-type</li> | ||
* </ul> | ||
* @param serviceBusKey Identifying the Service Bus connection details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make sure to keep docs and javadocs aligned
Matcher<Integer> countMatcher = comparisonRule.getComparisonRule(expectedCount); | ||
Integer result = new DurationBasedWaiter(timeout, Duration.ofSeconds(1)).wait( | ||
() -> serviceBusService.getMessagesForClient(serviceBusKey).size(), countMatcher::matches); | ||
softAssert.assertThat("Total count of messages for Service Bus client with key: " + serviceBusKey, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please avoid using client
naming
|
||
package org.vividus.azure.servicebus.model; | ||
|
||
public class ServiceBusClientEntity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public class ServiceBusClientEntity | |
public class ServiceBusConnectionParameters |
else | ||
{ | ||
processorBuilder.topicName(name); | ||
processorBuilder.subscriptionName(serviceBusClientEntity.getSubscriptionName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to throw an error if subscription name is configured for queue
|
||
private ServiceBusClientEntity getServiceBusClientConfig(String clientKey) | ||
{ | ||
return clientConfigs.get(clientKey, "No Service Bus client with key `%s` is configured", clientKey); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return clientConfigs.get(clientKey, "No Service Bus client with key `%s` is configured", clientKey); | |
return clientConfigs.get(clientKey, "No Service Bus connection with key `%s` is configured", clientKey); |
3029a80
to
2fd5919
Compare
2fd5919
to
1b631ba
Compare
1b631ba
to
a0b34c1
Compare
…e Service Bus