8000 Poll archive client for ReplicationSession and ReplayMerge by marc-adaptive · Pull Request #1814 · aeron-io/aeron · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
< 8000 div hidden="hidden" data-view-component="true" class="js-stale-session-flash stale-session-flash flash flash-warn flash-full"> Dismiss alert

Poll archive client for ReplicationSession and ReplayMerge #1814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-low-cadence.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
fail-fast: false
matrix:
java: [ '17', '21', '24' ]
os: ['ubuntu-24.04', 'windows-latest', 'macos-latest']
os: ['ubuntu-24.04', 'windows-latest', 'macos-15']
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
fail-fast: false
matrix:
java: [ '17', '21', '24' ]
os: ['ubuntu-24.04', 'windows-latest', 'macos-latest']
os: ['ubuntu-24.04', 'windows-latest', 'macos-15']
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down Expand Up @@ -382,7 +382,7 @@ jobs:

cpp-xcode-build:
name: C++ Xcode (macOS)
runs-on: macos-latest
runs-on: macos-15
timeout-minutes: 60
env:
CC: clang
Expand Down
24 changes: 23 additions & 1 deletion aeron-archive/src/main/c/client/aeron_archive_replay_merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "util/aeron_error.h"
#include "uri/aeron_uri_string_builder.h"
#include "aeron_archive_client.h"
#include "command/aeron_control_protocol.h"

#define REPLAY_MERGE_LIVE_ADD_MAX_WINDOW (32 * 1024 * 1024)
#define REPLAY_MERGE_REPLAY_REMOVE_THRESHOLD (0)
Expand Down Expand Up @@ -344,9 +345,22 @@ int aeron_archive_replay_merge_do_work(int *work_count_p, aeron_archive_replay_m
now_ms > (replay_merge->time_of_last_progress_ms + replay_merge->merge_progress_timeout_ms))
{
AERON_SET_ERR(ETIMEDOUT, "replay_merge no progress: state=%i", replay_merge->state);
aeron_archive_replay_merge_set_state(replay_merge, FAILED);
return -1;
}

if (check_progress && replay_merge->active_correlation_id == AERON_NULL_VALUE)
{
bool ignored;

if (aeron_archive_replay_merge_poll_for_response(&ignored, replay_merge) < 0)
{
AERON_APPEND_ERR("%s", "");
aeron_archive_replay_merge_set_state(replay_merge, FAILED);
return -1;
}
}

return 0;
}

Expand Down Expand Up @@ -763,7 +777,9 @@ static int aeron_archive_replay_merge_poll_for_response(bool *found_response_p,
{
aeron_archive_control_response_poller_t *poller = aeron_archive_control_response_poller(replay_merge->aeron_archive);

if (aeron_archive_control_response_poller_poll(poller) < 0)
const int poll_count = aeron_archive_control_response_poller_poll(poller);

if (poll_count < 0)
{
AERON_APPEND_ERR("%s", "");
return -1;
Expand All @@ -787,6 +803,12 @@ static int aeron_archive_replay_merge_poll_for_response(bool *found_response_p,
else
{
*found_response_p = false;

if (poll_count == 0 && !aeron_subscription_is_connected(poller->subscription))
{
AERON_SET_ERR(-AERON_ERROR_CODE_GENERIC_ERROR, "%s", "archive is not connected");
return -1;
}
}

return 0;
Expand Down
12 changes: 11 additions & 1 deletion aeron-archive/src/main/cpp/client/ReplayMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,19 @@ void ReplayMerge::checkProgress(long long nowMs)
{
throw TimeoutException("ReplayMerge no progress: state=" + std::to_string(m_state), SOURCEINFO);
}

if (m_activeCorrelationId == aeron::NULL_VALUE)
{
pollForResponse(*m_archive, aeron::NULL_VALUE);
}
}

bool ReplayMerge::pollForResponse(AeronArchive &archive, std::int64_t correlationId)
{
ControlResponsePoller &poller = archive.controlResponsePoller();

if (poller.poll() > 0 && poller.isPollComplete())
const int poll_count = poller.poll();
if (poller.isPollComplete())
{
if (poller.controlSessionId() == archive.controlSessionId())
{
Expand All @@ -346,6 +352,10 @@ bool ReplayMerge::pollForResponse(AeronArchive &archive, std::int64_t correlatio
return poller.correlationId() == correlationId;
}
}
else if (poll_count == 0 && !poller.subscription()->isConnected())
{
throw ArchiveException("archive is not connected", SOURCEINFO);
}

return false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class ReplicationSession implements Session, RecordingDescriptorConsumer
{
private static final int REPLAY_REMOVE_THRESHOLD = 0;
private static final int RETRY_ATTEMPTS = 3;
private static final int SOURCE_ARCHIVE_POLL_INTERVAL_MS = 100;
private final int replicationSessionId;

@SuppressWarnings("JavadocVariable")
Expand Down Expand Up @@ -95,6 +96,7 @@ enum State
private long responsePublicationRegistrationId = NULL_VALUE;
private ExclusivePublication responsePublication = null;
private ArchiveProxy responseArchiveProxy = null;
private long timeOfLastScheduledSourceArchivePollMs;

ReplicationSession(
final long srcRecordingId,
Expand Down Expand Up @@ -211,42 +213,52 @@ public int doWork()

case REPLICATE_DESCRIPTOR:
workCount += replicateDescriptor();
pollSourceArchiveEvents();
break;

case SRC_RECORDING_POSITION:
workCount += srcRecordingPosition();
pollSourceArchiveEvents();
break;

case EXTEND:
workCount += extend();
pollSourceArchiveEvents();
break;

F438 case REPLAY_TOKEN:
workCount += replayToken();
pollSourceArchiveEvents();
break;

case GET_ARCHIVE_PROXY:
workCount += getArchiveProxy();
pollSourceArchiveEvents();
break;

case REPLAY:
workCount += replay();
pollSourceArchiveEvents();
break;

case AWAIT_IMAGE:
workCount += awaitImage();
pollSourceArchiveEvents();
break;

case REPLICATE:
workCount += replicate();
pollSourceArchiveEvents();
break;

case CATCHUP:
workCount += catchup();
pollSourceArchiveEvents();
break;

case ATTEMPT_LIVE_JOIN:
workCount += attemptLiveJoin();
pollSourceArchiveEvents();
break;

case DONE:
Expand Down Expand Up @@ -955,6 +967,23 @@ private void state(final State newState, final String reason)
timeOfLastActionMs = epochClock.time();
}

private void pollSourceArchiveEvents()
{
final long nowMs = epochClock.time();

if (activeCorrelationId == Aeron.NULL_VALUE &&
(nowMs > (timeOfLastScheduledSourceArchivePollMs + SOURCE_ARCHIVE_POLL_INTERVAL_MS)))
{
timeOfLastScheduledSourceArchivePollMs = nowMs;

final String errorMessage = srcArchive.pollForErrorResponse();
if (null != errorMessage)
{
throw new ArchiveException(errorMessage);
}
}
}

@SuppressWarnings("unused")
private void logStateChange(
final State oldState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public final class ReplayMerge implements AutoCloseable
private static final long MERGE_PROGRESS_TIMEOUT_DEFAULT_MS = TimeUnit.SECONDS.toMillis(5);
private static final long INITIAL_GET_MAX_RECORDED_POSITION_BACKOFF_MS = 8;
private static final long GET_MAX_RECORDED_POSITION_BACKOFF_MAX_MS = 500;
private static final long ARCHIVE_POLL_INTERVAL_MS = 100;

@SuppressWarnings("JavadocVariable")
enum State
Expand All @@ -76,6 +77,7 @@ enum State
private long timeOfLastProgressMs;
private long timeOfNextGetMaxRecordedPositionMs;
private long getMaxRecordedPositionBackoffMs = INITIAL_GET_MAX_RECORDED_POSITION_BACKOFF_MS;
private long timeOfLastScheduledArchivePollMs;
private boolean isLiveAdded = false;
private boolean isReplayActive = false;
private State state;
Expand Down Expand Up @@ -242,26 +244,31 @@ public int doWork()
case RESOLVE_REPLAY_PORT:
workCount += resolveReplayPort(nowMs);
checkProgress(nowMs);
pollArchiveEvents(nowMs);
break;

case GET_RECORDING_POSITION:
workCount += getRecordingPosition(nowMs);
checkProgress(nowMs);
pollArchiveEvents(nowMs);
break;

case REPLAY:
workCount += replay(nowMs);
checkProgress(nowMs);
pollArchiveEvents(nowMs);
break;

case CATCHUP:
workCount += catchup(nowMs);
checkProgress(nowMs);
pollArchiveEvents(nowMs);
break;

case ATTEMPT_LIVE_JOIN:
workCount += attemptLiveJoin(nowMs);
checkProgress(nowMs);
pollArchiveEvents(nowMs);
break;

case MERGED:
Expand Down Expand Up @@ -406,6 +413,7 @@ else if (pollForResponse(archive, activeCorrelationId))
isReplayActive = true;
replaySessionId = polledRelevantId(archive);
timeOfLastProgressMs = nowMs;
activeCorrelationId = Aeron.NULL_VALUE;

// reset getRecordingPosition backoff when moving to CATCHUP state
getMaxRecordedPositionBackoffMs = INITIAL_GET_MAX_RECORDED_POSITION_BACKOFF_MS;
Expand Down Expand Up @@ -575,10 +583,22 @@ private void checkProgress(final long nowMs)
}
}

private void pollArchiveEvents(final long nowMs)
{
if (activeCorrelationId == Aeron.NULL_VALUE &&
(nowMs > (timeOfLastScheduledArchivePollMs + ARCHIVE_POLL_INTERVAL_MS)))
{
pollForResponse(archive, Aeron.NULL_VALUE);
timeOfLastScheduledArchivePollMs = nowMs;
}
}

private static boolean pollForResponse(final AeronArchive archive, final long correlationId)
{
final ControlResponsePoller poller = archive.controlResponsePoller();
if (poller.poll() > 0 && poller.isPollComplete())

final int pollCount = poller.poll();
if (poller.isPollComplete())
{
if (poller.controlSessionId() == archive.controlSessionId())
{
Expand All @@ -593,6 +613,10 @@ private static boolean pollForResponse(final AeronArchive archive, final long co
return poller.correlationId() == correlationId;
}
}
else if (pollCount == 0 && !poller.subscription().isConnected())
{
throw new ArchiveException("archive is not connected");
}

return false;
}
Expand Down
38 changes: 27 additions & 11 deletions aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1607,14 +1607,6 @@ void aeron_network_publication_entry_delete(
entry->publication = NULL;

AERON_DRIVER_MANAGED_RESOURCE_DECREF(&(endpoint->conductor_fields.managed_resource));

if (AERON_SEND_CHANNEL_ENDPOINT_STATUS_CLOSING == endpoint->conductor_fields.status)
{
aeron_str_to_ptr_hash_map_remove(
&conductor->send_channel_endpoint_by_channel_map,
endpoint->conductor_fields.udp_channel->canonical_form,
endpoint->conductor_fields.udp_channel->canonical_length);
}
}

static bool aeron_network_publication_free_voidp(void *publication)
Expand Down Expand Up @@ -1674,6 +1666,11 @@ bool aeron_send_channel_endpoint_entry_has_reached_end_of_life(
void aeron_send_channel_endpoint_entry_delete(
aeron_driver_conductor_t *conductor, aeron_send_channel_endpoint_entry_t *entry)
{
aeron_str_to_ptr_hash_map_remove(
&conductor->send_channel_endpoint_by_channel_map,
entry->endpoint->conductor_fields.udp_channel->canonical_form,
entry->endpoint->conductor_fields.udp_channel->canonical_length);

aeron_send_channel_endpoint_delete(&conductor->counters_manager, entry->endpoint);
}

Expand Down Expand Up @@ -4146,9 +4143,13 @@ int aeron_driver_conductor_on_add_network_publication_complete(
}
udp_channel = NULL;

if (AERON_SEND_CHANNEL_ENDPOINT_STATUS_CLOSING == endpoint->conductor_fields.status)
if (AERON_SEND_CHANNEL_ENDPOINT_STATUS_ACTIVE != endpoint->conductor_fields.status)
{
AERON_SET_ERR(EINVAL, "%s", "send_channel_endpoint found in CLOSING state");
AERON_SET_ERR(
-AERON_ERROR_CODE_RESOURCE_TEMPORARILY_UNAVAILABLE,
"%s",
"send_channel_endpoint found in CLOSING state, please retry");

goto error_cleanup;
}

Expand Down Expand Up @@ -6343,6 +6344,7 @@ void aeron_driver_conductor_on_response_connected(void *clientd, void *item)

void aeron_driver_conductor_on_release_resource(void *clientd, void *item)
{
aeron_driver_conductor_t *conductor = clientd;
aeron_command_release_resource_t *cmd = item;
void *managed_resource = cmd->base.item;
aeron_driver_conductor_resource_type_t resource_type = cmd->resource_type;
Expand All @@ -6358,8 +6360,22 @@ void aeron_driver_conductor_on_release_resource(void *clientd, void *item)
break;

case AERON_DRIVER_CONDUCTOR_RESOURCE_TYPE_SEND_CHANNEL_ENDPOINT:
aeron_send_channel_endpoint_sender_release(managed_resource);
{
aeron_send_channel_endpoint_t *endpoint = (aeron_send_channel_endpoint_t *)managed_resource;

if (endpoint->conductor_fields.status == AERON_SEND_CHANNEL_ENDPOINT_STATUS_CLOSING)
{
aeron_str_to_ptr_hash_map_remove(
&conductor->send_channel_endpoint_by_channel_map,
endpoint->conductor_fields.udp_channel->canonical_form,
endpoint->conductor_fields.udp_channel->canonical_length);

aeron_send_channel_endpoint_close(endpoint);
aeron_send_channel_endpoint_sender_release(managed_resource);
}

break;
}

case AERON_DRIVER_CONDUCTOR_RESOURCE_TYPE_PUBLICATION_IMAGE:
aeron_publication_image_receiver_release(managed_resource);
Expand Down
13 changes: 12 additions & 1 deletion aeron-driver/src/main/c/media/aeron_send_channel_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,10 @@ int aeron_send_channel_endpoint_delete(

aeron_int64_to_ptr_hash_map_delete(&endpoint->publication_dispatch_map);
aeron_udp_channel_delete(endpoint->conductor_fields.udp_channel);
endpoint->transport_bindings->close_func(&endpoint->transport);
if (endpoint->conductor_fields.status != AERON_SEND_CHANNEL_ENDPOINT_STATUS_CLOSED)
{
endpoint->transport_bindings->close_func(&endpoint->transport);
}

if (NULL != endpoint->port_manager)
{
Expand All @@ -270,6 +273,14 @@ int aeron_send_channel_endpoint_delete(
return 0;
}

int aeron_send_channel_endpoint_close(aeron_send_channel_endpoint_t *endpoint)
{
endpoint->transport_bindings->close_func(&endpoint->transport);
endpoint->conductor_fields.status = AERON_SEND_CHANNEL_ENDPOINT_STATUS_CLOSED;

return 0;
}

void aeron_send_channel_endpoint_handle_managed_resource_event(aeron_driver_managed_resource_event_t event, void *clientd)
{
aeron_send_channel_endpoint_t *endpoint = (aeron_send_channel_endpoint_t *)clientd;
Expand Down
Loading
0