8000 HADOOP-19229. S3A/ABFS: Vector IO on cloud storage: increase threshold for range merging by steveloughran · Pull Request #7281 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HADOOP-19229. S3A/ABFS: Vector IO on cloud storage: increase threshold for range merging #7281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import static org.apache.hadoop.io.Sizes.S_16K;
import static org.apache.hadoop.io.Sizes.S_1M;

/**
* Stream that permits positional reading.
*
Expand Down Expand Up @@ -95,15 +98,15 @@ void readFully(long position, byte[] buffer, int offset, int length)
* @return the minimum number of bytes
*/
default int minSeekForVectorReads() {
return 4 * 1024;
return S_16K;
}

/**
* What is the largest size that we should group ranges together as?
* @return the number of bytes to read at once
*/
default int maxReadSizeForVectorReads() {
return 1024 * 1024;
return S_1M;
}

/**
Expand Down
8000
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.io;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
* Sizes of binary values and other some common sizes.
* This avoids having to remember the larger binary values,
* and stops IDEs/style checkers complaining about numeric
* values in source code.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class Sizes {

/** 2^8 bytes: {@value}. */
public static final int S_256 = 256;

/** 2^9 bytes: {@value}. */
public static final int S_512 = S_256 << 1;

/** 2^10 bytes - 1 KiB: {@value}. */
public static final int S_1K = S_512 << 1;

/** 2^11 bytes - 1 KiB: {@value}. */
public static final int S_2K = S_1K << 1;

/** 2^12 bytes - 2 KiB: {@value}. */
public static final int S_4K = S_2K << 1;

/** 2^13 bytes: {@value}. */
public static final int S_8K = S_4K << 1;

/** 2^14 bytes: {@value}. */
public static final int S_16K = S_8K << 1;

/** 2^15 bytes: {@value}. */
public static final int S_32K = S_16K << 1;

/** 2^16 bytes: {@value}. */
public static final int S_64K = S_32K << 1;

/** 2^17 bytes, 128 KiB: {@value}. */
public static final int S_128K = S_64K << 1;

/** 2^18 bytes, 256 KiB: {@value}. */
public static final int S_256K = S_128K << 1;

/** 2^19 bytes, 512 KiB: {@value}. */
public static final int S_512K = S_256K << 1;

/** 2^20 bytes, 1 MiB: {@value}. */
public static final int S_1M = S_512K << 1;

/** 2^21 bytes, 2 MiB: {@value}. */
public static final int S_2M = S_1M << 1;

/** 2^22 bytes, 4 MiB: {@value}. */
public static final int S_4M = S_2M << 1;

/** 2^23 bytes, MiB: {@value}. */
public static final int S_8M = S_4M << 1;

/** 2^24 bytes, MiB: {@value}. */
public static final int S_16M = S_8M << 1;

/** 2^25 bytes, MiB: {@value}. */
public static final int S_32M = S_16M << 1;

/** 5 MiB: {@value}. */
public static final int S_5M = 5 * S_1M;

/** 10 MiB: {@value}. */
public static final int S_10M = 10 * S_1M;

}
8000
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.io.Sizes.S_128K;
import static org.apache.hadoop.io.Sizes.S_2M;

/**
* Constants used with the {@link S3AFileSystem}.
*
Expand Down Expand Up @@ -1545,14 +1548,14 @@ private Constants() {
"fs.s3a.vectored.read.max.merged.size";

/**
* Default minimum seek in bytes during vectored reads : {@value}.
* Default minimum seek in bytes during vectored reads: {@value}.
*/
public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4096; // 4K
public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = S_128K;

/**
* Default maximum read size in bytes during vectored reads : {@value}.
*/
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1048576; //1M
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = S_2M;

/**
* Maximum number of range reads a single input stream can have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ on the client requirements.
```xml
<property>
<name>fs.s3a.vectored.read.min.seek.size</name>
<value>4K</value>
<value>128K</value>
<description>
What is the smallest reasonable seek in bytes such
that we group ranges together during vectored
Expand All @@ -76,7 +76,7 @@ on the client requirements.
</property>
<property>
<name>fs.s3a.vectored.read.max.merged.size</name>
<value>1M</value>
<value>2M</value>
<description>
What is the largest merged read size in bytes such
that we group ranges together during vectored read.
Expand Down Expand Up @@ -282,7 +282,7 @@ Fix: Use one of the dedicated [S3A Committers](committers.md).

## <a name="tuning"></a> Options to Tune

### <a name="flags"></a> Performance Flags: `fs.s3a.performance.flag`
### <a name="flags"></a> Performance Flags: `fs.s3a.performance.flags`

This option takes a comma separated list of performance flags.
View it as the equivalent of the `-O` compiler optimization list C/C++ compilers offer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.io.Sizes.S_1M;
import static org.apache.hadoop.io.Sizes.S_4K;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
import static org.apache.hadoop.test.MoreAsserts.assertEqual;

Expand Down Expand Up @@ -139,13 +143,13 @@ public void testEOFRanges416Handling() throws Exception {
public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception {
Configuration conf = getFileSystem().getConf();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
S3ATestUtils.disableFilesystemCaching(conf);
final int configuredMinSeek = 2 * 1024;
final int configuredMaxSize = 10 * 1024 * 1024;
conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
conf.set(AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
conf.set(AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
try (FSDataInputStream fis = openVectorFile(fs)) {
int newMinSeek = fis.minSeekForVectorReads();
Expand All @@ -162,8 +166,8 @@ public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception {
public void testMinSeekAndMaxSizeDefaultValues() throws Exception {
Configuration conf = getFileSystem().getConf();
S3ATestUtils.removeBaseAndBucketOverrides(conf,
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
try (FSDataInputStream fis = openVectorFile(fs)) {
int minSeek = fis.minSeekForVectorReads();
Expand Down Expand Up @@ -400,16 +404,25 @@ public void testMultiVectoredReadStatsCollection() throws Exception {
}
}

/**
* Create a test fs with no readahead.
* The vector IO ranges are set to the original small values,
* so ranges on small files are not coalesced.
* @return a filesystem
* @throws IOException failure to instantiate.
*/
private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
Configuration conf = getFileSystem().getConf();
// also resetting the min seek and max size values is important
// as this same test suite has test which overrides these params.
S3ATestUtils.removeBaseAndBucketOverrides(conf,
Constants.READAHEAD_RANGE,
Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
S3ATestUtils.disableFilesystemCaching(conf);
conf.setInt(Constants.READAHEAD_RANGE, 0);
conf.setInt(AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, S_4K);
conf.setInt(AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, S_1M);
return S3ATestUtils.createTestFileSystem(conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;

Expand Down Expand Up @@ -53,6 +54,8 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.io.Sizes.S_128K;
import static org.apache.hadoop.io.Sizes.S_2M;
import static org.apache.hadoop.util.StringUtils.toLowerCase;

/**
Expand Down Expand Up @@ -891,4 +894,15 @@ long getLimit() {
BackReference getFsBackRef() {
return fsBackRef;
}

@Override
public int minSeekForVectorReads() {
return S_128K;
}

@Override
public int maxReadSizeForVectorReads() {
return S_2M;
}

}
Loading
0