8000 HADOOP-18296. Memory fragmentation in ChecksumFileSystem readVectored() by steveloughran · Pull Request #7732 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HADOOP-18296. Memory fragmentation in ChecksumFileSystem readVectored() #7732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: branch-3.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -57,6 +58,7 @@
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.io.Sizes.S_0;

/****************************************************************
* Abstract Checksumed FileSystem.
Expand Down Expand Up @@ -101,6 +103,14 @@ public void setVerifyChecksum(boolean verifyChec 8000 ksum) {
this.verifyChecksum = verifyChecksum;
}

/**
* Is checksum verification enabled?
* @return true if files are to be verified through checksums.
*/
public boolean getVerifyChecksum() {
return verifyChecksum;
}

@Override
public void setWriteChecksum(boolean writeChecksum) {
this.writeChecksum = writeChecksum;
Expand Down Expand Up @@ -165,7 +175,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {

/*******************************************************
* For open()'s FSInputStream
* It verifies that data matches checksums.
* It verifies that data matches checksums iff the data
* file has matching checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource, StreamCapabilities {
Expand Down Expand Up @@ -426,8 +437,18 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
return data;
}

/**
* Turn off range merging to make buffer recycling more likely (but not guaranteed).
* @return 0, always
*/
@Override
public int maxReadSizeForVectorReads() {
return S_0;
}

/**
* Vectored read.
* <p>
* If the file has no checksums: delegate to the underlying stream.
* If the file is checksummed: calculate the checksum ranges as
* well as the data ranges, read both, and validate the checksums
Expand All @@ -448,10 +469,12 @@ public void readVectored(final List<? extends FileRange> ranges,
final Consumer<ByteBuffer> release) throws IOException {

// If the stream doesn't have checksums, just delegate.
if (sums == null) {
if (delegateVectorReadsToInner()) {
LOG.debug("No checksums for vectored read, delegating to inner stream");
datas.readVectored(ranges, allocate);
return;
}
LOG.debug("Checksum vectored read for {} ranges", ranges.size());
final long length = getFileLength();
final List<? extends FileRange> sorted = validateAndSortRanges(ranges,
Optional.of(length));
Expand Down Expand Up @@ -489,9 +512,27 @@ public void readVectored(final List<? extends FileRange> ranges,
}
}

private boolean delegateVectorReadsToInner() {
return sums == null;
}

/**
* For this stream, declare that range merging may take place;
* otherwise delegate to the inner stream.
* @param capability string to query the stream support for.
* @return true for sliced vector IO if checksum validation
* is taking place. False if no checksums are present for the validation.
* For all other probes: pass to the wrapped stream
*/
@Override
public boolean hasCapability(String capability) {
return datas.hasCapability(capability);
switch (capability.toLowerCase(Locale.ENGLISH)) {
// slicing can take place during coalescing and checksumming
case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED:
return !delegateVectorReadsToInner();
default:
return datas.hasCapability(capability);
}
}
}

Expand Down Expand Up @@ -1142,6 +1183,9 @@ public boolean hasPathCapability(final Path path, final String capability)
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
return false;
case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED:
return getVerifyChecksum();

default:
return super.hasPathCapability(p, capability);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;

import static org.apache.hadoop.fs.local.LocalConfigKeys.LOCAL_FS_VERIFY_CHECKSUM;

/****************************************************************
* Implement the FileSystem API for the checksumed local filesystem.
*
Expand All @@ -50,6 +52,10 @@ public void initialize(URI name, Configuration conf) throws IOException {
if (!scheme.equals(fs.getUri().getScheme())) {
swapScheme = scheme;
}
final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true);
setVerifyChecksum(checksum);
LOG.debug("Checksum verification enabled={}", checksum);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ public class LocalFileSystemConfigKeys extends CommonConfigurationKeys {
public static final String LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_KEY =
"file.client-write-packet-size";
public static final int LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;

}

Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
import static org.apache.hadoop.fs.VectoredReadUtils.hasVectorIOCapability;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
Expand Down Expand Up @@ -293,15 +294,12 @@ public FileDescriptor getFileDescriptor() throws IOException {

@Override
public boolean hasCapability(String capability) {
// a bit inefficient, but intended to make it easier to add
// new capabilities.
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
case StreamCapabilities.VECTOREDIO:
return true;
default:
return false;
return hasVectorIOCapability(capability);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually change the logic? It seems like hasVectorIOCapability checks StreamCapabilities.VECTOREDIO, just like the old code did.

}
}

Expand Down Expand Up @@ -400,7 +398,9 @@ private void initiateRead() {
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
buffers[i] = allocateRelease.getBuffer(false, range.getLength());
channel.read(buffers[i], range.getOffset(), i, this);
final ByteBuffer buffer = buffers[i];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for this refactoring and making it final?

LOG.debug("Reading file range {} into buffer {}", range, System.identityHashCode(buffer));
channel.read(buffer, range.getOffset(), i, this);
}
}

Expand All @@ -416,6 +416,8 @@ private void initiateRead() {
public void completed(Integer result, Integer rangeIndex) {
FileRange range = ranges.get(rangeIndex);
ByteBuffer buffer = buffers[rangeIndex];
LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}",
range, System.identityHashCode(buffer), result, buffer.remaining());
if (result == -1) {
// no data was read back.
failed(new EOFException("Read past End of File"), rangeIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ public interface StreamCapabilities {
*/
String VECTOREDIO = "in:readvectored";

/**
* When performing vectored IO operations, are the buffers returned by readVectored()
* potentially sliced subsets of buffers allocated by the allocate() function
* passed in the read requests?
* If true, this means that the returned buffers may be sliced subsets of the
* allocated buffers.
*/
String VECTOREDIO_BUFFERS_SLICED = "fs.capability.vectoredio.sliced";

/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
* This matches the Path Capability
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand Down Expand Up @@ -476,6 +477,22 @@ public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset,
return readData;
}

/**
* Default vector IO probes.
* These are capabilities which streams that leave vector IO
* to the default methods should return when queried for vector capabilities.
* @param capability capability to probe for.
* @return true if the given capability holds for vectored IO features.
*/
public static boolean hasVectorIOCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.VECTOREDIO:
return true;
default:
return false;
}
}

/**
* private constructor.
*/
Expand Down
Loading
0