From 86d6bacafd412602e80e5085f33e906aa40d5a4c Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 9 Jun 2025 19:11:32 +0100 Subject: [PATCH 1/7] HADOOP-18296. Memory fragmentation in ChecksumFileSystem readVectored() --- .../apache/hadoop/fs/ChecksumFileSystem.java | 50 +++- .../org/apache/hadoop/fs/LocalFileSystem.java | 6 + .../hadoop/fs/LocalFileSystemConfigKeys.java | 7 + .../apache/hadoop/fs/RawLocalFileSystem.java | 12 +- .../apache/hadoop/fs/StreamCapabilities.java | 9 + .../apache/hadoop/fs/VectoredReadUtils.java | 17 ++ .../fs/impl/TrackingByteBufferPool.java | 229 ++++++++++++++++++ .../main/java/org/apache/hadoop/io/Sizes.java | 3 + .../markdown/filesystem/fsdatainputstream.md | 46 +++- .../viewfs/TestViewFileSystemDelegation.java | 3 +- 10 files changed, 370 insertions(+), 12 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 45b3f90feaacf..7d204699091d1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -57,6 +58,7 @@ import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; +import static org.apache.hadoop.io.Sizes.S_0; /**************************************************************** * Abstract Checksumed FileSystem. @@ -101,6 +103,14 @@ public void setVerifyChecksum(boolean verifyChecksum) { this.verifyChecksum = verifyChecksum; } + /** + * Is checksum verification enabled? + * @return true if files are to be verified through checksums. + */ + public boolean getVerifyChecksum() { + return verifyChecksum; + } + @Override public void setWriteChecksum(boolean writeChecksum) { this.writeChecksum = writeChecksum; @@ -165,7 +175,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) { /******************************************************* * For open()'s FSInputStream - * It verifies that data matches checksums. + * It verifies that data matches checksums iff the data + * file has matching checksums. *******************************************************/ private static class ChecksumFSInputChecker extends FSInputChecker implements IOStatisticsSource, StreamCapabilities { @@ -426,8 +437,18 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, return data; } + /** + * Turn off range merging to make buffer recycling more likely (but not guaranteed). + * @return 0, always + */ + @Override + public int maxReadSizeForVectorReads() { + return S_0; + } + /** * Vectored read. + *

* If the file has no checksums: delegate to the underlying stream. * If the file is checksummed: calculate the checksum ranges as * well as the data ranges, read both, and validate the checksums @@ -448,10 +469,12 @@ public void readVectored(final List ranges, final Consumer release) throws IOException { // If the stream doesn't have checksums, just delegate. - if (sums == null) { + if (delegateVectorReadsToInner()) { + LOG.debug("No checksums for vectored read, delegating to inner stream"); datas.readVectored(ranges, allocate); return; } + LOG.debug("Checksum vectored read for {} ranges", ranges.size()); final long length = getFileLength(); final List sorted = validateAndSortRanges(ranges, Optional.of(length)); @@ -489,9 +512,27 @@ public void readVectored(final List ranges, } } + private boolean delegateVectorReadsToInner() { + return sums == null; + } + + /** + * For this stream, declare that range merging may take place; + * otherwise delegate to the inner stream. + * @param capability string to query the stream support for. + * @return true for sliced vector IO if checksum validation + * is taking place. False if no checksums are present for the validation. + * For all other probes: pass to the wrapped stream + */ @Override public boolean hasCapability(String capability) { - return datas.hasCapability(capability); + switch (capability.toLowerCase(Locale.ENGLISH)) { + // slicing can take place during coalescing and checksumming + case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED: + return !delegateVectorReadsToInner(); + default: + return datas.hasCapability(capability); + } } } @@ -1142,6 +1183,9 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.FS_APPEND: case CommonPathCapabilities.FS_CONCAT: return false; + case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED: + return getVerifyChecksum(); + default: return super.hasPathCapability(p, capability); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java index 590cbd9a49ece..da651ebb1f864 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.LocalFileSystemConfigKeys.LOCAL_FS_VERIFY_CHECKSUM; + /**************************************************************** * Implement the FileSystem API for the checksumed local filesystem. * @@ -50,6 +52,10 @@ public void initialize(URI name, Configuration conf) throws IOException { if (!scheme.equals(fs.getUri().getScheme())) { swapScheme = scheme; } + final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true); + setVerifyChecksum(checksum); + LOG.debug("Checksum verification enabled: {}", checksum); + } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java index 90f9cf282e016..74843e5ae2868 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java @@ -42,5 +42,12 @@ public class LocalFileSystemConfigKeys extends CommonConfigurationKeys { public static final String LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_KEY = "file.client-write-packet-size"; public static final int LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; + + /** + * Verify checksums on read -default is true. + *

+ * {@value}. + */ + public static final String LOCAL_FS_VERIFY_CHECKSUM = "file.verify-checksum"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index d5f545b460d7e..c7a84c365adba 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -72,6 +72,7 @@ import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; +import static org.apache.hadoop.fs.VectoredReadUtils.hasVectorIOCapability; import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList; import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -293,15 +294,12 @@ public FileDescriptor getFileDescriptor() throws IOException { @Override public boolean hasCapability(String capability) { - // a bit inefficient, but intended to make it easier to add - // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.IOSTATISTICS_CONTEXT: - case StreamCapabilities.VECTOREDIO: return true; default: - return false; + return hasVectorIOCapability(capability); } } @@ -400,7 +398,9 @@ private void initiateRead() { for(int i = 0; i < ranges.size(); ++i) { FileRange range = ranges.get(i); buffers[i] = allocateRelease.getBuffer(false, range.getLength()); - channel.read(buffers[i], range.getOffset(), i, this); + final ByteBuffer buffer = buffers[i]; + LOG.debug("Reading file range {} into buffer {}", range, System.identityHashCode(buffer)); + channel.read(buffer, range.getOffset(), i, this); } } @@ -416,6 +416,8 @@ private void initiateRead() { public void completed(Integer result, Integer rangeIndex) { FileRange range = ranges.get(rangeIndex); ByteBuffer buffer = buffers[rangeIndex]; + LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}" + , range, System.identityHashCode(buffer), result, buffer.remaining()); if (result == -1) { // no data was read back. failed(new EOFException("Read past End of File"), rangeIndex); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 93ed57ef83057..eee99d305b9c2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -86,6 +86,15 @@ public interface StreamCapabilities { */ String VECTOREDIO = "in:readvectored"; + /** + * When performing vectored IO operations, are the buffers returned by readVectored() + * potentially sliced subsets of buffers allocated by the allocate() function + * passed in the read requests? + * If true, this means that the returned buffers may be sliced subsets of the + * allocated buffers. + */ + String VECTOREDIO_BUFFERS_SLICED = "fs.capability.vectoredio.sliced"; + /** * Stream abort() capability implemented by {@link Abortable#abort()}. * This matches the Path Capability diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 6adcba39a3fa5..a4c2d69b63bc9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -476,6 +477,22 @@ public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset, return readData; } + /** + * Default vector IO probes. + * These are capabilities which streams that leave vector IO + * to the default methods should return when queried for vector capabilities. + * @param capability capability to probe for. + * @return true if the given capability holds for vectored IO features. + */ + public static boolean hasVectorIOCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.VECTOREDIO: + return true; + default: + return false; + } + } + /** * private constructor. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java new file mode 100644 index 0000000000000..bc3792a5c5e7a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.fs.impl; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.io.ByteBufferPool; + +/** + * A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers are released. It + * throws the related exception at {@link #close()} if any buffer remains un-released. It also clears the buffers at + * release so if they continued being used it'll generate errors. + *

+ * To be used for testing only. + *

+ * The stacktraces of the allocation are not stored by default because it significantly decreases the unit test + * execution performance. Configuring this class to log at DEBUG will trigger their collection. + * @see ByteBufferAllocationStacktraceException + *

+ * Adapted from Parquet class {@code org.apache.parquet.bytes.TrackingByteBufferAllocator}. + */ +public final class TrackingByteBufferPool implements ByteBufferPool, AutoCloseable { + + /** + + */ + private static final boolean DEBUG = true; + private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferPool.class); + + /** + * Wrap an existing allocator with this tracking allocator. + * @param allocator allocator to wrap. + * @return a new allocator. + */ + public static TrackingByteBufferPool wrap(ByteBufferPool allocator) { + return new TrackingByteBufferPool(allocator); + } + + /** + * Key for the tracker map. + */ + private static class Key { + + private final int hashCode; + private final ByteBuffer buffer; + + Key(ByteBuffer buffer) { + hashCode = System.identityHashCode(buffer); + this.buffer = buffer; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Key key = (Key) o; + return this.buffer == key.buffer; + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + return buffer.toString(); + } + } + + public static class LeakDetectorHeapByteBufferPoolException extends RuntimeException { + + private LeakDetectorHeapByteBufferPoolException(String msg) { + super(msg); + } + + private LeakDetectorHeapByteBufferPoolException(String msg, Throwable cause) { + super(msg, cause); + } + + private LeakDetectorHeapByteBufferPoolException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + } + + /** + * Strack trace of allocation as saved in the tracking map. + */ + public static final class ByteBufferAllocationStacktraceException + extends LeakDetectorHeapByteBufferPoolException { + + private static final ByteBufferAllocationStacktraceException WITHOUT_STACKTRACE = + new ByteBufferAllocationStacktraceException(false); + + /** + * Create a stack trace for the map, either using the shared static one + * or a dynamically created one. + * @return a stack + */ + private static ByteBufferAllocationStacktraceException create() { + return LOG.isDebugEnabled() + ? new ByteBufferAllocationStacktraceException() + : WITHOUT_STACKTRACE; + } + + private ByteBufferAllocationStacktraceException() { + super("Allocation stacktrace of the first ByteBuffer:"); + } + + private ByteBufferAllocationStacktraceException(boolean unused) { + super( + "Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for full stack traces", + null, + false, + false); + } + } + + /** + * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} if the + * buffer to release was not in the hash map. + */ + public static final class ReleasingUnallocatedByteBufferException extends LeakDetectorHeapByteBufferPoolException { + + private ReleasingUnallocatedByteBufferException() { + super("Releasing a ByteBuffer instance that is not allocated by this buffer pool or already been released"); + } + } + + /** + * Exception raised in {@link TrackingByteBufferPool#close()} if there was an unreleased buffer. + */ + public static class LeakedByteBufferException extends LeakDetectorHeapByteBufferPoolException { + + private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceException e) { + super(count + " ByteBuffer object(s) is/are remained unreleased after closing this buffer pool.", e); + } + } + + /** + * Tracker of allocations. + *

+ * The key maps by the object id of the buffer, and refers to either a common stack trace + * or one dynamically created for each allocation. + */ + private final Map allocated = new HashMap<>(); + + /** + * Wrapped buffer pool. + */ + private final ByteBufferPool allocator; + + /** + * private constructor. + * @param allocator pool allocator. + */ + private TrackingByteBufferPool(ByteBufferPool allocator) { + this.allocator = allocator; + } + + @Override + public ByteBuffer getBuffer(final boolean direct, final int size) { + ByteBuffer buffer = allocator.getBuffer(direct, size); + final ByteBufferAllocationStacktraceException ex = ByteBufferAllocationStacktraceException.create(); + final Key key = new Key(buffer); + allocated.put(key, ex); + LOG.debug("Creating ByteBuffer:{} size {} {}", key.hashCode(), size, buffer, ex); + return buffer; + } + + @Override + public void putBuffer(ByteBuffer b) throws ReleasingUnallocatedByteBufferException { + Objects.requireNonNull(b); + final Key key = new Key(b); + LOG.debug("Releasing ByteBuffer: {}: {}", key.hashCode(), b); + if (allocated.remove(key) == null) { + throw new ReleasingUnallocatedByteBufferException(); + } + allocator.putBuffer(b); + // Clearing the buffer so subsequent access would probably generate errors + b.clear(); + } + + /** + * Expect all buffers to be released -if not, log unreleased ones + * and then raise an exception with the stack trace of the first + * unreleased buffer. + * @throws LeakedByteBufferException if at least one was unsued. + */ + @Override + public void close() throws LeakedByteBufferException { + if (!allocated.isEmpty()) { + allocated.keySet().forEach(key -> + LOG.warn("Unreleased ByteBuffer {}; {}", key.hashCode(), key)); + LeakedByteBufferException ex = new LeakedByteBufferException( + allocated.size(), + allocated.values().iterator().next()); + allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd + throw ex; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java index bf2dc78741f51..7bfce520910c5 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java @@ -31,6 +31,9 @@ @InterfaceStability.Evolving public final class Sizes { + /** 0 bytes: {@value}. Here to make it easy to find use of zero in constants. */ + public static final int S_0 = 0; + /** 2^8 bytes: {@value}. */ public static final int S_256 = 256; diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 92ef696db7a24..828b5ff59a1c5 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -647,7 +647,7 @@ For details see [HADOOP-19291](https://issues.apache.org/jira/browse/HADOOP-1929 For reliable use with older hadoop releases with the API: sort the list of ranges and check for overlaps before calling `readVectored()`. -*Direct Buffer Reads* +#### Direct Buffer Reads Releases without [HADOOP-19101](https://issues.apache.org/jira/browse/HADOOP-19101) _Vectored Read into off-heap buffer broken in fallback implementation_ can read data @@ -665,8 +665,48 @@ support through an explicit `hasCapability()` probe: Stream.hasCapability("in:readvectored") ``` -Given the HADOOP-18296 problem with `ChecksumFileSystem` and direct buffers, across all releases, -it is best to avoid using this API in production with direct buffers. +#### Buffer Slicing + +[HADOOP-18296](https://issues.apache.org/jira/browse/HADOOP-18296), +_Memory fragmentation in ChecksumFileSystem Vectored IO implementation_ +highlights that `ChecksumFileSystem` (which the default implementation of `file://` +subclasses), may return buffers which are sliced subsets of buffers allocated +through the `allocate()` function passed in. + +This will happen during reads with and without range coalescing. + +Checksum verification may be disabled by setting the option +`file.verify-checksum` to true (Hadoop 3.4.2 and later). + +```xml + + file.verify-checksum + false + +``` + +(As you would expect, disabling checksum verification means that errors +reading data may not be detected during the read operation. +Use with care in production.) + +Filesystem instances which spit buffersduring vector read operations +MUST declare this by returning `true` +to the path capabilities probe `fs.capability.vectoredio.sliced`, +and for the open stream in its `hasCapability()` method. + + +The local filesystem will not slice buffers if the checksum file +of `filename + ".crc"` is not found. This is not declared in the +filesystem `hasPathCapability(filename, "fs.capability.vectoredio.sliced")` +call, as no checks for the checksum file are made then.ddddddddddddddddd +This cannot be relied on in production, but it may be useful when +testing for buffer recycling with Hadoop releases 3.4.1 and earlier. + +*Implementors Notes* + +* Don't slice buffers. `ChecksumFileSystem` has to be considered an outlier which + needs to be addressed in future. +* Always free buffers in error handling code paths. ## `void readVectored(List ranges, IntFunction allocate, Consumer release)` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java index 3a60d6ecdda94..fc9b128178672 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java @@ -157,7 +157,8 @@ public URI getUri() { public void setVerifyChecksum(boolean verifyChecksum) { this.verifyChecksum = verifyChecksum; } - + + @Override public boolean getVerifyChecksum(){ return verifyChecksum; } From 995fcc4b320aaf123fcb122ef908731804259dfa Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 10 Jun 2025 17:36:55 +0100 Subject: [PATCH 2/7] HADOOP-18296. Buffer slicing in checksum fs Wire up TrackingByteBufferPool to vector read tests, with test suite AbstractContractVectoredReadTest adding test testBufferSlicing() to generate conditions which may trigger slicing. Only files which declare that they may slicing buffers are permitted to return buffers to the pool which aren't known about, or to close the pool with outstanding entries. So: no fix, just public declaration of behavior and test to verify that no other streams are doing it. --- .../org/apache/hadoop/fs/LocalFileSystem.java | 2 +- .../apache/hadoop/fs/RawLocalFileSystem.java | 4 +- .../fs/impl/TrackingByteBufferPool.java | 145 +++++++++++++++--- .../AbstractContractVectoredReadTest.java | 68 +++++++- .../TestLocalFSContractVectoredRead.java | 7 + 5 files changed, 196 insertions(+), 30 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java index da651ebb1f864..a41e5ec9b88d9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java @@ -54,7 +54,7 @@ public void initialize(URI name, Configuration conf) throws IOException { } final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true); setVerifyChecksum(checksum); - LOG.debug("Checksum verification enabled: {}", checksum); + LOG.debug("Checksum verification enabled={}", checksum); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index c7a84c365adba..3bd93a4f459c3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -416,8 +416,8 @@ private void initiateRead() { public void completed(Integer result, Integer rangeIndex) { FileRange range = ranges.get(rangeIndex); ByteBuffer buffer = buffers[rangeIndex]; - LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}" - , range, System.identityHashCode(buffer), result, buffer.remaining()); + LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}", + range, System.identityHashCode(buffer), result, buffer.remaining()); if (result == -1) { // no data was read back. failed(new EOFException("Read past End of File"), rangeIndex); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java index bc3792a5c5e7a..60da7f2f99ad7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java @@ -16,12 +16,14 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.hadoop.fs.impl; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,24 +31,23 @@ import org.apache.hadoop.io.ByteBufferPool; /** - * A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers are released. It - * throws the related exception at {@link #close()} if any buffer remains un-released. It also clears the buffers at - * release so if they continued being used it'll generate errors. + * A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers + * are released. + *

+ * It throws the related exception at {@link #close()} if any buffer remains un-released. + * It also clears the buffers at release so if they continued being used it'll generate errors. *

* To be used for testing only. *

- * The stacktraces of the allocation are not stored by default because it significantly decreases the unit test - * execution performance. Configuring this class to log at DEBUG will trigger their collection. + * The stacktraces of the allocation are not stored by default because + * it can significantly decreases the unit test performance. + * Configuring this class to log at DEBUG will trigger their collection. * @see ByteBufferAllocationStacktraceException *

* Adapted from Parquet class {@code org.apache.parquet.bytes.TrackingByteBufferAllocator}. */ public final class TrackingByteBufferPool implements ByteBufferPool, AutoCloseable { - /** - - */ - private static final boolean DEBUG = true; private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferPool.class); /** @@ -60,10 +61,13 @@ public static TrackingByteBufferPool wrap(ByteBufferPool allocator) { /** * Key for the tracker map. + * This uses the identity hash code of the buffer as the hash code + * for the map. */ private static class Key { private final int hashCode; + private final ByteBuffer buffer; Key(ByteBuffer buffer) { @@ -105,7 +109,10 @@ private LeakDetectorHeapByteBufferPoolException(String msg, Throwable cause) { } private LeakDetectorHeapByteBufferPoolException( - String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } @@ -116,6 +123,9 @@ private LeakDetectorHeapByteBufferPoolException( public static final class ByteBufferAllocationStacktraceException extends LeakDetectorHeapByteBufferPoolException { + /** + * Single stack trace instance to use when DEBUG is not enabled. + */ private static final ByteBufferAllocationStacktraceException WITHOUT_STACKTRACE = new ByteBufferAllocationStacktraceException(false); @@ -134,9 +144,12 @@ private ByteBufferAllocationStacktraceException() { super("Allocation stacktrace of the first ByteBuffer:"); } + /** + * Private constructor to for the singleton {@link #WITHOUT_STACKTRACE}, + * telling develoers how to see a trace per buffer. + */ private ByteBufferAllocationStacktraceException(boolean unused) { - super( - "Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for full stack traces", + super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for stack traces", null, false, false); @@ -147,20 +160,36 @@ private ByteBufferAllocationStacktraceException(boolean unused) { * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} if the * buffer to release was not in the hash map. */ - public static final class ReleasingUnallocatedByteBufferException extends LeakDetectorHeapByteBufferPoolException { + public static final class ReleasingUnallocatedByteBufferException + extends LeakDetectorHeapByteBufferPoolException { - private ReleasingUnallocatedByteBufferException() { - super("Releasing a ByteBuffer instance that is not allocated by this buffer pool or already been released"); + private ReleasingUnallocatedByteBufferException(final ByteBuffer b) { + super(String.format("Releasing a ByteBuffer instance that is not allocated" + + " by this buffer pool or already been released: %s size %d", b, b.capacity())); } } /** - * Exception raised in {@link TrackingByteBufferPool#close()} if there was an unreleased buffer. + * Exception raised in {@link TrackingByteBufferPool#close()} if there + * was an unreleased buffer. */ - public static class LeakedByteBufferException extends LeakDetectorHeapByteBufferPoolException { + public static final class LeakedByteBufferException + extends LeakDetectorHeapByteBufferPoolException { + + private final int count; private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceException e) { - super(count + " ByteBuffer object(s) is/are remained unreleased after closing this buffer pool.", e); + super(count + " ByteBuffer object(s) is/are remained unreleased" + + " after closing this buffer pool.", e); + this.count = count; + } + + /** + * Get the number of unreleased buffers. + * @return number of unreleased buffers + */ + public int getCount() { + return count; } } @@ -170,13 +199,28 @@ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceExcep * The key maps by the object id of the buffer, and refers to either a common stack trace * or one dynamically created for each allocation. */ - private final Map allocated = new HashMap<>(); + private final Map allocated = + new HashMap<>(); /** * Wrapped buffer pool. */ private final ByteBufferPool allocator; + /** + * Number of buffer allocations. + *

+ * This is incremented in {@link #getBuffer(boolean, int)}. + */ + private final AtomicInteger bufferAllocations = new AtomicInteger(); + + /** + * Number of buffer releases. + *

+ * This is incremented in {@link #putBuffer(ByteBuffer)}. + */ + private final AtomicInteger bufferReleases = new AtomicInteger(); + /** * private constructor. * @param allocator pool allocator. @@ -185,34 +229,87 @@ private TrackingByteBufferPool(ByteBufferPool allocator) { this.allocator = allocator; } + public int getBufferAllocations() { + return bufferAllocations.get(); + } + + public int getBufferReleases() { + return bufferReleases.get(); + } + + /** + * Get a buffer from the pool. + *

+ * This increments the {@link #bufferAllocations} counter and stores the + * singleron or local allocation stack trace in the {@link #allocated} map. + * @param direct whether to allocate a direct buffer or not + * @param size size of the buffer to allocate + * @return a ByteBuffer instance + */ @Override - public ByteBuffer getBuffer(final boolean direct, final int size) { + public synchronized ByteBuffer getBuffer(final boolean direct, final int size) { + bufferAllocations.incrementAndGet(); ByteBuffer buffer = allocator.getBuffer(direct, size); - final ByteBufferAllocationStacktraceException ex = ByteBufferAllocationStacktraceException.create(); + final ByteBufferAllocationStacktraceException ex = + ByteBufferAllocationStacktraceException.create(); final Key key = new Key(buffer); allocated.put(key, ex); LOG.debug("Creating ByteBuffer:{} size {} {}", key.hashCode(), size, buffer, ex); return buffer; } + /** + * Release a buffer back to the pool. + *

+ * This increments the {@link #bufferReleases} counter and removes the + * buffer from the {@link #allocated} map. + *

+ * If the buffer was not allocated by this pool, it throws + * {@link ReleasingUnallocatedByteBufferException}. + * + * @param b buffer to release + * @throws ReleasingUnallocatedByteBufferException if the buffer was not allocated by this pool + */ @Override - public void putBuffer(ByteBuffer b) throws ReleasingUnallocatedByteBufferException { + public synchronized void putBuffer(ByteBuffer b) + throws ReleasingUnallocatedByteBufferException { + + bufferReleases.incrementAndGet(); Objects.requireNonNull(b); final Key key = new Key(b); LOG.debug("Releasing ByteBuffer: {}: {}", key.hashCode(), b); if (allocated.remove(key) == null) { - throw new ReleasingUnallocatedByteBufferException(); + throw new ReleasingUnallocatedByteBufferException(b); } allocator.putBuffer(b); // Clearing the buffer so subsequent access would probably generate errors b.clear(); } + /** + * Check if the buffer is in the pool. + * @param b buffer + * @return true if the buffer is in the pool + */ + public boolean containsBuffer(ByteBuffer b) { + Objects.requireNonNull(b); + final Key key = new Key(b); + return allocated.containsKey(key); + } + + /** + * Get the number of allocated buffers. + * @return number of allocated buffers + */ + public int size() { + return allocated.size(); + } + /** * Expect all buffers to be released -if not, log unreleased ones * and then raise an exception with the stack trace of the first * unreleased buffer. - * @throws LeakedByteBufferException if at least one was unsued. + * @throws LeakedByteBufferException if at least one buffer was not released */ @Override public void close() throws LeakedByteBufferException { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index e32107be656b7..d81bd5cc3ceb6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.TrackingByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -52,13 +53,16 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR; +import static org.apache.hadoop.fs.StreamCapabilities.VECTOREDIO_BUFFERS_SLICED; import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.range; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; - import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.io.Sizes.S_128K; +import static org.apache.hadoop.io.Sizes.S_4K; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @@ -74,7 +78,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); - public static final int DATASET_LEN = 64 * 1024; + public static final int DATASET_LEN = S_128K; protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; @@ -91,6 +95,8 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac private final String bufferType; + private final boolean isDirect; + /** * Path to the vector file. */ @@ -110,7 +116,7 @@ public static List params() { protected AbstractContractVectoredReadTest(String bufferType) { this.bufferType = bufferType; - final boolean isDirect = !"array".equals(bufferType); + this.isDirect = !"array".equals(bufferType); this.allocate = size -> pool.getBuffer(isDirect, size); } @@ -619,4 +625,60 @@ protected void verifyExceptionalVectoredRead( }); } } + + @Test + public void testBufferSlicing() throws Throwable { + describe("Test buffer slicing behavior in vectored IO"); + + final int numBuffers = 8; + final int bufferSize = S_4K; + long offset = 0; + final List fileRanges = new ArrayList<>(); + for (int i = 0; i < numBuffers; i++) { + fileRanges.add(FileRange.createFileRange(offset, bufferSize)); + // increment and add a non-binary-aligned gap, so as to force + // offsets to be misaligned with possible page sizes. + offset += bufferSize + 4000; + } + TrackingByteBufferPool pool = TrackingByteBufferPool.wrap(getPool()); + int unknownBuffers = 0; + boolean slicing; + try (FSDataInputStream in = openVectorFile()) { + slicing = in.hasCapability(VECTOREDIO_BUFFERS_SLICED); + LOG.info("Slicing is {} for vectored IO with stream {}", slicing, in); + in.readVectored(fileRanges, s -> pool.getBuffer(isDirect, s), pool::putBuffer); + + // check that all buffers are from the the pool, unless they are sliced. + for (FileRange res : fileRanges) { + CompletableFuture data = res.getData(); + ByteBuffer buffer = awaitFuture(data); + Assertions.assertThat(buffer) + .describedAs("Buffer must not be null") + .isNotNull(); + Assertions.assertThat(slicing || pool.containsBuffer(buffer)) + .describedAs("Buffer must be from the pool") + .isTrue(); + try { + pool.putBuffer(buffer); + } catch (TrackingByteBufferPool.ReleasingUnallocatedByteBufferException e) { + // this can happen if the buffer was sliced, as it is not in the pool. + if (!slicing) { + throw e; + } + LOG.info("Sliced buffer detected: {}", buffer); + unknownBuffers++; + } + } + } + try { + pool.close(); + } catch (TrackingByteBufferPool.LeakedByteBufferException e) { + if (!slicing) { + throw e; + } + LOG.info("Slicing is enabled; we saw leaked buffers: {} after {} releases of unknown bufferfs", + e.getCount(), unknownBuffers); + } + + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 23cfcce75a2c9..9e0867597b325 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -74,6 +74,13 @@ public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception validateCheckReadException(testPath, length, smallFileRanges); } + /** + * Verify that checksum validation works through vectored reads. + * @param testPath path to the file to be tested + * @param length length of the file to be created + * @param ranges ranges to be read from the file + * @throws Exception any exception other than ChecksumException + */ private void validateCheckReadException(Path testPath, int length, List ranges) throws Exception { From 3c855c0ab1a4fd65ab2035614a2d3d187c372462 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 11 Jun 2025 10:42:49 +0100 Subject: [PATCH 3/7] explicitly disable analytics on the classic vector read tests --- .../contract/s3a/ITestS3AContractVectoredRead.java | 12 +++++------- .../java/org/apache/hadoop/fs/s3a/S3ATestUtils.java | 13 +++++++++++++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index d41974dab91cb..02f0291835381 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -62,7 +62,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableAnalyticsAccelerator; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.io.Sizes.S_1M; @@ -89,14 +89,12 @@ protected AbstractFSContract createContract(Configuration conf) { } /** - * Analytics Accelerator Library for Amazon S3 does not support Vectored Reads. - * @throws Exception + * Create a configuration. + * @return a configuration */ @Override - public void setup() throws Exception { - super.setup(); - skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), - "Analytics Accelerator does not support vectored reads"); + protected Configuration createConfiguration() { + return disableAnalyticsAccelerator(super.createConfiguration()); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 269e3a14190d2..d74c84463252e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -108,6 +108,7 @@ import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_S3; import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics; +import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Classic; import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; @@ -1876,6 +1877,18 @@ public static Configuration enableAnalyticsAccelerator(Configuration conf) { return conf; } + /** + * Disable analytics stream for S3A S3AFileSystem in tests. + * @param conf Configuration to update + * @return patched config + */ + public static Configuration disableAnalyticsAccelerator(Configuration conf) { + removeBaseAndBucketOverrides(conf, + INPUT_STREAM_TYPE); + conf.setEnum(INPUT_STREAM_TYPE, Classic); + return conf; + } + /** * Probe for a filesystem having a specific stream type; * this is done through filesystem capabilities. From 0ea6638067bf6800d37bdd284b61008bf5f6549e Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 11 Jun 2025 16:28:56 +0100 Subject: [PATCH 4/7] review and checkstyle --- .../hadoop/fs/LocalFileSystemConfigKeys.java | 2 +- .../hadoop/fs/impl/TrackingByteBufferPool.java | 3 ++- .../src/main/resources/core-default.xml | 18 ++++++++++++++++++ .../markdown/filesystem/fsdatainputstream.md | 4 ++-- .../AbstractContractVectoredReadTest.java | 13 +++++++------ 5 files changed, 30 insertions(+), 10 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java index 74843e5ae2868..0c4988dad5361 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java @@ -48,6 +48,6 @@ public class LocalFileSystemConfigKeys extends CommonConfigurationKeys { *

* {@value}. */ - public static final String LOCAL_FS_VERIFY_CHECKSUM = "file.verify-checksum"; + public static final String LOCAL_FS_VERIFY_CHECKSUM = "fs.file.checksum.verify"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java index 60da7f2f99ad7..a60185a5b512b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java @@ -98,7 +98,8 @@ public String toString() { } } - public static class LeakDetectorHeapByteBufferPoolException extends RuntimeException { + public static class LeakDetectorHeapByteBufferPoolException + extends RuntimeException { private LeakDetectorHeapByteBufferPoolException(String msg) { super(msg); diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 4104e3043149e..72576c62c8c05 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1364,6 +1364,24 @@ File space usage statistics refresh interval in msec. + + fs.file.checksum.verify + true + + Should data read through the local filesystem (file://) URLs be verified aginst + the checksums stored in the associated checksum files? + Setting this to false skips loading the checksum files, reading data in checksum-aligned + blocks and verifying checksums. This may improve performance + when reading data, though it pushes the responsibility of detecting errors + into the file formats themselves, or the underlying storage system. + Even when verification is enabled, files without associated checksum files + .$FILENAME.crc are never be verified. + When checksum verification is disabled, vector reads of date will always returne + buffers that are the buffers allocated through the buffer allocator + passed in to the API call and not sliced subsets thereof. + + + fs.automatic.close true diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 828b5ff59a1c5..6d3f8b57b434b 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -676,11 +676,11 @@ through the `allocate()` function passed in. This will happen during reads with and without range coalescing. Checksum verification may be disabled by setting the option -`file.verify-checksum` to true (Hadoop 3.4.2 and later). +`fs.file.checksum.verify` to true (Hadoop 3.4.2 and later). ```xml - file.verify-checksum + fs.file.checksum.verify false ``` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index d81bd5cc3ceb6..a9c39ef0d68cf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -640,13 +640,13 @@ public void testBufferSlicing() throws Throwable { // offsets to be misaligned with possible page sizes. offset += bufferSize + 4000; } - TrackingByteBufferPool pool = TrackingByteBufferPool.wrap(getPool()); + TrackingByteBufferPool trackerPool = TrackingByteBufferPool.wrap(getPool()); int unknownBuffers = 0; boolean slicing; try (FSDataInputStream in = openVectorFile()) { slicing = in.hasCapability(VECTOREDIO_BUFFERS_SLICED); LOG.info("Slicing is {} for vectored IO with stream {}", slicing, in); - in.readVectored(fileRanges, s -> pool.getBuffer(isDirect, s), pool::putBuffer); + in.readVectored(fileRanges, s -> trackerPool.getBuffer(isDirect, s), trackerPool::putBuffer); // check that all buffers are from the the pool, unless they are sliced. for (FileRange res : fileRanges) { @@ -655,11 +655,11 @@ public void testBufferSlicing() throws Throwable { Assertions.assertThat(buffer) .describedAs("Buffer must not be null") .isNotNull(); - Assertions.assertThat(slicing || pool.containsBuffer(buffer)) + Assertions.assertThat(slicing || trackerPool.containsBuffer(buffer)) .describedAs("Buffer must be from the pool") .isTrue(); try { - pool.putBuffer(buffer); + trackerPool.putBuffer(buffer); } catch (TrackingByteBufferPool.ReleasingUnallocatedByteBufferException e) { // this can happen if the buffer was sliced, as it is not in the pool. if (!slicing) { @@ -671,12 +671,13 @@ public void testBufferSlicing() throws Throwable { } } try { - pool.close(); + trackerPool.close(); } catch (TrackingByteBufferPool.LeakedByteBufferException e) { if (!slicing) { throw e; } - LOG.info("Slicing is enabled; we saw leaked buffers: {} after {} releases of unknown bufferfs", + LOG.info("Slicing is enabled; we saw leaked buffers: {} after {}" + + " releases of unknown buffers", e.getCount(), unknownBuffers); } From b51f2e851651d45dfa5a49f2911147da7f5c0646 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 16 Jun 2025 16:52:35 +0100 Subject: [PATCH 5/7] HADOOP-18296. review feedback --- .../fs/impl/TrackingByteBufferPool.java | 89 ++++++------------- .../s3a/ITestS3AContractVectoredRead.java | 3 +- 2 files changed, 27 insertions(+), 65 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java index a60185a5b512b..67fe8b42fd1d2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java @@ -20,9 +20,8 @@ package org.apache.hadoop.fs.impl; import java.nio.ByteBuffer; -import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.Map; -import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; @@ -30,6 +29,9 @@ import org.apache.hadoop.io.ByteBufferPool; +import static java.lang.System.identityHashCode; +import static java.util.Objects.requireNonNull; + /** * A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers * are released. @@ -40,7 +42,7 @@ * To be used for testing only. *

* The stacktraces of the allocation are not stored by default because - * it can significantly decreases the unit test performance. + * it can significantly decrease the unit test performance. * Configuring this class to log at DEBUG will trigger their collection. * @see ByteBufferAllocationStacktraceException *

@@ -59,45 +61,6 @@ public static TrackingByteBufferPool wrap(ByteBufferPool allocator) { return new TrackingByteBufferPool(allocator); } - /** - * Key for the tracker map. - * This uses the identity hash code of the buffer as the hash code - * for the map. - */ - private static class Key { - - private final int hashCode; - - private final ByteBuffer buffer; - - Key(ByteBuffer buffer) { - hashCode = System.identityHashCode(buffer); - this.buffer = buffer; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Key key = (Key) o; - return this.buffer == key.buffer; - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public String toString() { - return buffer.toString(); - } - } - public static class LeakDetectorHeapByteBufferPoolException extends RuntimeException { @@ -166,7 +129,8 @@ public static final class ReleasingUnallocatedByteBufferException private ReleasingUnallocatedByteBufferException(final ByteBuffer b) { super(String.format("Releasing a ByteBuffer instance that is not allocated" - + " by this buffer pool or already been released: %s size %d", b, b.capacity())); + + " by this buffer pool or already been released: %s size %d; hash code %s", + b, b.capacity(), identityHashCode(b))); } } @@ -200,8 +164,8 @@ public int getCount() { * The key maps by the object id of the buffer, and refers to either a common stack trace * or one dynamically created for each allocation. */ - private final Map allocated = - new HashMap<>(); + private final Map allocated = + new IdentityHashMap<>(); /** * Wrapped buffer pool. @@ -253,9 +217,9 @@ public synchronized ByteBuffer getBuffer(final boolean direct, final int size) { ByteBuffer buffer = allocator.getBuffer(direct, size); final ByteBufferAllocationStacktraceException ex = ByteBufferAllocationStacktraceException.create(); - final Key key = new Key(buffer); - allocated.put(key, ex); - LOG.debug("Creating ByteBuffer:{} size {} {}", key.hashCode(), size, buffer, ex); + allocated.put(buffer, ex); + LOG.debug("Creating ByteBuffer:{} size {} {}", + identityHashCode(buffer), size, buffer, ex); return buffer; } @@ -268,34 +232,31 @@ public synchronized ByteBuffer getBuffer(final boolean direct, final int size) { * If the buffer was not allocated by this pool, it throws * {@link ReleasingUnallocatedByteBufferException}. * - * @param b buffer to release + * @param buffer buffer to release * @throws ReleasingUnallocatedByteBufferException if the buffer was not allocated by this pool */ @Override - public synchronized void putBuffer(ByteBuffer b) + public synchronized void putBuffer(ByteBuffer buffer) throws ReleasingUnallocatedByteBufferException { bufferReleases.incrementAndGet(); - Objects.requireNonNull(b); - final Key key = new Key(b); - LOG.debug("Releasing ByteBuffer: {}: {}", key.hashCode(), b); - if (allocated.remove(key) == null) { - throw new ReleasingUnallocatedByteBufferException(b); + requireNonNull(buffer); + LOG.debug("Releasing ByteBuffer: {}: {}", identityHashCode(buffer), buffer); + if (allocated.remove(buffer) == null) { + throw new ReleasingUnallocatedByteBufferException(buffer); } - allocator.putBuffer(b); + allocator.putBuffer(buffer); // Clearing the buffer so subsequent access would probably generate errors - b.clear(); + buffer.clear(); } /** * Check if the buffer is in the pool. - * @param b buffer + * @param buffer buffer * @return true if the buffer is in the pool */ - public boolean containsBuffer(ByteBuffer b) { - Objects.requireNonNull(b); - final Key key = new Key(b); - return allocated.containsKey(key); + public boolean containsBuffer(ByteBuffer buffer) { + return allocated.containsKey(requireNonNull(buffer)); } /** @@ -315,8 +276,8 @@ public int size() { @Override public void close() throws LeakedByteBufferException { if (!allocated.isEmpty()) { - allocated.keySet().forEach(key -> - LOG.warn("Unreleased ByteBuffer {}; {}", key.hashCode(), key)); + allocated.keySet().forEach(buffer -> + LOG.warn("Unreleased ByteBuffer {}; {}", identityHashCode(buffer), buffer)); LeakedByteBufferException ex = new LeakedByteBufferException( allocated.size(), allocated.values().iterator().next()); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 02f0291835381..48b76b162aaa6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -71,7 +71,8 @@ import static org.apache.hadoop.test.MoreAsserts.assertEqual; /** - * S3A contract tests for vectored reads. + * S3A contract tests for vectored reads through the classic input stream. + *

* This is a complex suite as it really is testing the store, so measurements of * what IO took place is also performed if the input stream is suitable for this. */ From de78242f0ae51dc0cff0e5775270a011e0945ed0 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 16 Jun 2025 17:01:14 +0100 Subject: [PATCH 6/7] HADOOP-18296. test failure --- .../main/java/org/apache/hadoop/fs/LocalFileSystem.java | 2 +- .../org/apache/hadoop/fs/LocalFileSystemConfigKeys.java | 6 ------ .../java/org/apache/hadoop/fs/local/LocalConfigKeys.java | 7 +++++++ 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java index a41e5ec9b88d9..703e7a6392e81 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java @@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.fs.LocalFileSystemConfigKeys.LOCAL_FS_VERIFY_CHECKSUM; +import static org.apache.hadoop.fs.local.LocalConfigKeys.LOCAL_FS_VERIFY_CHECKSUM; /**************************************************************** * Implement the FileSystem API for the checksumed local filesystem. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java index 0c4988dad5361..53d049cb1633c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java @@ -43,11 +43,5 @@ public class LocalFileSystemConfigKeys extends CommonConfigurationKeys { "file.client-write-packet-size"; public static final int LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; - /** - * Verify checksums on read -default is true. - *

- * {@value}. - */ - public static final String LOCAL_FS_VERIFY_CHECKSUM = "fs.file.checksum.verify"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java index e93858ff1e63b..255b05cf04551 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java @@ -57,6 +57,13 @@ public class LocalConfigKeys extends CommonConfigurationKeys { DataChecksum.Type.CRC32; public static final String KEY_PROVIDER_URI_DEFAULT = ""; + /** + * Verify checksums on read -default is true. + *

+ * {@value}. + */ + public static final String LOCAL_FS_VERIFY_CHECKSUM = "fs.file.checksum.verify"; + public static FsServerDefaults getServerDefaults() throws IOException { return new FsServerDefaults( BLOCK_SIZE_DEFAULT, From b66370163a1ccd4a44b28e68b08e1c9fe79b2f1a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 16 Jul 2025 19:18:49 +0100 Subject: [PATCH 7/7] HADOOP-18296. review feedback and test failure. Note that this is a 3.4.x pr, so doesn't have to deal with the JUnit5 migration. I will do that afterwards, once I've got the existing ITests working again. --- .../java/org/apache/hadoop/fs/CommonConfigurationKeys.java | 6 ++++++ .../main/java/org/apache/hadoop/fs/LocalFileSystem.java | 2 +- .../org/apache/hadoop/fs/LocalFileSystemConfigKeys.java | 1 - .../main/java/org/apache/hadoop/fs/StreamCapabilities.java | 1 + .../java/org/apache/hadoop/fs/local/LocalConfigKeys.java | 7 ------- .../hadoop-common/src/main/resources/core-default.xml | 2 +- .../src/site/markdown/filesystem/fsdatainputstream.md | 2 +- 7 files changed, 10 insertions(+), 11 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 31b6654afc578..e0832fc0576b1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -505,4 +505,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String HADOOP_SECURITY_RESOLVER_IMPL = "hadoop.security.resolver.impl"; + /** + * Verify checksums on read -default is true. + *

+ * {@value}. + */ + public static final String LOCAL_FS_VERIFY_CHECKSUM = "fs.file.checksum.verify"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java index 703e7a6392e81..e912d2245bed4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java @@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import static org.apache.hadoop.fs.local.LocalConfigKeys.LOCAL_FS_VERIFY_CHECKSUM; +import static org.apache.hadoop.fs.CommonConfigurationKeys.LOCAL_FS_VERIFY_CHECKSUM; /**************************************************************** * Implement the FileSystem API for the checksumed local filesystem. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java index 53d049cb1633c..90f9cf282e016 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemConfigKeys.java @@ -42,6 +42,5 @@ public class LocalFileSystemConfigKeys extends CommonConfigurationKeys { public static final String LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_KEY = "file.client-write-packet-size"; public static final int LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; - } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index eee99d305b9c2..955040d91a36e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -87,6 +87,7 @@ public interface StreamCapabilities { String VECTOREDIO = "in:readvectored"; /** + * Probe for vector IO implementation details: {@value}. * When performing vectored IO operations, are the buffers returned by readVectored() * potentially sliced subsets of buffers allocated by the allocate() function * passed in the read requests? diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java index 255b05cf04551..e93858ff1e63b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/local/LocalConfigKeys.java @@ -57,13 +57,6 @@ public class LocalConfigKeys extends CommonConfigurationKeys { DataChecksum.Type.CRC32; public static final String KEY_PROVIDER_URI_DEFAULT = ""; - /** - * Verify checksums on read -default is true. - *

- * {@value}. - */ - public static final String LOCAL_FS_VERIFY_CHECKSUM = "fs.file.checksum.verify"; - public static FsServerDefaults getServerDefaults() throws IOException { return new FsServerDefaults( BLOCK_SIZE_DEFAULT, diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 72576c62c8c05..34f3d13975677 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1376,7 +1376,7 @@ into the file formats themselves, or the underlying storage system. Even when verification is enabled, files without associated checksum files .$FILENAME.crc are never be verified. - When checksum verification is disabled, vector reads of date will always returne + When fs.file.checksum.verify is false, vector reads of date will always return buffers that are the buffers allocated through the buffer allocator passed in to the API call and not sliced subsets thereof. diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 6d3f8b57b434b..7f1ed1d82d2bf 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -689,7 +689,7 @@ Checksum verification may be disabled by setting the option reading data may not be detected during the read operation. Use with care in production.) -Filesystem instances which spit buffersduring vector read operations +Filesystem instances which split buffers during vector read operations MUST declare this by returning `true` to the path capabilities probe `fs.capability.vectoredio.sliced`, and for the open stream in its `hasCapability()` method.