8000 [Java] Add Cluster and Archive events to logging agent. by epickrram · Pull Request #646 · aeron-io/aeron · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[Java] Add Cluster and Archive events to logging agent. #646

New issu 8000 e

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2ac956f
[Java] Add test to validate that client message delivery will continu…
epickrram Jan 17, 2019
7e4fa56
Merge remote-tracking branch 'upstream/master'
epickrram Jan 17, 2019
9d7c34b
Merge remote-tracking branch 'upstream/master'
epickrram Jan 21, 2019
dc64b78
Merge remote-tracking branch 'upstream/master'
epickrram Jan 23, 2019
a7c534d
Merge remote-tracking branch 'upstream/master'
epickrram Jan 23, 2019
796a4f2
Merge remote-tracking branch 'upstream/master'
epickrram Jan 24, 2019
6c0825f
Merge remote-tracking branch 'upstream/master'
epickrram Feb 18, 2019
3a8e7c8
Merge remote-tracking branch 'upstream/master'
epickrram Mar 14, 2019
8170e80
Revert "[Java] Add test to validate that client message delivery will…
epickrram Mar 14, 2019
7f7bdc8
[Java] Add EventCodeType enum, add type code to EventLogger messages
epickrram Mar 14, 2019
d32ffc0
[Java] Add Cluster Event wiring
epickrram Mar 14, 2019
d667e33
[Java] Allow aeron-agent to depend on aeron-cluster
epickrram Mar 14, 2019
290989b
[Java] Wire up election state change event
epickrram Mar 14, 2019
a5d92d4
[Java] Update event log string
epickrram Mar 14, 2019
646a688
[Java] Add configuration for Cluster event codes
epickrram Mar 14, 2019
5ec3a95
[Java] Fix incorrect event code type id
epickrram Mar 14, 2019
cfde34f
[Java] Rename EventCode -> DriverEventCode, extract interface for com…
epickrram Mar 14, 2019
7ff05ca
[Java] Fix advice wiring and visibility
epickrram Mar 14, 2019
03a6cce
[Java] Add javadoc
epickrram Mar 15, 2019
c63c08a
[Java] Add logging of new leadership term
epickrram Mar 18, 2019
b29ecd3
[Java] Add more cluster state change events
epickrram Mar 18, 2019
e355afd
[Java] Wire up Archive connect event
epickrram Mar 19, 2019
7cc167d
[Java] Wire up all Archive request events
epickrram Mar 19, 2019
7d6cbc0
Merge remote-tracking branch 'upstream/master'
epickrram Mar 19, 2019
e075ae7
[Java] Add copyright, javadoc
epickrram Mar 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions aeron-agent/src/main/java/io/aeron/agent/ArchiveEventCode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2014-2019 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron.agent;

import org.agrona.MutableDirectBuffer;

public enum ArchiveEventCode implements EventCode
{
CMD_IN_CONNECT(1, ArchiveEventDissector::controlRequest),
CMD_IN_CLOSE_SESSION(2, ArchiveEventDissector::controlRequest),
CMD_IN_START_RECORDING(3, ArchiveEventDissector::controlRequest),
CMD_IN_STOP_RECORDING(4, ArchiveEventDissector::controlRequest),
CMD_IN_REPLAY(5, ArchiveEventDissector::controlRequest),
CMD_IN_STOP_REPLAY(6, ArchiveEventDissector::controlRequest),
CMD_IN_LIST_RECORDINGS(7, ArchiveEventDissector::controlRequest),
CMD_IN_LIST_RECORDINGS_FOR_URI(8, ArchiveEventDissector::controlRequest),
CMD_IN_LIST_RECORDING(9, ArchiveEventDissector::controlRequest),
CMD_IN_EXTEND_RECORDING(10, ArchiveEventDissector::controlRequest),
CMD_IN_RECORDING_POSITION(11, ArchiveEventDissector::controlRequest),
CMD_IN_TRUNCATE_RECORDING(12, ArchiveEventDissector::controlRequest),
CMD_IN_STOP_RECORDING_SUBSCRIPTION(13, ArchiveEventDissector::controlRequest),
CMD_IN_STOP_POSITION(14, ArchiveEventDissector::controlRequest),
CMD_IN_FIND_LAST_MATCHING_RECORD(15, ArchiveEventDissector::controlRequest),
CMD_IN_LIST_RECORDING_SUBSCRIPTIONS(16, ArchiveEventDissector::controlRequest);

static final int EVENT_CODE_TYPE = EventCodeType.ARCHIVE.getTypeCode();
private static final int MAX_ID = 63;
private static final ArchiveEventCode[] EVENT_CODE_BY_ID = new ArchiveEventCode[MAX_ID];

private final long tagBit;
private final int id;
private final DissectFunction<ArchiveEventCode> dissector;

static
{
for (final ArchiveEventCode code : ArchiveEventCode.values())
{
final int id = code.id();
if (null != EVENT_CODE_BY_ID[id])
{
throw new IllegalArgumentException("id already in use: " + id);
}

EVENT_CODE_BY_ID[id] = code;
}
}

ArchiveEventCode(final int id, final DissectFunction<ArchiveEventCode> dissector)
{
this.id = id;
this.tagBit = 1L << id;
this.dissector = dissector;
}

static ArchiveEventCode get(final int eventCodeId)
{
return EVENT_CODE_BY_ID[eventCodeId];
}

public int id()
{
return id;
}

public long tagBit()
{
return tagBit;
}

public void decode(final MutableDirectBuffer buffer, final int offset, final StringBuilder builder)
{
dissector.dissect(this, buffer, offset, builder);
}

public static boolean isEnabled(final ArchiveEventCode code, final long mask)
{
return ((mask & code.tagBit()) == code.tagBit());
}

}
280 changes: 280 additions & 0 deletions aeron-agent/src/main/java/io/aeron/agent/ArchiveEventDissector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
/*
* Copyright 2014-2019 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron.agent;

import io.aeron.archive.codecs.*;
import org.agrona.MutableDirectBuffer;

final class ArchiveEventDissector
{
private static final MessageHeaderDecoder HEADER_DECODER = new MessageHeaderDecoder();
private static final ConnectRequestDecoder CONNECT_REQUEST_DECODER = new ConnectRequestDecoder();
private static final CloseSessionRequestDecoder CLOSE_SESSION_REQUEST_DECODER = new CloseSessionRequestDecoder();
private static final StartRecordingRequestDecoder START_RECORDING_REQUEST_DECODER =
new StartRecordingRequestDecoder();
private static final StopRecordingRequestDecoder STOP_RECORDING_REQUEST_DECODER = new StopRecordingRequestDecoder();
private static final ReplayRequestDecoder REPLAY_REQUEST_DECODER = new ReplayRequestDecoder();
private static final StopReplayRequestDecoder STOP_REPLAY_REQUEST_DECODER = new StopReplayRequestDecoder();
private static final ListRecordingsRequestDecoder LIST_RECORDINGS_REQUEST_DECODER =
new ListRecordingsRequestDecoder();
private static final ListRecordingsForUriRequestDecoder LIST_RECORDINGS_FOR_URI_REQUEST_DECODER =
new ListRecordingsForUriRequestDecoder();
private static final ListRecordingRequestDecoder LIST_RECORDING_REQUEST_DECODER = new ListRecordingRequestDecoder();
private static final ExtendRecordingRequestDecoder EXTEND_RECORDING_REQUEST_DECODER =
new ExtendRecordingRequestDecoder();
private static final RecordingPositionRequestDecoder RECORDING_POSITION_REQUEST_DECODER =
new RecordingPositionRequestDecoder();
private static final TruncateRecordingRequestDecoder TRUNCATE_RECORDING_REQUEST_DECODER =
new TruncateRecordingRequestDecoder();
private static final StopRecordingSubscriptionRequestDecoder STOP_RECORDING_SUBSCRIPTION_REQUEST_DECODER =
new StopRecordingSubscriptionRequestDecoder();
private static final StopPositionRequestDecoder STOP_POSITION_REQUEST_DECODER = new StopPositionRequestDecoder();
private static final FindLastMatchingRecordingRequestDecoder FIND_LAST_MATCHING_RECORDING_REQUEST_DECODER =
new FindLastMatchingRecordingRequestDecoder();
private static final ListRecordingSubscriptionsRequestDecoder LIST_RECORDING_SUBSCRIPTIONS_REQUEST_DECODER =
new ListRecordingSubscriptionsRequestDecoder();

static void controlRequest(
final ArchiveEventCode event, final MutableDirectBuffer buffer,
final int offset, final StringBuilder builder)
{
HEADER_DECODER.wrap(buffer, offset);
switch (event)
{
case CMD_IN_CONNECT:
CONNECT_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendConnect(builder);
break;
case CMD_IN_CLOSE_SESSION:
CLOSE_SESSION_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendCloseSession(builder);
break;
case CMD_IN_START_RECORDING:
START_RECORDING_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendStartRecording(builder);
break;
case CMD_IN_STOP_RECORDING:
STOP_RECORDING_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendStopRecording(builder);
break;
case CMD_IN_REPLAY:
REPLAY_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendReplay(builder);
break;
case CMD_IN_STOP_REPLAY:
STOP_REPLAY_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendStopReplay(builder);
break;
case CMD_IN_LIST_RECORDINGS:
LIST_RECORDINGS_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendListRecordings(builder);
break;
case CMD_IN_LIST_RECORDINGS_FOR_URI:
LIST_RECORDINGS_FOR_URI_REQUEST_DECODER.wrap(
buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendListRecordingsForUri(builder);
break;
case CMD_IN_LIST_RECORDING:
LIST_RECORDING_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendListRecording(builder);
break;
case CMD_IN_EXTEND_RECORDING:
EXTEND_RECORDING_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendExtendRecording(builder);
break;
case CMD_IN_RECORDING_POSITION:
RECORDING_POSITION_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendRecordingPosition(builder);
break;
case CMD_IN_TRUNCATE_RECORDING:
TRUNCATE_RECORDING_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendTruncateRecording(builder);
break;
case CMD_IN_STOP_RECORDING_SUBSCRIPTION:
STOP_RECORDING_SUBSCRIPTION_REQUEST_DECODER.wrap(
buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendStopRecordingSubscription(builder);
break;
case CMD_IN_STOP_POSITION:
STOP_POSITION_REQUEST_DECODER.wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendStopPosition(builder);
break;
case CMD_IN_FIND_LAST_MATCHING_RECORD:
FIND_LAST_MATCHING_RECORDING_REQUEST_DECODER.wrap(
buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendFindLastMatchingRecord(builder);
break;
case CMD_IN_LIST_RECORDING_SUBSCRIPTIONS:
LIST_RECORDING_SUBSCRIPTIONS_REQUEST_DECODER.wrap(
buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH,
HEADER_DECODER.blockLength(), HEADER_DECODER.version());
appendListRecordingSubscriptions(builder);
break;
default:
builder.append("ARCHIVE:COMMAND UNKNOWN");
}
}

private static void appendListRecordingSubscriptions(final StringBuilder builder)
{
builder.append("ARCHIVE:LIST_RECORDING_SUBSCRIPTIONS ")
.append(LIST_RECORDING_SUBSCRIPTIONS_REQUEST_DECODER.channel()).append(' ')
.append(LIST_RECORDING_SUBSCRIPTIONS_REQUEST_DECODER.streamId()).append(' ')
.append(LIST_RECORDING_SUBSCRIPTIONS_REQUEST_DECODER.applyStreamId()).append(' ')
.append(LIST_RECORDING_SUBSCRIPTIONS_REQUEST_DECODER.pseudoIndex()).append(' ')
.append(LIST_RECORDING_SUBSCRIPTIONS_REQUEST_DECODER.controlSessionId()).append(" [")
.append(LIST_RECORDING_SUBSCRIPTIONS_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendFindLastMatchingRecord(final StringBuilder builder)
{
builder.append("ARCHIVE:FIND_LAST_MATCHING_RECORDING ")
.append(FIND_LAST_MATCHING_RECORDING_REQUEST_DECODER.streamId()).append(' ')
.append(FIND_LAST_MATCHING_RECORDING_REQUEST_DECODER.sessionId()).append(' ')
.append(FIND_LAST_MATCHING_RECORDING_REQUEST_DECODER.minRecordingId()).append(' ')
.append(FIND_LAST_MATCHING_RECORDING_REQUEST_DECODER.controlSessionId()).append(" [")
.append(FIND_LAST_MATCHING_RECORDING_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendStopPosition(final StringBuilder builder)
{
builder.append("ARCHIVE:STOP_POSITION ")
.append(STOP_POSITION_REQUEST_DECODER.recordingId()).append(' ')
.append(STOP_POSITION_REQUEST_DECODER.controlSessionId()).append(" [")
.append(STOP_POSITION_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendStopRecordingSubscription(final StringBuilder builder)
{
builder.append("ARCHIVE:STOP_RECORDING_SUBSCRIPTION ")
.append(STOP_RECORDING_SUBSCRIPTION_REQUEST_DECODER.subscriptionId()).append(' ')
.append(STOP_RECORDING_SUBSCRIPTION_REQUEST_DECODER.controlSessionId()).append(" [")
.append(STOP_RECORDING_SUBSCRIPTION_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendTruncateRecording(final StringBuilder builder)
{
builder.append("ARCHIVE:TRUNCATE_RECORDING ")
.append(TRUNCATE_RECORDING_REQUEST_DECODER.recordingId()).append(' ')
.append(TRUNCATE_RECORDING_REQUEST_DECODER.position()).append(' ')
.append(TRUNCATE_RECORDING_REQUEST_DECODER.controlSessionId()).append(" [")
.append(TRUNCATE_RECORDING_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendRecordingPosition(final StringBuilder builder)
{
builder.append("ARCHIVE:RECORDING_POSITION ")
.append(RECORDING_POSITION_REQUEST_DECODER.recordingId()).append(' ')
.append(RECORDING_POSITION_REQUEST_DECODER.controlSessionId()).append(" [")
.append(RECORDING_POSITION_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendExtendRecording(final StringBuilder builder)
{
builder.append("ARCHIVE:EXTEND_RECORDING ")
.append(EXTEND_RECORDING_REQUEST_DECODER.channel()).append(' ')
.append(EXTEND_RECORDING_REQUEST_DECODER.streamId()).append(' ')
.append(EXTEND_RECORDING_REQUEST_DECODER.recordingId()).append(' ')
.append(EXTEND_RECORDING_REQUEST_DECODER.sourceLocation()).append(' ')
.append(EXTEND_RECORDING_REQUEST_DECODER.controlSessionId()).append(" [")
.append(EXTEND_RECORDING_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendListRecording(final StringBuilder builder)
{
builder.append("ARCHIVE:LIST_RECORDING ")
.append(LIST_RECORDING_REQUEST_DECODER.recordingId()).append(' ')
.append(LIST_RECORDING_REQUEST_DECODER.controlSessionId()).append(" [")
.append(LIST_RECORDING_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendListRecordingsForUri(final StringBuilder builder)
{
builder.append("ARCHIVE:LIST_RECORDINGS_FOR_URI ")
.append(LIST_RECORDINGS_FOR_URI_REQUEST_DECODER.channel()).append(' ')
.append(LIST_RECORDINGS_FOR_URI_REQUEST_DECODER.fromRecordingId()).append(' ')
.append(LIST_RECORDINGS_FOR_URI_REQUEST_DECODER.controlSessionId()).append(" [")
.append(LIST_RECORDINGS_FOR_URI_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendListRecordings(final StringBuilder builder)
{
builder.append("ARCHIVE:LIST_RECORDINGS ")
.append(LIST_RECORDINGS_REQUEST_DECODER.fromRecordingId()).append(' ')
.append(LIST_RECORDINGS_REQUEST_DECODER.controlSessionId()).append(" [")
.append(LIST_RECORDINGS_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendStopReplay(final StringBuilder builder)
{
builder.append("ARCHIVE:STOP_REPLAY ").append(STOP_REPLAY_REQUEST_DECODER.replaySessionId()).append(' ')
.append(STOP_REPLAY_REQUEST_DECODER.controlSessionId()).append(" [")
.append(STOP_REPLAY_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendReplay(final StringBuilder builder)
{
builder.append("ARCHIVE:REPLAY ").append(REPLAY_REQUEST_DECODER.replayChannel()).append(' ')
.append(REPLAY_REQUEST_DECODER.replayStreamId()).append(' ')
.append(REPLAY_REQUEST_DECODER.recordingId()).append(' ')
.append(REPLAY_REQUEST_DECODER.controlSessionId()).append(" [")
.append(REPLAY_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendStopRecording(final StringBuilder builder)
{
builder.append("ARCHIVE:STOP_RECORDING ").append(STOP_RECORDING_REQUEST_DECODER.channel()).append(' ')
.append(STOP_RECORDING_REQUEST_DECODER.streamId()).append(' ')
.append(STOP_RECORDING_REQUEST_DECODER.controlSessionId()).append(" [")
.append(STOP_RECORDING_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendStartRecording(final StringBuilder builder)
{
builder.append("ARCHIVE:START_RECORDING ").append(START_RECORDING_REQUEST_DECODER.channel()).append(' ')
.append(START_RECORDING_REQUEST_DECODER.streamId()).append(' ')
.append(START_RECORDING_REQUEST_DECODER.controlSessionId()).append(" [")
.append(START_RECORDING_REQUEST_DECODER.correlationId()).append(']');
}

private static void appendCloseSession(final StringBuilder builder)
{
builder.append("ARCHIVE:CLOSE_SESSION ").append(CLOSE_SESSION_REQUEST_DECODER.controlSessionId());
}

private static void appendConnect(final StringBuilder builder)
{
builder.append("ARCHIVE:CONNECT ").append(CONNECT_REQUEST_DECODER.responseChannel()).append(' ')
.append(CONNECT_REQUEST_DECODER.responseStreamId()).append(" [")
.append(CONNECT_REQUEST_DECODER.correlationId()).append("][")
.append(CONNECT_REQUEST_DECODER.version()).append(']');
}
}
Loading
0