8000 HADOOP-18167. Add metrics to track delegation token secret manager op… by hchaverri · Pull Request #4092 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HADOOP-18167. Add metrics to track delegation token secret manager op… #4092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10000
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,17 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.metrics2.util.Metrics2Util.NameValuePair;
import org.apache.hadoop.metrics2.util.Metrics2Util.TopN;
import org.apache.hadoop.security.AccessControlException;
Expand All @@ -47,6 +57,7 @@
import org.apache.hadoop.util.Time;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -96,6 +107,10 @@ private String formatTokenId(TokenIdent id) {
* Access to currentKey is protected by this object lock
*/
private DelegationKey currentKey;
/**
* Metrics to track token management operations.
*/
private DelegationTokenSecretManagerMetrics metrics;

private long keyUpdateInterval;
private long tokenMaxLifetime;
Expand Down Expand Up @@ -134,6 +149,7 @@ public AbstractDelegationTokenSecretManager(long delegationKeyUpdateInterval,
this.tokenRenewInterval = delegationTokenRenewInterval;
this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
this.storeTokenTrackingId = false;
this.metrics = DelegationTokenSecretManagerMetrics.create();
}

/** should be called before this object is used */
Expand Down Expand Up @@ -430,14 +446,14 @@ protected synchronized byte[] createPassword(TokenIdent identifier) {
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
storeToken(identifier, tokenInfo);
metrics.trackStoreToken(() -> storeToken(identifier, tokenInfo));
} catch (IOException ioe) {
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
ioe);
}
return password;
}



/**
Expand Down Expand Up @@ -555,7 +571,7 @@ public synchronized long renewToken(Token<TokenIdent> token,
throw new InvalidToken("Renewal request for unknown token "
+ formatTokenId(id));
}
updateToken(id, info);
metrics.trackUpdateToken(() -> updateToken(id, info));
return renewTime;
}

Expand Down Expand Up @@ -591,8 +607,10 @@ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
if (info == null) {
throw new InvalidToken("Token not found " + formatTokenId(id));
}
removeTokenForOwnerStats(id);
removeStoredToken(id);
metrics.trackRemoveToken(() -> {
removeTokenForOwnerStats(id);
removeStoredToken(id);
});
return id;
}

Expand Down Expand Up @@ -825,4 +843,97 @@ protected void syncTokenOwnerStats() {
addTokenForOwnerStats(id);
}
}

protected DelegationTokenSecretManagerMetrics getMetrics() {
return metrics;
}

/**
* DelegationTokenSecretManagerMetrics tracks token management operations
* and publishes them through the metrics interfaces.
*/
@Metrics(about="Delegation token secret manager metrics", context="token")
static class DelegationTokenSecretManagerMetrics implements DurationTrackerFactory {
private static final Logger LOG = LoggerFactory.getLogger(
DelegationTokenSecretManagerMetrics.class);

final static String STORE_TOKEN_STAT = "storeToken";
final static String UPDATE_TOKEN_STAT = "updateToken";
final static String REMOVE_TOKEN_STAT = "removeToken";
final static String TOKEN_FAILURE_STAT = "tokenFailure";

private final MetricsRegistry registry;
private final IOStatisticsStore ioStatistics;

@Metric("Rate of storage of delegation tokens and latency (milliseconds)")
private MutableRate storeToken;
@Metric("Rate of update of delegation tokens and latency (milliseconds)")
private MutableRate updateToken;
@Metric("Rate of removal of delegation tokens and latency (milliseconds)")
private MutableRate removeToken;
@Metric("Counter of delegation tokens operation failures")
private MutableCounterLong tokenFailure;

static DelegationTokenSecretManagerMetrics create() {
return DefaultMetricsSystem.instance().register(new DelegationTokenSecretManagerMetrics());
}

DelegationTokenSecretManagerMetrics() {
ioStatistics = IOStatisticsBinding.iostatisticsStore()
.withDurationTracking(STORE_TOKEN_STAT, UPDATE_TOKEN_STAT, REMOVE_TOKEN_STAT)
.withCounters(TOKEN_FAILURE_STAT)
.build();
registry = new MetricsRegistry("DelegationTokenSecretManagerMetrics");
LOG.debug("Initialized {}", registry);
}

public void trackStoreToken(InvocationRaisingIOE invocation) throws IOException {
trackInvocation(invocation, STORE_TOKEN_STAT, storeToken);
}

public void trackUpdateToken(InvocationRaisingIOE invocation) throws IOException {
trackInvocation(invocation, UPDATE_TOKEN_STAT, updateToken);
}

public void trackRemoveToken(InvocationRaisingIOE invocation) throws IOException {
trackInvocation(invocation, REMOVE_TOKEN_STAT, removeToken);
}

public void trackInvocation(InvocationRaisingIOE invocation, String statistic,
MutableRate metric) throws IOException {
try {
long start = Time.monotonicNow();
IOStatisticsBinding.trackDurationOfInvocation(this, statistic, invocation);
metric.add(Time.monotonicNow() - start);
} catch (Exception ex) {
tokenFailure.incr();
throw ex;
}
}

@Override
public DurationTracker trackDuration(String key, long count) {
return ioStatistics.trackDuration(key, count);
}

protected MutableRate getStoreToken() {
return storeToken;
}

protected MutableRate getUpdateToken() {
return updateToken;
}

protected MutableRate getRemoveToken() {
return removeToken;
}

protected MutableCounterLong getTokenFailure() {
return tokenFailure;
}

protected IOStatisticsStore getIoStatistics() {
return ioStatistics;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
import java.util.List;
import java.util.Map;

import java.util.concurrent.Callable;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;

import org.apache.hadoop.io.DataInputBuffer;
Expand Down Expand Up @@ -155,6 +161,55 @@ public DelegationKey getKey(TestDelegationTokenIdentifier id) {
return allKeys.get(id.getMasterKeyId());
}
}

public static class TestFailureDelegationTokenSecretManager
extends TestDelegationTokenSecretManager {
private boolean throwError = false;
private long errorSleepMillis;

public TestFailureDelegationTokenSecretManager(long errorSleepMillis) {
super(24*60*60*1000, 10*1000, 1*1000, 60*60*1000);
this.errorSleepMillis = errorSleepMillis;
}

public void setThrowError(boolean throwError) {
this.throwError = throwError;
}

private void sleepAndThrow() throws IOException {
try {
Thread.sleep(errorSleepMillis);
throw new IOException("Test exception");
} catch (InterruptedException e) {
}
}

@Override
protected void storeNewToken(TestDelegationTokenIdentifier ident, long renewDate)
throws IOException {
if (throwError) {
sleepAndThrow();
}
super.storeNewToken(ident, renewDate);
}

@Override
protected void removeStoredToken(TestDelegationTokenIdentifier ident) throws IOException {
if (throwError) {
sleepAndThrow();
}
super.removeStoredToken(ident);
}

@Override
protected void updateStoredToken(TestDelegationTokenIdentifier ident, long renewDate)
throws IOException {
if (throwError) {
sleepAndThrow();
}
super.updateStoredToken(ident, renewDate);
}
}

public static class TokenSelector extends
AbstractDelegationTokenSelector<TestDelegationTokenIdentifier>{
Expand Down Expand Up @@ -579,4 +634,85 @@ public void testEmptyToken() throws IOException {
assertEquals(token1, token2);
assertEquals(token1.encodeToUrlString(), token2.encodeToUrlString());
}

@Test
public void testDelegationTokenSecretManagerMetrics() throws Exception {
TestDelegationTokenSecretManager dtSecretManager =
new TestDelegationTokenSecretManager(24*60*60*1000,
10*1000, 1*1000, 60*60*1000);
try {
dtSecretManager.startThreads();

final Token<TestDelegationTokenIdentifier> token = callAndValidateMetrics(
dtSecretManager, dtSecretManager.getMetrics().getStoreToken(), "storeToken",
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"), 1);

callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getUpdateToken(),
"updateToken", () -> dtSecretManager.renewToken(token, "JobTracker"), 1);

callAndValidateMetrics(dtSecretManager, dtSecretManager.getMetrics().getRemoveToken(),
"removeToken", () -> dtSecretManager.cancelToken(token, "JobTracker"), 1);
} finally {
dtSecretManager.stopThreads();
}
}

@Test
public void testDelegationTokenSecretManagerMetricsFailures() throws Exception {
int errorSleepMillis = 200;
TestFailureDelegationTokenSecretManager dtSecretManager =
new TestFailureDelegationTokenSecretManager(errorSleepMillis);

try {
dtSecretManager.startThreads();

final Token<TestDelegationTokenIdentifier> token =
generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker");

dtSecretManager.setThrowError(true);

callAndValidateFailureMetrics(dtSecretManager, "storeToken", 1, 1, false,
errorSleepMillis,
() -> generateDelegationToken(dtSecretManager, "SomeUser", "JobTracker"));

callAndValidateFailureMetrics(dtSecretManager, "updateToken", 1, 2, true,
errorSleepMillis, () -> dtSecretManager.renewToken(token, "JobTracker"));

callAndValidateFailureMetrics(dtSecretManager, "removeToken", 1, 3, true,
errorSleepMillis, () -> dtSecretManager.cancelToken(token, "JobTracker"));
} finally {
dtSecretManager.stopThreads();
}
}

private <T> T callAndValidateMetrics(TestDelegationTokenSecretManager dtSecretManager,
MutableRate metric, String statName, Callable<T> callable, int expectedCount)
throws Exception {
MeanStatistic stat = IOStatisticAssertions.lookupMeanStatistic(
dtSecretManager.getMetrics().getIoStatistics(), statName + ".mean");
assertEquals(expectedCount - 1, metric.lastStat().numSamples());
assertEquals(expectedCount - 1, stat.getSamples());
T returnedObject = callable.call();
assertEquals(expectedCount, metric.lastStat().numSamples());
assertEquals(expectedCount, stat.getSamples());
return returnedObject;
}

private <T> void callAndValidateFailureMetrics(TestDelegationTokenSecretManager dtSecretManager,
String statName, int expectedStatCount, int expectedMetricCount, boolean expectError,
int errorSleepMillis, Callable<T> callable) throws Exception {
MutableCounterLong counter = dtSecretManager.getMetrics().getTokenFailure();
MeanStatistic failureStat = IOStatisticAssertions.lookupMeanStatistic(
dtSecretManager.getMetrics().getIoStatistics(), statName + ".failures.mean");
assertEquals(expectedMetricCount - 1, counter.value());
assertEquals(expectedStatCount - 1, failureStat.getSamples());
if (expectError) {
LambdaTestUtils.intercept(IOException.class, callable);
} else {
callable.call();
}
assertEquals(expectedMetricCount, counter.value());
assertEquals(expectedStatCount, failureStat.getSamples());
assertTrue(failureStat.getSum() >= errorSleepMillis);
}
}
0