8000 HDFS-16553 Fix checkstyle for the length of BlockManager construction method over limit. by smarthanwang · Pull Request #4211 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HDFS-16553 Fix checkstyle for the length of BlockManager construction method over limit. #4211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -485,72 +485,54 @@ public int getPendingSPSPaths() {
public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
this.datanodeManager = new DatanodeManager(this, namesystem, conf);
this.heartbeatManager = datanodeManager.getHeartbeatManager();
this.blockIdManager = new BlockIdManager(this);
blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
this.blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
rescannedMisreplicatedBlocks =
this.rescannedMisreplicatedBlocks =
new ArrayList<Block>(blocksPerPostpondedRescan);
startupDelayBlockDeletionInMs = conf.getLong(
this.startupDelayBlockDeletionInMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
deleteBlockLockTimeMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT)
* 1000L;
this.deleteBlockLockTimeMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_LOCK_THRESHOLD_MS,
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_LOCK_THRESHOLD_MS_DEFAULT);
deleteBlockUnlockIntervalTimeMs = conf.getLong(
this.deleteBlockUnlockIntervalTimeMs = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_UNLOCK_INTERVAL_MS,
DFSConfigKeys.DFS_NAMENODE_BLOCK_DELETION_UNLOCK_INTERVAL_MS_DEFAULT);
invalidateBlocks = new InvalidateBlocks(
this.invalidateBlocks = new InvalidateBlocks(
datanodeManager.getBlockInvalidateLimit(),
startupDelayBlockDeletionInMs,
blockIdManager);
markedDeleteQueue = new ConcurrentLinkedQueue<>();
this.markedDeleteQueue = new ConcurrentLinkedQueue<>();
// Compute the map capacity by allocating 2% of total memory
blocksMap = new BlocksMap(
this.blocksMap = new BlocksMap(
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
placementPolicies = new BlockPlacementPolicies(
conf, datanodeManager.getFSClusterStats(),
datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite(conf);
pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(
this.placementPolicies = new BlockPlacementPolicies(
conf, datanodeManager.getFSClusterStats(),
datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
this.storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite(conf);
this.pendingReconstruction = new PendingReconstructionBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L);

createSPSManager(conf);

blockTokenSecretManager = createBlockTokenSecretManager(conf);

providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);
this.blockTokenSecretManager = createBlockTokenSecretManager(conf);
this.providedStorageMap = new ProvidedStorageMap(namesystem, this, conf);

this.maxCorruptFilesReturned = conf.getInt(
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);

final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
if (minR <= 0)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minR + " <= 0");
if (maxR > Short.MAX_VALUE)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+ " = " + maxR + " > " + Short.MAX_VALUE);
if (minR > maxR)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minR + " > "
+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+ " = " + maxR);
this.minReplication = (short)minR;
this.maxReplication = (short)maxR;
this.minReplication = (short) initMinReplication(conf);
this.maxReplication = (short) initMaxReplication(conf);

this.maxReplicationStreams =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
Expand Down Expand Up @@ -582,6 +564,66 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);

this.minReplicationToBeInMaintenance =
(short) initMinReplicationToBeInMaintenance(conf);
this.replQueueResetToHeadThreshold =
initReplQueueResetToHeadThreshold(conf);

long heartbeatIntervalSecs = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
this.pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);

this.blockReportLeaseManager = new BlockReportLeaseManager(conf);

this.bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled,
conf);

int queueSize = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
this.blockReportThread = new BlockReportProcessingThread(queueSize);

this.deleteCorruptReplicaImmediately =
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);

printInitialConfigs();
}

private int initMinReplication(Configuration conf) throws IOException {
final int minR = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
if (minR <= 0) {
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minR + " <= 0");
}
return minR;
}

private int initMaxReplication(Configuration conf) throws IOException {
final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
if (maxR > Short.MAX_VALUE) {
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+ " = " + maxR + " > " + Short.MAX_VALUE);
}
if (minReplication > maxR) {
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minReplication + " > "
+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+ " = " + maxR);
}
return maxR;
}

private int initMinReplicationToBeInMaintenance(Configuration conf)
throws IOException {
final int minMaintenanceR = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);
Expand All @@ -598,39 +640,25 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
+ DFSConfigKeys.DFS_REPLICATION_KEY
+ " = " + defaultReplication);
}
this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
return minMaintenanceR;
}

replQueueResetToHeadThreshold = conf.getInt(
private int initReplQueueResetToHeadThreshold(Configuration conf) {
int threshold = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
if (replQueueResetToHeadThreshold < 0) {
if (threshold < 0) {
LOG.warn("{} is set to {} and it must be >= 0. Resetting to default {}",
DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS,
replQueueResetToHeadThreshold, DFSConfigKeys.
threshold, DFSConfigKeys.
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT);
replQueueResetToHeadThreshold = DFSConfigKeys.
threshold = DFSConfigKeys.
DFS_NAMENODE_REDUNDANCY_QUEUE_RESTART_ITERATIONS_DEFAULT;
}
return threshold;
}

long heartbeatIntervalSecs = conf.getTimeDuration(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
long blockRecoveryTimeout = getBlockRecoveryTimeout(heartbeatIntervalSecs);
pendingRecoveryBlocks = new PendingRecoveryBlocks(blockRecoveryTimeout);

this.blockReportLeaseManager = new BlockReportLeaseManager(conf);

bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);

int queueSize = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_KEY,
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
blockReportThread = new BlockReportProcessingThread(queueSize);

this.deleteCorruptReplicaImmediately =
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);

private void printInitialConfigs() {
LOG.info("defaultReplication = {}", defaultReplication);
LOG.info("maxReplication = {}", maxReplication);
LOG.info("minReplication = {}", minReplication);
Expand Down
0