8000 fix(amqp sink): attempt one reconnect when channel has errored by aramperes · Pull Request #22971 · vectordotdev/vector · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(amqp sink): attempt one reconnect when channel has errored #22971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

aramperes
Copy link
Contributor

Summary

Improves the resilience of the amqp sink by attempting to re-create a broken Channel when sending new events.

This is a simple implementation that could be extended later or in this PR, depending on the Vector team's preference & vision. The current implementation:

  • only attempts a reconnect when sending an event fails, and will not attempt multiple times or with exponential back-off. This could be better integrated with the tower retry mechanism in some way;
  • does not make the re-connection behavior configurable;
  • does not implement the re-connection for the amqp source, which would be more complex to re-create the listeners.

Lapin does not have a built-in reconnection mechanism and it is up to the client to implement this. See amqp-rs/lapin#70, amqp-rs/lapin#389

Implementation

A new AmqpChannel struct wraps a tokio::sync::RwLock<lapin::Channel>. This read- 10000 write lock is used to lock during the re-connection to prevent concurrent attempts.

The API could be further improved by making crate::amqp::AmqpConfig::connect() return an AmqpChannel directly, but that would also affect the amqp source.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

  1. Run RabbitMQ in a container or remote server.
  2. Run Vector with an amqp sink. Example:
sources:
  in:
    type: http_server
    address: 0.0.0.0:8000
    codec: json

sinks:
  sink_amqp:
    type: amqp
    inputs:
      - in
    connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
    routing_key: example-key
    exchange: example-exchange
    encoding:
      codec: json
  1. Restart the RabbitMQ container/process. The ioloop in Lapin will show as broken.
  2. Send a new event through Vector. Vector will try to reconnect the channel since it is in Error state. If the RabbitMQ process hasn't booted up and the re-connection failed, the event is dropped. Otherwise, the channel is re-established and the event is sent to the new channel.

Example of the re-connection succeeding:

2025-05-01T02:40:39.687291Z  INFO vector::app: Log level is enabled. level="info"
2025-05-01T02:40:39.701858Z  INFO vector::app: Loading configs. paths=["vector.yaml"]
2025-05-01T02:40:39.736801Z  INFO vector::topology::running: Running healthchecks.
2025-05-01T02:40:39.737012Z  INFO vector: Vector has started. debug="false" version="0.47.0" arch="x86_64" revision=""
2025-05-01T02:40:39.737014Z  INFO source{component_kind="source" component_id=in component_type=http_server}: vector::sources::util::http::prelude: Building HTTP server. address=0.0.0.0:8000
2025-05-01T02:40:39.736933Z  INFO vector::topology::builder: Healthcheck passed.
2025-05-01T02:40:43.868982Z ERROR lapin::io_loop: Socket was readable but we read 0. This usually means that the connection is half closed this mark it as broken
2025-05-01T02:40:43.869422Z ERROR lapin::io_loop: error doing IO error=IOError(Kind(ConnectionAborted))
2025-05-01T02:40:43.869447Z ERROR lapin::channels: Connection error error=IO error: connection aborted
2025-05-01T02:40:53.114888Z  INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=2}: vector::sinks::amqp::channel: Recovering broken connection to the AMQP broker. internal_log_rate_limit=true
2025-05-01T02:40:53.120366Z  INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=2}: vector::sinks::amqp::channel: Recovered connection to the AMQP broker. internal_log_rate_limit=true
^C2025-05-01T02:40:58.323475Z  INFO vector::signal: Signal received. signal="SIGINT"
2025-05-01T02:40:58.323566Z  INFO vector: Vector has stopped.
2025-05-01T02:40:58.325213Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="sink_amqp" time_remaining="59 seconds left"

Example of re-connection failing:

2025-05-01T02:45:58.663389Z  INFO vector::app: Log level is enabled. level="info"
2025-05-01T02:45:58.676135Z  INFO vector::app: Loading configs. paths=["vector.yaml"]
2025-05-01T02:45:58.705614Z  INFO vector::topology::running: Running healthchecks.
2025-05-01T02:45:58.705760Z  INFO vector: Vector has started. debug="false" version="0.47.0" arch="x86_64" revision=""
2025-05-01T02:45:58.705784Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2025-05-01T02:45:58.705778Z  INFO source{component_kind="source" component_id=in component_type=http_server}: vector::sources::util::http::prelude: Building HTTP server. address=0.0.0.0:8000
2025-05-01T02:45:58.705744Z  INFO vector::topology::builder: Healthcheck passed.
2025-05-01T02:46:01.964157Z ERROR lapin::channel: Connection closed channel=0 method=Close { reply_code: 320, reply_text: ShortString("CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"), class_id: 0, method_id: 0 } error=AMQPError { kind: Hard(CONNECTIONFORCED), message: ShortString("CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'") }
2025-05-01T02:46:01.964395Z ERROR lapin::channels: Connection error error=protocol error: AMQP hard error: CONNECTION-FORCED: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
2025-05-01T02:46:04.668137Z  INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovering broken connection to the AMQP broker. internal_log_rate_limit=true
2025-05-01T02:46:04.668767Z ERROR sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector_common::internal_event::service: Service call failed. No retries or retries exhausted. error=Some(ConnectFailed { error: IOError(Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }) }) request_id=1 error_type="request_failed" stage="sending" internal_log_rate_limit=true
2025-05-01T02:46:04.668831Z ERROR sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=1 reason="Service call failed. No retries or retries exhausted." internal_log_rate_limit=true

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • The CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • cargo fmt --all
      • cargo clippy --workspace --all-targets -- -D warnings
      • cargo nextest run --workspace (alternatively, you can run cargo test --all)
      • ./scripts/check_changelog_fragments.sh
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run cargo vdev build licenses to regenerate the license inventory and commit the changes (if any). More details here.

References

Closes #22313

@aramperes aramperes requested a review from a team as a code owner May 1, 2025 02:54
@github-actions github-actions bot added the domain: sinks Anything related to the Vector's sinks label May 1, 2025
Copy link
Member
@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aramperes

}

info!(
message = "Recovering broken connection to the AMQP broker.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can add a URL tag here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that is a good idea since it could print the password:

INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovering broken connection to the AMQP broker. uri="amqp://user:password@127.0.0.1:5672/%2f?timeout=10" internal_log_rate_limit=true
INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovered connection to the AMQP broker. uri="amqp://user:password@127.0.0.1:5672/%2f?timeout=10" internal_log_rate_limit=true

Comment on lines +27 to +31
let need_reconnect =
{ self.channel.read().await.status().state() == lapin::ChannelState::Error };

if need_reconnect {
let mut channel = self.channel.write().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong here, but there is a possibility for a race condition. Could you instead do:

let mut channel = self.channel.write().await;
  if channel.status().state() == lapin::ChannelState::Error {
  // ...
  }
  Ok(channel.downgrade())
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern was with the potential performance impact of acquiring a write lock on every message published. The write lock future hangs until all other readers and writers have been released, so it would prevent sending messages in two tasks concurrently.

This is why I acquire the write lock on line 31, then check again on line 34 if the connection is still in an error state. If there is a race condition and the write-lock acquired twice (one after the other), the second check would prevent the second task from trying to connect again.

@pront pront added the sink: amqp Anything `amqp` sink related label May 9, 2025
@aramperes
Copy link
Contributor Author

@pront Any concerns with potentially integrating the deadpool crate into Vector? This could provide a cleaner interface for recycling AMQP channels while also supporting a pool of connections & channels instead of a single one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sinks Anything related to the Vector's sinks sink: amqp Anything `amqp` sink related
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SINK amqp doesnt reconnect
2 participants
0