From 325306d125757e46a75616be48c96f9e8b5b6935 Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Sat, 27 Jan 2024 19:55:08 +0530 Subject: [PATCH 1/6] HADOOP-19047: Support InMemory Tracking Of S3A Magic Commits --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 17 +++ .../hadoop/fs/s3a/commit/CommitConstants.java | 7 + .../fs/s3a/commit/MagicCommitIntegration.java | 20 +-- .../magic/InMemoryMagicCommitTracker.java | 126 ++++++++++++++++++ .../s3a/commit/magic/MagicCommitTracker.java | 106 +++++---------- .../commit/magic/MagicCommitTrackerUtils.java | 64 +++++++++ .../commit/magic/MagicS3GuardCommitter.java | 105 ++++++++++++--- .../commit/magic/S3MagicCommitTracker.java | 124 +++++++++++++++++ .../markdown/tools/hadoop-aws/committers.md | 7 + .../s3a/commit/AbstractITCommitProtocol.java | 15 ++- .../commit/TestMagicCommitTrackerUtils.java | 64 +++++++++ .../magic/ITestMagicCommitProtocol.java | 28 ++++ 12 files changed, 578 insertions(+), 105 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index a404fc1c21732..8e6d7bd27f799 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -231,6 +232,7 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE; @@ -3900,6 +3902,21 @@ public void access(final Path f, final FsAction mode) @Retries.RetryTranslated public FileStatus getFileStatus(final Path f) throws IOException { Path path = qualify(f); + if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) { + // Some downstream apps might call getFileStatus for a magic path to get the file size. + // when commit data is stored in memory construct the dummy S3AFileStatus with correct + // file size fetched from the memory. + if (InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().containsKey(path)) { + long len = InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().get(path); + return new S3AFileStatus(len, + 0L, + path, + getDefaultBlockSize(path), + username, + null, + null); + } + } return trackDurationAndSpan( INVOCATION_GET_FILE_STATUS, path, () -> innerGetFileStatus(path, false, StatusProbeEnum.ALL)); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index 52df58d6a4b43..baec79fd79aa4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -242,6 +242,13 @@ private CommitConstants() { */ public static final int DEFAULT_COMMITTER_THREADS = 32; + + public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED = + "fs.s3a.committer.magic.track.commits.in.memory.enabled"; + + public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT = + false; + /** * Path in the cluster filesystem for temporary data: {@value}. * This is for HDFS, not the local filesystem. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java index e6524c91961dc..a2c11a15c13fe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java @@ -20,17 +20,19 @@ import java.util.List; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; +import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.Statistic; -import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker; import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; /** * Adds the code needed for S3A to support magic committers. @@ -105,13 +107,15 @@ public PutTracker createTracker(Path path, String key, String pendingsetPath = key + CommitConstants.PENDING_SUFFIX; getStoreContext().incrementStatistic( Statistic.COMMITTER_MAGIC_FILES_CREATED); - tracker = new MagicCommitTracker(path, - getStoreContext().getBucket(), - key, - destKey, - pendingsetPath, - owner.getWriteOperationHelper(), - trackerStatistics); + if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) { + tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(), + key, destKey, pendingsetPath, owner.getWriteOperationHelper(), + trackerStatistics); + } else { + tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(), + key, destKey, pendingsetPath, owner.getWriteOperationHelper(), + trackerStatistics); + } LOG.debug("Created {}", tracker); } else { LOG.warn("File being created has a \"magic\" path, but the filesystem" diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java new file mode 100644 index 0000000000000..9ab5eca1fb577 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import software.amazon.awssdk.services.s3.model.CompletedPart; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath; + +/** + * InMemoryMagicCommitTracker stores the commit data in memory. + * The commit data and related data stores are flushed out from + * the memory when the task is committed or aborted. + */ +public class InMemoryMagicCommitTracker extends MagicCommitTracker { + + // stores taskAttemptId to commit data mapping + private static Map> + taskAttemptIdToMpuMetdadataMap = new ConcurrentHashMap<>(); + + // stores the path to its length/size mapping + private static Map taskAttemptIdToBytesWritten = new ConcurrentHashMap<>(); + + // stores taskAttemptId to path mapping + private static Map> taskAttemptIdToPath = new ConcurrentHashMap<>(); + + public InMemoryMagicCommitTracker(Path path, + String bucket, + String originalDestKey, + String destKey, + String pendingsetKey, + WriteOperationHelper writer, + PutTrackerStatistics trackerStatistics) { + super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics); + } + + @Override + public boolean aboutToComplete(String uploadId, + List parts, + long bytesWritten, + final IOStatistics iostatistics) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), + "empty/null upload ID: " + uploadId); + Preconditions.checkArgument(parts != null, "No uploaded parts list"); + Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save"); + + // build the commit summary + SinglePendingCommit commitData = new SinglePendingCommit(); + commitData.touch(System.currentTimeMillis()); + commitData.setDestinationKey(getDestKey()); + commitData.setBucket(getBucket()); + commitData.setUri(getPath().toUri().toString()); + commitData.setUploadId(uploadId); + commitData.setText(""); + commitData.setLength(bytesWritten); + commitData.bindCommitData(parts); + commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics)); + + // extract the taskAttemptId from the path + String taskAttemptId = extractTaskAttemptIdFromPath(getPath()); + + // store the commit data with taskAttemptId as the key + taskAttemptIdToMpuMetdadataMap.computeIfAbsent(taskAttemptId, + k -> Collections.synchronizedList(new ArrayList<>())).add(commitData); + + // store the byteswritten(length) for the corresponding file + taskAttemptIdToBytesWritten.put(getPath(), bytesWritten); + + // store the mapping between taskAttemptId and path + // This information is used for removing entries from + // the map once the taskAttempt is completed/committed. + taskAttemptIdToPath.computeIfAbsent(taskAttemptId, + k -> Collections.synchronizedList(new ArrayList<>())).add(getPath()); + + LOG.info("commit metadata for {} parts in {}. size: {} byte(s) " + + "for the taskAttemptId: {} is stored in memory", + parts.size(), getPendingPartKey(), bytesWritten, taskAttemptId); + LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", + getPath(), getPendingPartKey(), commitData); + + return false; + } + + public static Map> getTaskAttemptIdToMpuMetdadataMap() { + return taskAttemptIdToMpuMetdadataMap; + } + + public static Map getTaskAttemptIdToBytesWritten() { + return taskAttemptIdToBytesWritten; + } + + public static Map> getTaskAttemptIdToPath() { + return taskAttemptIdToPath; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java index b2e703e1b088d..2e43966932823 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java @@ -18,37 +18,22 @@ package org.apache.hadoop.fs.s3a.commit.magic; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; import java.util.List; -import java.util.Map; import software.amazon.awssdk.services.s3.model.CompletedPart; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.Retries; -import org.apache.hadoop.fs.s3a.S3ADataBlocks; import org.apache.hadoop.fs.s3a.WriteOperationHelper; import org.apache.hadoop.fs.s3a.commit.PutTracker; -import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; -import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; -import org.apache.hadoop.util.Preconditions; import static java.util.Objects.requireNonNull; -import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; -import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; -import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; /** * Put tracker for Magic commits. @@ -65,7 +50,7 @@ public class MagicCommitTracker extends PutTracker { private final Path path; private final WriteOperationHelper writer; private final String bucket; - private static final byte[] EMPTY = new byte[0]; + protected static final byte[] EMPTY = new byte[0]; private final PutTrackerStatistics trackerStatistics; /** @@ -118,76 +103,21 @@ public boolean outputImmediatelyVisible() { /** * Complete operation: generate the final commit data, put it. - * @param uploadId Upload ID - * @param parts list of parts + * + * @param uploadId Upload ID + * @param parts list of parts * @param bytesWritten bytes written * @param iostatistics nullable IO statistics * @return false, indicating that the commit must fail. - * @throws IOException any IO problem. + * @throws IOException any IO problem. * @throws IllegalArgumentException bad argument */ @Override public boolean aboutToComplete(String uploadId, List parts, long bytesWritten, - final IOStatistics iostatistics) - throws IOException { - Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), - "empty/null upload ID: "+ uploadId); - Preconditions.checkArgument(parts != null, - "No uploaded parts list"); - Preconditions.checkArgument(!parts.isEmpty(), - "No uploaded parts to save"); - - // put a 0-byte file with the name of the original under-magic path - // Add the final file length as a header - // this is done before the task commit, so its duration can be - // included in the statistics - Map headers = new HashMap<>(); - headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten)); - PutObjectRequest originalDestPut = writer.createPutObjectRequest( - originalDestKey, - 0, - new PutObjectOptions(true, null, headers), false); - upload(originalDestPut, new ByteArrayInputStream(EMPTY)); - - // build the commit summary - SinglePendingCommit commitData = new SinglePendingCommit(); - commitData.touch(System.currentTimeMillis()); - commitData.setDestinationKey(getDestKey()); - commitData.setBucket(bucket); - commitData.setUri(path.toUri().toString()); - commitData.setUploadId(uploadId); - commitData.setText(""); - commitData.setLength(bytesWritten); - commitData.bindCommitData(parts); - commitData.setIOStatistics( - new IOStatisticsSnapshot(iostatistics)); - - byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer()); - LOG.info("Uncommitted data pending to file {};" - + " commit metadata for {} parts in {}. size: {} byte(s)", - path.toUri(), parts.size(), pendingPartKey, bytesWritten); - LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", - path, pendingPartKey, commitData); - PutObjectRequest put = writer.createPutObjectRequest( - pendingPartKey, - bytes.length, null, false); - upload(put, new ByteArrayInputStream(bytes)); + final IOStatistics iostatistics) throws IOException { return false; - - } - /** - * PUT an object. - * @param request the request - * @param inputStream input stream of data to be uploaded - * @throws IOException on problems - */ - @Retries.RetryTranslated - private void upload(PutObjectRequest request, InputStream inputStream) throws IOException { - trackDurationOfInvocation(trackerStatistics, COMMITTER_MAGIC_MARKER_PUT.getSymbol(), - () -> writer.putObject(request, PutObjectOptions.keepingDirs(), - new S3ADataBlocks.BlockUploadData(inputStream), false, null)); } @Override @@ -201,4 +131,28 @@ public String toString() { sb.append('}'); return sb.toString(); } + + public String getOriginalDestKey() { + return originalDestKey; + } + + public String getPendingPartKey() { + return pendingPartKey; + } + + public Path getPath() { + return path; + } + + public String getBucket() { + return bucket; + } + + public WriteOperationHelper getWriter() { + return writer; + } + + public PutTrackerStatistics getTrackerStatistics() { + return trackerStatistics; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java new file mode 100644 index 0000000000000..f923e4df40972 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths; + +import java.util.List; + +import static org.apache.hadoop.util.Preconditions.checkArgument; + +/** + * Utility class for the class {@link MagicCommitTracker} and its subclasses. + */ +public final class MagicCommitTrackerUtils { + + private MagicCommitTrackerUtils() { + } + + /** + * The magic path is of the following format. + * "s3://bucket-name/table-path/__magic_jobId/job-id/taskAttempt/id/tasks/taskAttemptId" + * So the third child from the "__magic" path will give the task attempt id. + * @param path Path + * @return taskAttemptId + */ + public static String extractTaskAttemptIdFromPath(Path path) { + List elementsInPath = MagicCommitPaths.splitPathToElements(path); + List childrenOfMagicPath = MagicCommitPaths.magicPathChildren(elementsInPath); + + checkArgument(childrenOfMagicPath.size() >= 3, "Magic Path is invalid"); + // 3rd child of the magic path is the taskAttemptId + return childrenOfMagicPath.get(3); + } + + /** + * Is tracking of magic commit data in-memory enabled. + * @param conf Configuration + * @return true if in memory tracking of commit data is enabled. + */ + public static boolean isTrackMagicCommitsInMemoryEnabled(Configuration conf) { + return conf.getBoolean( + CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, + CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 518831b7d4330..a22ad38ea47de 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -18,7 +18,9 @@ package org.apache.hadoop.fs.s3a.commit.magic; +import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -48,8 +50,8 @@ import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TASK_ATTEMPT_ID; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA; import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*; -import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*; import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics; /** @@ -192,23 +194,9 @@ public void commitTask(TaskAttemptContext context) throws IOException { */ private PendingSet innerCommitTask( TaskAttemptContext context) throws IOException { - Path taskAttemptPath = getTaskAttemptPath(context); // load in all pending commits. - CommitOperations actions = getCommitOperations(); - PendingSet pendingSet; + PendingSet pendingSet = loadPendingCommits(context); try (CommitContext commitContext = initiateTaskOperation(context)) { - Pair>> - loaded = actions.loadSinglePendingCommits( - taskAttemptPath, true, commitContext); - pendingSet = loaded.getKey(); - List> failures = loaded.getValue(); - if (!failures.isEmpty()) { - // At least one file failed to load - // revert all which did; report failure with first exception - LOG.error("At least one commit file could not be read: failing"); - abortPendingUploads(commitContext, pendingSet.getCommits(), true); - throw failures.get(0).getValue(); - } // patch in IDs String jobId = getUUID(); String taskId = String.valueOf(context.getTaskAttemptID()); @@ -248,6 +236,80 @@ private PendingSet innerCommitTask( return pendingSet; } + /** + * Loads pending commits from either memory or from the remote store (S3) based on the config. + * @param context TaskAttemptContext + * @return All pending commit data for the given TaskAttemptContext + * @throws IOException + * if there is an error trying to read the commit data + */ + protected PendingSet loadPendingCommits(TaskAttemptContext context) throws IOException { + PendingSet pendingSet = new PendingSet(); + if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) { + // load from memory + List pendingCommits = loadPendingCommitsFromMemory(context); + + for (SinglePendingCommit singleCommit : pendingCommits) { + // aggregate stats + pendingSet.getIOStatistics() + .aggregate(singleCommit.getIOStatistics()); + // then clear so they aren't marshalled again. + singleCommit.getIOStatistics().clear(); + } + pendingSet.setCommits(pendingCommits); + } else { + // Load from remote store + CommitOperations actions = getCommitOperations(); + Path taskAttemptPath = getTaskAttemptPath(context); + try (CommitContext commitContext = initiateTaskOperation(context)) { + Pair>> loaded = + actions.loadSinglePendingCommits(taskAttemptPath, true, commitContext); + pendingSet = loaded.getKey(); + List> failures = loaded.getValue(); + if (!failures.isEmpty()) { + // At least one file failed to load + // revert all which did; report failure with first exception + LOG.error("At least one commit file could not be read: failing"); + abortPendingUploads(commitContext, pendingSet.getCommits(), true); + throw failures.get(0).getValue(); + } + } + } + return pendingSet; + } + + private List loadPendingCommitsFromMemory(TaskAttemptContext context) + throws FileNotFoundException { + String taskAttemptId = String.valueOf(context.getTaskAttemptID()); + // get all the pending commit metadata associated with the taskAttemptId. + // This will also remove the entry from the map. + List pendingCommits = + InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetdadataMap().remove(taskAttemptId); + // get all the path/files associated with the taskAttemptId. + // This will also remove the entry from the map. + List pathsAssociatedWithTaskAttemptId = + InMemoryMagicCommitTracker.getTaskAttemptIdToPath().remove(taskAttemptId); + + // for each of the path remove the entry from map, + // This is done so that there is no memory leak. + if (pathsAssociatedWithTaskAttemptId != null) { + for (Path path : pathsAssociatedWithTaskAttemptId) { + boolean cleared = + InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().remove(path) != null; + LOG.debug("Removing path: {} from the memory isSuccess: {}", path, cleared); + } + } else { + LOG.debug("No paths to remove for taskAttemptId: {}", taskAttemptId); + } + + if (pendingCommits == null || pendingCommits.isEmpty()) { + LOG.info("No commit data present for the taskAttemptId: {} in the memory", taskAttemptId); + return new ArrayList<>(); + } + + return pendingCommits; + } + /** * Abort a task. Attempt load then abort all pending files, * then try to delete the task attempt path. @@ -264,9 +326,14 @@ public void abortTask(TaskAttemptContext context) throws IOException { try (DurationInfo d = new DurationInfo(LOG, "Abort task %s", context.getTaskAttemptID()); CommitContext commitContext = initiateTaskOperation(context)) { - getCommitOperations().abortAllSinglePendingCommits(attemptPath, - commitContext, - true); + if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) { + List pendingCommits = loadPendingCommitsFromMemory(context); + for (SinglePendingCommit singleCommit : pendingCommits) { + commitContext.abortSingleCommit(singleCommit); + } + } else { + getCommitOperations().abortAllSinglePendingCommits(attemptPath, commitContext, true); + } } finally { deleteQuietly( attemptPath.getFileSystem(context.getConfiguration()), diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java new file mode 100644 index 0000000000000..c27abf66bfe22 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ADataBlocks; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; +import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.util.Preconditions; +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; + +/** + * Stores the commit data under the magic path. + */ +public class S3MagicCommitTracker extends MagicCommitTracker { + + public S3MagicCommitTracker(Path path, + String bucket, + String originalDestKey, + String destKey, + String pendingsetKey, + WriteOperationHelper writer, + PutTrackerStatistics trackerStatistics) { + super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics); + } + + @Override + public boolean aboutToComplete(String uploadId, + List parts, + long bytesWritten, + final IOStatistics iostatistics) + throws IOException { + Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId), + "empty/null upload ID: "+ uploadId); + Preconditions.checkArgument(parts != null, + "No uploaded parts list"); + Preconditions.checkArgument(!parts.isEmpty(), + "No uploaded parts to save"); + + // put a 0-byte file with the name of the original under-magic path + // Add the final file length as a header + // this is done before the task commit, so its duration can be + // included in the statistics + Map headers = new HashMap<>(); + headers.put(X_HEADER_MAGIC_MARKER, Long.toString(bytesWritten)); + PutObjectRequest originalDestPut = getWriter().createPutObjectRequest( + getOriginalDestKey(), + 0, + new PutObjectOptions(true, null, headers), false); + upload(originalDestPut, new ByteArrayInputStream(EMPTY)); + + // build the commit summary + SinglePendingCommit commitData = new SinglePendingCommit(); + commitData.touch(System.currentTimeMillis()); + commitData.setDestinationKey(getDestKey()); + commitData.setBucket(getBucket()); + commitData.setUri(getPath().toUri().toString()); + commitData.setUploadId(uploadId); + commitData.setText(""); + commitData.setLength(bytesWritten); + commitData.bindCommitData(parts); + commitData.setIOStatistics( + new IOStatisticsSnapshot(iostatistics)); + + byte[] bytes = commitData.toBytes(SinglePendingCommit.serializer()); + LOG.info("Uncommitted data pending to file {};" + + " commit metadata for {} parts in {}. size: {} byte(s)", + getPath().toUri(), parts.size(), getPendingPartKey(), bytesWritten); + LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}", + getPath(), getPendingPartKey(), commitData); + PutObjectRequest put = getWriter().createPutObjectRequest( + getPendingPartKey(), + bytes.length, null, false); + upload(put, new ByteArrayInputStream(bytes)); + return false; + } + + /** + * PUT an object. + * @param request the request + * @param inputStream input stream of data to be uploaded + * @throws IOException on problems + */ + @Retries.RetryTranslated + private void upload(PutObjectRequest request, InputStream inputStream) throws IOException { + trackDurationOfInvocation(getTrackerStatistics(), COMMITTER_MAGIC_MARKER_PUT.getSymbol(), + () -> getWriter().putObject(request, PutObjectOptions.keepingDirs(), + new S3ADataBlocks.BlockUploadData(inputStream), false, null)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md index fb42d507b2d60..895815444932c 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md @@ -362,6 +362,13 @@ the magic directory path rewriting is enabled by default. The Magic Committer has not been field tested to the extent of Netflix's committer; consider it the least mature of the committers. +When there are less number of files to be written, The Magic committer has an option to store the commit data in-memory which can speed up the TaskCommit operation as well as save S3 cost. This can be enabled by the following property +```xml + + fs.s3a.committer.magic.track.commits.in.memory.enabled + true + +``` ### Which Committer to Use? diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java index 67c88039aad1b..3a7cceb2369ee 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java @@ -82,6 +82,7 @@ import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE; import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_TASKS_SUCCEEDED; +import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -906,7 +907,14 @@ public void testCommitterWithDuplicatedCommit() throws Exception { assertNoMultipartUploadsPending(outDir); // commit task to fail on retry - expectFNFEonTaskCommit(committer, tContext); + // FNFE is not thrown in case of Magic committer when + // in memory commit data is enabled and hence skip the check. + boolean skipExpectFNFE = committer instanceof MagicS3GuardCommitter && + isTrackMagicCommitsInMemoryEnabled(tContext.getConfiguration()); + + if (!skipExpectFNFE) { + expectFNFEonTaskCommit(committer, tContext); + } } /** @@ -1422,7 +1430,10 @@ public void testOutputFormatIntegration() throws Throwable { validateTaskAttemptPathDuringWrite(dest, expectedLength, jobData.getCommitter().getUUID()); recordWriter.close(tContext); // at this point - validateTaskAttemptPathAfterWrite(dest, expectedLength); + // Skip validation when commit data is stored in memory + if (!isTrackMagicCommitsInMemoryEnabled(conf)) { + validateTaskAttemptPathAfterWrite(dest, expectedLength); + } assertTrue("Committer does not have data to commit " + committer, committer.needsTaskCommit(tContext)); commitTask(committer, tContext); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java new file mode 100644 index 0000000000000..a08f8d2d34b70 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/TestMagicCommitTrackerUtils.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.fs.s3a.commit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR; +import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.Before; +import org.junit.Test; + +import static junit.framework.TestCase.assertEquals; +import static org.apache.hadoop.fs.s3a.commit.AbstractCommitITest.randomJobId; + +/** + * Class to test {@link MagicCommitTrackerUtils}. + */ +public final class TestMagicCommitTrackerUtils { + + private String jobId; + private String attemptId; + private TaskAttemptID taskAttemptId; + private static final Path DEST_PATH = new Path("s3://dummyBucket/dummyTable"); + + + @Before + public void setup() throws Exception { + jobId = randomJobId(); + attemptId = "attempt_" + jobId + "_m_000000_0"; + taskAttemptId = TaskAttemptID.forName(attemptId); + } + + @Test + public void testExtractTaskAttemptIdFromPath() { + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl( + new Configuration(), + taskAttemptId); + Path path = CommitUtilsWithMR + .getBaseMagicTaskAttemptPath(taskAttemptContext, "00001", DEST_PATH); + assertEquals("TaskAttemptId didn't match", attemptId, + MagicCommitTrackerUtils.extractTaskAttemptIdFromPath(path)); + + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index fa963a4b97064..d18a4de2c476e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -20,8 +20,11 @@ import java.io.IOException; import java.net.URI; +import java.util.Arrays; +import java.util.Collection; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -39,6 +42,8 @@ import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; @@ -48,8 +53,11 @@ /** * Test the magic committer's commit protocol. */ +@RunWith(Parameterized.class) public class ITestMagicCommitProtocol extends AbstractITCommitProtocol { + private final boolean trackCommitsInMemory; + @Override protected String suitename() { return "ITestMagicCommitProtocol"; @@ -71,6 +79,26 @@ public void setup() throws Exception { CommitUtils.verifyIsMagicCommitFS(getFileSystem()); } + @Parameterized.Parameters(name = "track-commit-in-memory-{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {false}, + {true} + }); + } + + public ITestMagicCommitProtocol(boolean trackCommitsInMemory) { + this.trackCommitsInMemory = trackCommitsInMemory; + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, trackCommitsInMemory); + + return conf; + } + @Override public void assertJobAbortCleanedUp(JobData jobData) throws Exception { From e43fa54b2f57e5b288569a6b930ac2a7362e9db2 Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Mon, 5 Feb 2024 14:32:49 +0530 Subject: [PATCH 2/6] Address PR review commnents --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 2 +- .../magic/InMemoryMagicCommitTracker.java | 19 ++++++++++--------- .../commit/magic/MagicS3GuardCommitter.java | 8 ++++++-- .../commit/magic/S3MagicCommitTracker.java | 19 ++++++++++--------- .../magic/ITestMagicCommitProtocol.java | 2 ++ 5 files changed, 29 insertions(+), 21 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 8e6d7bd27f799..8a01ab42780df 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -52,7 +52,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; -import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -118,6 +117,7 @@ import org.apache.hadoop.fs.s3a.auth.SignerManager; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations; import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; import org.apache.hadoop.fs.s3a.impl.AWSCannedACL; import org.apache.hadoop.fs.s3a.impl.AWSHeaders; import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java index 9ab5eca1fb577..f9bcc1acdc2a0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java @@ -18,6 +18,15 @@ package org.apache.hadoop.fs.s3a.commit.magic; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import software.amazon.awssdk.services.s3.model.CompletedPart; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.WriteOperationHelper; @@ -25,15 +34,7 @@ import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; -import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; -import software.amazon.awssdk.services.s3.model.CompletedPart; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.util.Preconditions; import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index a22ad38ea47de..4aeb12853017e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -278,8 +278,12 @@ protected PendingSet loadPendingCommits(TaskAttemptContext context) throws IOExc return pendingSet; } - private List loadPendingCommitsFromMemory(TaskAttemptContext context) - throws FileNotFoundException { + /** + * Loads the pending commits from the memory data structure for a given taskAttemptId. + * @param context TaskContext + * @return list of pending commits + */ + private List loadPendingCommitsFromMemory(TaskAttemptContext context) { String taskAttemptId = String.valueOf(context.getTaskAttemptID()); // get all the pending commit metadata associated with the taskAttemptId. // This will also remove the entry from the map. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java index c27abf66bfe22..0ab3cee5201e7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java @@ -18,6 +18,16 @@ package org.apache.hadoop.fs.s3a.commit.magic; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import software.amazon.awssdk.services.s3.model.CompletedPart; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Retries; @@ -29,15 +39,6 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; import org.apache.hadoop.util.Preconditions; -import software.amazon.awssdk.services.s3.model.CompletedPart; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.X_HEADER_MAGIC_MARKER; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index d18a4de2c476e..cbfc23a2a29b6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -45,6 +45,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath; @@ -94,6 +95,7 @@ public ITestMagicCommitProtocol(boolean trackCommitsInMemory) { @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); + removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED); conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, trackCommitsInMemory); return conf; From dcf2b129304d93038b9610f853e1907e6edfe924 Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Mon, 12 Feb 2024 12:37:24 +0530 Subject: [PATCH 3/6] Fix import ordering, Test in ITestTerasortOnS3A and other minor fix --- .../apache/hadoop/fs/s3a/S3AFileSystem.java | 7 +-- .../hadoop/fs/s3a/commit/CommitConstants.java | 11 +++- .../fs/s3a/commit/MagicCommitIntegration.java | 4 +- .../magic/InMemoryMagicCommitTracker.java | 54 +++++++++++++------ .../s3a/commit/magic/MagicCommitTracker.java | 16 +++--- .../commit/magic/MagicCommitTrackerUtils.java | 4 +- .../commit/magic/MagicS3GuardCommitter.java | 5 +- .../commit/terasort/ITestTerasortOnS3A.java | 17 ++++-- 8 files changed, 77 insertions(+), 41 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 8a01ab42780df..0e2ae0f74dd0a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -232,6 +232,7 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS; import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME; import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE; @@ -3906,14 +3907,14 @@ public FileStatus getFileStatus(final Path f) throws IOException { // Some downstream apps might call getFileStatus for a magic path to get the file size. // when commit data is stored in memory construct the dummy S3AFileStatus with correct // file size fetched from the memory. - if (InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().containsKey(path)) { - long len = InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().get(path); + if (InMemoryMagicCommitTracker.getPathToBytesWritten().containsKey(path)) { + long len = InMemoryMagicCommitTracker.getPathToBytesWritten().get(path); return new S3AFileStatus(len, 0L, path, getDefaultBlockSize(path), username, - null, + MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME, null); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java index baec79fd79aa4..4f0005509937a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java @@ -58,6 +58,10 @@ private CommitConstants() { */ public static final String PENDINGSET_SUFFIX = ".pendingset"; + /** + * Etag name to be returned on non-committed S3 object: {@value}. + */ + public static final String MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME = "pending"; /** * Prefix to use for config options: {@value}. @@ -242,10 +246,15 @@ private CommitConstants() { */ public static final int DEFAULT_COMMITTER_THREADS = 32; - + /** + * Should Magic committer track all the pending commits in memory? + */ public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED = "fs.s3a.committer.magic.track.commits.in.memory.enabled"; + /** + * Default value for {@link #FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED}: {@value}. + */ public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT = false; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java index a2c11a15c13fe..ba1dd400f6d7b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java @@ -20,14 +20,14 @@ import java.util.List; -import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; -import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; +import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker; import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation; import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java index f9bcc1acdc2a0..992b251f7c0b0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java @@ -45,15 +45,24 @@ */ public class InMemoryMagicCommitTracker extends MagicCommitTracker { - // stores taskAttemptId to commit data mapping - private static Map> - taskAttemptIdToMpuMetdadataMap = new ConcurrentHashMap<>(); - - // stores the path to its length/size mapping - private static Map taskAttemptIdToBytesWritten = new ConcurrentHashMap<>(); - - // stores taskAttemptId to path mapping - private static Map> taskAttemptIdToPath = new ConcurrentHashMap<>(); + /** + * Map to store taskAttemptId, and it's corresponding list of pending commit data. + * The entries in the Map gets removed when a task commits or aborts. + */ + private final static Map> + TASK_ATTEMPT_ID_TO_MPU_METDADATA = new ConcurrentHashMap<>(); + + /** + * Map to store path of the file, and it's corresponding size. + * The entries in the Map gets removed when a task commits or aborts. + */ + private final static Map PATH_TO_BYTES_WRITTEN = new ConcurrentHashMap<>(); + + /** + * Map to store taskAttemptId, and list of paths to files written by it. + * The entries in the Map gets removed when a task commits or aborts. + */ + private final static Map> TASK_ATTEMPT_ID_TO_PATH = new ConcurrentHashMap<>(); public InMemoryMagicCommitTracker(Path path, String bucket, @@ -92,16 +101,16 @@ public boolean aboutToComplete(String uploadId, String taskAttemptId = extractTaskAttemptIdFromPath(getPath()); // store the commit data with taskAttemptId as the key - taskAttemptIdToMpuMetdadataMap.computeIfAbsent(taskAttemptId, + TASK_ATTEMPT_ID_TO_MPU_METDADATA.computeIfAbsent(taskAttemptId, k -> Collections.synchronizedList(new ArrayList<>())).add(commitData); // store the byteswritten(length) for the corresponding file - taskAttemptIdToBytesWritten.put(getPath(), bytesWritten); + PATH_TO_BYTES_WRITTEN.put(getPath(), bytesWritten); // store the mapping between taskAttemptId and path // This information is used for removing entries from // the map once the taskAttempt is completed/committed. - taskAttemptIdToPath.computeIfAbsent(taskAttemptId, + TASK_ATTEMPT_ID_TO_PATH.computeIfAbsent(taskAttemptId, k -> Collections.synchronizedList(new ArrayList<>())).add(getPath()); LOG.info("commit metadata for {} parts in {}. size: {} byte(s) " @@ -113,15 +122,26 @@ public boolean aboutToComplete(String uploadId, return false; } - public static Map> getTaskAttemptIdToMpuMetdadataMap() { - return taskAttemptIdToMpuMetdadataMap; + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "InMemoryMagicCommitTracker{"); + sb.append(", Number of taskAttempts=").append(TASK_ATTEMPT_ID_TO_MPU_METDADATA.size()); + sb.append(", Number of files=").append(PATH_TO_BYTES_WRITTEN.size()); + sb.append('}'); + return sb.toString(); + } + + + public static Map> getTaskAttemptIdToMpuMetdadata() { + return TASK_ATTEMPT_ID_TO_MPU_METDADATA; } - public static Map getTaskAttemptIdToBytesWritten() { - return taskAttemptIdToBytesWritten; + public static Map getPathToBytesWritten() { + return PATH_TO_BYTES_WRITTEN; } public static Map> getTaskAttemptIdToPath() { - return taskAttemptIdToPath; + return TASK_ATTEMPT_ID_TO_PATH; } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java index 2e43966932823..62151658b5aaf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java @@ -41,7 +41,7 @@ * uses any datatype in hadoop-mapreduce. */ @InterfaceAudience.Private -public class MagicCommitTracker extends PutTracker { +public abstract class MagicCommitTracker extends PutTracker { public static final Logger LOG = LoggerFactory.getLogger( MagicCommitTracker.class); @@ -103,22 +103,20 @@ public boolean outputImmediatelyVisible() { /** * Complete operation: generate the final commit data, put it. - * - * @param uploadId Upload ID - * @param parts list of parts + * @param uploadId Upload ID + * @param parts list of parts * @param bytesWritten bytes written * @param iostatistics nullable IO statistics * @return false, indicating that the commit must fail. - * @throws IOException any IO problem. + * @throws IOException any IO problem. * @throws IllegalArgumentException bad argument */ @Override - public boolean aboutToComplete(String uploadId, + public abstract boolean aboutToComplete(String uploadId, List parts, long bytesWritten, - final IOStatistics iostatistics) throws IOException { - return false; - } + IOStatistics iostatistics) + throws IOException; @Override public String toString() { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java index f923e4df40972..2ceac1c8e03de 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java @@ -18,13 +18,13 @@ package org.apache.hadoop.fs.s3a.commit.magic; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths; -import java.util.List; - import static org.apache.hadoop.util.Preconditions.checkArgument; /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 4aeb12853017e..0338acffed879 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.s3a.commit.magic; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -288,7 +287,7 @@ private List loadPendingCommitsFromMemory(TaskAttemptContex // get all the pending commit metadata associated with the taskAttemptId. // This will also remove the entry from the map. List pendingCommits = - InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetdadataMap().remove(taskAttemptId); + InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetdadata().remove(taskAttemptId); // get all the path/files associated with the taskAttemptId. // This will also remove the entry from the map. List pathsAssociatedWithTaskAttemptId = @@ -299,7 +298,7 @@ private List loadPendingCommitsFromMemory(TaskAttemptContex if (pathsAssociatedWithTaskAttemptId != null) { for (Path path : pathsAssociatedWithTaskAttemptId) { boolean cleared = - InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().remove(path) != null; + InMemoryMagicCommitTracker.getPathToBytesWritten().remove(path) != null; LOG.debug("Removing path: {} from the memory isSuccess: {}", path, cleared); } } else { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java index d28ee5172b632..be52220833784 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java @@ -44,6 +44,7 @@ import org.apache.hadoop.examples.terasort.TeraValidate; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter; import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; import org.apache.hadoop.mapred.JobConf; @@ -97,6 +98,9 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest { /** Name of the committer for this run. */ private final String committerName; + /** Should Magic committer track pending commits in-memory. */ + private final boolean trackCommitsInMemory; + /** Base path for all the terasort input and output paths. */ private Path terasortPath; @@ -117,12 +121,14 @@ public class ITestTerasortOnS3A extends AbstractYarnClusterITest { @Parameterized.Parameters(name = "{0}") public static Collection params() { return Arrays.asList(new Object[][]{ - {DirectoryStagingCommitter.NAME}, - {MagicS3GuardCommitter.NAME}}); + {DirectoryStagingCommitter.NAME, false}, + {MagicS3GuardCommitter.NAME, false}, + {MagicS3GuardCommitter.NAME, true}}); } - public ITestTerasortOnS3A(final String committerName) { + public ITestTerasortOnS3A(final String committerName, final boolean trackCommitsInMemory) { this.committerName = committerName; + this.trackCommitsInMemory = trackCommitsInMemory; } @Override @@ -152,6 +158,9 @@ protected void applyCustomConfigOptions(JobConf conf) { conf.setBoolean( TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false); + conf.setBoolean( + CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, + trackCommitsInMemory); } private int getExpectedPartitionCount() { @@ -173,7 +182,7 @@ protected int getRowCount() { */ private void prepareToTerasort() { // small sample size for faster runs - terasortPath = new Path("/terasort-" + committerName) + terasortPath = new Path("/terasort-" + committerName + "-" + trackCommitsInMemory) .makeQualified(getFileSystem()); sortInput = new Path(terasortPath, "sortin"); sortOutput = new Path(terasortPath, "sortout"); From 85361f11580b1b3099aa67af2ccd6687b696b756 Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Mon, 26 Feb 2024 13:41:45 +0530 Subject: [PATCH 4/6] HADOOP-19047: Support InMemory Tracking Of S3A Magic Commits --- .../org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java index d1943fa47773f..4cbffef521df5 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java @@ -584,7 +584,7 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile, destKey, uploadId, partNumber, - size).build(); + size).build();x // Read from the file input stream at current position. RequestBody body = RequestBody.fromInputStream(fileStream, size); UploadPartResponse response = writeOperations.uploadPart(part, body, statistics); From f8e9a1ed3a5cb57247b63199a19a8fd36567faf5 Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Tue, 26 Mar 2024 15:04:49 +0530 Subject: [PATCH 5/6] Fix PR comments --- .../s3a/commit/magic/InMemoryMagicCommitTracker.java | 11 +++++------ .../fs/s3a/commit/magic/MagicS3GuardCommitter.java | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java index 992b251f7c0b0..8e36b1e485ef7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java @@ -49,8 +49,7 @@ public class InMemoryMagicCommitTracker extends MagicCommitTracker { * Map to store taskAttemptId, and it's corresponding list of pending commit data. * The entries in the Map gets removed when a task commits or aborts. */ - private final static Map> - TASK_ATTEMPT_ID_TO_MPU_METDADATA = new ConcurrentHashMap<>(); + private final static Map> TASK_ATTEMPT_ID_TO_MPU_METADATA = new ConcurrentHashMap<>(); /** * Map to store path of the file, and it's corresponding size. @@ -101,7 +100,7 @@ public boolean aboutToComplete(String uploadId, String taskAttemptId = extractTaskAttemptIdFromPath(getPath()); // store the commit data with taskAttemptId as the key - TASK_ATTEMPT_ID_TO_MPU_METDADATA.computeIfAbsent(taskAttemptId, + TASK_ATTEMPT_ID_TO_MPU_METADATA.computeIfAbsent(taskAttemptId, k -> Collections.synchronizedList(new ArrayList<>())).add(commitData); // store the byteswritten(length) for the corresponding file @@ -126,15 +125,15 @@ public boolean aboutToComplete(String uploadId, public String toString() { final StringBuilder sb = new StringBuilder( "InMemoryMagicCommitTracker{"); - sb.append(", Number of taskAttempts=").append(TASK_ATTEMPT_ID_TO_MPU_METDADATA.size()); + sb.append(", Number of taskAttempts=").append(TASK_ATTEMPT_ID_TO_MPU_METADATA.size()); sb.append(", Number of files=").append(PATH_TO_BYTES_WRITTEN.size()); sb.append('}'); return sb.toString(); } - public static Map> getTaskAttemptIdToMpuMetdadata() { - return TASK_ATTEMPT_ID_TO_MPU_METDADATA; + public static Map> getTaskAttemptIdToMpuMetadata() { + return TASK_ATTEMPT_ID_TO_MPU_METADATA; } public static Map getPathToBytesWritten() { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java index 0338acffed879..5ed1a3abd4645 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java @@ -287,7 +287,7 @@ private List loadPendingCommitsFromMemory(TaskAttemptContex // get all the pending commit metadata associated with the taskAttemptId. // This will also remove the entry from the map. List pendingCommits = - InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetdadata().remove(taskAttemptId); + InMemoryMagicCommitTracker.getTaskAttemptIdToMpuMetadata().remove(taskAttemptId); // get all the path/files associated with the taskAttemptId. // This will also remove the entry from the map. List pathsAssociatedWithTaskAttemptId = From 8d739beced49d1469593671cd7412b4662ca2f04 Mon Sep 17 00:00:00 2001 From: Syed Shameerur Rahman Date: Tue, 26 Mar 2024 17:08:28 +0530 Subject: [PATCH 6/6] remove unwanted changes --- .../org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java index 4cbffef521df5..d1943fa47773f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java @@ -584,7 +584,7 @@ public SinglePendingCommit uploadFileToPendingCommit(File localFile, destKey, uploadId, partNumber, - size).build();x + size).build(); // Read from the file input stream at current position. RequestBody body = RequestBody.fromInputStream(fileStream, size); UploadPartResponse response = writeOperations.uploadPart(part, body, statistics);