-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(amqp sink): attempt one reconnect when channel has errored #22971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix(amqp sink): attempt one reconnect when channel has errored #22971
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aramperes
} | ||
|
||
info!( | ||
message = "Recovering broken connection to the AMQP broker.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We can add a URL tag here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that is a good idea since it could print the password:
INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovering broken connection to the AMQP broker. uri="amqp://user:password@127.0.0.1:5672/%2f?timeout=10" internal_log_rate_limit=true
INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovered connection to the AMQP broker. uri="amqp://user:password@127.0.0.1:5672/%2f?timeout=10" internal_log_rate_limit=true
let need_reconnect = | ||
{ self.channel.read().await.status().state() == lapin::ChannelState::Error }; | ||
|
||
if need_reconnect { | ||
let mut channel = self.channel.write().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be wrong here, but there is a possibility for a race condition. Could you instead do:
let mut channel = self.channel.write().await;
if channel.status().state() == lapin::ChannelState::Error {
// ...
}
Ok(channel.downgrade())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern was with the potential performance impact of acquiring a write
lock on every message published. The write
lock future hangs until all other readers and writers have been released, so it would prevent sending messages in two tasks concurrently.
This is why I acquire the write
lock on line 31, then check again on line 34 if the connection is still in an error state. If there is a race condition and the write-lock acquired twice (one after the other), the second check would prevent the second task from trying to connect again.
Summary
Improves the resilience of the
amqp sink
by attempting to re-create a brokenChannel
when sending new events.This is a simple implementation that could be extended later or in this PR, depending on the Vector team's preference & vision. The current implementation:
tower
retry mechanism in some way;amqp source
, which would be more complex to re-create the listeners.Lapin does not have a built-in reconnection mechanism and it is up to the client to implement this. See amqp-rs/lapin#70, amqp-rs/lapin#389
Implementation
A new
AmqpChannel
struct wraps atokio::sync::RwLock<lapin::Channel>
. This read- 10000 write lock is used to lock during the re-connection to prevent concurrent attempts.The API could be further improved by making
crate::amqp::AmqpConfig::connect()
return anAmqpChannel
directly, but that would also affect theamqp source
.Change Type
Is this a breaking change?
How did you test this PR?
amqp
sink. Example:Error
state. If the RabbitMQ process hasn't booted up and the re-connection failed, the event is dropped. Otherwise, the channel is re-established and the event is sent to the new channel.Example of the re-connection succeeding:
Example of re-connection failing:
Does this PR include user facing changes?
Notes
@vectordotdev/vector
to reach out to us regarding this PR.pre-push
hook, please see this template.cargo fmt --all
cargo clippy --workspace --all-targets -- -D warnings
cargo nextest run --workspace
(alternatively, you can runcargo test --all
)./scripts/check_changelog_fragments.sh
git merge origin master
andgit push
.Cargo.lock
), pleaserun
cargo vdev build licenses
to regenerate the license inventory and commit the changes (if any). More details here.References
Closes #22313