8000 HDFS-2139. Fast copy for HDFS.FastCopy (This is used for test case . And in the futuer this will split to subtask submission. ) by LiuGuH · Pull Request #6950 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HDFS-2139. Fast copy for HDFS.FastCopy (This is used for test case . And in the futuer this will split to subtask submission. ) #6950

New issue

Have a 8000 question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,10 @@ public void moveFromLocalFile(Path src, Path dst)
copyFromLocalFile(true, src, dst);
}

public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException {
throw new UnsupportedOperationException("FileSystem does not support fastcopy method.");
}

/**
* The src file is on the local disk. Add it to the filesystem at
* the given dst name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,14 @@ protected void copyFileToTarget(PathData src, PathData target)
IOUtils.closeStream(in);
}
}

protected void copyFileToTargetWithFastCp(PathData src,PathData target) throws IOException {
final boolean preserveRawXattrs =
checkPathsForReservedRaw(src.path, target.path);
src.fs.setVerifyChecksum(verifyChecksum);
fastCopyToTarget(src, target);
preserveAttributes(src, target, preserveRawXattrs);
}

/**
* Check the source and target paths to ensure that they are either both in
Expand Down Expand Up @@ -433,6 +441,24 @@ protected void copyStreamToTarget(InputStream in, PathData target)
}
}

protected void fastCopyToTarget(PathData src, PathData target)
throws IOException {
if (target.exists && (target.stat.isDirectory() || !overwrite)) {
throw new PathExistsException(target.toString());
}
TargetFileSystem targetFs = new TargetFileSystem(target.fs);
try {
PathData tempTarget = direct ? target : target.suffix("._COPYING_");
targetFs.setWriteChecksum(writeChecksum);
src.fs.fastCopy(src.path, tempTarget.path, overwrite);
if (!direct) {
targetFs.rename(tempTarget, target);
}
} finally {
targetFs.close(); // last ditch effort to ensure temp file is removed
}
}

/**
* Preserve the attributes of the source to the target.
* The method calls {@link #shouldPreserve(FileAttribute)} to check what
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.fs.PathOperationException;
import org.apache.hadoop.io.IOUtils;

/** Various commands for copy files */
Expand All @@ -44,6 +45,7 @@ class CopyCommands {
public static void registerCommands(CommandFactory factory) {
factory.addClass(Merge.class, "-getmerge");
factory.addClass(Cp.class, "-cp");
factory.addClass(FastCp.class, "-fastcp");
factory.addClass(CopyFromLocal.class, "-copyFromLocal");
factory.addClass(CopyToLocal.class, "-copyToLocal");
factory.addClass(Get.class, "-get");
Expand Down Expand Up @@ -209,7 +211,81 @@ private void popPreserveOption(List<String> args) {
}
}
}


static class FastCp extends CopyCommandWithMultiThread {
public static final String NAME = "fastcp";
public static final String USAGE =
"[-f] [-p | -p[topax]] [-d] [-t <thread count>]"
+ " [-q <thread pool queue size>] <src> ... <dst>";
public static final String DESCRIPTION =
"FastCopy files that match the file pattern <src> to a destination."
+ " When copying multiple files, the destination must be a "
+ "directory.\nFlags :\n"
+ " -p[topax] : Preserve file attributes [topx] (timestamps, "
+ "ownership, permission, ACL, XAttr). If -p is specified with "
+ "no arg, then preserves timestamps, ownership, permission. "
+ "If -pa is specified, then preserves permission also because "
+ "ACL is a super-set of permission. Determination of whether raw "
+ "namespace extended attributes are preserved is independent of "
+ "the -p flag.\n"
+ " -f : Overwrite the destination if it already exists.\n"
+ " -d : Skip creation of temporary file(<dst>._COPYING_).\n"
+ " -t <thread count> : Number of threads to be used, "
+ "default is 1.\n"
+ " -q <thread pool queue size> : Thread pool queue size to be "
+ "used, default is 1024.\n";

@Override
protected void processOptions(LinkedList<String> args) throws IOException {
popPreserveOption(args);
CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "f", "d");
cf.addOptionWithValue("t");
cf.addOptionWithValue("q");
cf.parse(args);
setDirectWrite(cf.getOpt("d"));
setOverwrite(cf.getOpt("f"));
setThreadCount(cf.getOptValue("t"));
setThreadPoolQueueSize(cf.getOptValue("q"));
// should have a -r option
setRecursive(true);
getRemoteDestination(args);
}

private void popPreserveOption(List<String> args) {
for (Iterator<String> iter = args.iterator(); iter.hasNext(); ) {
String cur = iter.next();
if (cur.equals("--")) {
// stop parsing arguments when you see --
break;
} else if (cur.startsWith("-p")) {
iter.remove();
if (cur.length() == 2) {
setPreserve(true);
} else {
String attributes = cur.substring(2);
for (int index = 0; index < attributes.length(); index++) {
preserve(FileAttribute.getAttribute(attributes.charAt(index)));
}
}
return;
}
}
}

@Override
protected void processPath(PathData src, PathData dst) throws IOException {
if (src.stat.isSymlink()) {
// TODO: remove when FileContext is supported, this needs to either
// copy the symlink or deref the symlink
throw new PathOperationException(src.toString());
} else if (src.stat.isFile()) {
copyFileToTargetWithFastCp(src, dst);
} else if (src.stat.isDirectory() && !isRecursive()) {
throw new PathIsDirectoryException(src.toString());
}
}
}

/**
* Copy local files to a remote filesystem
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public Token<?>[] addDelegationTokens(String renewer, Credentials creds)
void setQuota(Path f, long namespaceQuota, long storagespaceQuota);
void setQuotaByStorageType(Path f, StorageType type, long quota);
StorageStatistics getStorageStatistics();
public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException;

/*
Not passed through as the inner implementation will miss features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ public Collection<? extends BlockStoragePolicySpi> getAllStoragePolicies()
public Collection<FileStatus> getTrashRoots(boolean allUsers) throws IOException;
StorageStatistics getStorageStatistics();

public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException;

FutureDataInputStreamBuilder openFile(Path path)
throws IOException, UnsupportedOperationException;

Expand Down
E377
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -1956,6 +1957,26 @@ protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken);
}

protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, DatanodeInfo sourceDatanode,
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken,
DatanodeInfo targetDatanode) throws IOException {
IOStreamPair pair =
DFSUtilClient.connectToDN(sourceDatanode, getConf().getSocketTimeout(), conf, saslClient,
socketFactory, getConf().isConnectToDnViaHostname(), this, sourceBlockToken);

new Sender((DataOutputStream) pair.out).copyBlockCrossNamespace(sourceBlk, sourceBlockToken,
targetBlk, targetBlockToken, targetDatanode);

pair.out.flush();

DataInputStream reply = new DataInputStream(pair.in);
BlockOpResponseProto proto = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(reply));
DataTransferProtoUtil.checkBlockOpStatus(proto,
"copyBlockCrossNamespace " + sourceBlk + " to " + targetBlk + " from " + sourceDatanode
+ " to " + targetDatanode);
}

/**
* Infer the checksum type for a replica by sending an OP_READ_BLOCK
* for the first byte of that replica. This is used for compatibility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class DFSOutputStream extends FSOutputSummer
private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize;
private boolean leaseRecovered = false;
private ExtendedBlock userAssignmentLastBlock;

/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
Expand Down Expand Up @@ -949,6 +950,9 @@ protected void recoverLease(boolean recoverLeaseOnCloseException) {
void completeFile() throws IOException {
// get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock();
if (lastBlock == null) {
lastBlock = getUserAssignmentLastBlock();
}
try (TraceScope ignored = dfsClient.getTracer()
.newScope("DFSOutputStream#completeFile")) {
completeFile(lastBlock);
Expand Down Expand Up @@ -1095,6 +1099,14 @@ ExtendedBlock getBlock() {
return getStreamer().getBlock();
}

public ExtendedBlock getUserAssignmentLastBlock() {
return userAssignmentLastBlock;
}

public void setUserAssignmentLastBlock(ExtendedBlock userAssignmentLastBlock) {
this.userAssignmentLastBlock = userAssignmentLastBlock;
}

@VisibleForTesting
public long getFileId() {
return fileId;
Expand Down Expand Up @@ -1199,4 +1211,16 @@ private static long calculateDelayForNextRetry(long previousDelay,
long maxDelay) {
return Math.min(previousDelay * 2, maxDelay);
}

public DFSClient getDfsClient() {
return dfsClient;
}

protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, DatanodeInfo sourceDatanode,
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken,
DatanodeInfo targetDatanode) throws IOException {
dfsClient.copyBlockCrossNamespace(sourceBlk, sourceBlockToken, sourceDatanode, targetBlk,
targetBlockToken, targetDatanode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,9 @@ protected synchronized void closeImpl() throws IOException {

try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) {
if (currentBlockGroup == null) {
currentBlockGroup = getUserAssignmentLastBlock();
}
completeFile(currentBlockGroup);
}
logCorruptBlocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,10 @@ public FSDataOutputStream next(final FileSystem fs, final Path p)
* inherited policy.
*
*/
private HdfsDataOutputStream create(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize,
public HdfsDataOutputStream create(
final Path f, final FsPermission permission,
final EnumSet<CreateFlag> flag, final int bufferSize,
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final InetSocketAddress[] favoredNodes, final String ecPolicyName,
final String storagePolicy)
Expand Down Expand Up @@ -692,6 +693,12 @@ public HdfsDataOutputStream next(final FileSystem fs, final Path p)
}.resolve(this, absF);
}

@Override
public void fastCopy(Path src, Path dst, boolean overwrite) throws IOException {
FastCopy fastCopy = new FastCopy(getConf(), src, dst, overwrite);
fastCopy.copyFile();
}

/**
* Same as create(), except fails if parent directory doesn't already exist.
*/
Expand Down
Loading
Loading
0