8000 HADOOP-19047: Support InMemory Tracking Of S3A Magic Commits by shameersss1 · Pull Request #6468 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HADOOP-19047: Support InMemory Tracking Of S3A Magic Commits #6468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AWSCannedACL;
import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
Expand Down Expand Up @@ -231,6 +232,8 @@
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
Expand Down Expand Up @@ -3900,6 +3903,21 @@ public void access(final Path f, final FsAction mode)
@Retries.RetryTranslated
public FileStatus getFileStatus(final Path f) throws IOException {
Path path = qualify(f);
if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit of a hack. not saying that's bad, just wondering if there is a more elegant solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this an ugly hack. I couldn't find any better alternative. This will be used by downstream application like Spark which wants to get the file size of the file written by the task. This is supposed to be used in the same process which writes the file/initiated and upload the MPU.

// Some downstream apps might call getFileStatus for a magic path to get the file size.
// when commit data is stored in memory construct the dummy S3AFileStatus with correct
// file size fetched from the memory.
if (InMemoryMagicCommitTracker.getPathToBytesWritten().containsKey(path)) {
long len = InMemoryMagicCommitTracker.getPathToBytesWritten().get(path);
return new S3AFileStatus(len,
0L,
path,
getDefaultBlockSize(path),
username,
MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME,
null);
}
}
return trackDurationAndSpan(
INVOCATION_GET_FILE_STATUS, path, () ->
innerGetFileStatus(path, false, StatusProbeEnum.ALL));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ private CommitConstants() {
*/
public static final String PENDINGSET_SUFFIX = ".pendingset";

/**
* Etag name to be returned on non-committed S3 object: {@value}.
*/
public static final String MAGIC_COMMITTER_PENDING_OBJECT_ETAG_NAME = "pending";

/**
* Prefix to use for config options: {@value}.
Expand Down Expand Up @@ -242,6 +246,18 @@ private CommitConstants() {
*/
public static final int DEFAULT_COMMITTER_THREADS = 32;

/**
* Should Magic committer track all the pending commits in memory?
*/
public static final String FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED =
"fs.s3a.committer.magic.track.commits.in.memory.enabled";

/**
* Default value for {@link #FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED}: {@value}.
*/
public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
false;

/**
* Path in the cluster filesystem for temporary data: {@value}.
* This is for HDFS, not the local filesystem.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.S3MagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;

import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;

/**
* Adds the code needed for S3A to support magic committers.
Expand Down Expand Up @@ -105,13 +107,15 @@ public PutTracker createTracker(Path path, String key,
String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
getStoreContext().incrementStatistic(
Statistic.COMMITTER_MAGIC_FILES_CREATED);
tracker = new MagicCommitTracker(path,
getStoreContext().getBucket(),
key,
destKey,
pendingsetPath,
owner.getWriteOperationHelper(),
trackerStatistics);
if (isTrackMagicCommitsInMemoryEnabled(getStoreContext().getConfiguration())) {
tracker = new InMemoryMagicCommitTracker(path, getStoreContext().getBucket(),
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
trackerStatistics);
} else {
tracker = new S3MagicCommitTracker(path, getStoreContext().getBucket(),
key, destKey, pendingsetPath, owner.getWriteOperationHelper(),
trackerStatistics);
}
LOG.debug("Created {}", tracker);
} else {
LOG.warn("File being created has a \"magic\" path, but the filesystem"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.commit.magic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import software.amazon.awssdk.services.s3.model.CompletedPart;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.util.Preconditions;

import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath;

/**
* InMemoryMagicCommitTracker stores the commit data in memory.
* The commit data and related data stores are flushed out from
* the memory when the task is committed or aborted.
*/
public class InMemoryMagicCommitTracker extends MagicCommitTracker {

/**
* Map to store taskAttemptId, and it's corresponding list of pending commit data.
* The entries in the Map gets removed when a task commits or aborts.
*/
private final static Map<String, List<SinglePendingCommit>> TASK_ATTEMPT_ID_TO_MPU_METADATA = new ConcurrentHashMap<>();

/**
* Map to store path of the file, and it's corresponding size.
* The entries in the Map gets removed when a task commits or aborts.
*/
private final static Map<Path, Long> PATH_TO_BYTES_WRITTEN = new ConcurrentHashMap<>();

/**
* Map to store taskAttemptId, and list of paths to files written by it.
* The entries in the Map gets removed when a task commits or aborts.
*/
private final static Map<String, List<Path>> TASK_ATTEMPT_ID_TO_PATH = new ConcurrentHashMap<>();

public InMemoryMagicCommitTracker(Path path,
String bucket,
String originalDestKey,
String destKey,
String pendingsetKey,
WriteOperationHelper writer,
PutTrackerStatistics trackerStatistics) {
super(path, bucket, originalDestKey, destKey, pendingsetKey, writer, trackerStatistics);
}

@Override
public boolean aboutToComplete(String uploadId,
List<CompletedPart> parts,
long bytesWritten,
final IOStatistics iostatistics)
throws IOException {
Preconditions.checkArgument(StringUtils.isNotEmpty(uploadId),
"empty/null upload ID: " + uploadId);
Preconditions.checkArgument(parts != null, "No uploaded parts list");
Preconditions.checkArgument(!parts.isEmpty(), "No uploaded parts to save");

// build the commit summary
SinglePendingCommit commitData = new SinglePendingCommit();
commitData.touch(System.currentTimeMillis());
commitData.setDestinationKey(getDestKey());
commitData.setBucket(getBucket());
commitData.setUri(getPath().toUri().toString());
commitData.setUploadId(uploadId);
commitData.setText("");
commitData.setLength(bytesWritten);
commitData.bindCommitData(parts);
commitData.setIOStatistics(new IOStatisticsSnapshot(iostatistics));

// extract the taskAttemptId from the path
String taskAttemptId = extractTaskAttemptIdFromPath(getPath());

// store the commit data with taskAttemptId as the key
TASK_ATTEMPT_ID_TO_MPU_METADATA.computeIfAbsent(taskAttemptId,
k -> Collections.synchronizedList(new ArrayList<>())).add(commitData);

// store the byteswritten(length) for the corresponding file
PATH_TO_BYTES_WRITTEN.put(getPath(), bytesWritten);

// store the mapping between taskAttemptId and path
// This information is used for removing entries from
// the map once the taskAttempt is completed/committed.
TASK_ATTEMPT_ID_TO_PATH.computeIfAbsent(taskAttemptId,
k -> Collections.synchronizedList(new ArrayList<>())).add(getPath());

LOG.info("commit metadata for {} parts in {}. size: {} byte(s) "
+ "for the taskAttemptId: {} is stored in memory",
parts.size(), getPendingPartKey(), bytesWritten, taskAttemptId);
LOG.debug("Closed MPU to {}, saved commit information to {}; data=:\n{}",
getPath(), getPendingPartKey(), commitData);

return false;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"InMemoryMagicCommitTracker{");
sb.append(", Number of taskAttempts=").append(TASK_ATTEMPT_ID_TO_MPU_METADATA.size());
sb.append(", Number of files=").append(PATH_TO_BYTES_WRITTEN.size());
sb.append('}');
return sb.toString();
}


public static Map<String, List<SinglePendingCommit>> getTaskAttemptIdToMpuMetadata() {
return TASK_ATTEMPT_ID_TO_MPU_METADATA;
}

public static Map<Path, Long> getPathToBytesWritten() {
return PATH_TO_BYTES_WRITTEN;
}

public static Map<String, List<Path>> getTaskAttemptIdToPath() {
return TASK_ATTEMPT_ID_TO_PATH;
}
}
Loading
0