From 3e2e2c3f18b81c7ba1936c805a357c515cbac9ac Mon Sep 17 00:00:00 2001 From: LiuGuH <444506464@qq.com> Date: Wed, 5 Jun 2024 16:25:55 +0800 Subject: [PATCH 1/4] HDFS-16757. Add a new method copyBlockCrossNamespace to DataNode --- .../org/apache/hadoop/hdfs/DFSClient.java | 21 ++ .../apache/hadoop/hdfs/DFSOutputStream.java | 24 ++ .../hadoop/hdfs/DFSStripedOutputStream.java | 3 + .../hadoop/hdfs/DistributedFileSystem.java | 7 +- .../datatransfer/DataTransferProtocol.java | 14 ++ .../hadoop/hdfs/protocol/datatransfer/Op.java | 1 + .../hdfs/protocol/datatransfer/Sender.java | 15 ++ .../src/main/proto/datatransfer.proto | 7 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 + .../hdfs/protocol/datatransfer/Receiver.java | 21 ++ .../hadoop/hdfs/server/datanode/DNConf.java | 15 ++ .../hadoop/hdfs/server/datanode/DataNode.java | 225 +++++++++++++----- .../datanode/DataNodeFaultInjector.java | 7 +- .../hdfs/server/datanode/DataXceiver.java | 29 +++ .../datanode/fsdataset/FsDatasetSpi.java | 12 + .../fsdataset/impl/BlockPoolSlice.java | 13 +- .../fsdataset/impl/FsDatasetImpl.java | 23 ++ .../datanode/metrics/DataNodeMetrics.java | 18 ++ .../src/main/resources/hdfs-default.xml | 8 + .../hadoop/hdfs/TestDataTransferProtocol.java | 149 +++++++++++- .../server/datanode/SimulatedFSDataset.java | 5 + .../extdataset/ExternalDatasetImpl.java | 7 + 22 files changed, 566 insertions(+), 63 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index cbe7516b0ede0..f9ed1b3bf79b2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -1956,6 +1957,26 @@ protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout, socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken); } + protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, DatanodeInfo sourceDatanode, + ExtendedBlock targetBlk, Token targetBlockToken, + DatanodeInfo targetDatanode) throws IOException { + IOStreamPair pair = + DFSUtilClient.connectToDN(sourceDatanode, getConf().getSocketTimeout(), conf, saslClient, + socketFactory, getConf().isConnectToDnViaHostname(), this, sourceBlockToken); + + new Sender((DataOutputStream) pair.out).copyBlockCrossNamespace(sourceBlk, sourceBlockToken, + targetBlk, targetBlockToken, targetDatanode); + + pair.out.flush(); + + DataInputStream reply = new DataInputStream(pair.in); + BlockOpResponseProto proto = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(reply)); + DataTransferProtoUtil.checkBlockOpStatus(proto, + "copyBlockCrossNamespace " + sourceBlk + " to " + targetBlk + " from " + sourceDatanode + + " to " + targetDatanode); + } + /** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index a1bfb7f5d594e..f6bcfd67976ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -131,6 +131,7 @@ public class DFSOutputStream extends FSOutputSummer private FileEncryptionInfo fileEncryptionInfo; private int writePacketSize; private boolean leaseRecovered = false; + private ExtendedBlock userAssignmentLastBlock; /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, @@ -949,6 +950,9 @@ protected void recoverLease(boolean recoverLeaseOnCloseException) { void completeFile() throws IOException { // get last block before destroying the streamer ExtendedBlock lastBlock = getStreamer().getBlock(); + if (lastBlock == null) { + lastBlock = getUserAssignmentLastBlock(); + } try (TraceScope ignored = dfsClient.getTracer() .newScope("DFSOutputStream#completeFile")) { completeFile(lastBlock); @@ -1095,6 +1099,14 @@ ExtendedBlock getBlock() { return getStreamer().getBlock(); } + public ExtendedBlock getUserAssignmentLastBlock() { + return userAssignmentLastBlock; + } + + public void setUserAssignmentLastBlock(ExtendedBlock userAssignmentLastBlock) { + this.userAssignmentLastBlock = userAssignmentLastBlock; + } + @VisibleForTesting public long getFileId() { return fileId; @@ -1199,4 +1211,16 @@ private static long calculateDelayForNextRetry(long previousDelay, long maxDelay) { return Math.min(previousDelay * 2, maxDelay); } + + public DFSClient getDfsClient() { + return dfsClient; + } + + protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, DatanodeInfo sourceDatanode, + ExtendedBlock targetBlk, Token targetBlockToken, + DatanodeInfo targetDatanode) throws IOException { + dfsClient.copyBlockCrossNamespace(sourceBlk, sourceBlockToken, sourceDatanode, targetBlk, + targetBlockToken, targetDatanode); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 8320cc9a40866..543c00d45e40d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -1271,6 +1271,9 @@ protected synchronized void closeImpl() throws IOException { try (TraceScope ignored = dfsClient.getTracer().newScope("completeFile")) { + if (currentBlockGroup == null) { + currentBlockGroup = getUserAssignmentLastBlock(); + } completeFile(currentBlockGroup); } logCorruptBlocks(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 17c39f6c55b75..6182d852c35e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -600,9 +600,10 @@ public FSDataOutputStream next(final FileSystem fs, final Path p) * inherited policy. * */ - private HdfsDataOutputStream create(final Path f, - final FsPermission permission, final EnumSet flag, - final int bufferSize, final short replication, final long blockSize, + public HdfsDataOutputStream create( + final Path f, final FsPermission permission, + final EnumSet flag, final int bufferSize, + final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, final InetSocketAddress[] favoredNodes, final String ecPolicyName, final String storagePolicy) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 384f1dc3507af..8756c465ae806 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -238,4 +238,18 @@ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, Token blockToken, long requestedNumBytes, BlockChecksumOptions blockChecksumOptions) throws IOException; + + /** + * Copy a block cross Namespace. + * It is used for fastcopy. + * + * @param sourceBlk the block being copied. + * @param sourceBlockToken security token for accessing sourceBlk. + * @param targetBlk the block to be writted. + * @param targetBlockToken security token for accessing targetBlk. + * @param targetDatanode the target datnode which sourceBlk will copy to as targetBlk. + */ + void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, ExtendedBlock targetBlk, + Token targetBlockToken, DatanodeInfo targetDatanode) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java index 94250e5e7f622..1e9366dcb46f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -39,6 +39,7 @@ public enum Op { RELEASE_SHORT_CIRCUIT_FDS((byte)88), REQUEST_SHORT_CIRCUIT_SHM((byte)89), BLOCK_GROUP_CHECKSUM((byte)90), + COPY_BLOCK_CROSSNAMESPACE((byte)91), CUSTOM((byte)127); /** The code for this operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 86c6513745ea2..7b3329331c2c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -308,4 +309,18 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, send(out, Op.BLOCK_GROUP_CHECKSUM, proto); } + + @Override + public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, ExtendedBlock targetBlk, + Token targetBlockToken, DatanodeInfo targetDatanode) + throws IOException { + OpCopyBlockCrossNamespaceProto proto = OpCopyBlockCrossNamespaceProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(sourceBlk, sourceBlockToken)) + .setTargetBlock(PBHelperClient.convert(targetBlk)) + .setTargetToken(PBHelperClient.convert(targetBlockToken)) + .setTargetDatanode(PBHelperClient.convert(targetDatanode)).build(); + + send(out, Op.COPY_BLOCK_CROSSNAMESPACE, proto); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 5356cd6961699..b9cc22ca3bb5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -331,3 +331,10 @@ message OpBlockChecksumResponseProto { message OpCustomProto { required string customId = 1; } + +message OpCopyBlockCrossNamespaceProto { + required BaseHeaderProto header = 1; + required ExtendedBlockProto targetBlock = 2; + required hadoop.common.TokenProto targetToken = 3; + required DatanodeInfoProto targetDatanode = 4; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b9f8e07f67a5f..1119a25d3feb9 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -136,6 +136,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.ec.reconstruct.write.bandwidthPerSec"; public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT = 0; // A value of zero indicates no limit + public static final String DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY = + "dfs.datanode.copy.block.cross.namespace.socket-timeout.ms"; + public static final int DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT = + 5 * 60 * 1000; + @Deprecated public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 8bcfb199ff5a9..6e149dd4c3cb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -133,6 +134,9 @@ protected final void processOp(Op op) throws IOException { case REQUEST_SHORT_CIRCUIT_SHM: opRequestShortCircuitShm(in); break; + case COPY_BLOCK_CROSSNAMESPACE: + opCopyBlockCrossNamespace(in); + break; default: throw new IOException("Unknown op " + op + " in data stream"); } @@ -339,4 +343,21 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException { } } } + + private void opCopyBlockCrossNamespace(DataInputStream dis) throws IOException { + OpCopyBlockCrossNamespaceProto proto = + OpCopyBlockCrossNamespaceProto.parseFrom(vintPrefixed(dis)); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); + try { + copyBlockCrossNamespace(PBHelperClient.convert(proto.getHeader().getBlock()), + PBHelperClient.convert(proto.getHeader().getToken()), + PBHelperClient.convert(proto.getTargetBlock()), + PBHelperClient.convert(proto.getTargetToken()), + PBHelperClient.convert(proto.getTargetDatanode())); + } finally { + if (traceScope != null) { + traceScope.close(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 21b92db3073a1..736314bdc7fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -27,6 +27,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; @@ -89,6 +91,7 @@ public class DNConf { final int socketWriteTimeout; final int socketKeepaliveTimeout; final int ecChecksumSocketTimeout; + private final int copyBlockCrossNamespaceSocketTimeout; private final int transferSocketSendBufferSize; private final int transferSocketRecvBufferSize; private final boolean tcpNoDelay; @@ -153,6 +156,9 @@ public DNConf(final Configurable dn) { ecChecksumSocketTimeout = getConf().getInt( DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY, DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT); + copyBlockCrossNamespaceSocketTimeout = getConf().getInt( + DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY, + DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT); this.transferSocketSendBufferSize = getConf().getInt( DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT); @@ -390,6 +396,15 @@ public int getEcChecksumSocketTimeout() { return ecChecksumSocketTimeout; } + /** + * Returns socket timeout for copyBlockCrossNamespace. + * + * @return int socket timeout + */ + public int getCopyBlockCrossNamespaceSocketTimeout() { + return copyBlockCrossNamespaceSocketTimeout; + } + /** * Returns the SaslPropertiesResolver configured for use with * DataTransferProtocol, or null if not configured. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 87e8eee681d1d..4161c3531e929 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -470,6 +470,7 @@ public static InetSocketAddress createSocketAddr(String target) { private DataSetLockManager dataSetLockManager; private final ExecutorService xferService; + private final ExecutorService copyBlockCrossNamespaceExecutor; @Nullable private final StorageLocationChecker storageLocationChecker; @@ -518,6 +519,8 @@ private static Tracer createTracer(Configuration conf) { volumeChecker = new DatasetVolumeChecker(conf, new Timer()); this.xferService = HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); + this.copyBlockCrossNamespaceExecutor = + HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); double congestionRationTmp = conf.getDouble(DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO, DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT); this.congestionRatio = congestionRationTmp > 0 ? @@ -583,7 +586,8 @@ private static Tracer createTracer(Configuration conf) { "File descriptor passing was not configured."; LOG.debug(this.fileDescriptorPassingDisabledReason); } - + this.copyBlockCrossNamespaceExecutor = + HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); try { @@ -2593,9 +2597,12 @@ public void shutdown() { // wait reconfiguration thread, if any, to exit shutdownReconfigurationTask(); - LOG.info("Waiting up to 30 seconds for transfer threads to complete"); + LOG.info("Waiting up to 15 seconds for transfer threads to complete"); HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS); + LOG.info("Waiting up to 15 seconds for copyBlockCrossNamespaceExecutor threads to complete"); + HadoopExecutors.shutdown(this.copyBlockCrossNamespaceExecutor, LOG, 15L, TimeUnit.SECONDS); + // wait for all data receiver threads to exit if (this.threadGroup != null) { int sleepMs = 2; @@ -3003,46 +3010,58 @@ private class DataTransfer implements Runnable { final DatanodeInfo[] targets; final StorageType[] targetStorageTypes; final private String[] targetStorageIds; - final ExtendedBlock b; + final private ExtendedBlock source; + private ExtendedBlock target; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; final String clientname; final CachingStrategy cachingStrategy; - /** Throttle to block replication when data transfers or writes. */ + /** + * Throttle to block replication when data transfers or writes. + */ private DataTransferThrottler throttler; + private boolean copyBlockCrossNamespace; /** - * Connect to the first item in the target list. Pass along the + * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, - String[] targetStorageIds, ExtendedBlock b, - BlockConstructionStage stage, final String clientname) { - DataTransferProtocol.LOG.debug("{}: {} (numBytes={}), stage={}, " + - "clientname={}, targets={}, target storage types={}, " + - "target storage IDs={}", getClass().getSimpleName(), b, - b.getNumBytes(), stage, clientname, Arrays.asList(targets), - targetStorageTypes == null ? "[]" : - Arrays.asList(targetStorageTypes), + String[] targetStorageIds, ExtendedBlock source, BlockConstructionStage stage, + final String clientname) { + DataTransferProtocol.LOG.debug("{}: {} (numBytes={}), stage={}, " + + "clientname={}, targets={}, target storage types={}, " + "target storage IDs={}", + getClass().getSimpleName(), source, source.getNumBytes(), stage, clientname, + Arrays.asList(targets), + targetStorageTypes == null ? "[]" : Arrays.asList(targetStorageTypes), targetStorageIds == null ? "[]" : Arrays.asList(targetStorageIds)); this.targets = targets; this.targetStorageTypes = targetStorageTypes; this.targetStorageIds = targetStorageIds; - this.b = b; + this.source = source; + this.target = source; + this.copyBlockCrossNamespace = false; this.stage = stage; - BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); + BPOfferService bpos = blockPoolManager.get(source.getBlockPoolId()); bpReg = bpos.bpRegistration; this.clientname = clientname; - this.cachingStrategy = - new CachingStrategy(true, getDnConf().readaheadLength); + this.cachingStrategy = new CachingStrategy(true, getDnConf().readaheadLength); if (isTransfer(stage, clientname)) { this.throttler = xserver.getTransferThrottler(); - } else if(isWrite(stage)) { + } else if (isWrite(stage)) { this.throttler = xserver.getWriteThrottler(); } } + DataTransfer(DatanodeInfo[] targets, StorageType[] targetStorageTypes, + String[] targetStorageIds, ExtendedBlock source, ExtendedBlock target, + BlockConstructionStage stage, final String clientname) { + this(targets, targetStorageTypes, targetStorageIds, source, stage, clientname); + this.target = target; + this.copyBlockCrossNamespace = true; + } + /** * Do the deed, write the bytes */ @@ -3054,8 +3073,9 @@ public void run() { DataInputStream in = null; BlockSender blockSender = null; final boolean isClient = clientname.length() > 0; - + try { + DataNodeFaultInjector.get().transferThrowException(); final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname); InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr); LOG.debug("Connecting to datanode {}", dnAddr); @@ -3067,69 +3087,70 @@ public void run() { // // Header info // - Token accessToken = getBlockAccessToken(b, - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - targetStorageTypes, targetStorageIds); + Token accessToken = + getBlockAccessToken(target, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), + targetStorageTypes, targetStorageIds); - long writeTimeout = dnConf.socketWriteTimeout + - HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); + long writeTimeout = + dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length + - 1); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); - DataEncryptionKeyFactory keyFactory = - getDataEncryptionKeyFactoryForBlock(b); - IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyFactory, accessToken, bpReg); + DataEncryptionKeyFactory keyFactory = getDataEncryptionKeyFactoryForBlock(source); + IOStreamPair saslStreams = + saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory, accessToken, bpReg); unbufOut = saslStreams.out; unbufIn = saslStreams.in; - - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtilClient.getSmallBufferSize(getConf()))); + + out = new DataOutputStream( + new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(getConf()))); in = new DataInputStream(unbufIn); - blockSender = new BlockSender(b, 0, b.getNumBytes(), - false, false, true, DataNode.this, null, cachingStrategy); - DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg) - .build(); - - String storageId = targetStorageIds.length > 0 ? - targetStorageIds[0] : null; - new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, - clientname, targets, targetStorageTypes, srcNode, - stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, - false, false, null, storageId, - targetStorageIds); + blockSender = + new BlockSender(source, 0, source.getNumBytes(), false, false, true, DataNode.this, + null, cachingStrategy); + DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg).build(); + + String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] : null; + new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken, clientname, targets, + targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), + cachingStrategy, false, false, null, storageId, targetStorageIds); // send data & checksum blockSender.sendBlock(out, unbufOut, throttler); // no response necessary - LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", - getClass().getSimpleName(), DataNode.this.getDisplayName(), - b, b.getNumBytes(), curTarget); + LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", getClass().getSimpleName(), + DataNode.this.getDisplayName(), source, source.getNumBytes(), curTarget); // read ack if (isClient) { - DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + DNTransferAckProto closeAck = + DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in)); LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck); if (closeAck.getStatus() != Status.SUCCESS) { if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( - "Got access token error for connect ack, targets=" - + Arrays.asList(targets)); + "Got access token error for connect ack, targets=" + Arrays.asList(targets)); } else { - throw new IOException("Bad connect ack, targets=" - + Arrays.asList(targets) + " status=" + closeAck.getStatus()); + throw new IOException( + "Bad connect ack, targets=" + Arrays.asList(targets) + " status=" + + closeAck.getStatus()); } } } else { metrics.incrBlocksReplicated(); } } catch (IOException ie) { - handleBadBlock(b, ie, false); - LOG.warn("{}:Failed to transfer {} to {} got", - bpReg, b, targets[0], ie); + if (copyBlockCrossNamespace) { + throw new RuntimeException(ie); + } + handleBadBlock(source, ie, false); + LOG.warn("{}:Failed to transfer {} to {} got", bpReg, source, targets[0], ie); } catch (Throwable t) { - LOG.error("Failed to transfer block {}", b, t); + LOG.error("Failed to transfer block {}", source, t); + if (copyBlockCrossNamespace) { + throw new RuntimeException(t); + } } finally { decrementXmitsInProgress(); IOUtils.closeStream(blockSender); @@ -3141,7 +3162,7 @@ public void run() { @Override public String toString() { - return "DataTransfer " + b + " to " + Arrays.asList(targets); + return "DataTransfer " + source + " to " + Arrays.asList(targets); } } @@ -4388,4 +4409,94 @@ boolean isSlownode() { public BlockPoolManager getBlockPoolManager() { return blockPoolManager; } + + public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock targetBlk, + DatanodeInfo targetDn) throws IOException { + if (!data.isValidBlock(sourceBlk)) { + // block does not exist or is under-construction + String errStr = + "copyBlock:(" + this.getInfoPort() + ") Can't send invalid block " + sourceBlk + " " + + data.getReplicaString(sourceBlk.getBlockPoolId(), sourceBlk.getBlockId()); + LOG.info(errStr); + throw new IOException(errStr); + } + long onDiskLength = data.getLength(sourceBlk); + if (sourceBlk.getNumBytes() > onDiskLength) { + // Shorter on-disk len indicates corruption so report NN the corrupt block + String msg = "copyBlock: Can't replicate block " + sourceBlk + " because on-disk length " + + onDiskLength + " is shorter than provided length " + sourceBlk.getNumBytes(); + LOG.info(msg); + throw new IOException(msg); + } + LOG.info(getDatanodeInfo() + " copyBlock: Starting thread to transfer: " + "block:" + + sourceBlk + " from " + this.getDatanodeUuid() + " to " + targetDn.getDatanodeUuid() + + "(" + targetDn + ")"); + Future result; + if (this.getDatanodeUuid().equals(targetDn.getDatanodeUuid())) { + result = copyBlockCrossNamespaceExecutor.submit(new LocalBlockCopy(sourceBlk, targetBlk)); + } else { + result = copyBlockCrossNamespaceExecutor.submit( + new DataCopy(targetDn, sourceBlk, targetBlk).getDataTransfer()); + } + try { + result.get(getDnConf().getCopyBlockCrossNamespaceSocketTimeout(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new IOException(e); + } + } + + private class DataCopy { + private final DataTransfer dataTransfer; + + DataCopy(DatanodeInfo targetDn, ExtendedBlock sourceBlk, ExtendedBlock targetBlk) { + FsVolumeImpl volume = (FsVolumeImpl) data.getVolume(sourceBlk); + StorageType storageType = volume.getStorageType(); + String storageId = volume.getStorageID(); + + DatanodeInfo[] targets = new DatanodeInfo[] {targetDn}; + StorageType[] targetStorageTypes = new StorageType[] {storageType}; + String[] targetStorageIds = new String[] {storageId}; + dataTransfer = + new DataTransfer(targets, targetStorageTypes, targetStorageIds, sourceBlk, targetBlk, + PIPELINE_SETUP_CREATE, ""); + } + + public DataTransfer getDataTransfer() { + return dataTransfer; + } + } + + class LocalBlockCopy implements Callable { + private ExtendedBlock sourceBlk = null; + private ExtendedBlock targetBlk = null; + + LocalBlockCopy(ExtendedBlock sourceBlk, ExtendedBlock targetBlk) { + this.sourceBlk = sourceBlk; + this.targetBlk = targetBlk; + } + + public Boolean call() throws IOException { + try { + targetBlk.setNumBytes(sourceBlk.getNumBytes()); + data.hardLinkOneBlock(sourceBlk, targetBlk); + FsVolumeSpi v = (FsVolumeSpi) (getFSDataset().getVolume(targetBlk)); + closeBlock(targetBlk, null, v.getStorageID(), v.isTransientStorage()); + + BlockLocalPathInfo srcBlpi = data.getBlockLocalPathInfo(sourceBlk); + BlockLocalPathInfo dstBlpi = data.getBlockLocalPathInfo(targetBlk); + LOG.info( + getClass().getSimpleName() + ": Hardlinked " + sourceBlk + "( " + srcBlpi.getBlockPath() + + " " + srcBlpi.getMetaPath() + " ) " + "to " + targetBlk + "( " + + dstBlpi.getBlockPath() + " " + dstBlpi.getMetaPath() + " ) "); + + metrics.incrBlocksReplicatedViaHardlink(); + } catch (IOException e) { + LOG.warn("Local block copy for src : " + sourceBlk.getBlockName() + ", dst : " + + targetBlk.getBlockName() + " failed", e); + throw e; + } + return true; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 9e046cc3600df..9f54e0b44f698 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -173,9 +173,14 @@ public void delayDiffRecord() {} */ public void delayGetMetaDataInputStream() {} - /** + /** * Used in {@link DirectoryScanner#reconcile()} to wait until a storage is removed, * leaving a stale copy of {@link DirectoryScanner#diffs}. */ public void waitUntilStorageRemoved() {} + + /** + * Userd for datanode transfer thow exception case. + */ + public void transferThrowException() throws IOException {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index d948c1caefd1f..b2aced4d69e88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -1088,6 +1088,35 @@ public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, datanode.metrics.addBlockChecksumOp(elapsed()); } + @Override + public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, ExtendedBlock targetBlk, + Token targetBlockToken, + DatanodeInfo targetDatanode) + throws IOException { + updateCurrentThreadName("Copying block " + sourceBlk + " to " + targetBlk); + + DataOutputStream reply = getBufferedOutputStream(); + checkAccess(reply, true, sourceBlk, sourceBlockToken, Op.COPY_BLOCK_CROSSNAMESPACE, + BlockTokenIdentifier.AccessMode.READ); + checkAccess(reply, true, targetBlk, targetBlockToken, Op.COPY_BLOCK_CROSSNAMESPACE, + BlockTokenIdentifier.AccessMode.WRITE); + + try { + datanode.copyBlockCrossNamespace(sourceBlk, targetBlk, targetDatanode); + sendResponse(SUCCESS, null); + } catch (IOException ioe) { + LOG.warn("copyBlockCrossNamespace from {} to {} to {} received exception,", sourceBlk, + targetBlk, targetDatanode, ioe); + incrDatanodeNetworkErrors(); + throw ioe; + } finally { + IOUtils.closeStream(reply); + } + + datanode.metrics.addCopyBlockCrossNamespaceOp(elapsed()); + } + @Override public void copyBlock(final ExtendedBlock block, final Token blockToken) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 06be54b37d96a..81a7567d7826a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -708,4 +708,16 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, * @param time the last time in milliseconds when the directory scanner successfully ran. */ default void setLastDirScannerFinishTime(long time) {} + + /** + * Copies over a block from a block file + * + * @param srcBlock + * the source block which needs to be copied + * @param dstBlock + * the destination block to which the srcBlock needs to be copied to + * @throws IOException + */ + void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 8c643e9e16ace..d679c0a7cb788 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -139,7 +140,7 @@ public int compare(File f1, File f2) { private volatile GetSpaceUsed dfsUsage; /** - * Create a blook pool slice + * Create a blook pool slice. * @param bpid Block pool Id * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to * @param bpDir directory corresponding to the BlockPool @@ -1152,4 +1153,14 @@ void setDeleteDuplicateReplicasForTests( this.deleteDuplicateReplicas = deleteDuplicateReplicasForTests; } + public File hardLinkOneBlock(File src, File srcMeta, Block dstBlock) throws IOException { + File dstMeta = new File(tmpDir, + DatanodeUtil.getMetaName(dstBlock.getBlockName(), dstBlock.getGenerationStamp())); + HardLink.createHardLink(srcMeta, dstMeta); + + File dstBlockFile = new File(tmpDir, dstBlock.getBlockName()); + HardLink.createHardLink(src, dstBlockFile); + + return dstBlockFile; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index eeec1bb728825..655faee4c472b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; +import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; @@ -3863,5 +3864,27 @@ public void setLastDirScannerFinishTime(long time) { public long getPendingAsyncDeletions() { return asyncDiskService.countPendingDeletions(); } + + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) throws IOException { + BlockLocalPathInfo blpi = getBlockLocalPathInfo(srcBlock); + FsVolumeImpl v = getVolume(srcBlock); + + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, dstBlock.getBlockPoolId(), + v.getStorageID())) { + File src = new File(blpi.getBlockPath()); + File srcMeta = new File(blpi.getMetaPath()); + BlockPoolSlice dstBPS = v.getBlockPoolSlice(dstBlock.getBlockPoolId()); + File dstBlockFile = dstBPS.hardLinkOneBlock(src, srcMeta, dstBlock.getLocalBlock()); + + ReplicaInfo replicaInfo = + new LocalReplicaInPipeline(dstBlock.getBlockId(), dstBlock.getGenerationStamp(), v, + dstBlockFile.getParentFile(), dstBlock.getLocalBlock().getNumBytes()); + dstBlockFile = dstBPS.addFinalizedBlock(dstBlock.getLocalBlock(), replicaInfo); + replicaInfo = new FinalizedReplica(dstBlock.getLocalBlock(), getVolume(srcBlock), + dstBlockFile.getParentFile()); + volumeMap.add(dstBlock.getBlockPoolId(), replicaInfo); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 832a8029f7771..b391fb5b1284c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -66,6 +66,7 @@ public class DataNodeMetrics { @Metric MutableCounterLong blocksWritten; @Metric MutableCounterLong blocksRead; @Metric MutableCounterLong blocksReplicated; + @Metric private MutableCounterLong blocksReplicatedViaHardlink; @Metric MutableCounterLong blocksRemoved; @Metric MutableCounterLong blocksVerified; @Metric MutableCounterLong blockVerificationFailures; @@ -127,6 +128,7 @@ public class DataNodeMetrics { @Metric MutableRate writeBlockOp; @Metric MutableRate blockChecksumOp; @Metric MutableRate copyBlockOp; + @Metric private MutableRate copyBlockCrossNamespaceOp; @Metric MutableRate replaceBlockOp; @Metric MutableRate heartbeats; @Metric MutableRate heartbeatsTotal; @@ -351,6 +353,18 @@ public void incrBlocksReplicated() { blocksReplicated.incr(); } + public long getBlocksReplicated() { + return blocksReplicated.value(); + } + + public void incrBlocksReplicatedViaHardlink() { + blocksReplicatedViaHardlink.incr(); + } + + public long getBlocksReplicatedViaHardlink() { + return blocksReplicatedViaHardlink.value(); + } + public void incrBlocksWritten() { blocksWritten.incr(); } @@ -400,6 +414,10 @@ public void addCopyBlockOp(long latency) { copyBlockOp.add(latency); } + public void addCopyBlockCrossNamespaceOp(long latency) { + copyBlockCrossNamespaceOp.add(latency); + } + public void addBlockChecksumOp(long latency) { blockChecksumOp.add(latency); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 94c3ea0cc9b0c..76853f5e414e2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4994,6 +4994,14 @@ + + dfs.datanode.copy.block.cross.namespace.socket-timeout.ms + 300000 + + Default timeout value in milliseconds for datanode copying block cross namespace. + + + dfs.ha.fencing.methods diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 98556c4fd15ff..403634942de60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -31,9 +33,22 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.EnumSet; import java.util.Random; - +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.slf4j.Logger; @@ -650,4 +665,136 @@ public void testReleaseVolumeRefIfExceptionThrown() cluster.shutdown(); } } + + @Test(timeout = 90000) + public void testCopyBlockCrossNamespace() + throws IOException, InterruptedException, TimeoutException { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 1024); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)).build(); + try { + cluster.waitActive(); + ArrayList dataNodes = cluster.getDataNodes(); + + // Create one file with one block with one replica in Namespace0. + Path ns0Path = new Path("/testCopyBlockCrossNamespace_0.txt"); + DistributedFileSystem ns0FS = cluster.getFileSystem(0); + DFSTestUtil.createFile(ns0FS, ns0Path, 1024, (short) 2, 0); + DFSTestUtil.waitReplication(ns0FS, ns0Path, (short) 2); + HdfsFileStatus ns0FileStatus = (HdfsFileStatus) ns0FS.getFileStatus(ns0Path); + LocatedBlocks locatedBlocks = + ns0FS.getClient().getLocatedBlocks(ns0Path.toUri().getPath(), 0, Long.MAX_VALUE); + assertEquals(1, locatedBlocks.getLocatedBlocks().size()); + assertTrue(locatedBlocks.isLastBlockComplete()); + LocatedBlock locatedBlockNS0 = locatedBlocks.get(0); + + DatanodeInfoWithStorage[] datanodeInfoWithStoragesNS0 = locatedBlockNS0.getLocations(); + assertEquals(2, datanodeInfoWithStoragesNS0.length); + + String ns0BlockLocation1 = datanodeInfoWithStoragesNS0[0].getHostName() + ":" + + datanodeInfoWithStoragesNS0[0].getXferPort(); + String ns0BlockLocation2 = datanodeInfoWithStoragesNS0[1].getHostName() + ":" + + datanodeInfoWithStoragesNS0[1].getXferPort(); + + String[] favoredNodes = new String[2]; + + favoredNodes[0] = ns0BlockLocation1; + for (DataNode dn : dataNodes) { + String dnInfo = dn.getDatanodeHostname() + ":" + dn.getXferPort(); + if (!dnInfo.equals(ns0BlockLocation1) && !dnInfo.equals(ns0BlockLocation2)) { + favoredNodes[1] = dnInfo; + } + } + + // Create one similar file with two replicas in Namespace1. + Path ns1Path = new Path("/testCopyBlockCrossNamespace_1.txt"); + DistributedFileSystem ns1FS = cluster.getFileSystem(1); + FSDataOutputStream stream = + ns1FS.create(ns1Path, ns0FileStatus.getPermission(), EnumSet.of(CreateFlag.CREATE), + ns1FS.getClient().getConf().getIoBufferSize(), ns0FileStatus.getReplication(), + ns0FileStatus.getBlockSize(), null, null, null, null, null); + DFSOutputStream outputStream = (DFSOutputStream) stream.getWrappedStream(); + + LocatedBlock locatedBlockNS1 = + DFSOutputStream.addBlock(null, outputStream.getDfsClient(), ns1Path.getName(), null, + outputStream.getFileId(), favoredNodes, null); + assertEquals(2, locatedBlockNS1.getLocations().length); + + // Align the datanode. + DatanodeInfoWithStorage[] datanodeInfoWithStoragesNS1 = locatedBlockNS1.getLocations(); + DatanodeInfoWithStorage sameDN = datanodeInfoWithStoragesNS0[0].getXferPort() + == datanodeInfoWithStoragesNS1[0].getXferPort() ? + datanodeInfoWithStoragesNS1[0] : + datanodeInfoWithStoragesNS1[1]; + DatanodeInfoWithStorage differentDN = datanodeInfoWithStoragesNS0[0].getXferPort() + == datanodeInfoWithStoragesNS1[0].getXferPort() ? + datanodeInfoWithStoragesNS1[1] : + datanodeInfoWithStoragesNS1[0]; + + // HardLink locatedBlockNS0 to locatedBlockNS1 on same datanode. + outputStream.copyBlockCrossNamespace(locatedBlockNS0.getBlock(), + locatedBlockNS0.getBlockToken(), datanodeInfoWithStoragesNS0[0], + locatedBlockNS1.getBlock(), locatedBlockNS1.getBlockToken(), sameDN); + + // Test when transfer throw exception client can know it. + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + public void transferThrowException() throws IOException { + throw new IOException("Transfer failed for fastcopy."); + } + }); + boolean transferError = false; + try { + outputStream.copyBlockCrossNamespace(locatedBlockNS0.getBlock(), + locatedBlockNS0.getBlockToken(), datanodeInfoWithStoragesNS0[1], + locatedBlockNS1.getBlock(), locatedBlockNS1.getBlockToken(), differentDN); + } catch (IOException e) { + transferError = true; + } + assertTrue(transferError); + + DataNodeFaultInjector.set(new DataNodeFaultInjector()); + // Transfer locatedBlockNS0 to locatedBlockNS1 on different datanode. + outputStream.copyBlockCrossNamespace(locatedBlockNS0.getBlock(), + locatedBlockNS0.getBlockToken(), datanodeInfoWithStoragesNS0[1], + locatedBlockNS1.getBlock(), locatedBlockNS1.getBlockToken(), differentDN); + + // Check Lease Holder. + RemoteIterator iterator = ns1FS.listOpenFiles(); + OpenFileEntry fileEntry = iterator.next(); + assertEquals(ns1Path.toUri().toString(), fileEntry.getFilePath()); + assertEquals(outputStream.getDfsClient().getClientName(), fileEntry.getClientName()); + + outputStream.setUserAssignmentLastBlock(locatedBlockNS1.getBlock()); + stream.close(); + + // Check Lease release. + iterator = ns1FS.listOpenFiles(); + assertFalse(iterator.hasNext()); + + long heartbeatInterval = + conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT, + TimeUnit.SECONDS, TimeUnit.MILLISECONDS); + Thread.sleep(heartbeatInterval * 2); + + // Do verification that the file in namespace1 should contain one block with two replicas. + LocatedBlocks locatedBlocksNS1 = ns1FS.getClient().getNamenode() + .getBlockLocations(ns1Path.toUri().getPath(), 0, Long.MAX_VALUE); + assertEquals(1, locatedBlocksNS1.getLocatedBlocks().size()); + assertEquals(2, locatedBlocksNS1.getLocatedBlocks().get(0).getLocations().length); + assertTrue(locatedBlocksNS1.isLastBlockComplete()); + + for (DataNode dataNode : dataNodes) { + if (dataNode.getXferPort() == datanodeInfoWithStoragesNS0[0].getXferPort()) { + assertEquals(1L, dataNode.getMetrics().getBlocksReplicatedViaHardlink()); + } else if (dataNode.getXferPort() == datanodeInfoWithStoragesNS0[1].getXferPort()) { + assertEquals(1L, dataNode.getMetrics().getBlocksReplicated()); + } + } + + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 1ddc4e9602a7d..b3924efcb77f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1653,5 +1653,10 @@ public List getVolumeList() { public void setLastDirScannerFinishTime(long time) { throw new UnsupportedOperationException(); } + + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) { + throw new UnsupportedOperationException(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 24069fccdfa35..a0011c6ac241a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -493,4 +493,11 @@ public long getLastDirScannerFinishTime() { public long getPendingAsyncDeletions() { return 0; } + + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) + throws IOException { + throw new UnsupportedOperationException(); + } + } From 3af0f216b86896c4b101b79ab3fd65d297875b4f Mon Sep 17 00:00:00 2001 From: LiuGuH <444506464@qq.com> Date: Wed, 12 Jun 2024 18:03:42 +0800 Subject: [PATCH 2/4] HDFS-17581. Add FastCopy tool and support dfs -fastcp command --- .../java/org/apache/hadoop/fs/FileSystem.java | 4 + .../fs/shell/CommandWithDestination.java | 26 + .../apache/hadoop/fs/shell/CopyCommands.java | 78 ++- .../hadoop/fs/TestFilterFileSystem.java | 1 + .../apache/hadoop/fs/TestHarFileSystem.java | 2 + .../hadoop/hdfs/DistributedFileSystem.java | 6 + .../java/org/apache/hadoop/hdfs/FastCopy.java | 558 ++++++++++++++++++ .../hdfs/client/HdfsClientConfigKeys.java | 7 + .../src/main/resources/hdfs-default.xml | 24 + .../apache/hadoop/TestFsShellFastCopy.java | 183 ++++++ .../TestDefaultBlockPlacementPolicy.java | 11 + 11 files changed, 899 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestFsShellFastCopy.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 38ec611451750..89ed9b97f2677 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2544,6 +2544,10 @@ public void moveFromLocalFile(Path src, Path dst) copyFromLocalFile(true, src, dst); } + public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException { + throw new UnsupportedOperationException("FileSystem does not support fastcopy method."); + } + /** * The src file is on the local disk. Add it to the filesystem at * the given dst name. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java index 69a418c1925eb..102dc2f9b584d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java @@ -365,6 +365,14 @@ protected void copyFileToTarget(PathData src, PathData target) IOUtils.closeStream(in); } } + + protected void copyFileToTargetWithFastCp(PathData src,PathData target) throws IOException { + final boolean preserveRawXattrs = + checkPathsForReservedRaw(src.path, target.path); + src.fs.setVerifyChecksum(verifyChecksum); + fastCopyToTarget(src, target); + preserveAttributes(src, target, preserveRawXattrs); + } /** * Check the source and target paths to ensure that they are either both in @@ -433,6 +441,24 @@ protected void copyStreamToTarget(InputStream in, PathData target) } } + protected void fastCopyToTarget(PathData src, PathData target) + throws IOException { + if (target.exists && (target.stat.isDirectory() || !overwrite)) { + throw new PathExistsException(target.toString()); + } + TargetFileSystem targetFs = new TargetFileSystem(target.fs); + try { + PathData tempTarget = direct ? target : target.suffix("._COPYING_"); + targetFs.setWriteChecksum(writeChecksum); + src.fs.fastCopy(src.path, tempTarget.path, overwrite); + if (!direct) { + targetFs.rename(tempTarget, target); + } + } finally { + targetFs.close(); // last ditch effort to ensure temp file is removed + } + } + /** * Preserve the attributes of the source to the target. * The method calls {@link #shouldPreserve(FileAttribute)} to check what diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 70c7fe381244d..ed5c91b44c1af 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathIsDirectoryException; +import org.apache.hadoop.fs.PathOperationException; import org.apache.hadoop.io.IOUtils; /** Various commands for copy files */ @@ -44,6 +45,7 @@ class CopyCommands { public static void registerCommands(CommandFactory factory) { factory.addClass(Merge.class, "-getmerge"); factory.addClass(Cp.class, "-cp"); + factory.addClass(FastCp.class, "-fastcp"); factory.addClass(CopyFromLocal.class, "-copyFromLocal"); factory.addClass(CopyToLocal.class, "-copyToLocal"); factory.addClass(Get.class, "-get"); @@ -209,7 +211,81 @@ private void popPreserveOption(List args) { } } } - + + static class FastCp extends CopyCommandWithMultiThread { + public static final String NAME = "fastcp"; + public static final String USAGE = + "[-f] [-p | -p[topax]] [-d] [-t ]" + + " [-q ] ... "; + public static final String DESCRIPTION = + "FastCopy files that match the file pattern to a destination." + + " When copying multiple files, the destination must be a " + + "directory.\nFlags :\n" + + " -p[topax] : Preserve file attributes [topx] (timestamps, " + + "ownership, permission, ACL, XAttr). If -p is specified with " + + "no arg, then preserves timestamps, ownership, permission. " + + "If -pa is specified, then preserves permission also because " + + "ACL is a super-set of permission. Determination of whether raw " + + "namespace extended attributes are preserved is independent of " + + "the -p flag.\n" + + " -f : Overwrite the destination if it already exists.\n" + + " -d : Skip creation of temporary file(._COPYING_).\n" + + " -t : Number of threads to be used, " + + "default is 1.\n" + + " -q : Thread pool queue size to be " + + "used, default is 1024.\n"; + + @Override + protected void processOptions(LinkedList args) throws IOException { + popPreserveOption(args); + CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "f", "d"); + cf.addOptionWithValue("t"); + cf.addOptionWithValue("q"); + cf.parse(args); + setDirectWrite(cf.getOpt("d")); + setOverwrite(cf.getOpt("f")); + setThreadCount(cf.getOptValue("t")); + setThreadPoolQueueSize(cf.getOptValue("q")); + // should have a -r option + setRecursive(true); + getRemoteDestination(args); + } + + private void popPreserveOption(List args) { + for (Iterator iter = args.iterator(); iter.hasNext(); ) { + String cur = iter.next(); + if (cur.equals("--")) { + // stop parsing arguments when you see -- + break; + } else if (cur.startsWith("-p")) { + iter.remove(); + if (cur.length() == 2) { + setPreserve(true); + } else { + String attributes = cur.substring(2); + for (int index = 0; index < attributes.length(); index++) { + preserve(FileAttribute.getAttribute(attributes.charAt(index))); + } + } + return; + } + } + } + + @Override + protected void processPath(PathData src, PathData dst) throws IOException { + if (src.stat.isSymlink()) { + // TODO: remove when FileContext is supported, this needs to either + // copy the symlink or deref the symlink + throw new PathOperationException(src.toString()); + } else if (src.stat.isFile()) { + copyFileToTargetWithFastCp(src, dst); + } else if (src.stat.isDirectory() && !isRecursive()) { + throw new PathIsDirectoryException(src.toString()); + } + } + } + /** * Copy local files to a remote filesystem */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index 1b42290cedc5e..5a05cc96d30eb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -137,6 +137,7 @@ public Token[] addDelegationTokens(String renewer, Credentials creds) void setQuota(Path f, long namespaceQuota, long storagespaceQuota); void setQuotaByStorageType(Path f, StorageType type, long quota); StorageStatistics getStorageStatistics(); + public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException; /* Not passed through as the inner implementation will miss features diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index 26d0361d6a255..c817094225f3c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -234,6 +234,8 @@ public Collection getAllStoragePolicies() public Collection getTrashRoots(boolean allUsers) throws IOException; StorageStatistics getStorageStatistics(); + public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException; + FutureDataInputStreamBuilder openFile(Path path) throws IOException, UnsupportedOperationException; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 6182d852c35e9..f785b7f06f43b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -693,6 +693,12 @@ public HdfsDataOutputStream next(final FileSystem fs, final Path p) }.resolve(this, absF); } + @Override + public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException { + FastCopy fastCopy = new FastCopy(getConf(), src, dst, overwrite); + fastCopy.copyFile(); + } + /** * Same as create(), except fails if parent directory doesn't already exist. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java new file mode 100644 index 0000000000000..42ed56e712985 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java @@ -0,0 +1,558 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_FAST_COPY_BLOCK_EXECUTOR_POOLSIZE; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_FAST_COPY_BLOCK_EXECUTOR_POOLSIZE_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_FAST_COPY_BLOCK_WAIT_TIME_MS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_FAST_COPY_BLOCK_WAIT_TIME_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_FAST_COPY_FILE_WAIT_TIME_MS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_FAST_COPY_FILE_WAIT_TIME_MS_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; + +/** + * There is a need to perform fast file copy on HDFS (primarily for the purpose + * of HBase Snapshot). The fast copy mechanism for a file works as follows : + *

+ * 1) Query metadata for all blocks of the source file. + *

+ * 2) For each block 'b' of the file, find out its datanode locations. + *

+ * 3) For each block of the file, add an empty block to the nameSystem for the + * destination file. + *

+ * 4) For each location of the block, instruct the datanode to make a local copy + * of that block. + *

+ * 5) Once each datanode has copied over its respective blocks, they report + * to the nameNode about it. + *

+ * 6) Wait for all blocks to be copied and exit. + *

+ * This would speed up the copying process considerably by removing top of the + * rack data transfers. + **/ + +public class FastCopy { + public static final Logger LOG = LoggerFactory.getLogger(FastCopy.class); + protected Configuration conf; + // Map used to store the status of each block. + private final Map blockStatusMap = new ConcurrentHashMap<>(); + private final FastCopyFileStatus fileStatus; + private final short minReplication; + // The time for which to wait for a block to be reported to the nameNode. + private final long BLK_WAIT_TIME; + // Maximum time to wait for a file copy to complete. + public final long FILE_WAIT_TIME; + private final EnumSet flag; + + private final String src; + private final String dst; + private final ExecutorService copyBlockExecutor; + private volatile IOException copyBlockException = null; + public int copyBlockExecutorPoolSize; + + private final DFSClient srcDFSClient; + private final DistributedFileSystem dstFs; + + public FastCopy(Configuration conf, Path sourcePath, Path dstPath, boolean overwrite) throws IOException { + this.conf = conf; + FILE_WAIT_TIME = + conf.getInt(DFS_FAST_COPY_FILE_WAIT_TIME_MS, DFS_FAST_COPY_FILE_WAIT_TIME_MS_DEFAULT); + BLK_WAIT_TIME = + conf.getInt(DFS_FAST_COPY_BLOCK_WAIT_TIME_MS, DFS_FAST_COPY_BLOCK_WAIT_TIME_MS_DEFAULT); + minReplication = (short) conf.getInt(DFS_NAMENODE_REPLICATION_MIN_KEY, 1); + + if (overwrite) { + flag = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); + } else { + flag = EnumSet.of(CreateFlag.CREATE); + } + + DistributedFileSystem srcFs = (DistributedFileSystem) sourcePath.getFileSystem(conf); + this.dstFs = (DistributedFileSystem) dstPath.getFileSystem(conf); + this.srcDFSClient = srcFs.getClient(); + this.src = + sourcePath.makeQualified(srcFs.getUri(), srcFs.getWorkingDirectory()).toUri().getPath(); + this.dst = + dstPath.makeQualified(dstFs.getUri(), dstFs.getWorkingDirectory()).toUri().getPath(); + this.fileStatus = new FastCopyFileStatus(this.dst); + // This controls the number of concurrent blocks that would be copied per + // file. So if we are concurrently copying 5 files at a time by setting + // THREAD_POOL_SIZE to 5 and allowing 5 concurrent file copies and this + // value is set at 5, we would have 25 blocks in all being copied by the + // tool in parallel. + this.copyBlockExecutorPoolSize = conf.getInt(DFS_FAST_COPY_BLOCK_EXECUTOR_POOLSIZE, + DFS_FAST_COPY_BLOCK_EXECUTOR_POOLSIZE_DEFAULT); + this.copyBlockExecutor = HadoopExecutors.newFixedThreadPool(this.copyBlockExecutorPoolSize); + } + + private class CopyBlockCrossNamespace implements Runnable { + private final ExtendedBlock source; + private final ExtendedBlock target; + private final DatanodeInfo sourceDn; + private final DatanodeInfo targetDn; + private final Token sourceBlockToken; + private final Token targetToken; + private final boolean isECFile; + private final DFSOutputStream outputStream; + + public CopyBlockCrossNamespace(ExtendedBlock source, ExtendedBlock target, + DatanodeInfo sourceDn, DatanodeInfo targetDn, Token sourceBlockToken, + Token targetToken, boolean isECFile, DFSOutputStream outputStream) { + this.source = source; + this.target = target; + this.sourceDn = sourceDn; + this.targetDn = targetDn; + this.sourceBlockToken = sourceBlockToken; + this.targetToken = targetToken; + this.isECFile = isECFile; + this.outputStream = outputStream; + } + + @Override + public void run() { + copyBlockReplica(); + } + + private void copyBlockReplica() { + boolean isError = false; + try { + outputStream.copyBlockCrossNamespace(source, sourceBlockToken, sourceDn, target, + targetToken, targetDn); + } catch (IOException e) { + String errMsg = "Fast Copy : Failed for Copying block " + source.getBlockName() + " from " + + sourceDn.getInfoAddr() + " to " + target.getBlockName() + " on " + + targetDn.getInfoAddr(); + LOG.warn(errMsg, e); + isError = true; + } + updateBlockStatus(target, isError, isECFile); + } + + /** + * Updates the status of a block. If the block is in the + * {@link FastCopy#blockStatusMap} we are still waiting for the block to + * reach the desired replication level. + * + * @param b the block whose status needs to be updated. + * @param isError whether the block had an error. + */ + private void updateBlockStatus(ExtendedBlock b, boolean isError, boolean isECFile) { + if (isECFile) { + b = new ExtendedBlock(b.getBlockPoolId(), + b.getBlockId() - StripedBlockUtil.getBlockIndex(b.getLocalBlock())); + } + + synchronized (blockStatusMap) { + BlockStatus bStatus = blockStatusMap.get(b); + if (bStatus == null) { + return; + } + if (isError) { + bStatus.addBadReplica(); + if (bStatus.isBadBlock()) { + blockStatusMap.remove(b); + copyBlockException = + new IOException("All replicas are bad for block : " + b.getBlockName()); + } + } else { + bStatus.addGoodReplica(); + // We are removing the block from the blockStatusMap, this indicates + // that the block has reached the desired replication, so now we + // update the fileStatusMap. Note that this will happen only once + // for each block. + if (bStatus.isGoodBlock()) { + blockStatusMap.remove(b); + updateFileStatus(); + } + } + } + } + + /** + * Updates the file status by incrementing the total number of blocks done + * for this file by 1. + */ + private void updateFileStatus() { + synchronized (fileStatus) { + fileStatus.addBlock(); + } + } + } + + /** + * Stores the status of a single block, the number of replicas that are bad + * and the total number of expected replicas. + */ + public class BlockStatus { + private final short totalReplicas; + private short badReplicas; + private short goodReplicas; + private final boolean isECFile; + + public BlockStatus(short totalReplicas, boolean isECFile) { + this.totalReplicas = totalReplicas; + this.badReplicas = 0; + this.goodReplicas = 0; + this.isECFile = isECFile; + } + + public void addBadReplica() { + this.badReplicas++; + } + + public boolean isBadBlock() { + return (badReplicas >= totalReplicas); + } + + public void addGoodReplica() { + this.goodReplicas++; + } + + public boolean isGoodBlock() { + if (isECFile) { + return (this.goodReplicas >= totalReplicas); + } + return (this.goodReplicas >= minReplication); + } + } + + /** + * This is used for status reporting by the Fast Copy tool + */ + public static class FastCopyFileStatus { + // The file that data is being copied to. + private final String file; + // The total number of blocks done till now. + private int blocksDone; + + public FastCopyFileStatus(String file) { + this(file, 0); + } + + public FastCopyFileStatus(String file, int blocksDone) { + this.file = file; + this.blocksDone = blocksDone; + } + + public String getFileName() { + return this.file; + } + + public int getBlocksDone() { + return this.blocksDone; + } + + public void addBlock() { + this.blocksDone++; + } + + public String toString() { + return "Copying " + file + " has finished " + blocksDone + " blocks."; + } + } + + /** + * Aligns the source and destination locations such that common locations + * appear at the same index. + * + * @param dstLocations the destination dataNodes + * @param srcLocations the source dataNodes + */ + public void alignDataNodes(DatanodeInfo[] dstLocations, DatanodeInfo[] srcLocations) { + for (int i = 0; i < dstLocations.length; i++) { + for (int j = 0; j < srcLocations.length; j++) { + if (i == j) + continue; + if (dstLocations[i].equals(srcLocations[j])) { + if (i < j) { + swap(i, j, srcLocations); + } else { + swap(i, j, dstLocations); + } + break; + } + } + } + } + + private void swap(int i, int j, DatanodeInfo[] arr) { + DatanodeInfo tmp = arr[i]; + arr[i] = arr[j]; + arr[j] = tmp; + } + + /** + * Copies all the replicas for a single replicated block + * + * @param src the source block + * @param dst the destination block + */ + private void startCopyBlockThread(LocatedBlock src, LocatedBlock dst, DFSOutputStream out) { + // Sorting source and destination locations so that we don't rely at all + // on the ordering of the locations that we receive from the NameNode. + DatanodeInfo[] dstLocations = dst.getLocations(); + DatanodeInfo[] srcLocations = src.getLocations(); + alignDataNodes(dstLocations, srcLocations); + + // We use minimum here, since its better for the NameNode to handle the + // extra locations in either list. The locations that match up are the + // ones we have chosen in our tool, so we handle copies for only those. + short blocksToCopy = (short) Math.min(srcLocations.length, dstLocations.length); + + ExtendedBlock srcBlock = src.getBlock(); + ExtendedBlock dstBlock = dst.getBlock(); + initializeBlockStatus(dstBlock, blocksToCopy, false); + for (int i = 0; i < blocksToCopy; i++) { + copyBlockExecutor.submit( + new CopyBlockCrossNamespace(srcBlock, dstBlock, srcLocations[i], dstLocations[i], + src.getBlockToken(), dst.getBlockToken(), false, out)); + } + } + + private void startCopyBlockThread(LocatedBlock srcLocatedBlock, + LocatedBlock destinationLocatedBlock, ErasureCodingPolicy erasureCodingPolicy, + DFSOutputStream out) { + LocatedBlock[] srcBlocks = + StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) srcLocatedBlock, + erasureCodingPolicy.getCellSize(), erasureCodingPolicy.getNumDataUnits(), + erasureCodingPolicy.getNumParityUnits()); + + LocatedBlock[] dstBlocks = + StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) destinationLocatedBlock, + erasureCodingPolicy.getCellSize(), erasureCodingPolicy.getNumDataUnits(), + erasureCodingPolicy.getNumParityUnits()); + + short notEmptyBlocks = 0; + for (LocatedBlock s : srcBlocks) { + if (s != null) { + notEmptyBlocks++; + } + } + + initializeBlockStatus(destinationLocatedBlock.getBlock(), notEmptyBlocks, true); + + for (int i = 0; i < srcBlocks.length; i++) { + if (srcBlocks[i] != null) { + copyBlockExecutor.submit( + new CopyBlockCrossNamespace(srcBlocks[i].getBlock(), dstBlocks[i].getBlock(), + srcBlocks[i].getLocations()[0], dstBlocks[i].getLocations()[0], + srcBlocks[i].getBlockToken(), dstBlocks[i].getBlockToken(), true, out)); + } + } + } + + /** + * Waits for the blocks of the file to be completed to a particular + * threshold. + * + * @param blocksAdded the number of blocks already added to the nameNode. + * @throws IOException throw IOException if timeout or copyBlockException has exception. + */ + private void waitForBlockCopy(int blocksAdded) throws IOException { + long startTime = Time.monotonicNow(); + + while (true) { + // If the dataNodes are not lagging or this is the first block that will + // be added to the nameNode, no need to wait longer. + int blocksDone = fileStatus.getBlocksDone(); + if (blocksAdded == blocksDone || blocksAdded == 0) { + break; + } + if (copyBlockException != null) { + throw copyBlockException; + } + if (Time.monotonicNow() - startTime > BLK_WAIT_TIME) { + throw new IOException("Timeout waiting for block to be copied."); + } + sleepFor(100); + } + } + + /** + * Initializes the block status map with information about a block. + * + * @param b the block to be added to the {@link FastCopy#blockStatusMap} + * @param totalReplicas the number of replicas for b + */ + private void initializeBlockStatus(ExtendedBlock b, short totalReplicas, boolean isECFile) { + BlockStatus bStatus = new BlockStatus(totalReplicas, isECFile); + blockStatusMap.put(b, bStatus); + } + + /** + * Shuts down the block rpc executor. + */ + private void terminateExecutor() throws IOException { + copyBlockExecutor.shutdown(); + try { + copyBlockExecutor.awaitTermination(FILE_WAIT_TIME, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } + + /** + * Sleeps the current thread for ms milliseconds. + * + * @param ms the number of milliseconds to sleep the current thread + * @throws IOException if it encountered an InterruptedException + */ + private void sleepFor(long ms) throws IOException { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + throw new IOException(e.getMessage()); + } + } + + /** + * Copy the file. + */ + public void copyFile() throws IOException { + // Get source file information and create empty destination file. + HdfsFileStatus srcFileStatus = srcDFSClient.getFileInfo(src); + if (srcFileStatus == null) { + throw new FileNotFoundException("File : " + src + " does not exist"); + } + + LOG.info("Start to copy {} to {}.", src, dst); + try { + LocatedBlocks blocks = srcDFSClient.getLocatedBlocks(src, 0, srcFileStatus.getLen()); + List blocksList = blocks.getLocatedBlocks(); + LOG.debug("FastCopy : Block locations retrieved for {} : {}.", src, blocksList); + + ErasureCodingPolicy erasureCodingPolicy = srcFileStatus.getErasureCodingPolicy(); + HdfsDataOutputStream stream = + dstFs.create(new Path(dst), srcFileStatus.getPermission(), flag, + dstFs.getClient().getConf().getIoBufferSize(), srcFileStatus.getReplication(), + srcFileStatus.getBlockSize(), null, null, null, + erasureCodingPolicy == null ? null : erasureCodingPolicy.getName(), null); + DFSOutputStream out = (DFSOutputStream) stream.getWrappedStream(); + + // Instruct each datanode to create a copy of the respective block. + int blocksAdded = 0; + ExtendedBlock previous = null; + LocatedBlock dstLocatedBlock; + // Loop through each block and create copies. + for (LocatedBlock srcLocatedBlock : blocksList) { + UserGroupInformation.getCurrentUser().addToken(srcLocatedBlock.getBlockToken()); + dstLocatedBlock = erasureCodingPolicy == null ? + copyBlocks(previous, srcLocatedBlock, out) : + copyBlocks(erasureCodingPolicy, previous, srcLocatedBlock, out); + + blocksAdded++; + // Wait for the block copies to reach a threshold. + waitForBlockCopy(blocksAdded); + + previous = dstLocatedBlock.getBlock(); + previous.setNumBytes(srcLocatedBlock.getBlockSize()); + } + out.setUserAssignmentLastBlock(previous); + stream.close(); + } catch (IOException e) { + LOG.error("Failed to copy src:{} to dst: {} .", src, dst); + throw new IOException(e); + } finally { + terminateExecutor(); + } + } + + private LocatedBlock copyBlocks(ExtendedBlock previous, LocatedBlock srcLocatedBlock, + DFSOutputStream out) throws IOException { + String[] favoredNodes = new String[srcLocatedBlock.getLocations().length]; + for (int i = 0; i < srcLocatedBlock.getLocations().length; i++) { + favoredNodes[i] = srcLocatedBlock.getLocations()[i].getHostName() + ":" + + srcLocatedBlock.getLocations()[i].getXferPort(); + } + LOG.debug("FavoredNodes for {}: {}.", srcLocatedBlock, Arrays.toString(favoredNodes)); + + LocatedBlock dstLocatedBlock = + DFSOutputStream.addBlock(null, out.getDfsClient(), dst, previous, out.getFileId(), + favoredNodes, out.getAddBlockFlags()); + if (dstLocatedBlock == null) { + throw new IOException(dst + " get null located block from dst nameNode."); + } + //blocksAdded++; + LOG.debug("Fast Copy : Block {} added to {}.", dstLocatedBlock.getBlock(), dst); + startCopyBlockThread(srcLocatedBlock, dstLocatedBlock, out); + return dstLocatedBlock; + } + + private LocatedBlock copyBlocks(ErasureCodingPolicy erasureCodingPolicy, ExtendedBlock previous, + LocatedBlock srcLocatedBlock, DFSOutputStream out) throws IOException { + LocatedBlock[] srcBlocks = + StripedBlockUtil.parseStripedBlockGroup((LocatedStripedBlock) srcLocatedBlock, + erasureCodingPolicy.getCellSize(), erasureCodingPolicy.getNumDataUnits(), + erasureCodingPolicy.getNumParityUnits()); + String[] favoredNodes = new String[srcBlocks.length]; + + for (int i = 0; i < srcBlocks.length; i++) { + if (srcBlocks[i] != null) { + favoredNodes[i] = srcBlocks[i].getLocations()[0].getHostName() + ":" + + srcBlocks[i].getLocations()[0].getXferPort(); + } else { + favoredNodes[i] = ""; + } + } + LOG.debug("FavoredNodes for {}: {}.", srcLocatedBlock, Arrays.toString(favoredNodes)); + LocatedBlock dstLocatedBlock = + DFSOutputStream.addBlock(null, out.getDfsClient(), dst, previous, out.getFileId(), + favoredNodes, out.getAddBlockFlags()); + if (dstLocatedBlock == null) { + throw new IOException(dst + " get null located block from dst nameNode."); + } + //blocksAdded++; + LOG.debug("Fast Copy : Block {} added to {}.", dstLocatedBlock.getBlock(), dst); + startCopyBlockThread(srcLocatedBlock, dstLocatedBlock, erasureCodingPolicy, out); + return dstLocatedBlock; + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 2044530506757..8e6bd5c6deb03 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -296,6 +296,13 @@ public interface HdfsClientConfigKeys { int DFS_CLIENT_CONGESTION_BACKOFF_MAX_TIME_DEFAULT = DFS_CLIENT_CONGESTION_BACKOFF_MEAN_TIME_DEFAULT * 10; + String DFS_FAST_COPY_FILE_WAIT_TIME_MS = "dfs.fast.copy.file.wait.time.ms"; + int DFS_FAST_COPY_FILE_WAIT_TIME_MS_DEFAULT = 10 * 60 * 1000; + String DFS_FAST_COPY_BLOCK_WAIT_TIME_MS = "dfs.fast.copy.block.wait.time.ms"; + int DFS_FAST_COPY_BLOCK_WAIT_TIME_MS_DEFAULT = 5 * 60 * 1000; + String DFS_FAST_COPY_BLOCK_EXECUTOR_POOLSIZE = "dfs.fast.copy.block.executor.poolsize"; + int DFS_FAST_COPY_BLOCK_EXECUTOR_POOLSIZE_DEFAULT = 2; + /** * These are deprecated config keys to client code. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 76853f5e414e2..3db2bad21aafc 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6437,6 +6437,30 @@ + + dfs.fast.copy.file.wait.time.ms + 600000 + + Default timeout value in milliseconds for fastcopy when copy a file. + + + + + dfs.fast.copy.block.wait.time.ms + 300000 + + Default timeout value in milliseconds for fastcopy when copy a block in a file. + + + + + dfs.fast.copy.block.executor.poolsize + 2 + + The thread num of fastcopy block executor. + + + dfs.permissions.allow.owner.set.quota false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestFsShellFastCopy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestFsShellFastCopy.java new file mode 100644 index 0000000000000..3b72e141661a9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/TestFsShellFastCopy.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop; + +import java.io.IOException; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.FastCopy; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestFsShellFastCopy { + private Configuration conf = null; + private MiniDFSCluster cluster = null; + private DistributedFileSystem srcDFS = null; + private DistributedFileSystem dstDFS = null; + private FsShell fsShell; + + private static final long BLOCKSIZE = 1024 * 1024; + + private final String prefix = "/fastcopy/"; + private final String replicatedRootDir = prefix + "replicated/"; + private final String ecFileRootDir = prefix + "ec/"; + private final String suffix = "BlocksFile"; + + @Before + public void setup() throws IOException { + conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); + + cluster = + new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(6).build(); + cluster.waitActive(); + + srcDFS = cluster.getFileSystem(0); + dstDFS = cluster.getFileSystem(1); + + srcDFS.mkdirs(new Path(replicatedRootDir), FsPermission.getFileDefault()); + dstDFS.mkdirs(new Path(replicatedRootDir), FsPermission.getFileDefault()); + + enableECPolicies(); + fsShell = new FsShell(conf); + } + + @After + public void shutdown() throws Exception { + if (cluster != null) { + cluster.shutdown(); + } + } + + private void initFiles(String dir, int numFiles) throws IOException { + for (int i = 0; i < numFiles; i++) { + Path path = new Path(dir + i + suffix); + DFSTestUtil.createFile(srcDFS, path, i * BLOCKSIZE, (short) 3, 0L); + } + } + + private void cleanFiles(DistributedFileSystem dfs, String dir) throws IOException { + dfs.delete(new Path(dir), true); + } + + public void enableECPolicies() throws IOException { + DFSTestUtil.enableAllECPolicies(srcDFS); + ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getPolicies().get(1); + srcDFS.mkdirs(new Path(ecFileRootDir), FsPermission.getFileDefault()); + srcDFS.getClient().setErasureCodingPolicy(ecFileRootDir, ecPolicy.getName()); + + DFSTestUtil.enableAllECPolicies(dstDFS); + ecPolicy = SystemErasureCodingPolicies.getPolicies().get(1); + + dstDFS.mkdirs(new Path(ecFileRootDir), FsPermission.getFileDefault()); + dstDFS.getClient().setErasureCodingPolicy(ecFileRootDir, ecPolicy.getName()); + } + + @Test + public void testFastCopy() throws Exception { + int numFiles = 4; + initFiles(replicatedRootDir, numFiles); + + for (int i = 0; i < numFiles; i++) { + FastCopy fcp = new FastCopy(conf, new Path(srcDFS.getUri() + replicatedRootDir + i + suffix), + new Path(dstDFS.getUri() + replicatedRootDir + i + suffix), false); + fcp.copyFile(); + } + + for (int i = 0; i < numFiles; i++) { + Path path = new Path(replicatedRootDir + i + suffix); + assertTrue(dstDFS.getFileStatus(path).isFile()); + assertEquals(dstDFS.getFileChecksum(path), srcDFS.getFileChecksum(path)); + } + + cleanFiles(srcDFS, replicatedRootDir); + cleanFiles(dstDFS, replicatedRootDir); + } + + @Test + public void testFastCopyWithEC() throws Exception { + int numFiles = 4; + initFiles(ecFileRootDir, numFiles); + + for (int i = 0; i < numFiles; i++) { + FastCopy fcp = new FastCopy(conf, new Path(srcDFS.getUri() + ecFileRootDir + i + suffix), + new Path(dstDFS.getUri() + ecFileRootDir + i + suffix), false); + fcp.copyFile(); + } + + for (int i = 0; i < numFiles; i++) { + Path path = new Path(ecFileRootDir + i + suffix); + assertTrue(dstDFS.getFileStatus(path).isFile()); + assertEquals(dstDFS.getFileChecksum(path), srcDFS.getFileChecksum(path)); + } + + cleanFiles(srcDFS, ecFileRootDir); + cleanFiles(dstDFS, ecFileRootDir); + } + + @Test + public void testDFSFastCp() throws Exception { + int numFiles = 4; + initFiles(replicatedRootDir, numFiles); + + int ret = fsShell.run(new String[] {"-fastcp", "-t", "3", srcDFS.getUri() + replicatedRootDir, + dstDFS.getUri() + prefix}); + assertEquals(0, ret); + + for (int i = 0; i < numFiles; i++) { + Path path = new Path(replicatedRootDir + i + suffix); + assertTrue(dstDFS.getFileStatus(path).isFile()); + assertEquals(dstDFS.getFileChecksum(path), srcDFS.getFileChecksum(path)); + } + } + + @Test + public void testDFSFastCpWithECFiles() throws Exception { + int numFiles = 12; + initFiles(ecFileRootDir, numFiles); + + int ret = fsShell.run(new String[] {"-fastcp", "-t", "3", srcDFS.getUri() + ecFileRootDir, + dstDFS.getUri() + prefix}); + assertEquals(0, ret); + + for (int i = 0; i < numFiles; i++) { + Path path = new Path(ecFileRootDir + i + suffix); + assertTrue(dstDFS.getFileStatus(path).isFile()); + assertEquals(dstDFS.getFileChecksum(path), srcDFS.getFileChecksum(path)); + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index 800747ac82728..fd8cfdbb99ca4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -20,7 +20,11 @@ import static org.junit.Assert.*; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; @@ -28,14 +32,21 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.Builder; import org.apache.hadoop.hdfs.net.DFSNetworkTopology; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.net.StaticMapping; import org.junit.After; From 205d3428c1a813aefcc94e13807664161a0cfa7d Mon Sep 17 00:00:00 2001 From: LiuGuH <444506464@qq.com> Date: Fri, 28 Jun 2024 09:37:46 +0800 Subject: [PATCH 3/4] HDFS-17582. Distcp support fastcopy --- .../java/org/apache/hadoop/hdfs/FastCopy.java | 11 +- .../apache/hadoop/tools/DistCpConstants.java | 1 + .../apache/hadoop/tools/DistCpContext.java | 4 + .../hadoop/tools/DistCpOptionSwitch.java | 6 + .../apache/hadoop/tools/DistCpOptions.java | 15 +++ .../apache/hadoop/tools/OptionsParser.java | 2 + .../hadoop/tools/mapred/CopyMapper.java | 13 +- .../mapred/RetriableFileCopyCommand.java | 4 +- .../mapred/RetriableFileFastCopyCommand.java | 62 ++++++++++ .../hadoop/tools/TestDistCpOptions.java | 12 ++ .../apache/hadoop/tools/TestDistCpSystem.java | 112 ++++++++++++++---- .../tools/TestDistCpSystemWithECFiles.java | 67 +++++++++++ .../hadoop/tools/TestOptionsParser.java | 15 +++ .../hadoop/tools/mapred/TestCopyMapper.java | 41 ++++++- .../mapred/TestCopyMapperCompositeCrc.java | 1 + 15 files changed, 334 insertions(+), 32 deletions(-) create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileFastCopyCommand.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystemWithECFiles.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java index 42ed56e712985..47d5577394f23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/FastCopy.java @@ -101,6 +101,9 @@ public class FastCopy { private final DFSClient srcDFSClient; private final DistributedFileSystem dstFs; + private long chunkOffset = 0; + private long chunkLength = Long.MAX_VALUE; + public FastCopy(Configuration conf, Path sourcePath, Path dstPath, boolean overwrite) throws IOException { this.conf = conf; @@ -134,6 +137,12 @@ public FastCopy(Configuration conf, Path sourcePath, Path dstPath, boolean overw this.copyBlockExecutor = HadoopExecutors.newFixedThreadPool(this.copyBlockExecutorPoolSize); } + public FastCopy(Configuration conf, Path sourcePath, Path dstPath, boolean overwrite, long chunkOffset, long chunkLength) throws IOException { + this(conf,sourcePath,dstPath,overwrite); + this.chunkOffset = chunkOffset; + this.chunkLength = chunkLength; + } + private class CopyBlockCrossNamespace implements Runnable { private final ExtendedBlock source; private final ExtendedBlock target; @@ -466,7 +475,7 @@ public void copyFile() throws IOException { LOG.info("Start to copy {} to {}.", src, dst); try { - LocatedBlocks blocks = srcDFSClient.getLocatedBlocks(src, 0, srcFileStatus.getLen()); + LocatedBlocks blocks = srcDFSClient.getLocatedBlocks(src, chunkOffset, chunkLength); List blocksList = blocks.getLocatedBlocks(); LOG.debug("FastCopy : Block locations retrieved for {} : {}.", src, blocksList); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index b05ce10a96f9c..f57fa70ed0300 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -65,6 +65,7 @@ private DistCpConstants() { public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy"; + public static final String CONF_LABEL_USE_FAST_COPY = "distcp.use.fast.copy"; public static final String CONF_LABEL_SKIP_CRC = "distcp.skip.crc"; public static final String CONF_LABEL_OVERWRITE = "distcp.copy.overwrite"; public static final String CONF_LABEL_APPEND = "distcp.copy.append"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java index 8c443f66a0529..a46edb662469f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java @@ -99,6 +99,10 @@ public boolean shouldSkipCRC() { return options.shouldSkipCRC(); } + public boolean shouldUseFastCopy() { + return options.shouldSkipCRC(); + } + public boolean shouldBlock() { return options.shouldBlock(); } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index becd537f68b41..0ec5502cd00e1 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -153,6 +153,12 @@ public enum DistCpOptionSwitch { new Option("strategy", true, "Copy strategy to use. Default is " + "dividing work based on file sizes")), + /** + * Copy file uses fastCopy. + */ + USE_FASTCOPY(DistCpConstants.CONF_LABEL_USE_FAST_COPY, + new Option("fastcopy", false, "Copy file uses fastCopy.")), + /** * Skip CRC checks between source and target, when determining what * files need to be copied. diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 2745d828c361b..5b03e90cf6417 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -161,6 +161,7 @@ public final class DistCpOptions { private final boolean directWrite; private final boolean useIterator; + private final boolean useFastCopy; private final boolean updateRoot; @@ -230,6 +231,7 @@ private DistCpOptions(Builder builder) { this.directWrite = builder.directWrite; this.useIterator = builder.useIterator; + this.useFastCopy = builder.useFastCopy; this.updateRoot = builder.updateRoot; } @@ -290,6 +292,10 @@ public boolean shouldSkipCRC() { return skipCRC; } + public boolean shouldUseFastCopy() { + return useFastCopy; + } + public boolean shouldBlock() { return blocking; } @@ -406,6 +412,8 @@ public void appendToConf(Configuration conf) { String.valueOf(useRdiff)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC, String.valueOf(skipCRC)); + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_FASTCOPY, + String.valueOf(useFastCopy)); if (mapBandwidth > 0) { DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH, String.valueOf(mapBandwidth)); @@ -532,6 +540,8 @@ public static class Builder { private boolean updateRoot = false; + private boolean useFastCopy = false; + public Builder(List sourcePaths, Path targetPath) { Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(), "Source paths should not be null or empty!"); @@ -816,6 +826,11 @@ public Builder withUpdateRoot(boolean updateRootAttrs) { this.updateRoot = updateRootAttrs; return this; } + + public Builder withUseFastCopy(boolean useFastCopy) { + this.useFastCopy = useFastCopy; + return this; + } } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 49ee09f400400..81f1a6a550a96 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -118,6 +118,8 @@ public static DistCpOptions parse(String[] args) command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch())) .withUseIterator( command.hasOption(DistCpOptionSwitch.USE_ITERATOR.getSwitch())) + .withUseFastCopy( + command.hasOption(DistCpOptionSwitch.USE_FASTCOPY.getSwitch())) .withUpdateRoot( command.hasOption(DistCpOptionSwitch.UPDATE_ROOT.getSwitch())); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index ad17e574ca9b8..a7a6cb86b6fbf 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -97,6 +97,7 @@ public enum ChecksumComparison { private boolean verboseLog = false; private boolean directWrite = false; private boolean useModTimeToUpdate; + private boolean useFastCopy = false; private EnumSet preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; @@ -129,6 +130,7 @@ public void setup(Context context) throws IOException, InterruptedException { useModTimeToUpdate = conf.getBoolean(DistCpConstants.CONF_LABEL_UPDATE_MOD_TIME, CONF_LABEL_UPDATE_MOD_TIME_DEFAULT); + useFastCopy = conf.getBoolean(DistCpOptionSwitch.USE_FASTCOPY.getConfigLabel(), false); targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path targetFinalPath = new Path(conf.get( @@ -272,9 +274,14 @@ private void copyFileWithRetry(String description, throws IOException, InterruptedException { long bytesCopied; try { - bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description, - action, directWrite).execute(sourceFileStatus, target, context, - fileAttributes, sourceStatus); + if (!useFastCopy) { + bytesCopied = + (Long) new RetriableFileCopyCommand(skipCrc, description, action, directWrite).execute( + sourceFileStatus, target, context, fileAttributes, sourceStatus); + } else { + bytesCopied = (Long) new RetriableFileFastCopyCommand(skipCrc, description, action, + directWrite).execute(sourceFileStatus, target, context, fileAttributes, sourceStatus); + } } catch (Exception e) { context.setStatus("Copy Failure: " + sourceFileStatus.getPath()); throw new IOException("File copy failed: " + sourceFileStatus.getPath() + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 84bb001008637..ed275d6631133 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -66,7 +66,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { private static Logger LOG = LoggerFactory.getLogger(RetriableFileCopyCommand.class); private boolean skipCrc = false; private boolean directWrite = false; - private FileAction action; + protected FileAction action; /** * Constructor, taking a description of the action. @@ -192,7 +192,7 @@ private ChecksumOpt getChecksumOpt(EnumSet fileAttributes, } @SuppressWarnings("checkstyle:parameternumber") - private long copyToFile(Path targetPath, FileSystem targetFS, + protected long copyToFile(Path targetPath, FileSystem targetFS, CopyListingFileStatus source, long sourceOffset, Mapper.Context context, EnumSet fileAttributes, final FileChecksum sourceChecksum, FileStatus sourceStatus) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileFastCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileFastCopyCommand.java new file mode 100644 index 0000000000000..46ce267297786 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileFastCopyCommand.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.mapred; + +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.FastCopy; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.tools.CopyListingFileStatus; +import org.apache.hadoop.tools.DistCpOptions.FileAttribute; +import org.apache.hadoop.tools.mapred.CopyMapper.FileAction; + +public class RetriableFileFastCopyCommand extends RetriableFileCopyCommand { + public RetriableFileFastCopyCommand(String description, FileAction action) { + super(description, action); + } + + public RetriableFileFastCopyCommand(boolean skipCrc, String description, FileAction action) { + super(skipCrc, description, action); + } + + public RetriableFileFastCopyCommand(boolean skipCrc, String description, FileAction action, + boolean directWrite) { + super(skipCrc, description, action, directWrite); + } + + @Override + protected long copyToFile(Path targetPath, FileSystem targetFS, CopyListingFileStatus source, + long sourceOffset, Context context, EnumSet fileAttributes, + FileChecksum sourceChecksum, FileStatus sourceStatus) throws IOException { + FastCopy fastCopy = new FastCopy(context.getConfiguration(), source.getPath(), targetPath, + action == FileAction.OVERWRITE || action == FileAction.APPEND, source.getChunkOffset(), + source.getChunkLength()); + fastCopy.copyFile(); + + if (action == FileAction.APPEND) { + return source.getLen() - sourceOffset; + } + return source.getSizeToCopy(); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index d126bfdc4f975..a07776b3270ad 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -134,6 +134,18 @@ public void testSetSkipCRC() { Assert.assertTrue(options.shouldSkipCRC()); } + @Test + public void testUseFastCopy() { + final DistCpOptions.Builder builder = new DistCpOptions.Builder( + Collections.singletonList(new Path("hdfs://localhost:8020/source")), + new Path("hdfs://localhost:8020/target/")); + Assert.assertFalse(builder.build().shouldUseFastCopy()); + + final DistCpOptions options = builder.withUseFastCopy(true) + .build(); + Assert.assertTrue(options.shouldUseFastCopy()); + } + @Test public void testSetAtomicCommit() { final DistCpOptions.Builder builder = new DistCpOptions.Builder( diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java index 64c6800f9446a..35ee7dcc62ddb 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tools.util.DistCpTestUtils; import org.apache.hadoop.util.ToolRunner; @@ -63,10 +64,10 @@ public class TestDistCpSystem { private static final String SRCDAT = "srcdat"; private static final String DSTDAT = "dstdat"; - private static final long BLOCK_SIZE = 1024; + protected static long BLOCK_SIZE = 1024; - private static MiniDFSCluster cluster; - private static Configuration conf; + protected static MiniDFSCluster cluster; + protected static Configuration conf; private class FileEntry { String path; @@ -128,13 +129,14 @@ private void createFiles(DistributedFileSystem fs, String topdir, DFSTestUtil.createFile(fs, newPath, bufSize, fileSize, BLOCK_SIZE, replicationFactor, seed); } else { + ErasureCodingPolicy ecPolicy = fs.getErasureCodingPolicy(new Path("/")); + int num = ecPolicy == null ? 1 : ecPolicy.getNumDataUnits(); // Create a variable length block file, by creating // one block of half block size at the chunk boundary - long seg1 = chunkSize * BLOCK_SIZE - BLOCK_SIZE / 2; + long seg1 = chunkSize * BLOCK_SIZE * num - BLOCK_SIZE / 2; long seg2 = fileSize - seg1; - DFSTestUtil.createFile(fs, newPath, bufSize, - seg1, BLOCK_SIZE, replicationFactor, seed); - DFSTestUtil.appendFileNewBlock(fs, newPath, (int)seg2); + DFSTestUtil.createFile(fs, newPath, bufSize, seg1, BLOCK_SIZE, replicationFactor, seed); + DFSTestUtil.appendFileNewBlock(fs, newPath, (int) seg2); } } seed = System.currentTimeMillis() + rand.nextLong(); @@ -160,7 +162,7 @@ private static FileStatus[] getFileStatus(FileSystem fs, /** delete directory and everything underneath it.*/ - private static void deldir(FileSystem fs, String topdir) throws IOException { + protected static void deldir(FileSystem fs, String topdir) throws IOException { fs.delete(new Path(topdir), true); } @@ -336,6 +338,10 @@ private void copyAndVerify(final DistributedFileSystem fs, } private void chunkCopy(FileEntry[] srcFiles) throws Exception { + chunkCopy(srcFiles, false); + } + + private void chunkCopy(FileEntry[] srcFiles, boolean fastCopy) throws Exception { final String testRoot = "/testdir"; final String testSrcRel = SRCDAT; final String testSrc = testRoot + "/" + testSrcRel; @@ -358,9 +364,14 @@ private void chunkCopy(FileEntry[] srcFiles) throws Exception { createDestDir(fs, testDst, srcStats, srcFiles); - String[] args = new String[] {"-pugp", "-blocksperchunk", - String.valueOf(chunkSize), - nnUri + testSrc, nnUri + testDst}; + String[] args; + if (!fastCopy) { + args = new String[] {"-pugp", "-blocksperchunk", String.valueOf(chunkSize), nnUri + testSrc, + nnUri + testDst}; + } else { + args = new String[] {"-fastcopy", "-pugp", "-blocksperchunk", String.valueOf(chunkSize), + nnUri + testSrc, nnUri + testDst}; + } copyAndVerify(fs, srcFiles, srcStats, testDst, args); // Do it again @@ -378,11 +389,13 @@ private void chunkCopy(FileEntry[] srcFiles) throws Exception { } // get file status after modifying file srcStats = getFileStatus(fs, testRoot, srcFiles); - - args = new String[] {"-pugp", "-update", "-blocksperchunk", - String.valueOf(chunkSize), - nnUri + testSrc, nnUri + testDst + "/" + testSrcRel}; - + if (!fastCopy) { + args = new String[] {"-pugp", "-update", "-blocksperchunk", String.valueOf(chunkSize), + nnUri + testSrc, nnUri + testDst + "/" + testSrcRel}; + } else { + args = new String[] {"-fastcopy", "-pugp", "-update", "-blocksperchunk", + String.valueOf(chunkSize), nnUri + testSrc, nnUri + testDst + "/" + testSrcRel}; + } copyAndVerify(fs, srcFiles, srcStats, testDst, args); deldir(fs, testRoot); @@ -398,6 +411,7 @@ public void testRecursiveChunkCopy() throws Exception { new FileEntry(SRCDAT + "/dir1/file1", false) }; chunkCopy(srcFiles); + chunkCopy(srcFiles, true); } @Test @@ -407,10 +421,16 @@ public void testChunkCopyOneFile() throws Exception { new FileEntry(SRCDAT + "/file0", false) }; chunkCopy(srcFiles); + chunkCopy(srcFiles, true); } @Test public void testDistcpLargeFile() throws Exception { + testDistcpLargeFile(false); + testDistcpLargeFile(true); + } + + public void testDistcpLargeFile(boolean fastCopy) throws Exception { FileEntry[] srcfiles = { new FileEntry(SRCDAT, true), new FileEntry(SRCDAT + "/file", false) @@ -444,12 +464,15 @@ public void testDistcpLargeFile() throws Exception { for (int i = 0; i < srcfiles.length; i++) { fs.setOwner(srcstats[i].getPath(), "u" + i, null); } - String[] args = new String[] { - "-blocksperchunk", - String.valueOf(chunkSize), - nnUri + testSrc, - nnUri + testDst - }; + + String[] args; + if (fastCopy) { + args = new String[] {"-fastcopy", "-blocksperchunk", String.valueOf(chunkSize), nnUri + testSrc, + nnUri + testDst}; + } else { + args = new String[] {"-blocksperchunk", String.valueOf(chunkSize), nnUri + testSrc, + nnUri + testDst}; + } LOG.info("_____ running distcp: " + args[0] + " " + args[1]); ToolRunner.run(conf, new DistCp(), args); @@ -464,6 +487,51 @@ public void testDistcpLargeFile() throws Exception { deldir(fs, testRoot); } + @Test + public void testDistcpFastCopy() throws Exception { + FileEntry[] srcfiles = { + new FileEntry(SRCDAT, true), + new FileEntry(SRCDAT + "/file0", false), + new FileEntry(SRCDAT + "/sub1/file1", false), + new FileEntry(SRCDAT + "/sub2/file2", false), + new FileEntry(SRCDAT + "/sub3/file3", false), + new FileEntry(SRCDAT + "/sub3/file4", false) + }; + + final String testRoot = "/testdir"; + final String testSrcRel = SRCDAT; + final String testSrc = testRoot + "/" + testSrcRel; + final String testDstRel = DSTDAT; + final String testDst = testRoot + "/" + testDstRel; + + String nnUri = FileSystem.getDefaultUri(conf).toString(); + DistributedFileSystem fs = + (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf); + fs.mkdirs(new Path(testRoot)); + fs.mkdirs(new Path(testSrc)); + fs.mkdirs(new Path(testDst)); + createFiles(fs, testRoot, srcfiles, -1); + + String[] args = new String[] { + "-pug", + "-fastcopy", + nnUri + testSrc, + nnUri + testDst + }; + + LOG.info("_____ running distcp: " + args[0] + " " + args[1]); + ToolRunner.run(conf, new DistCp(), args); + + FileStatus[] srcstat = getFileStatus(fs, testRoot, srcfiles); + FileStatus[] dststat = getFileStatus(fs, testDst, srcfiles); + + for (int i=0; i< srcstat.length; i++) { + compareFiles(fs, srcstat[i], dststat[i]); + } + + deldir(fs, testRoot); + } + @Test public void testPreserveUseNonEmptyDir() throws Exception { String testRoot = "/testdir." + getMethodName(); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystemWithECFiles.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystemWithECFiles.java new file mode 100644 index 0000000000000..b7995b31a43e6 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystemWithECFiles.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import java.io.IOException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; + +/** + * A JUnit test for copying files recursively. + */ + +public class TestDistCpSystemWithECFiles extends TestDistCpSystem { + static DistributedFileSystem dfs; + + @BeforeClass + public static void beforeClass() throws IOException { + BLOCK_SIZE = 1024 * 1024; + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + //conf.set("dfs.checksum.combine.mode", COMPOSITE_CRC.toString()); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + enableECPolicies(); + } + + @AfterClass + public static void afterClass() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + public static void enableECPolicies() throws IOException { + DFSTestUtil.enableAllECPolicies(dfs); + ErasureCodingPolicy ecPolicy = SystemErasureCodingPolicies.getPolicies().get(1); + dfs.getClient().setErasureCodingPolicy("/", ecPolicy.getName()); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 1ffdd89073dec..fe7370d33cd54 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -152,6 +152,21 @@ public void testParseSkipCRC() { Assert.assertTrue(options.shouldSkipCRC()); } + @Test + public void testParseUseFastCopy() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldUseFastCopy()); + + options = OptionsParser.parse(new String[] { + "-update", + "-fastcopy", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldUseFastCopy()); + } + @Test public void testParseAtomicCommit() { DistCpOptions options = OptionsParser.parse(new String[] { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index 780d82df2bce3..01a5c781b42d7 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -21,8 +21,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.io.PrintWriter; -import java.io.StringWriter; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.EnumSet; @@ -44,6 +42,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; @@ -76,7 +75,8 @@ public class TestCopyMapper { private static final int DEFAULT_FILE_SIZE = 1024; private static final long NON_DEFAULT_BLOCK_SIZE = 4096; - private static MiniDFSCluster cluster; + protected static MiniDFSCluster cluster; + protected static DataNode datanode; private static final String SOURCE_PATH = "/tmp/source"; private static final String TARGET_PATH = "/tmp/target"; @@ -88,6 +88,7 @@ public static void setup() throws Exception { .numDataNodes(1) .format(true) .build()); + datanode = cluster.getDataNodes().get(0); } /** @@ -287,14 +288,32 @@ public void testCopyWithDifferentChecksumType() throws Exception { @Test(timeout=40000) public void testRun() throws Exception { + long numFastCopy1 = datanode.getMetrics().getBlocksReplicatedViaHardlink(); + testCopy(false, true); + long numFastCopy2 = datanode.getMetrics().getBlocksReplicatedViaHardlink(); + Assert.assertTrue(numFastCopy2 - numFastCopy1 > 0); + testCopy(false); + long numFastCopy3 = datanode.getMetrics().getBlocksReplicatedViaHardlink(); + Assert.assertEquals(0, numFastCopy3 - numFastCopy2); } @Test public void testCopyWithAppend() throws Exception { + long numFastCopy1 = datanode.getMetrics().getBlocksReplicatedViaHardlink(); + testCopyWithAppend(true); + long numFastCopy2 = datanode.getMetrics().getBlocksReplicatedViaHardlink(); + Assert.assertTrue(numFastCopy2 - numFastCopy1 > 0); + + testCopyWithAppend(false); + long numFastCopy3 = datanode.getMetrics().getBlocksReplicatedViaHardlink(); + Assert.assertEquals(0, numFastCopy3 - numFastCopy2); + } + + public void testCopyWithAppend(boolean fastCopy) throws Exception { final FileSystem fs = cluster.getFileSystem(); // do the first distcp - testCopy(false); + testCopy(false, fastCopy); // start appending data to source appendSourceData(); @@ -310,6 +329,10 @@ public void testCopyWithAppend() throws Exception { // Enable append context.getConfiguration().setBoolean( DistCpOptionSwitch.APPEND.getConfigLabel(), true); + if (fastCopy) { + context.getConfiguration().setBoolean( + DistCpOptionSwitch.USE_FASTCOPY.getConfigLabel(), true); + } copyMapper.setup(context); int numFiles = 0; @@ -334,6 +357,9 @@ public void testCopyWithAppend() throws Exception { .getValue()); Assert.assertEquals(numFiles, stubContext.getReporter(). getCounter(CopyMapper.Counter.COPY).getValue()); + if (fastCopy) { + return; + } rb = getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); /* * added as part of HADOOP-15292 to ensure that multiple readBlock() @@ -346,6 +372,10 @@ public void testCopyWithAppend() throws Exception { } private void testCopy(boolean preserveChecksum) throws Exception { + testCopy(preserveChecksum, false); + } + + private void testCopy(boolean preserveChecksum, boolean fastCopy) throws Exception { deleteState(); if (preserveChecksum) { createSourceDataWithDifferentChecksumType(); @@ -372,6 +402,9 @@ private void testCopy(boolean preserveChecksum) throws Exception { } configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(), DistCpUtils.packAttributes(fileAttributes)); + if (fastCopy) { + configuration.setBoolean(DistCpOptionSwitch.USE_FASTCOPY.getConfigLabel(), true); + } copyMapper.setup(context); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java index 6ed86e385d317..e37f1c242b7e5 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapperCompositeCrc.java @@ -36,6 +36,7 @@ public static void setup() throws Exception { .numDataNodes(1) .format(true) .build()); + datanode = cluster.getDataNodes().get(0); } @Override From e5022fe6c4f04a0f7b60b418367ba6d9468da925 Mon Sep 17 00:00:00 2001 From: LiuGuH <444506464@qq.com> Date: Fri, 26 Jul 2024 18:11:36 +0800 Subject: [PATCH 4/4] HDFS-17592. FastCopy support data copy in different nameservices without federation. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 ++++ .../hadoop/hdfs/server/datanode/DataNode.java | 15 +++++++++------ .../hadoop/hdfs/server/datanode/DataXceiver.java | 11 ++++++++--- .../src/main/resources/hdfs-default.xml | 8 ++++++++ 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 1119a25d3feb9..166d6f04d10c4 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -496,6 +496,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_DATANODE_PROCESS_COMMANDS_THRESHOLD_DEFAULT = TimeUnit.SECONDS.toMillis(2); + public static final String DFS_DATANODE_FAST_COPY_CHECK_TARGET_BLOCK_ACCESS_ENABLE = + "dfs.datanode.fast.copy.check.target.block.access.enable"; + public static final boolean DFS_DATANODE_FAST_COPY_CHECK_TARGET_BLOCK_ACCESS_ENABLE_DEFAULT = false; + public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check"; public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 4161c3531e929..ebc120958ce33 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3022,6 +3022,7 @@ private class DataTransfer implements Runnable { */ private DataTransferThrottler throttler; private boolean copyBlockCrossNamespace; + private Token targetBlockToken; /** * Connect to the first item in the target list. Pass along the @@ -3056,10 +3057,11 @@ private class DataTransfer implements Runnable { DataTransfer(DatanodeInfo[] targets, StorageType[] targetStorageTypes, String[] targetStorageIds, ExtendedBlock source, ExtendedBlock target, - BlockConstructionStage stage, final String clientname) { + BlockConstructionStage stage, final String clientname, Token targetBlockToken) { this(targets, targetStorageTypes, targetStorageIds, source, stage, clientname); this.target = target; this.copyBlockCrossNamespace = true; + this.targetBlockToken = targetBlockToken; } /** @@ -3087,7 +3089,8 @@ public void run() { // // Header info // - Token accessToken = + Token accessToken = targetBlockToken != null ? + targetBlockToken : getBlockAccessToken(target, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), targetStorageTypes, targetStorageIds); @@ -4411,7 +4414,7 @@ public BlockPoolManager getBlockPoolManager() { } public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock targetBlk, - DatanodeInfo targetDn) throws IOException { + DatanodeInfo targetDn, Token targetBlockToken) throws IOException { if (!data.isValidBlock(sourceBlk)) { // block does not exist or is under-construction String errStr = @@ -4436,7 +4439,7 @@ public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock targe result = copyBlockCrossNamespaceExecutor.submit(new LocalBlockCopy(sourceBlk, targetBlk)); } else { result = copyBlockCrossNamespaceExecutor.submit( - new DataCopy(targetDn, sourceBlk, targetBlk).getDataTransfer()); + new DataCopy(targetDn, sourceBlk, targetBlk, targetBlockToken).getDataTransfer()); } try { result.get(getDnConf().getCopyBlockCrossNamespaceSocketTimeout(), TimeUnit.MILLISECONDS); @@ -4449,7 +4452,7 @@ public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock targe private class DataCopy { private final DataTransfer dataTransfer; - DataCopy(DatanodeInfo targetDn, ExtendedBlock sourceBlk, ExtendedBlock targetBlk) { + DataCopy(DatanodeInfo targetDn, ExtendedBlock sourceBlk, ExtendedBlock targetBlk, Token targetBlockToken) { FsVolumeImpl volume = (FsVolumeImpl) data.getVolume(sourceBlk); StorageType storageType = volume.getStorageType(); String storageId = volume.getStorageID(); @@ -4459,7 +4462,7 @@ private class DataCopy { String[] targetStorageIds = new String[] {storageId}; dataTransfer = new DataTransfer(targets, targetStorageTypes, targetStorageIds, sourceBlk, targetBlk, - PIPELINE_SETUP_CREATE, ""); + PIPELINE_SETUP_CREATE, "", targetBlockToken); } public DataTransfer getDataTransfer() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index b2aced4d69e88..d8c655943e88c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -89,6 +89,8 @@ import java.util.Arrays; import java.util.concurrent.TimeUnit; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAST_COPY_CHECK_TARGET_BLOCK_ACCESS_ENABLE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAST_COPY_CHECK_TARGET_BLOCK_ACCESS_ENABLE_DEFAULT; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR; @@ -1099,11 +1101,14 @@ public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, DataOutputStream reply = getBufferedOutputStream(); checkAccess(reply, true, sourceBlk, sourceBlockToken, Op.COPY_BLOCK_CROSSNAMESPACE, BlockTokenIdentifier.AccessMode.READ); - checkAccess(reply, true, targetBlk, targetBlockToken, Op.COPY_BLOCK_CROSSNAMESPACE, - BlockTokenIdentifier.AccessMode.WRITE); + if (dnConf.getConf().getBoolean(DFS_DATANODE_FAST_COPY_CHECK_TARGET_BLOCK_ACCESS_ENABLE, + DFS_DATANODE_FAST_COPY_CHECK_TARGET_BLOCK_ACCESS_ENABLE_DEFAULT)) { + checkAccess(reply, true, targetBlk, targetBlockToken, Op.COPY_BLOCK_CROSSNAMESPACE, + BlockTokenIdentifier.AccessMode.WRITE); + } try { - datanode.copyBlockCrossNamespace(sourceBlk, targetBlk, targetDatanode); + datanode.copyBlockCrossNamespace(sourceBlk, targetBlk, targetDatanode, targetBlockToken); sendResponse(SUCCESS, null); } catch (IOException ioe) { LOG.warn("copyBlockCrossNamespace from {} to {} to {} received exception,", sourceBlk, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 3db2bad21aafc..eb43591adea84 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -6461,6 +6461,14 @@ + + dfs.datanode.fast.copy.check.target.block.access.enable + false + + Whether to check target block acess. If true, fastcopy will not work with different nameservices that are not federation cluster. + + + dfs.permissions.allow.owner.set.quota false