8000 HDFS-16396. Reconfig slow peer parameters for datanode by tomscut · Pull Request #3827 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HDFS-16396. Reconfig slow peer parameters for datanode #3827

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
8000
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
volumeFailureSummary.getFailedStorageLocations().length : 0;
final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
final SlowPeerReports slowPeers =
outliersReportDue && dn.getPeerMetrics() != null ?
outliersReportDue && dnConf.peerStatsEnabled && dn.getPeerMetrics() != null ?
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
SlowPeerReports.EMPTY_REPORT;
final SlowDiskReports slowDisks =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ duration, datanodeSlowLogThresholdMs, getVolumeBaseUri(),
*/
private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
if (peerMetrics != null && isPenultimateNode) {
if (datanode.getDnConf().peerStatsEnabled && peerMetrics != null && isPenultimateNode) {
peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
}
}
Expand Down Expand Up @@ -1107,7 +1107,7 @@ private void initPerfMonitoring(DatanodeInfo[] downstreams) {
if (downstreams != null && downstreams.length > 0) {
downstreamDNs = downstreams;
isPenultimateNode = (downstreams.length == 1);
if (isPenultimateNode && datanode.getPeerMetrics() != null) {
if (isPenultimateNode && datanode.getDnConf().peerStatsEnabled) {
mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
LOG.debug("Will collect peer metrics for downstream node {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class DNConf {
private final long lifelineIntervalMs;
volatile long blockReportInterval;
volatile long blockReportSplitThreshold;
final boolean peerStatsEnabled;
volatile boolean peerStatsEnabled;
final boolean diskStatsEnabled;
final long outliersReportIntervalMs;
final long ibrInterval;
Expand Down Expand Up @@ -507,4 +507,8 @@ void setInitBRDelayMs(String delayMs) {
dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
initBlockReportDelay();
}

void setPeerStatsEnabled(boolean enablePeerStats) {
peerStatsEnabled = enablePeerStats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,19 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
Expand Down Expand Up @@ -315,7 +323,11 @@ public class DataNode extends ReconfigurableBase
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
DFS_DATANODE_PEER_STATS_ENABLED_KEY,
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));

public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");

Expand Down Expand Up @@ -357,7 +369,7 @@ public static InetSocketAddress createSocketAddr(String target) {

DataNodeMetrics metrics;
@Nullable
private DataNodePeerMetrics peerMetrics;
private volatile DataNodePeerMetrics peerMetrics;
private DataNodeDiskMetrics diskMetrics;
private InetSocketAddress streamingAddr;

Expand Down Expand Up @@ -634,6 +646,11 @@ public String reconfigurePropertyImpl(String property, String newVal)
return reconfDataXceiverParameters(property, newVal);
case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
return reconfCacheReportParameters(property, newVal);
case DFS_DATANODE_PEER_STATS_ENABLED_KEY:
case DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY:
case DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY:
case DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY:
return reconfSlowPeerParameters(property, newVal);
default:
break;
}
Expand Down Expand Up @@ -713,6 +730,53 @@ private String reconfBlockReportParameters(String property, String newVal)
}
}

private String reconfSlowPeerParameters(String property, String newVal)
throws ReconfigurationException {
String result = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
if (property.equals(DFS_DATANODE_PEER_STATS_ENABLED_KEY)) {
Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
if (newVal != null && !newVal.equalsIgnoreCase("true")
&& !newVal.equalsIgnoreCase("false")) {
throw new IllegalArgumentException("Not a valid Boolean value for " + property +
" in reconfSlowPeerParameters");
}
boolean enable = (newVal == null ? DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT :
Boolean.parseBoolean(newVal));
result = Boolean.toString(enable);
dnConf.setPeerStatsEnabled(enable);
if (enable) {
// Create if it doesn't exist, overwrite if it does.
peerMetrics = DataNodePeerMetrics.create(getDisplayName(), getConf());
}
} else if (property.equals(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY)) {
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
long minNodes = (newVal == null ? DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT :
Long.parseLong(newVal));
result = Long.toString(minNodes);
peerMetrics.setMinOutlierDetectionNodes(minNodes);
} else if (property.equals(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY)) {
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
long threshold = (newVal == null ? DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT :
Long.parseLong(newVal));
result = Long.toString(threshold);
peerMetrics.setLowThresholdMs(threshold);
} else if (property.equals(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY)) {
Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be disabled.");
long minSamples = (newVal == null ?
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT :
Long.parseLong(newVal));
result = Long.toString(minSamples);
peerMetrics.setMinOutlierDetectionSamples(minSamples);
}
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
return result;
} catch (IllegalArgumentException e) {
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
}
}

6D4E /**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down Expand Up @@ -3872,7 +3936,7 @@ void setBlockScanner(BlockScanner blockScanner) {

@Override // DataNodeMXBean
public String getSendPacketDownstreamAvgInfo() {
return peerMetrics != null ?
return dnConf.peerStatsEnabled && peerMetrics != null ?
peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public void run() {
* the thread dies away.
*/
private void collectThreadLocalStates() {
if (datanode.getPeerMetrics() != null) {
if (datanode.getDnConf().peerStatsEnabled && datanode.getPeerMetrics() != null) {
datanode.getPeerMetrics().collectThreadLocalStates();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;

/**
* This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
Expand All @@ -57,27 +62,27 @@ public class DataNodePeerMetrics {
* for outlier detection. If the number of samples is below this then
* outlier detection is skipped.
*/
private final long minOutlierDetectionSamples;
private volatile long minOutlierDetectionSamples;
/**
* Threshold in milliseconds below which a DataNode is definitely not slow.
*/
private final long lowThresholdMs;
private volatile long lowThresholdMs;
/**
* Minimum number of nodes to run outlier detection.
*/
private final long minOutlierDetectionNodes;
private volatile long minOutlierDetectionNodes;

public DataNodePeerMetrics(final String name, Configuration conf) {
this.name = name;
minOutlierDetectionSamples = conf.getLong(
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT);
lowThresholdMs =
conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
conf.getLong(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
minOutlierDetectionNodes =
conf.getLong(DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
conf.getLong(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
this.slowNodeDetector =
new OutlierDetector(minOutlierDetectionNodes, lowThresholdMs);
Comment on lines 86 to 87
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.slowNodeDetector has to update minOutlierDetectionNodes and lowThresholdMs after reconfiguring them, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tasanuma for your comment, and it makes sense to me. I updated the code, please have a look when you are free.

sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
Expand All @@ -87,7 +92,7 @@ public String name() {
return name;
}

long getMinOutlierDetectionSamples() {
public long getMinOutlierDetectionSamples() {
return minOutlierDetectionSamples;
}

Expand Down Expand Up @@ -150,4 +155,38 @@ public Map<String, Double> getOutliers() {
public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
return sendPacketDownstreamRollingAverages;
}

public void setMinOutlierDetectionNodes(long minNodes) {
Preconditions.checkArgument(minNodes > 0,
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY + " should be larger than 0");
minOutlierDetectionNodes = minNodes;
this.slowNodeDetector.setMinNumResources(minNodes);
}

public long getMinOutlierDetectionNodes() {
return minOutlierDetectionNodes;
}

public void setLowThresholdMs(long thresholdMs) {
Preconditions.checkArgument(thresholdMs > 0,
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY + " should be larger than 0");
lowThresholdMs = thresholdMs;
this.slowNodeDetector.setLowThresholdMs(thresholdMs);
}

public long getLowThresholdMs() {
return lowThresholdMs;
}

public void setMinOutlierDetectionSamples(long minSamples) {
Preconditions.checkArgument(minSamples > 0,
DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY +
" should be larger than 0");
minOutlierDetectionSamples = minSamples;
}

@VisibleForTesting
public OutlierDetector getSlowNodeDetector() {
return this.slowNodeDetector;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class OutlierDetector {
/**
* Minimum number of resources to run outlier detection.
*/
private final long minNumResources;
private volatile long minNumResources;

/**
* The multiplier is from Leys, C. et al.
Expand All @@ -70,7 +70,7 @@ public class OutlierDetector {
/**
* Threshold in milliseconds below which a node/ disk is definitely not slow.
*/
private final long lowThresholdMs;
private volatile long lowThresholdMs;

/**
* Deviation multiplier. A sample is considered to be an outlier if it
Expand Down Expand Up @@ -180,4 +180,20 @@ public static Double computeMedian(List<Double> sortedValues) {
}
return median;
}

public void setMinNumResources(long minNodes) {
minNumResources = minNodes;
}

public long getMinOutlierDetectionNodes() {
return minNumResources;
}

public void setLowThresholdMs(long thresholdMs) {
lowThresholdMs = thresholdMs;
}

public long getLowThresholdMs() {
return lowThresholdMs;
}
}
Loading
0