Closed
Description
AeronArchive does not survive long disconnects > 5s.
But what is more important there is no way to know that Archive's ControlSession is dead or not.
The only sympton will be all AeronArchive methods throwing TimeoutException which is ambiguous. It may either indicate that connection is still considered connected by Aeron but is actually broken, or that Archive's ControlSession is dead.
One could just recreate AeronArchive on each TimeoutException, but then you can get a memory leak, because currently if you send CONNECT message, a new ControlSession is unconditionaly created (line), but previous one is still considered alive as its Publication is in connected state (line).
Following test reflects described case:
package io.aeron.archive;
import io.aeron.Aeron;
import io.aeron.CommonContext;
import io.aeron.archive.client.AeronArchive;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ext.DebugReceiveChannelEndpoint;
import io.aeron.driver.ext.DebugSendChannelEndpoint;
import io.aeron.driver.ext.LossGenerator;
import io.aeron.exceptions.TimeoutException;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Assert;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
public class AeronArchiveTest {
@Test
public void testDisconnect() throws Exception {
ControlledLossGenerator lossGenerator = new ControlledLossGenerator();
String aeronDirectoryName = CommonContext.generateRandomDirName();
try (ArchivingMediaDriver mediaDriver = ArchivingMediaDriver.launch(
new MediaDriver.Context()
.timerIntervalNs(TimeUnit.MILLISECONDS.toNanos(100))
.publicationConnectionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(500))
.imageLivenessTimeoutNs(TimeUnit.MILLISECONDS.toNanos(500))
.aeronDirectoryName(aeronDirectoryName)
.receiveChannelEndpointSupplier((udpChannel, dispatcher, statusIndicator, context) ->
new DebugReceiveChannelEndpoint(udpChannel, dispatcher, statusIndicator, context,
lossGenerator, lossGenerator))
.sendChannelEndpointSupplier((udpChannel, statusIndicator, context) ->
new DebugSendChannelEndpoint(udpChannel, statusIndicator, context,
lossGenerator, lossGenerator)),
new Archive.Context()
.aeronContext(new Aeron.Context()
.aeronDirectoryName(aeronDirectoryName)));
Aeron archiveAeron = Aeron.connect(new Aeron.Context()
.aeronDirectoryName(aeronDirectoryName));
AeronArchive archive = AeronArchive.connect(new AeronArchive.Context()
.aeron(archiveAeron)))
{
lossGenerator.shouldDrop = true;
Thread.sleep(6000);
// now ControlSession should be dead
lossGenerator.shouldDrop = false;
while (!archive.archiveProxy().publication().isConnected()) {
Thread.yield();
}
try {
archive.listRecordings(0, 10, new FailControlResponseListener());
Assert.fail();
} catch (TimeoutException ex) {
// this exception is ambiguous
// there is no way to know, is it time to CONNECT again
}
}
}
private static class ControlledLossGenerator implements LossGenerator {
private volatile boolean shouldDrop = false;
@Override
public boolean shouldDropFrame(InetSocketAddress address, UnsafeBuffer buffer, int length) {
return shouldDrop;
}
}
}
Metadata
Metadata
Assignees
Labels
No labels