8000 AeronStat by JPWatson · Pull Request #684 · aeron-io/aeron · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

AeronStat #684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,24 @@ public final int code()
public static State get(final AtomicCounter counter)
{
final long code = counter.get();
return get((int)code);
}

if (code < 0 || code > (STATES.length - 1))
/**
* Get the {@link State} corresponding to a particular value.
*
* @param value of the State.
* @return the {@link State} corresponding to the provided value.
* @throws ClusterException if the value does not correspond to a valid State.
*/
public static State get(final int value)
{
if (value < 0 || value > (STATES.length - 1))
{
throw new ClusterException("invalid state counter code: " + code);
throw new ClusterException("invalid state counter code: " + value);
}

return STATES[(int)code];
return STATES[value];
}
}

Expand Down
185 changes: 101 additions & 84 deletions aeron-samples/src/main/java/io/aeron/samples/AeronStat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
import io.aeron.CncFileDescriptor;
import io.aeron.status.ChannelEndpointStatus;
import org.agrona.DirectBuffer;
import org.agrona.SemanticVersion;
import org.agrona.SystemUtil;
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.SigInt;
import org.agrona.concurrent.status.CountersReader;

import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -62,6 +59,11 @@ public class AeronStat
*/
private static final String DELAY = "delay";

/**
* Whether to watch for updates or run once.
*/
private static final String WATCH = "watch";

/**
* Types of the counters.
* <ul>
Expand Down Expand Up @@ -92,42 +94,10 @@ public class AeronStat
*/
private static final String COUNTER_CHANNEL = "channel";

private final CountersReader counters;
private final Pattern typeFilter;
private final Pattern identityFilter;
private final Pattern sessionFilter;
private final Pattern streamFilter;
private final Pattern channelFilter;

public AeronStat(
final CountersReader counters,
final Pattern typeFilter,
final Pattern identityFilter,
final Pattern sessionFilter,
final Pattern streamFilter,
final Pattern channelFilter)
{
this.counters = counters;
this.typeFilter = typeFilter;
this.identityFilter = identityFilter;
this.sessionFilter = sessionFilter;
this.streamFilter = streamFilter;
this.channelFilter = channelFilter;
}

public AeronStat(final CountersReader counters)
{
this.counters = counters;
this.typeFilter = null;
this.identityFilter = null;
this.sessionFilter = null;
this.streamFilter = null;
this.channelFilter = null;
}

public static void main(final String[] args) throws Exception
{
long delayMs = 1000L;
boolean watch = true;
Pattern typeFilter = null;
Pattern identityFilter = null;
Pattern sessionFilter = null;
Expand All @@ -152,6 +122,10 @@ public static void main(final String[] args) throws Exception

switch (argName)
{
case WATCH:
watch = Boolean.parseBoolean(argValue);
break;

case DELAY:
delayMs = Long.parseLong(argValue) * 1000L;
break;
Expand Down Expand Up @@ -183,47 +157,66 @@ public static void main(final String[] args) throws Exception
}
}

final MutableInteger cncFileVersion = new MutableInteger();
final AeronStat aeronStat = new AeronStat(
SamplesUtil.mapCounters(cncFileVersion),
final CncFileReader cncFileReader = CncFileReader.map();

final CounterFilter counterFilter = new CounterFilter(
typeFilter,
identityFilter,
sessionFilter,
streamFilter,
channelFilter);

if (watch)
{
workLoop(delayMs, () -> printOutput(cncFileReader, counterFilter));
}
else
{
printOutput(cncFileReader, counterFilter);
}
}

private static void workLoop(final long delayMs, final Runnable printOutput) throws Exception
{
final AtomicBoolean running = new AtomicBoolean(true);
SigInt.register(() -> running.set(false));

final String header =
" - Aeron Stat (CnC v" + SemanticVersion.toString(cncFileVersion.get()) + "), pid " + SystemUtil.getPid();
final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

while (running.get())
do
{
clearScreen();

System.out.print(dateFormat.format(new Date()));
System.out.println(header);
System.out.println("======================================================================");

aeronStat.print(System.out);
System.out.println("--");
printOutput.run();

Thread.sleep(delayMs);
}
while (running.get());
}

public void print(final PrintStream out)
private static void printOutput(final CncFileReader cncFileReader, final CounterFilter counterFilter)
{
final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");

System.out.print(dateFormat.format(new Date()));
System.out.println(
" - Aeron Stat (CnC v" + cncFileReader.semanticVersion() + ")" +
", pid " + SystemUtil.getPid() +
", heartbeat " + cncFileReader.driverHeartbeatAge() + "ms");
System.out.println("======================================================================");

final CountersReader counters = cncFileReader.countersReader();

counters.forEach(
(counterId, typeId, keyBuffer, label) ->
{
if (filter(typeId, keyBuffer))
if (counterFilter.filter(typeId, keyBuffer))
{
final long value = counters.getCounterValue(counterId);
out.format("%3d: %,20d - %s%n", counterId, value, label);
System.out.format("%3d: %,20d - %s%n", counterId, value, label);
}
});
}
);

System.out.println("--");
}

private static void checkForHelp(final String[] args)
Expand All @@ -235,6 +228,7 @@ private static void checkForHelp(final String[] args)
System.out.format(
"Usage: [-Daeron.dir=<directory containing CnC file>] AeronStat%n" +
ED4F "\t[delay=<seconds between updates>]%n" +
"\t[watch=<true|false>]%n" +
"filter by optional regex patterns:%n" +
"\t[type=<pattern>]%n" +
"\t[identity=<pattern>]%n" +
Expand All @@ -247,48 +241,71 @@ private static void checkForHelp(final String[] args)
}
}

private boolean filter(final int typeId, final DirectBuffer keyBuffer)
private static void clearScreen() throws Exception
{
if (!match(typeFilter, () -> Integer.toString(typeId)))
{
return false;
}

if (SYSTEM_COUNTER_TYPE_ID == typeId && !match(identityFilter, () -> Integer.toString(keyBuffer.getInt(0))))
{
return false;
}
else if ((typeId >= PUBLISHER_LIMIT_TYPE_ID && typeId <= RECEIVER_POS_TYPE_ID) ||
typeId == SENDER_LIMIT_TYPE_ID || typeId == PER_IMAGE_TYPE_ID || typeId == PUBLISHER_POS_TYPE_ID)
if (SystemUtil.osName().contains("win"))
{
return
match(identityFilter, () -> Long.toString(keyBuffer.getLong(REGISTRATION_ID_OFFSET))) &&
match(sessionFilter, () -> Integer.toString(keyBuffer.getInt(SESSION_ID_OFFSET))) &&
match(streamFilter, () -> Integer.toString(keyBuffer.getInt(STREAM_ID_OFFSET))) &&
match(channelFilter, () -> keyBuffer.getStringAscii(CHANNEL_OFFSET));
new ProcessBuilder("cmd", "/c", "cls").inheritIO().start().waitFor();
}
else if (typeId >= SEND_CHANNEL_STATUS_TYPE_ID && typeId <= RECEIVE_CHANNEL_STATUS_TYPE_ID)
else
{
return match(channelFilter, () -> keyBuffer.getStringAscii(ChannelEndpointStatus.CHANNEL_OFFSET));
System.out.print(ANSI_CLS + ANSI_HOME);
}

return true;
}

private static boolean match(final Pattern pattern, final Supplier<String> supplier)
static class CounterFilter
{
return null == pattern || pattern.matcher(supplier.get()).find();
}
private final Pattern typeFilter;
private final Pattern identityFilter;
private final Pattern sessionFilter;
private final Pattern streamFilter;
private final Pattern channelFilter;

CounterFilter(
final Pattern typeFilter,
final Pattern identityFilter,
final Pattern sessionFilter,
final Pattern streamFilter,
final Pattern channelFilter)
{
this.typeFilter = typeFilter;
this.identityFilter = identityFilter;
this.sessionFilter = sessionFilter;
this.streamFilter = streamFilter;
this.channelFilter = channelFilter;
}

private static void clearScreen() throws Exception
{
if (SystemUtil.osName().contains("win"))
private static boolean match(final Pattern pattern, final Supplier<String> supplier)
{
new ProcessBuilder("cmd", "/c", "cls").inheritIO().start().waitFor();
return null == pattern || pattern.matcher(supplier.get()).find();
}
else

boolean filter(final int typeId, final DirectBuffer keyBuffer)
{
System.out.print(ANSI_CLS + ANSI_HOME);
if (!match(typeFilter, () -> Integer.toString(typeId)))
{
return false;
}

if (SYSTEM_COUNTER_TYPE_ID == typeId && !match(identityFilter, () -> Integer.toString(keyBuffer.getInt(0))))
{
return false;
}
else if ((typeId >= PUBLISHER_LIMIT_TYPE_ID && typeId <= RECEIVER_POS_TYPE_ID) ||
typeId == SENDER_LIMIT_TYPE_ID || typeId == PER_IMAGE_TYPE_ID || typeId == PUBLISHER_POS_TYPE_ID)
{
return
match(identityFilter, () -> Long.toString(keyBuffer.getLong(REGISTRATION_ID_OFFSET))) &&
match(sessionFilter, () -> Integer.toString(keyBuffer.getInt(SESSION_ID_OFFSET))) &&
match(streamFilter, () -> Integer.toString(keyBuffer.getInt(STREAM_ID_OFFSET))) &&
match(channelFilter, () -> keyBuffer.getStringAscii(CHANNEL_OFFSET));
}
else if (typeId >= SEND_CHANNEL_STATUS_TYPE_ID && typeId <= RECEIVE_CHANNEL_STATUS_TYPE_ID)
{
return match(channelFilter, () -> keyBuffer.getStringAscii(ChannelEndpointStatus.CHANNEL_OFFSET));
}

return true;
}
}
}
Loading
0