From b03b21179470723fbc4165eada5fcbc0907558bb Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Tue, 21 Apr 2020 21:57:29 +0530 Subject: [PATCH 1/7] HADOOP-16965. Refactor abfs stream configuration. (#1956) Contributed by Mukund Thakur. (cherry picked from commit 8031c66295b530dcaae9e00d4f656330bc3b3952) --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 31 +++++--- .../fs/azurebfs/services/AbfsInputStream.java | 20 +++--- .../services/AbfsInputStreamContext.java | 70 +++++++++++++++++++ .../azurebfs/services/AbfsOutputStream.java | 11 ++- .../services/AbfsOutputStreamContext.java | 68 ++++++++++++++++++ .../azurebfs/services/AbfsStreamContext.java | 26 +++++++ 6 files changed, 200 insertions(+), 26 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 0b2b3b1c57cb4..67e39815ad712 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -75,7 +75,9 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; @@ -362,9 +364,15 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), 0, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + populateAbfsOutputStreamContext()); + } + + private AbfsOutputStreamContext populateAbfsOutputStreamContext() { + return new AbfsOutputStreamContext() + .withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) + .enableFlush(abfsConfiguration.isFlushEnabled()) + .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) + .build(); } public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) @@ -402,11 +410,18 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist null); } - // Add statistics for InputStream return new AbfsInputStream(client, statistics, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength, - abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(), - abfsConfiguration.getTolerateOobAppends(), eTag); + populateAbfsInputStreamContext(), + eTag); + } + + private AbfsInputStreamContext populateAbfsInputStreamContext() { + return new AbfsInputStreamContext() + .withReadBufferSize(abfsConfiguration.getReadBufferSize()) + .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth()) + .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends()) + .build(); } public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws @@ -435,9 +450,7 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t client, AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), offset, - abfsConfiguration.getWriteBufferSize(), - abfsConfiguration.isFlushEnabled(), - abfsConfiguration.isOutputStreamFlushDisabled()); + populateAbfsOutputStreamContext()); } public void rename(final Path source, final Path destination) throws diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index fe48cb9323712..d0b9d4bd54663 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -55,21 +55,19 @@ public class AbfsInputStream extends FSInputStream { private boolean closed = false; public AbfsInputStream( - final AbfsClient client, - final Statistics statistics, - final String path, - final long contentLength, - final int bufferSize, - final int readAheadQueueDepth, - final boolean tolerateOobAppends, - final String eTag) { + final AbfsClient client, + final Statistics statistics, + final String path, + final long contentLength, + final AbfsInputStreamContext abfsInputStreamContext, + final String eTag) { this.client = client; this.statistics = statistics; this.path = path; this.contentLength = contentLength; - this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.tolerateOobAppends = tolerateOobAppends; + this.bufferSize = abfsInputStreamContext.getReadBufferSize(); + this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth(); + this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends(); this.eTag = eTag; this.readAheadEnabled = true; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java new file mode 100644 index 0000000000000..cba719101694c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Class to hold extra input stream configs. + */ +public class AbfsInputStreamContext extends AbfsStreamContext { + + private int readBufferSize; + + private int readAheadQueueDepth; + + private boolean tolerateOobAppends; + + public AbfsInputStreamContext() { + } + + public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) { + this.readBufferSize = readBufferSize; + return this; + } + + public AbfsInputStreamContext withReadAheadQueueDepth( + final int readAheadQueueDepth) { + this.readAheadQueueDepth = (readAheadQueueDepth >= 0) + ? readAheadQueueDepth + : Runtime.getRuntime().availableProcessors(); + return this; + } + + public AbfsInputStreamContext withTolerateOobAppends( + final boolean tolerateOobAppends) { + this.tolerateOobAppends = tolerateOobAppends; + return this; + } + + public AbfsInputStreamContext build() { + // Validation of parameters to be done here. + return this; + } + + public int getReadBufferSize() { + return readBufferSize; + } + + public int getReadAheadQueueDepth() { + return readAheadQueueDepth; + } + + public boolean isTolerateOobAppends() { + return tolerateOobAppends; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index e68400f048144..3be958676498e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -80,18 +80,17 @@ public AbfsOutputStream( final AbfsClient client, final String path, final long position, - final int bufferSize, - final boolean supportFlush, - final boolean disableOutputStreamFlush) { + AbfsOutputStreamContext abfsOutputStreamContext) { this.client = client; this.path = path; this.position = position; this.closed = false; - this.supportFlush = supportFlush; - this.disableOutputStreamFlush = disableOutputStreamFlush; + this.supportFlush = abfsOutputStreamContext.isEnableFlush(); + this.disableOutputStreamFlush = abfsOutputStreamContext + .isDisableOutputStreamFlush(); this.lastError = null; this.lastFlushOffset = 0; - this.bufferSize = bufferSize; + this.bufferSize = abfsOutputStreamContext.getWriteBufferSize(); this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java new file mode 100644 index 0000000000000..0be97c5d9f1f2 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Class to hold extra output stream configs. + */ +public class AbfsOutputStreamContext extends AbfsStreamContext { + + private int writeBufferSize; + + private boolean enableFlush; + + private boolean disableOutputStreamFlush; + + public AbfsOutputStreamContext() { + } + + public AbfsOutputStreamContext withWriteBufferSize( + final int writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } + + public AbfsOutputStreamContext enableFlush(final boolean enableFlush) { + this.enableFlush = enableFlush; + return this; + } + + public AbfsOutputStreamContext disableOutputStreamFlush( + final boolean disableOutputStreamFlush) { + this.disableOutputStreamFlush = disableOutputStreamFlush; + return this; + } + + public AbfsOutputStreamContext build() { + // Validation of parameters to be done here. + return this; + } + + public int getWriteBufferSize() { + return writeBufferSize; + } + + public boolean isEnableFlush() { + return enableFlush; + } + + public boolean isDisableOutputStreamFlush() { + return disableOutputStreamFlush; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java new file mode 100644 index 0000000000000..ee77f595fed4b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Base stream configuration class which is going + * to store common configs among input and output streams. + */ +public abstract class AbfsStreamContext { +} From 957e75d9c67e4d86e10c567d4e18b5d694e0dc8a Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Thu, 23 Apr 2020 18:05:39 +0530 Subject: [PATCH 2/7] HADOOP-16914 Adding Output Stream Counters in ABFS (#1899) Contributed by Mehakmeet Singh.There (cherry picked from commit 459eb2ad6d5bc6b21462e728fb334c6e30e14c39) --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 2 + .../azurebfs/services/AbfsOutputStream.java | 57 +++++ .../services/AbfsOutputStreamContext.java | 12 + .../services/AbfsOutputStreamStatistics.java | 77 ++++++ .../AbfsOutputStreamStatisticsImpl.java | 176 ++++++++++++++ .../azurebfs/AbstractAbfsIntegrationTest.java | 25 ++ .../azurebfs/AbstractAbfsTestWithTimeout.java | 17 ++ .../ITestAbfsOutputStreamStatistics.java | 229 ++++++++++++++++++ .../TestAbfsOutputStreamStatistics.java | 176 ++++++++++++++ 9 files changed, 771 insertions(+) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 67e39815ad712..ced4871fad5ce 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -78,6 +78,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; @@ -372,6 +373,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext() { .withWriteBufferSize(abfsConfiguration.getWriteBufferSize()) .enableFlush(abfsConfiguration.isFlushEnabled()) .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled()) + .withStreamStatistics(new AbfsOutputStreamStatisticsImpl()) .build(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 3be958676498e..f07c2f7ed7c4d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -35,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -47,6 +49,7 @@ * The BlobFsOutputStream for Rest AbfsClient. */ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCapabilities { + private final AbfsClient client; private final String path; private long position; @@ -76,6 +79,11 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa private final ElasticByteBufferPool byteBufferPool = new ElasticByteBufferPool(); + private final AbfsOutputStreamStatistics outputStreamStatistics; + + private static final Logger LOG = + LoggerFactory.getLogger(AbfsOutputStream.class); + public AbfsOutputStream( final AbfsClient client, final String path, @@ -94,6 +102,7 @@ public AbfsOutputStream( this.buffer = byteBufferPool.getBuffer(false, bufferSize).array(); this.bufferIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); + this.outputStreamStatistics = abfsOutputStreamContext.getStreamStatistics(); this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); @@ -261,6 +270,9 @@ public synchronized void close() throws IOException { threadExecutor.shutdownNow(); } } + if (LOG.isDebugEnabled()) { + LOG.debug("Closing AbfsOutputStream ", toString()); + } } @Override @@ -284,16 +296,20 @@ private synchronized void writeCurrentBufferToService() throws IOException { if (bufferIndex == 0) { return; } + outputStreamStatistics.writeCurrentBuffer(); final byte[] bytes = buffer; final int bytesLength = bufferIndex; + outputStreamStatistics.bytesToUpload(bytesLength); buffer = byteBufferPool.getBuffer(false, bufferSize).array(); bufferIndex = 0; final long offset = position; position += bytesLength; if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + long start = System.currentTimeMillis(); waitForTaskToComplete(); + outputStreamStatistics.timeSpentTaskWait(start, System.currentTimeMillis()); } final Future job = completionService.submit(new Callable() { @@ -306,6 +322,11 @@ public Void call() throws Exception { } }); + if (job.isCancelled()) { + outputStreamStatistics.uploadFailed(bytesLength); + } else { + outputStreamStatistics.uploadSuccessful(bytesLength); + } writeOperations.add(new WriteOperation(job, offset, bytesLength)); // Try to shrink the queue @@ -367,6 +388,8 @@ private synchronized void shrinkWriteOperationQueue() throws IOException { writeOperations.peek().task.get(); lastTotalAppendOffset += writeOperations.peek().length; writeOperations.remove(); + // Incrementing statistics to indicate queue has been shrunk. + outputStreamStatistics.queueShrunk(); } } catch (Exception e) { if (e.getCause() instanceof AzureBlobFileSystemException) { @@ -414,4 +437,38 @@ private static class WriteOperation { public synchronized void waitForPendingUploads() throws IOException { waitForTaskToComplete(); } + + /** + * Getter method for AbfsOutputStream statistics. + * + * @return statistics for AbfsOutputStream. + */ + @VisibleForTesting + public AbfsOutputStreamStatistics getOutputStreamStatistics() { + return outputStreamStatistics; + } + + /** + * Getter to get the size of the task queue. + * + * @return the number of writeOperations in AbfsOutputStream. + */ + @VisibleForTesting + public int getWriteOperationsSize() { + return writeOperations.size(); + } + + /** + * Appending AbfsOutputStream statistics to base toString(). + * + * @return String with AbfsOutputStream statistics. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("AbfsOuputStream@").append(this.hashCode()).append("){"); + sb.append(outputStreamStatistics.toString()); + sb.append("}"); + return sb.toString(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java index 0be97c5d9f1f2..e0aefbf33b2e0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java @@ -29,6 +29,8 @@ public class AbfsOutputStreamContext extends AbfsStreamContext { private boolean disableOutputStreamFlush; + private AbfsOutputStreamStatistics streamStatistics; + public AbfsOutputStreamContext() { } @@ -49,6 +51,12 @@ public AbfsOutputStreamContext disableOutputStreamFlush( return this; } + public AbfsOutputStreamContext withStreamStatistics( + final AbfsOutputStreamStatistics streamStatistics) { + this.streamStatistics = streamStatistics; + return this; + } + public AbfsOutputStreamContext build() { // Validation of parameters to be done here. return this; @@ -65,4 +73,8 @@ public boolean isEnableFlush() { public boolean isDisableOutputStreamFlush() { return disableOutputStreamFlush; } + + public AbfsOutputStreamStatistics getStreamStatistics() { + return streamStatistics; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java new file mode 100644 index 0000000000000..c9fe0dd45525d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatistics.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for {@link AbfsOutputStream} statistics. + */ +@InterfaceStability.Unstable +public interface AbfsOutputStreamStatistics { + + /** + * Number of bytes to be uploaded. + * + * @param bytes number of bytes to upload. + */ + void bytesToUpload(long bytes); + + /** + * Records a successful upload and the number of bytes uploaded. + * + * @param bytes number of bytes that were successfully uploaded. + */ + void uploadSuccessful(long bytes); + + /** + * Records that upload is failed and the number of bytes. + * + * @param bytes number of bytes that failed to upload. + */ + void uploadFailed(long bytes); + + /** + * Time spent in waiting for tasks to be completed in the blocking queue. + * + * @param start millisecond at which the wait for task to be complete begins. + * @param end millisecond at which the wait is completed for the task. + */ + void timeSpentTaskWait(long start, long end); + + /** + * Number of times task queue is shrunk. + */ + void queueShrunk(); + + /** + * Number of times buffer is written to the service after a write operation. + */ + void writeCurrentBuffer(); + + /** + * Method to form a string of all AbfsOutputStream statistics and their + * values. + * + * @return AbfsOutputStream statistics. + */ + @Override + String toString(); + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java new file mode 100644 index 0000000000000..cd5a29e217ce5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamStatisticsImpl.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * OutputStream statistics implementation for Abfs. + */ +public class AbfsOutputStreamStatisticsImpl + implements AbfsOutputStreamStatistics { + private long bytesToUpload; + private long bytesUploadSuccessful; + private long bytesUploadFailed; + /** + * Counter to get the total time spent while waiting for tasks to complete + * in the blocking queue inside the thread executor. + */ + private long timeSpentOnTaskWait; + /** + * Counter to get the total number of queue shrink operations done {@code + * AbfsOutputStream#shrinkWriteOperationQueue()} by AbfsOutputStream to + * remove the write operations which were successfully done by + * AbfsOutputStream from the task queue. + */ + private long queueShrunkOps; + /** + * Counter to get the total number of times the current buffer is written + * to the service {@code AbfsOutputStream#writeCurrentBufferToService()} via + * AbfsClient and appended to the data store by AbfsRestOperation. + */ + private long writeCurrentBufferOperations; + + /** + * Records the need to upload bytes and increments the total bytes that + * needs to be uploaded. + * + * @param bytes total bytes to upload. Negative bytes are ignored. + */ + @Override + public void bytesToUpload(long bytes) { + if (bytes > 0) { + bytesToUpload += bytes; + } + } + + /** + * Records the total bytes successfully uploaded through AbfsOutputStream. + * + * @param bytes number of bytes that were successfully uploaded. Negative + * bytes are ignored. + */ + @Override + public void uploadSuccessful(long bytes) { + if (bytes > 0) { + bytesUploadSuccessful += bytes; + } + } + + /** + * Records the total bytes failed to upload through AbfsOutputStream. + * + * @param bytes number of bytes failed to upload. Negative bytes are ignored. + */ + @Override + public void uploadFailed(long bytes) { + if (bytes > 0) { + bytesUploadFailed += bytes; + } + } + + /** + * {@inheritDoc} + * + * Records the total time spent waiting for a task to complete. + * + * When the thread executor has a task queue + * {@link java.util.concurrent.BlockingQueue} of size greater than or + * equal to 2 times the maxConcurrentRequestCounts then, it waits for a + * task in that queue to finish, then do the next task in the queue. + * + * This time spent while waiting for the task to be completed is being + * recorded in this counter. + * + * @param startTime time(in milliseconds) before the wait for task to be + * completed is begin. + * @param endTime time(in milliseconds) after the wait for the task to be + * completed is done. + */ + @Override + public void timeSpentTaskWait(long startTime, long endTime) { + timeSpentOnTaskWait += endTime - startTime; + } + + /** + * {@inheritDoc} + * + * Records the number of times AbfsOutputStream try to remove the completed + * write operations from the beginning of write operation task queue. + */ + @Override + public void queueShrunk() { + queueShrunkOps++; + } + + /** + * {@inheritDoc} + * + * Records the number of times AbfsOutputStream writes the buffer to the + * service via the AbfsClient and appends the buffer to the service. + */ + @Override + public void writeCurrentBuffer() { + writeCurrentBufferOperations++; + } + + public long getBytesToUpload() { + return bytesToUpload; + } + + public long getBytesUploadSuccessful() { + return bytesUploadSuccessful; + } + + public long getBytesUploadFailed() { + return bytesUploadFailed; + } + + public long getTimeSpentOnTaskWait() { + return timeSpentOnTaskWait; + } + + public long getQueueShrunkOps() { + return queueShrunkOps; + } + + public long getWriteCurrentBufferOperations() { + return writeCurrentBufferOperations; + } + + /** + * String to show AbfsOutputStream statistics values in AbfsOutputStream. + * + * @return String with AbfsOutputStream statistics. + */ + @Override public String toString() { + final StringBuilder outputStreamStats = new StringBuilder( + "OutputStream Statistics{"); + outputStreamStats.append(", bytes_upload=").append(bytesToUpload); + outputStreamStats.append(", bytes_upload_successfully=") + .append(bytesUploadSuccessful); + outputStreamStats.append(", bytes_upload_failed=") + .append(bytesUploadFailed); + outputStreamStats.append(", time_spent_task_wait=") + .append(timeSpentOnTaskWait); + outputStreamStats.append(", queue_shrunk_ops=").append(queueShrunkOps); + outputStreamStats.append(", write_current_buffer_ops=") + .append(writeCurrentBufferOperations); + outputStreamStats.append("}"); + return outputStreamStats.toString(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 04be4f458c0ce..5b4defb9d0b4f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; @@ -41,6 +43,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.utils.UriUtils; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_ACCOUNT_NAME_DOMAIN_SUFFIX; @@ -237,6 +240,11 @@ protected String getTestUrl() { protected void setFileSystemName(String fileSystemName) { this.fileSystemName = fileSystemName; } + + protected String getMethodName() { + return methodName.getMethodName(); + } + protected String getFileSystemName() { return fileSystemName; } @@ -341,4 +349,21 @@ protected Path path(String filepath) throws IOException { new Path(getTestPath(), filepath)); } + /** + * Generic create File and enabling AbfsOutputStream Flush. + * + * @param fs AzureBlobFileSystem that is initialised in the test. + * @param path Path of the file to be created. + * @return AbfsOutputStream for writing. + * @throws AzureBlobFileSystemException + */ + protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled( + AzureBlobFileSystem fs, + Path path) throws AzureBlobFileSystemException { + AzureBlobFileSystemStore abfss = fs.getAbfsStore(); + abfss.getAbfsConfiguration().setDisableOutputStreamFlush(false); + + return (AbfsOutputStream) abfss.createFile(path, + true, FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java index fee90abeabc9e..ab7bc82370615 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsTestWithTimeout.java @@ -23,6 +23,8 @@ import org.junit.Rule; import org.junit.rules.TestName; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_TIMEOUT; @@ -31,6 +33,9 @@ * This class does not attempt to bind to Azure. */ public class AbstractAbfsTestWithTimeout extends Assert { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAbfsTestWithTimeout.class); + /** * The name of the current method. */ @@ -67,4 +72,16 @@ public void nameThread() { protected int getTestTimeoutMillis() { return TEST_TIMEOUT; } + + /** + * Describe a test in the logs. + * + * @param text text to print + * @param args arguments to format in the printing + */ + protected void describe(String text, Object... args) { + LOG.info("\n\n{}: {}\n", + methodName.getMethodName(), + String.format(text, args)); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java new file mode 100644 index 0000000000000..09cbfde1bebfb --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsOutputStreamStatistics.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; + +/** + * Test AbfsOutputStream statistics. + */ +public class ITestAbfsOutputStreamStatistics + extends AbstractAbfsIntegrationTest { + private static final int OPERATIONS = 10; + + public ITestAbfsOutputStreamStatistics() throws Exception { + } + + /** + * Tests to check bytes uploaded successfully in {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamUploadingBytes() throws IOException { + describe("Testing bytes uploaded successfully by AbfsOutputSteam"); + final AzureBlobFileSystem fs = getFileSystem(); + Path uploadBytesFilePath = path(getMethodName()); + String testBytesToUpload = "bytes"; + + try ( + AbfsOutputStream outForSomeBytes = createAbfsOutputStreamWithFlushEnabled( + fs, uploadBytesFilePath) + ) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatisticsForUploadBytes = + getAbfsOutputStreamStatistics(outForSomeBytes); + + //Test for zero bytes To upload. + assertEquals("Mismatch in bytes to upload", 0, + abfsOutputStreamStatisticsForUploadBytes.getBytesToUpload()); + + outForSomeBytes.write(testBytesToUpload.getBytes()); + outForSomeBytes.flush(); + abfsOutputStreamStatisticsForUploadBytes = + getAbfsOutputStreamStatistics(outForSomeBytes); + + //Test for bytes to upload. + assertEquals("Mismatch in bytes to upload", + testBytesToUpload.getBytes().length, + abfsOutputStreamStatisticsForUploadBytes.getBytesToUpload()); + + //Test for successful bytes uploaded. + assertEquals("Mismatch in successful bytes uploaded", + testBytesToUpload.getBytes().length, + abfsOutputStreamStatisticsForUploadBytes.getBytesUploadSuccessful()); + + } + + try ( + AbfsOutputStream outForLargeBytes = createAbfsOutputStreamWithFlushEnabled( + fs, uploadBytesFilePath)) { + + for (int i = 0; i < OPERATIONS; i++) { + outForLargeBytes.write(testBytesToUpload.getBytes()); + } + outForLargeBytes.flush(); + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeBytes); + + //Test for bytes to upload. + assertEquals("Mismatch in bytes to upload", + OPERATIONS * (testBytesToUpload.getBytes().length), + abfsOutputStreamStatistics.getBytesToUpload()); + + //Test for successful bytes uploaded. + assertEquals("Mismatch in successful bytes uploaded", + OPERATIONS * (testBytesToUpload.getBytes().length), + abfsOutputStreamStatistics.getBytesUploadSuccessful()); + + } + } + + /** + * Tests to check correct values of queue shrunk operations in + * AbfsOutputStream. + * + * After writing data, AbfsOutputStream doesn't upload the data until + * flushed. Hence, flush() method is called after write() to test queue + * shrink operations. + */ + @Test + public void testAbfsOutputStreamQueueShrink() throws IOException { + describe("Testing queue shrink operations by AbfsOutputStream"); + final AzureBlobFileSystem fs = getFileSystem(); + Path queueShrinkFilePath = path(getMethodName()); + String testQueueShrink = "testQueue"; + + try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled( + fs, queueShrinkFilePath)) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForOneOp); + + //Test for shrinking queue zero time. + assertEquals("Mismatch in queue shrunk operations", 0, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + } + + /* + * After writing in the loop we flush inside the loop to ensure the write + * operation done in that loop is considered to be done which would help + * us triggering the shrinkWriteOperationQueue() method each time after + * the write operation. + * If we call flush outside the loop, then it will take all the write + * operations inside the loop as one write operation. + * + */ + try ( + AbfsOutputStream outForLargeOps = createAbfsOutputStreamWithFlushEnabled( + fs, queueShrinkFilePath)) { + for (int i = 0; i < OPERATIONS; i++) { + outForLargeOps.write(testQueueShrink.getBytes()); + outForLargeOps.flush(); + } + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeOps); + /* + * After a write operation is done, it is in a task queue where it is + * removed. Hence, to get the correct expected value we get the size of + * the task queue from AbfsOutputStream and subtract it with total + * write operations done to get the number of queue shrinks done. + * + */ + assertEquals("Mismatch in queue shrunk operations", + OPERATIONS - outForLargeOps.getWriteOperationsSize(), + abfsOutputStreamStatistics.getQueueShrunkOps()); + } + + } + + /** + * Tests to check correct values of write current buffer operations done by + * AbfsOutputStream. + * + * After writing data, AbfsOutputStream doesn't upload data till flush() is + * called. Hence, flush() calls were made after write(). + */ + @Test + public void testAbfsOutputStreamWriteBuffer() throws IOException { + describe("Testing write current buffer operations by AbfsOutputStream"); + final AzureBlobFileSystem fs = getFileSystem(); + Path writeBufferFilePath = path(getMethodName()); + String testWriteBuffer = "Buffer"; + + try (AbfsOutputStream outForOneOp = createAbfsOutputStreamWithFlushEnabled( + fs, writeBufferFilePath)) { + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForOneOp); + + //Test for zero time writing buffer to service. + assertEquals("Mismatch in write current buffer operations", 0, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + + outForOneOp.write(testWriteBuffer.getBytes()); + outForOneOp.flush(); + + abfsOutputStreamStatistics = getAbfsOutputStreamStatistics(outForOneOp); + + //Test for one time writing buffer to service. + assertEquals("Mismatch in write current buffer operations", 1, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + } + + try ( + AbfsOutputStream outForLargeOps = createAbfsOutputStreamWithFlushEnabled( + fs, writeBufferFilePath)) { + + /* + * Need to flush each time after we write to actually write the data + * into the data store and thus, get the writeCurrentBufferToService() + * method triggered and increment the statistic. + */ + for (int i = 0; i < OPERATIONS; i++) { + outForLargeOps.write(testWriteBuffer.getBytes()); + outForLargeOps.flush(); + } + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + getAbfsOutputStreamStatistics(outForLargeOps); + //Test for 10 times writing buffer to service. + assertEquals("Mismatch in write current buffer operations", + OPERATIONS, + abfsOutputStreamStatistics.getWriteCurrentBufferOperations()); + } + } + + /** + * Method to get the AbfsOutputStream statistics. + * + * @param out AbfsOutputStream whose statistics is needed. + * @return AbfsOutputStream statistics implementation class to get the + * values of the counters. + */ + private static AbfsOutputStreamStatisticsImpl getAbfsOutputStreamStatistics( + AbfsOutputStream out) { + return (AbfsOutputStreamStatisticsImpl) out.getOutputStreamStatistics(); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java new file mode 100644 index 0000000000000..58f00233710f8 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsOutputStreamStatistics.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.util.Random; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; + +/** + * Unit tests for AbfsOutputStream statistics. + */ +public class TestAbfsOutputStreamStatistics + extends AbstractAbfsIntegrationTest { + + private static final int LOW_RANGE_FOR_RANDOM_VALUE = 49; + private static final int HIGH_RANGE_FOR_RANDOM_VALUE = 9999; + private static final int OPERATIONS = 10; + + public TestAbfsOutputStreamStatistics() throws Exception { + } + + /** + * Tests to check number of bytes failed to upload in + * {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamBytesFailed() { + describe("Testing number of bytes failed during upload in AbfsOutputSteam"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for zero bytes uploaded. + assertEquals("Mismatch in number of bytes failed to upload", 0, + abfsOutputStreamStatistics.getBytesUploadFailed()); + + //Populating small random value for bytesFailed. + int randomBytesFailed = new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE); + abfsOutputStreamStatistics.uploadFailed(randomBytesFailed); + //Test for bytes failed to upload. + assertEquals("Mismatch in number of bytes failed to upload", + randomBytesFailed, abfsOutputStreamStatistics.getBytesUploadFailed()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering multiple random values for bytesFailed to check correct + * summation of values. + */ + int expectedBytesFailed = 0; + for (int i = 0; i < OPERATIONS; i++) { + randomBytesFailed = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + abfsOutputStreamStatistics.uploadFailed(randomBytesFailed); + expectedBytesFailed += randomBytesFailed; + } + //Test for bytes failed to upload. + assertEquals("Mismatch in number of bytes failed to upload", + expectedBytesFailed, abfsOutputStreamStatistics.getBytesUploadFailed()); + } + + /** + * Tests to check time spent on waiting for tasks to be complete on a + * blocking queue in {@link AbfsOutputStream}. + */ + @Test + public void testAbfsOutputStreamTimeSpentOnWaitTask() { + describe("Testing time Spent on waiting for task to be completed in " + + "AbfsOutputStream"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for initial value of timeSpentWaitTask. + assertEquals("Mismatch in time spent on waiting for tasks to complete", 0, + abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + + int smallRandomStartTime = + new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE); + int smallRandomEndTime = + new Random().nextInt(LOW_RANGE_FOR_RANDOM_VALUE) + + smallRandomStartTime; + int smallDiff = smallRandomEndTime - smallRandomStartTime; + abfsOutputStreamStatistics + .timeSpentTaskWait(smallRandomStartTime, smallRandomEndTime); + //Test for small random value of timeSpentWaitTask. + assertEquals("Mismatch in time spent on waiting for tasks to complete", + smallDiff, abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering multiple values for timeSpentTaskWait() to check the + * summation is happening correctly. Also calculating the expected result. + */ + int expectedRandomDiff = 0; + for (int i = 0; i < OPERATIONS; i++) { + int largeRandomStartTime = + new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + int largeRandomEndTime = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE) + + largeRandomStartTime; + abfsOutputStreamStatistics + .timeSpentTaskWait(largeRandomStartTime, largeRandomEndTime); + expectedRandomDiff += largeRandomEndTime - largeRandomStartTime; + } + + /* + * Test to check correct value of timeSpentTaskWait after multiple + * random values are passed in it. + */ + assertEquals("Mismatch in time spent on waiting for tasks to complete", + expectedRandomDiff, + abfsOutputStreamStatistics.getTimeSpentOnTaskWait()); + } + + /** + * Unit Tests to check correct values of queue shrunk operations in + * AbfsOutputStream. + * + */ + @Test + public void testAbfsOutputStreamQueueShrink() { + describe("Testing queue shrink operations by AbfsOutputStream"); + + AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = + new AbfsOutputStreamStatisticsImpl(); + + //Test for shrinking queue zero time. + assertEquals("Mismatch in queue shrunk operations", 0, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + abfsOutputStreamStatistics.queueShrunk(); + + //Test for shrinking queue 1 time. + assertEquals("Mismatch in queue shrunk operations", 1, + abfsOutputStreamStatistics.getQueueShrunkOps()); + + //Reset statistics for the next test. + abfsOutputStreamStatistics = new AbfsOutputStreamStatisticsImpl(); + + /* + * Entering random values for queueShrunkOps and checking the correctness + * of summation for the statistic. + */ + int randomQueueValues = new Random().nextInt(HIGH_RANGE_FOR_RANDOM_VALUE); + for (int i = 0; i < randomQueueValues * OPERATIONS; i++) { + abfsOutputStreamStatistics.queueShrunk(); + } + /* + * Test for random times incrementing queue shrunk operations. + */ + assertEquals("Mismatch in queue shrunk operations", + randomQueueValues * OPERATIONS, + abfsOutputStreamStatistics.getQueueShrunkOps()); + } +} From 6fa99fae78e3293f4b5435dc0bbc555bd3525e2f Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Tue, 2 Jun 2020 18:31:35 +0100 Subject: [PATCH 3/7] HADOOP-17016. Adding Common Counters in ABFS (#1991). Contributed by: Mehakmeet Singh. Change-Id: Ib84e7a42f28e064df4c6204fcce33e573360bf42 (cherry picked from commit 7f486f0258943f1dbda7fe5c08be4391e284df28) --- .../apache/hadoop/fs/StorageStatistics.java | 1 + .../fs/azurebfs/AbfsInstrumentation.java | 279 ++++++++++++++++++ .../hadoop/fs/azurebfs/AbfsStatistic.java | 93 ++++++ .../fs/azurebfs/AzureBlobFileSystem.java | 75 ++++- .../fs/azurebfs/services/AbfsCounters.java | 66 +++++ .../azurebfs/AbstractAbfsIntegrationTest.java | 15 + .../fs/azurebfs/ITestAbfsStatistics.java | 258 ++++++++++++++++ .../fs/azurebfs/TestAbfsStatistics.java | 61 ++++ 8 files changed, 840 insertions(+), 8 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java index d987ad084d3ef..a120c297ea7e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java @@ -49,6 +49,7 @@ public interface CommonStatisticNames { String OP_DELETE = "op_delete"; String OP_EXISTS = "op_exists"; String OP_GET_CONTENT_SUMMARY = "op_get_content_summary"; + String OP_GET_DELEGATION_TOKEN = "op_get_delegation_token"; String OP_GET_FILE_CHECKSUM = "op_get_file_checksum"; String OP_GET_FILE_STATUS = "op_get_file_status"; String OP_GET_STATUS = "op_get_status"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java new file mode 100644 index 0000000000000..9094c4065de0c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricStringBuilder; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableMetric; + +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; + +/** + * Instrumentation of Abfs counters. + */ +public class AbfsInstrumentation implements AbfsCounters { + + /** + * Single context for all the Abfs counters to separate them from other + * counters. + */ + private static final String CONTEXT = "AbfsContext"; + /** + * The name of a field added to metrics records that uniquely identifies a + * specific FileSystem instance. + */ + private static final String REGISTRY_ID = "AbfsID"; + /** + * The name of a field added to metrics records that indicates the hostname + * portion of the FS URL. + */ + private static final String METRIC_BUCKET = "AbfsBucket"; + + private final MetricsRegistry registry = + new MetricsRegistry("abfsMetrics").setContext(CONTEXT); + + private static final AbfsStatistic[] STATISTIC_LIST = { + CALL_CREATE, + CALL_OPEN, + CALL_GET_FILE_STATUS, + CALL_APPEND, + CALL_CREATE_NON_RECURSIVE, + CALL_DELETE, + CALL_EXIST, + CALL_GET_DELEGATION_TOKEN, + CALL_LIST_STATUS, + CALL_MKDIRS, + CALL_RENAME, + DIRECTORIES_CREATED, + DIRECTORIES_DELETED, + FILES_CREATED, + FILES_DELETED, + ERROR_IGNORED + }; + + public AbfsInstrumentation(URI uri) { + UUID fileSystemInstanceId = UUID.randomUUID(); + registry.tag(REGISTRY_ID, + "A unique identifier for the instance", + fileSystemInstanceId.toString()); + registry.tag(METRIC_BUCKET, "Hostname from the FS URL", uri.getHost()); + + for (AbfsStatistic stats : STATISTIC_LIST) { + createCounter(stats); + } + } + + /** + * Look up a Metric from registered set. + * + * @param name name of metric. + * @return the metric or null. + */ + private MutableMetric lookupMetric(String name) { + return getRegistry().get(name); + } + + /** + * Look up counter by name. + * + * @param name name of counter. + * @return counter if found, else null. + */ + private MutableCounterLong lookupCounter(String name) { + MutableMetric metric = lookupMetric(name); + if (metric == null) { + return null; + } + if (!(metric instanceof MutableCounterLong)) { + throw new IllegalStateException("Metric " + name + + " is not a MutableCounterLong: " + metric); + } + return (MutableCounterLong) metric; + } + + /** + * Create a counter in the registry. + * + * @param stats AbfsStatistic whose counter needs to be made. + * @return counter or null. + */ + private MutableCounterLong createCounter(AbfsStatistic stats) { + return registry.newCounter(stats.getStatName(), + stats.getStatDescription(), 0L); + } + + /** + * {@inheritDoc} + * + * Increment a statistic with some value. + * + * @param statistic AbfsStatistic need to be incremented. + * @param value long value to be incremented by. + */ + @Override + public void incrementCounter(AbfsStatistic statistic, long value) { + MutableCounterLong counter = lookupCounter(statistic.getStatName()); + if (counter != null) { + counter.incr(value); + } + } + + /** + * Getter for MetricRegistry. + * + * @return MetricRegistry or null. + */ + private MetricsRegistry getRegistry() { + return registry; + } + + /** + * {@inheritDoc} + * + * Method to aggregate all the counters in the MetricRegistry and form a + * string with prefix, separator and suffix. + * + * @param prefix string that would be before metric. + * @param separator string that would be between metric name and value. + * @param suffix string that would be after metric value. + * @param all gets all the values even if unchanged. + * @return a String with all the metrics and their values. + */ + @Override + public String formString(String prefix, String separator, String suffix, + boolean all) { + + MetricStringBuilder metricStringBuilder = new MetricStringBuilder(null, + prefix, separator, suffix); + registry.snapshot(metricStringBuilder, all); + return metricStringBuilder.toString(); + } + + /** + * {@inheritDoc} + * + * Creating a map of all the counters for testing. + * + * @return a map of the metrics. + */ + @VisibleForTesting + @Override + public Map toMap() { + MetricsToMap metricBuilder = new MetricsToMap(null); + registry.snapshot(metricBuilder, true); + return metricBuilder.getMap(); + } + + protected static class MetricsToMap extends MetricsRecordBuilder { + private final MetricsCollector parent; + private final Map map = + new HashMap<>(); + + MetricsToMap(MetricsCollector parent) { + this.parent = parent; + } + + @Override + public MetricsRecordBuilder tag(MetricsInfo info, String value) { + return this; + } + + @Override + public MetricsRecordBuilder add(MetricsTag tag) { + return this; + } + + @Override + public MetricsRecordBuilder add(AbstractMetric metric) { + return this; + } + + @Override + public MetricsRecordBuilder setContext(String value) { + return this; + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, int value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addCounter(MetricsInfo info, long value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, int value) { + return tuple(info, value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, long value) { + return tuple(info, value); + } + + public MetricsToMap tuple(MetricsInfo info, long value) { + return tuple(info.name(), value); + } + + public MetricsToMap tuple(String name, long value) { + map.put(name, value); + return this; + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, float value) { + return tuple(info, (long) value); + } + + @Override + public MetricsRecordBuilder addGauge(MetricsInfo info, double value) { + return tuple(info, (long) value); + } + + @Override + public MetricsCollector parent() { + return parent; + } + + /** + * Get the map. + * + * @return the map of metrics. + */ + public Map getMap() { + return map; + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java new file mode 100644 index 0000000000000..a9867aa12b85e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; + +/** + * Statistic which are collected in Abfs. + * Available as metrics in {@link AbfsInstrumentation}. + */ +public enum AbfsStatistic { + + CALL_CREATE(CommonStatisticNames.OP_CREATE, + "Calls of create()."), + CALL_OPEN(CommonStatisticNames.OP_OPEN, + "Calls of open()."), + CALL_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS, + "Calls of getFileStatus()."), + CALL_APPEND(CommonStatisticNames.OP_APPEND, + "Calls of append()."), + CALL_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE, + "Calls of createNonRecursive()."), + CALL_DELETE(CommonStatisticNames.OP_DELETE, + "Calls of delete()."), + CALL_EXIST(CommonStatisticNames.OP_EXISTS, + "Calls of exist()."), + CALL_GET_DELEGATION_TOKEN(CommonStatisticNames.OP_GET_DELEGATION_TOKEN, + "Calls of getDelegationToken()."), + CALL_LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS, + "Calls of listStatus()."), + CALL_MKDIRS(CommonStatisticNames.OP_MKDIRS, + "Calls of mkdirs()."), + CALL_RENAME(CommonStatisticNames.OP_RENAME, + "Calls of rename()."), + DIRECTORIES_CREATED("directories_created", + "Total number of directories created through the object store."), + DIRECTORIES_DELETED("directories_deleted", + "Total number of directories deleted through the object store."), + FILES_CREATED("files_created", + "Total number of files created through the object store."), + FILES_DELETED("files_deleted", + "Total number of files deleted from the object store."), + ERROR_IGNORED("error_ignored", + "Errors caught and ignored."); + + private String statName; + private String statDescription; + + /** + * Constructor of AbfsStatistic to set statistic name and description. + * + * @param statName Name of the statistic. + * @param statDescription Description of the statistic. + */ + AbfsStatistic(String statName, String statDescription) { + this.statName = statName; + this.statDescription = statDescription; + } + + /** + * Getter for statistic name. + * + * @return Name of statistic. + */ + public String getStatName() { + return statName; + } + + /** + * Getter for statistic description. + * + * @return Description of statistic. + */ + public String getStatDescription() { + return statDescription; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 7180a4769ce85..10d45f8a4bd62 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -69,6 +70,7 @@ import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException; import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizer; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; @@ -78,6 +80,8 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; + /** * A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on Windows Azure @@ -93,6 +97,7 @@ public class AzureBlobFileSystem extends FileSystem { private boolean delegationTokenEnabled = false; private AbfsDelegationTokenManager delegationTokenManager; private AbfsAuthorizer authorizer; + private AbfsCounters instrumentation; @Override public void initialize(URI uri, Configuration configuration) @@ -106,7 +111,7 @@ public void initialize(URI uri, Configuration configuration) this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration); final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration(); - + instrumentation = new AbfsInstrumentation(uri); this.setWorkingDirectory(this.getHomeDirectory()); if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { @@ -142,6 +147,11 @@ public String toString() { sb.append("uri=").append(uri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); + if (instrumentation != null) { + sb.append(", Statistics: {").append(instrumentation.formString("{", "=", + "}", true)); + sb.append("}"); + } sb.append('}'); return sb.toString(); } @@ -158,7 +168,7 @@ public URI getUri() { @Override public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize); - + statIncrement(CALL_OPEN); Path qualifiedPath = makeQualified(path); performAbfsAuthCheck(FsAction.READ, qualifiedPath); @@ -180,6 +190,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi overwrite, blockSize); + statIncrement(CALL_CREATE); trailingPeriodCheck(f); Path qualifiedPath = makeQualified(f); @@ -188,6 +199,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi try { OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite, permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf())); + statIncrement(FILES_CREATED); return new FSDataOutputStream(outputStream, statistics); } catch(AzureBlobFileSystemException ex) { checkException(f, ex); @@ -201,6 +213,7 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { + statIncrement(CALL_CREATE_NON_RECURSIVE); final Path parent = f.getParent(); final FileStatus parentFileStatus = tryGetFileStatus(parent); @@ -244,7 +257,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr "AzureBlobFileSystem.append path: {} bufferSize: {}", f.toString(), bufferSize); - + statIncrement(CALL_APPEND); Path qualifiedPath = makeQualified(f); performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); @@ -260,7 +273,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr public boolean rename(final Path src, final Path dst) throws IOException { LOG.debug( "AzureBlobFileSystem.rename src: {} dst: {}", src.toString(), dst.toString()); - + statIncrement(CALL_RENAME); trailingPeriodCheck(dst); Path parentFolder = src.getParent(); @@ -328,7 +341,7 @@ public boolean rename(final Path src, final Path dst) throws IOException { public boolean delete(final Path f, final boolean recursive) throws IOException { LOG.debug( "AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive); - + statIncrement(CALL_DELETE); Path qualifiedPath = makeQualified(f); performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); @@ -354,7 +367,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException public FileStatus[] listStatus(final Path f) throws IOException { LOG.debug( "AzureBlobFileSystem.listStatus path: {}", f.toString()); - + statIncrement(CALL_LIST_STATUS); Path qualifiedPath = makeQualified(f); performAbfsAuthCheck(FsAction.READ, qualifiedPath); @@ -367,6 +380,24 @@ public FileStatus[] listStatus(final Path f) throws IOException { } } + /** + * Increment of an Abfs statistic. + * + * @param statistic AbfsStatistic that needs increment. + */ + private void statIncrement(AbfsStatistic statistic) { + incrementStatistic(statistic); + } + + /** + * Method for incrementing AbfsStatistic by a long value. + * + * @param statistic the Statistic to be incremented. + */ + private void incrementStatistic(AbfsStatistic statistic) { + instrumentation.incrementCounter(statistic, 1); + } + /** * Performs a check for (.) until root in the path to throw an exception. * The purpose is to differentiate between dir/dir1 and dir/dir1. @@ -396,7 +427,7 @@ private void trailingPeriodCheck(Path path) throws IllegalArgumentException { public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { LOG.debug( "AzureBlobFileSystem.mkdirs path: {} permissions: {}", f, permission); - + statIncrement(CALL_MKDIRS); trailingPeriodCheck(f); final Path parentFolder = f.getParent(); @@ -411,6 +442,7 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce try { abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission, FsPermission.getUMask(getConf())); + statIncrement(DIRECTORIES_CREATED); return true; } catch (AzureBlobFileSystemException ex) { checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS); @@ -427,12 +459,13 @@ public synchronized void close() throws IOException { super.close(); LOG.debug("AzureBlobFileSystem.close"); this.isClosed = true; + LOG.debug("Closing Abfs: " + toString()); } @Override public FileStatus getFileStatus(final Path f) throws IOException { LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f); - + statIncrement(CALL_GET_FILE_STATUS); Path qualifiedPath = makeQualified(f); performAbfsAuthCheck(FsAction.READ, qualifiedPath); @@ -570,6 +603,11 @@ private boolean deleteRoot() throws IOException { @Override public Void call() throws Exception { delete(fs.getPath(), fs.isDirectory()); + if (fs.isDirectory()) { + statIncrement(DIRECTORIES_DELETED); + } else { + statIncrement(FILES_DELETED); + } return null; } }); @@ -943,11 +981,25 @@ public void access(final Path path, final FsAction mode) throws IOException { } } + /** + * Incrementing exists() calls from superclass for statistic collection. + * + * @param f source path. + * @return true if the path exists. + * @throws IOException + */ + @Override + public boolean exists(Path f) throws IOException { + statIncrement(CALL_EXIST); + return super.exists(f); + } + private FileStatus tryGetFileStatus(final Path f) { try { return getFileStatus(f); } catch (IOException ex) { LOG.debug("File not found {}", f); + statIncrement(ERROR_IGNORED); return null; } } @@ -964,6 +1016,7 @@ private boolean fileSystemExists() throws IOException { // there is not way to get the storage error code // workaround here is to check its status code. } catch (FileNotFoundException e) { + statIncrement(ERROR_IGNORED); return false; } } @@ -1135,6 +1188,7 @@ private Throwable getRootCause(Throwable throwable) { */ @Override public synchronized Token getDelegationToken(final String renewer) throws IOException { + statIncrement(CALL_GET_DELEGATION_TOKEN); return this.delegationTokenEnabled ? this.delegationTokenManager.getDelegationToken(renewer) : super.getDelegationToken(renewer); } @@ -1199,4 +1253,9 @@ private void performAbfsAuthCheck(FsAction action, Path... paths) } } } + + @VisibleForTesting + Map getInstrumentationMap() { + return instrumentation.toMap(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java new file mode 100644 index 0000000000000..87b1af4f0620c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; + +/** + * An interface for Abfs counters. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface AbfsCounters { + + /** + * Increment a AbfsStatistic by a long value. + * + * @param statistic AbfsStatistic to be incremented. + * @param value the value to increment the statistic by. + */ + void incrementCounter(AbfsStatistic statistic, long value); + + /** + * Form a String of the all the statistics and present in an organized manner. + * + * @param prefix the prefix to be set. + * @param separator the separator between the statistic name and value. + * @param suffix the suffix to be used. + * @param all enable all the statistics to be displayed or not. + * @return String of all the statistics and their values. + */ + String formString(String prefix, String separator, String suffix, + boolean all); + + /** + * Convert all the statistics into a key-value pair map to be used for + * testing. + * + * @return map with statistic name as key and statistic value as the map + * value. + */ + @VisibleForTesting + Map toMap(); + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 5b4defb9d0b4f..104f3f5c348d0 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.URI; import java.util.Hashtable; +import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -366,4 +367,18 @@ protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled( return (AbfsOutputStream) abfss.createFile(path, true, FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); } + + /** + * Custom assertion for AbfsStatistics which have statistics, expected + * value and map of statistics and value as its parameters. + * @param statistic the AbfsStatistics which needs to be asserted. + * @param expectedValue the expected value of the statistics. + * @param metricMap map of (String, Long) with statistics name as key and + * statistics value as map value. + */ + protected void assertAbfsStatistics(AbfsStatistic statistic, + long expectedValue, Map metricMap) { + assertEquals("Mismatch in " + statistic.getStatName(), expectedValue, + (long) metricMap.get(statistic.getStatName())); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java new file mode 100644 index 0000000000000..c88dc847a3f9a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; +import org.apache.hadoop.fs.permission.FsPermission; + +/** + * Tests AzureBlobFileSystem Statistics. + */ +public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest { + + private static final int NUMBER_OF_OPS = 10; + + public ITestAbfsStatistics() throws Exception { + } + + /** + * Testing the initial value of statistics. + */ + @Test + public void testInitialStatsValues() throws IOException { + describe("Testing the initial values of Abfs counters"); + + AbfsCounters abfsCounters = + new AbfsInstrumentation(getFileSystem().getUri()); + Map metricMap = abfsCounters.toMap(); + + for (Map.Entry entry : metricMap.entrySet()) { + String key = entry.getKey(); + Long value = entry.getValue(); + + //Verify if initial value of statistic is 0. + checkInitialValue(key, value); + } + } + + /** + * Testing statistics by creating files and directories. + */ + @Test + public void testCreateStatistics() throws IOException { + describe("Testing counter values got by creating directories and files in" + + " Abfs"); + + AzureBlobFileSystem fs = getFileSystem(); + Path createFilePath = path(getMethodName()); + Path createDirectoryPath = path(getMethodName() + "Dir"); + + fs.mkdirs(createDirectoryPath); + fs.createNonRecursive(createFilePath, FsPermission + .getDefault(), false, 1024, (short) 1, 1024, null); + + Map metricMap = fs.getInstrumentationMap(); + /* + Test of statistic values after creating a directory and a file ; + getFileStatus is called 1 time after creating file and 1 time at time of + initialising. + */ + assertAbfsStatistics(AbfsStatistic.CALL_CREATE, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_CREATE_NON_RECURSIVE, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.FILES_CREATED, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.DIRECTORIES_CREATED, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_MKDIRS, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_GET_FILE_STATUS, 2, metricMap); + + //re-initialising Abfs to reset statistic values. + fs.initialize(fs.getUri(), fs.getConf()); + + /* + Creating 10 directories and files; Directories and files can't be created + with same name, hence + i to give unique names. + */ + for (int i = 0; i < NUMBER_OF_OPS; i++) { + fs.mkdirs(path(getMethodName() + "Dir" + i)); + fs.createNonRecursive(path(getMethodName() + i), + FsPermission.getDefault(), false, 1024, (short) 1, + 1024, null); + } + + metricMap = fs.getInstrumentationMap(); + /* + Test of statistics values after creating 10 directories and files; + getFileStatus is called 1 time at initialise() plus number of times file + is created. + */ + assertAbfsStatistics(AbfsStatistic.CALL_CREATE, NUMBER_OF_OPS, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_CREATE_NON_RECURSIVE, NUMBER_OF_OPS, + metricMap); + assertAbfsStatistics(AbfsStatistic.FILES_CREATED, NUMBER_OF_OPS, metricMap); + assertAbfsStatistics(AbfsStatistic.DIRECTORIES_CREATED, NUMBER_OF_OPS, + metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_MKDIRS, NUMBER_OF_OPS, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_GET_FILE_STATUS, + 1 + NUMBER_OF_OPS, metricMap); + } + + /** + * Testing statistics by deleting files and directories. + */ + @Test + public void testDeleteStatistics() throws IOException { + describe("Testing counter values got by deleting directory and files " + + "in Abfs"); + + AzureBlobFileSystem fs = getFileSystem(); + /* + This directory path needs to be root for triggering the + directories_deleted counter. + */ + Path createDirectoryPath = path("/"); + Path createFilePath = path(getMethodName()); + + /* + creating a directory and a file inside that directory. + The directory is root. Hence, no parent. This allows us to invoke + deleteRoot() method to see the population of directories_deleted and + files_deleted counters. + */ + fs.mkdirs(createDirectoryPath); + fs.create(path(createDirectoryPath + getMethodName())); + fs.delete(createDirectoryPath, true); + + Map metricMap = fs.getInstrumentationMap(); + + /* + Test for op_delete, files_deleted, op_list_status. + since directory is delete recursively op_delete is called 2 times. + 1 file is deleted, 1 listStatus() call is made. + */ + assertAbfsStatistics(AbfsStatistic.CALL_DELETE, 2, metricMap); + assertAbfsStatistics(AbfsStatistic.FILES_DELETED, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_LIST_STATUS, 1, metricMap); + + /* + creating a root directory and deleting it recursively to see if + directories_deleted is called or not. + */ + fs.mkdirs(createDirectoryPath); + fs.create(createFilePath); + fs.delete(createDirectoryPath, true); + metricMap = fs.getInstrumentationMap(); + + //Test for directories_deleted. + assertAbfsStatistics(AbfsStatistic.DIRECTORIES_DELETED, 1, metricMap); + } + + /** + * Testing statistics of open, append, rename and exists method calls. + */ + @Test + public void testOpenAppendRenameExists() throws IOException { + describe("Testing counter values on calling open, append and rename and " + + "exists methods on Abfs"); + + AzureBlobFileSystem fs = getFileSystem(); + Path createFilePath = path(getMethodName()); + Path destCreateFilePath = path(getMethodName() + "New"); + + fs.create(createFilePath); + fs.open(createFilePath); + fs.append(createFilePath); + assertTrue(fs.rename(createFilePath, destCreateFilePath)); + + Map metricMap = fs.getInstrumentationMap(); + //Testing single method calls to open, append and rename. + assertAbfsStatistics(AbfsStatistic.CALL_OPEN, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_APPEND, 1, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_RENAME, 1, metricMap); + + //Testing if file exists at path. + assertTrue(String.format("File with name %s should exist", + destCreateFilePath), + fs.exists(destCreateFilePath)); + assertFalse(String.format("File with name %s should not exist", + createFilePath), + fs.exists(createFilePath)); + + metricMap = fs.getInstrumentationMap(); + //Testing exists() calls. + assertAbfsStatistics(AbfsStatistic.CALL_EXIST, 2, metricMap); + + //re-initialising Abfs to reset statistic values. + fs.initialize(fs.getUri(), fs.getConf()); + + fs.create(destCreateFilePath); + + for (int i = 0; i < NUMBER_OF_OPS; i++) { + fs.open(destCreateFilePath); + fs.append(destCreateFilePath); + } + + metricMap = fs.getInstrumentationMap(); + + //Testing large number of method calls to open, append. + assertAbfsStatistics(AbfsStatistic.CALL_OPEN, NUMBER_OF_OPS, metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_APPEND, NUMBER_OF_OPS, metricMap); + + for (int i = 0; i < NUMBER_OF_OPS; i++) { + // rename and then back to earlier name for no error while looping. + assertTrue(fs.rename(destCreateFilePath, createFilePath)); + assertTrue(fs.rename(createFilePath, destCreateFilePath)); + + //check if first name is existing and 2nd is not existing. + assertTrue(String.format("File with name %s should exist", + destCreateFilePath), + fs.exists(destCreateFilePath)); + assertFalse(String.format("File with name %s should not exist", + createFilePath), + fs.exists(createFilePath)); + + } + + metricMap = fs.getInstrumentationMap(); + + /* + Testing exists() calls and rename calls. Since both were called 2 + times in 1 loop. 2*numberOfOps is expectedValue. + */ + assertAbfsStatistics(AbfsStatistic.CALL_RENAME, 2 * NUMBER_OF_OPS, + metricMap); + assertAbfsStatistics(AbfsStatistic.CALL_EXIST, 2 * NUMBER_OF_OPS, + metricMap); + + } + + /** + * Method to check initial value of the statistics which should be 0. + * + * @param statName name of the statistic to be checked. + * @param statValue value of the statistic. + */ + private void checkInitialValue(String statName, long statValue) { + assertEquals("Mismatch in " + statName, 0, statValue); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java new file mode 100644 index 0000000000000..20d96fadef6e7 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; + +/** + * Unit tests for Abfs common counters. + */ +public class TestAbfsStatistics extends AbstractAbfsIntegrationTest { + + private static final int LARGE_OPS = 100; + + public TestAbfsStatistics() throws Exception { + } + + /** + * Tests for op_get_delegation_token and error_ignore counter values. + */ + @Test + public void testInitializeStats() throws IOException { + describe("Testing the counter values after Abfs is initialised"); + + AbfsCounters instrumentation = + new AbfsInstrumentation(getFileSystem().getUri()); + + //Testing summation of the counter values. + for (int i = 0; i < LARGE_OPS; i++) { + instrumentation.incrementCounter(AbfsStatistic.CALL_GET_DELEGATION_TOKEN, 1); + instrumentation.incrementCounter(AbfsStatistic.ERROR_IGNORED, 1); + } + + Map metricMap = instrumentation.toMap(); + + assertAbfsStatistics(AbfsStatistic.CALL_GET_DELEGATION_TOKEN, LARGE_OPS, + metricMap); + assertAbfsStatistics(AbfsStatistic.ERROR_IGNORED, LARGE_OPS, metricMap); + + } +} From 0ca5415cd58722ade0720e8133765a18bcb0f10c Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Fri, 19 Jun 2020 18:33:49 +0530 Subject: [PATCH 4/7] HADOOP-17065. Add Network Counters to ABFS (#2056) Contributed by Mehakmeet Singh. (cherry picked from commit 3472c3efc0014237d0cc4d9a989393b8513d2ab6) --- ...rumentation.java => AbfsCountersImpl.java} | 13 +- .../hadoop/fs/azurebfs/AbfsStatistic.java | 20 +- .../fs/azurebfs/AzureBlobFileSystem.java | 15 +- .../fs/azurebfs/AzureBlobFileSystemStore.java | 13 +- .../fs/azurebfs/services/AbfsClient.java | 13 +- .../AbfsClientThrottlingAnalyzer.java | 7 +- .../AbfsClientThrottlingIntercept.java | 14 +- .../azurebfs/services/AbfsRestOperation.java | 24 +- .../azurebfs/AbstractAbfsIntegrationTest.java | 3 +- .../azurebfs/ITestAbfsNetworkStatistics.java | 253 ++++++++++++++++++ .../fs/azurebfs/ITestAbfsStatistics.java | 2 +- .../azurebfs/TestAbfsNetworkStatistics.java | 67 +++++ .../fs/azurebfs/TestAbfsStatistics.java | 2 +- .../fs/azurebfs/services/TestAbfsClient.java | 2 +- 14 files changed, 420 insertions(+), 28 deletions(-) rename hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/{AbfsInstrumentation.java => AbfsCountersImpl.java} (96%) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java similarity index 96% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java index 9094c4065de0c..57cc3eada4847 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java @@ -41,7 +41,7 @@ /** * Instrumentation of Abfs counters. */ -public class AbfsInstrumentation implements AbfsCounters { +public class AbfsCountersImpl implements AbfsCounters { /** * Single context for all the Abfs counters to separate them from other @@ -78,10 +78,17 @@ public class AbfsInstrumentation implements AbfsCounters { DIRECTORIES_DELETED, FILES_CREATED, FILES_DELETED, - ERROR_IGNORED + ERROR_IGNORED, + CONNECTIONS_MADE, + SEND_REQUESTS, + GET_RESPONSES, + BYTES_SENT, + BYTES_RECEIVED, + READ_THROTTLES, + WRITE_THROTTLES }; - public AbfsInstrumentation(URI uri) { + public AbfsCountersImpl(URI uri) { UUID fileSystemInstanceId = UUID.randomUUID(); registry.tag(REGISTRY_ID, "A unique identifier for the instance", diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java index a9867aa12b85e..2935cd754315d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java @@ -22,7 +22,7 @@ /** * Statistic which are collected in Abfs. - * Available as metrics in {@link AbfsInstrumentation}. + * Available as metrics in {@link AbfsCountersImpl}. */ public enum AbfsStatistic { @@ -57,7 +57,23 @@ public enum AbfsStatistic { FILES_DELETED("files_deleted", "Total number of files deleted from the object store."), ERROR_IGNORED("error_ignored", - "Errors caught and ignored."); + "Errors caught and ignored."), + + //Network statistics. + CONNECTIONS_MADE("connections_made", + "Total number of times a connection was made with the data store."), + SEND_REQUESTS("send_requests", + "Total number of times http requests were sent to the data store."), + GET_RESPONSES("get_responses", + "Total number of times a response was received."), + BYTES_SENT("bytes_sent", + "Total bytes uploaded."), + BYTES_RECEIVED("bytes_received", + "Total bytes received."), + READ_THROTTLES("read_throttles", + "Total number of times a read operation is throttled."), + WRITE_THROTTLES("write_throttles", + "Total number of times a write operation is throttled."); private String statName; private String statDescription; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 10d45f8a4bd62..b40f215133f8b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -97,7 +97,7 @@ public class AzureBlobFileSystem extends FileSystem { private boolean delegationTokenEnabled = false; private AbfsDelegationTokenManager delegationTokenManager; private AbfsAuthorizer authorizer; - private AbfsCounters instrumentation; + private AbfsCounters abfsCounters; @Override public void initialize(URI uri, Configuration configuration) @@ -109,9 +109,10 @@ public void initialize(URI uri, Configuration configuration) LOG.debug("Initializing AzureBlobFileSystem for {}", uri); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration); + abfsCounters = new AbfsCountersImpl(uri); + this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), + configuration, abfsCounters); final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration(); - instrumentation = new AbfsInstrumentation(uri); this.setWorkingDirectory(this.getHomeDirectory()); if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { @@ -147,8 +148,8 @@ public String toString() { sb.append("uri=").append(uri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); - if (instrumentation != null) { - sb.append(", Statistics: {").append(instrumentation.formString("{", "=", + if (abfsCounters != null) { + sb.append(", Statistics: {").append(abfsCounters.formString("{", "=", "}", true)); sb.append("}"); } @@ -395,7 +396,7 @@ private void statIncrement(AbfsStatistic statistic) { * @param statistic the Statistic to be incremented. */ private void incrementStatistic(AbfsStatistic statistic) { - instrumentation.incrementCounter(statistic, 1); + abfsCounters.incrementCounter(statistic, 1); } /** @@ -1256,6 +1257,6 @@ private void performAbfsAuthCheck(FsAction action, Path... paths) @VisibleForTesting Map getInstrumentationMap() { - return instrumentation.toMap(); + return abfsCounters.toMap(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index ced4871fad5ce..25aad7bc096ba 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -73,6 +73,7 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer; import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; @@ -132,8 +133,9 @@ public class AzureBlobFileSystemStore { private final UserGroupInformation userGroupInformation; private final IdentityTransformer identityTransformer; - public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration) - throws IOException { + public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, + Configuration configuration, + AbfsCounters abfsCounters) throws IOException { this.uri = uri; String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; @@ -163,7 +165,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c this.authType = abfsConfiguration.getAuthType(accountName); boolean usingOauth = (authType == AuthType.OAuth); boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme; - initializeClient(uri, fileSystemName, accountName, useHttps); + initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters); this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration()); } @@ -927,7 +929,8 @@ public boolean isAtomicRenameKey(String key) { return isKeyForDirectorySet(key, azureAtomicRenameDirSet); } - private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) throws AzureBlobFileSystemException { + private void initializeClient(URI uri, String fileSystemName, + String accountName, boolean isSecure, AbfsCounters abfsCounters) throws AzureBlobFileSystemException { if (this.client != null) { return; } @@ -960,7 +963,7 @@ private void initializeClient(URI uri, String fileSystemName, String accountName this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()), - tokenProvider); + tokenProvider, abfsCounters); } private String getOctalNotation(FsPermission fsPermission) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index b961b5ceebc92..7149455595eee 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -59,12 +59,14 @@ public class AbfsClient { private final String userAgent; private final AccessTokenProvider tokenProvider; + private final AbfsCounters abfsCounters; public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final ExponentialRetryPolicy exponentialRetryPolicy, - final AccessTokenProvider tokenProvider) { + final AccessTokenProvider tokenProvider, + final AbfsCounters abfsCounters) { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); @@ -85,6 +87,7 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); this.tokenProvider = tokenProvider; + this.abfsCounters = abfsCounters; } public String getFileSystem() { @@ -608,4 +611,12 @@ String initializeUserAgent(final AbfsConfiguration abfsConfiguration, URL getBaseUrl() { return baseUrl; } + + /** + * Getter for abfsCounters from AbfsClient. + * @return AbfsCounters instance. + */ + protected AbfsCounters getAbfsCounters() { + return abfsCounters; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java index f1e5aaae6835c..e1a799b7a2648 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java @@ -114,16 +114,19 @@ public void addBytesTransferred(long count, boolean isFailedOperation) { /** * Suspends the current storage operation, as necessary, to reduce throughput. + * @return true if Thread sleeps(Throttling occurs) else false. */ - public void suspendIfNecessary() { + public boolean suspendIfNecessary() { int duration = sleepDuration; if (duration > 0) { try { Thread.sleep(duration); + return true; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } + return false; } @VisibleForTesting @@ -269,4 +272,4 @@ static class AbfsOperationMetrics { this.operationsSuccessful = new AtomicLong(); } } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 1c6ce17a38c3c..7303e833418db 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; /** @@ -103,17 +104,24 @@ static void updateMetrics(AbfsRestOperationType operationType, * uses this to suspend the request, if necessary, to minimize errors and * maximize throughput. */ - static void sendingRequest(AbfsRestOperationType operationType) { + static void sendingRequest(AbfsRestOperationType operationType, + AbfsCounters abfsCounters) { if (!isAutoThrottlingEnabled) { return; } switch (operationType) { case ReadFile: - singleton.readThrottler.suspendIfNecessary(); + if (singleton.readThrottler.suspendIfNecessary() + && abfsCounters != null) { + abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1); + } break; case Append: - singleton.writeThrottler.suspendIfNecessary(); + if (singleton.writeThrottler.suspendIfNecessary() + && abfsCounters != null) { + abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1); + } break; default: break; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 54fe14ab2b4a5..5ad609c88fac8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -62,6 +63,7 @@ public class AbfsRestOperation { private int bufferLength; private AbfsHttpOperation result; + private AbfsCounters abfsCounters; public AbfsHttpOperation getResult() { return result; @@ -87,6 +89,7 @@ public AbfsHttpOperation getResult() { this.requestHeaders = requestHeaders; this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method) || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); + this.abfsCounters = client.getAbfsCounters(); } /** @@ -114,6 +117,7 @@ public AbfsHttpOperation getResult() { this.buffer = buffer; this.bufferOffset = bufferOffset; this.bufferLength = bufferLength; + this.abfsCounters = client.getAbfsCounters(); } /** @@ -149,6 +153,7 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS try { // initialize the HTTP request and open the connection httpOperation = new AbfsHttpOperation(url, method, requestHeaders); + incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); // sign the HTTP request if (client.getAccessToken() == null) { @@ -161,14 +166,19 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS client.getAccessToken()); } - AbfsClientThrottlingIntercept.sendingRequest(operationType); + AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters); if (hasRequestBody) { // HttpUrlConnection requires httpOperation.sendRequest(buffer, bufferOffset, bufferLength); + incrementCounter(AbfsStatistic.SEND_REQUESTS, 1); + incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength); } httpOperation.processResponse(buffer, bufferOffset, bufferLength); + incrementCounter(AbfsStatistic.GET_RESPONSES, 1); + incrementCounter(AbfsStatistic.BYTES_RECEIVED, + httpOperation.getBytesReceived()); } catch (IOException ex) { if (ex instanceof UnknownHostException) { LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost())); @@ -208,4 +218,16 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS return true; } + + /** + * Incrementing Abfs counters with a long value. + * + * @param statistic the Abfs statistic that needs to be incremented. + * @param value the value to be incremented by. + */ + private void incrementCounter(AbfsStatistic statistic, long value) { + if (abfsCounters != null) { + abfsCounters.incrementCounter(statistic, value); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index 104f3f5c348d0..d23b41e00e5e3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -376,9 +376,10 @@ protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled( * @param metricMap map of (String, Long) with statistics name as key and * statistics value as map value. */ - protected void assertAbfsStatistics(AbfsStatistic statistic, + protected long assertAbfsStatistics(AbfsStatistic statistic, long expectedValue, Map metricMap) { assertEquals("Mismatch in " + statistic.getStatName(), expectedValue, (long) metricMap.get(statistic.getStatName())); + return expectedValue; } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java new file mode 100644 index 0000000000000..904fdf3f7c16e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; + +public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class); + private static final int LARGE_OPERATIONS = 10; + + public ITestAbfsNetworkStatistics() throws Exception { + } + + /** + * Testing connections_made, send_request and bytes_send statistics in + * {@link AbfsRestOperation}. + */ + @Test + public void testAbfsHttpSendStatistics() throws IOException { + describe("Test to check correct values of statistics after Abfs http send " + + "request is done."); + + AzureBlobFileSystem fs = getFileSystem(); + Map metricMap; + Path sendRequestPath = path(getMethodName()); + String testNetworkStatsString = "http_send"; + long connectionsMade, requestsSent, bytesSent; + + /* + * Creating AbfsOutputStream will result in 1 connection made and 1 send + * request. + */ + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + sendRequestPath)) { + out.write(testNetworkStatsString.getBytes()); + + /* + * Flushes all outstanding data (i.e. the current unfinished packet) + * from the client into the service on all DataNode replicas. + */ + out.hflush(); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the network stats with 1 write operation. + * + * connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush). + * + * send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush). + * + * bytes_sent : bytes wrote in AbfsOutputStream. + */ + connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + 6, metricMap); + requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4, + metricMap); + bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT, + testNetworkStatsString.getBytes().length, metricMap); + + } + + // To close the AbfsOutputStream 1 connection is made and 1 request is sent. + connectionsMade++; + requestsSent++; + + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + sendRequestPath)) { + + for (int i = 0; i < LARGE_OPERATIONS; i++) { + out.write(testNetworkStatsString.getBytes()); + + /* + * 1 flush call would create 2 connections and 2 send requests. + * when hflush() is called it will essentially trigger append() and + * flush() inside AbfsRestOperation. Both of which calls + * executeHttpOperation() method which creates a connection and sends + * requests. + */ + out.hflush(); + } + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the network stats with Large amount of bytes sent. + * + * connections made : connections_made(Last assertion) + 1 + * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush). + * + * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) + + * LARGE_OPERATIONS * 2(flush). + * + * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes + * wrote each time). + * + */ + assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap); + assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, + requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_SENT, + bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), + metricMap); + + } + + } + + /** + * Testing get_response and bytes_received in {@link AbfsRestOperation}. + */ + @Test + public void testAbfsHttpResponseStatistics() throws IOException { + describe("Test to check correct values of statistics after Http " + + "Response is processed."); + + AzureBlobFileSystem fs = getFileSystem(); + Path getResponsePath = path(getMethodName()); + Map metricMap; + String testResponseString = "some response"; + long getResponses, bytesReceived; + + FSDataOutputStream out = null; + FSDataInputStream in = null; + try { + + /* + * Creating a File and writing some bytes in it. + * + * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2 + * (Writing data in Data store). + * + */ + out = fs.create(getResponsePath); + out.write(testResponseString.getBytes()); + out.hflush(); + + // open would require 1 get response. + in = fs.open(getResponsePath); + // read would require 1 get response and also get the bytes received. + int result = in.read(); + + // Confirming read isn't -1. + LOG.info("Result of read operation : {}", result); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing values of statistics after writing and reading a buffer. + * + * get_responses - 6(above operations) + 1(open()) + 1 (read()). + * + * bytes_received - This should be equal to bytes sent earlier. + */ + getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8, + metricMap); + // Testing that bytes received is equal to bytes sent. + long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName()); + bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, + bytesSend, + metricMap); + + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + + // To close the streams 1 response is received. + getResponses++; + + try { + + /* + * Creating a file and writing buffer into it. Also recording the + * buffer for future read() call. + * This creating outputStream and writing requires 2 * + * (LARGE_OPERATIONS) get requests. + */ + StringBuilder largeBuffer = new StringBuilder(); + out = fs.create(getResponsePath); + for (int i = 0; i < LARGE_OPERATIONS; i++) { + out.write(testResponseString.getBytes()); + out.hflush(); + largeBuffer.append(testResponseString); + } + + // Open requires 1 get_response. + in = fs.open(getResponsePath); + + /* + * Reading the file which was written above. This read() call would + * read bytes equal to the bytes that was written above. + * Get response would be 1 only. + */ + in.read(0, largeBuffer.toString().getBytes(), 0, + largeBuffer.toString().getBytes().length); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the statistics values after writing and reading a large buffer. + * + * get_response : get_responses(Last assertion) + 1 + * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing + * LARGE_OPERATIONS times) + 1(open()) + 1(read()). + * + * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS * + * bytes wrote each time (bytes_received is equal to bytes wrote in the + * File). + * + */ + assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, + bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length), + metricMap); + assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, + getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap); + + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java index c88dc847a3f9a..42205807c1b3e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java @@ -45,7 +45,7 @@ public void testInitialStatsValues() throws IOException { describe("Testing the initial values of Abfs counters"); AbfsCounters abfsCounters = - new AbfsInstrumentation(getFileSystem().getUri()); + new AbfsCountersImpl(getFileSystem().getUri()); Map metricMap = abfsCounters.toMap(); for (Map.Entry entry : metricMap.entrySet()) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java new file mode 100644 index 0000000000000..0639cf2f82b9a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; + +public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { + + private static final int LARGE_OPERATIONS = 1000; + + public TestAbfsNetworkStatistics() throws Exception { + } + + /** + * Test to check correct values of read and write throttling statistics in + * {@code AbfsClientThrottlingAnalyzer}. + */ + @Test + public void testAbfsThrottlingStatistics() throws IOException { + describe("Test to check correct values of read throttle and write " + + "throttle statistics in Abfs"); + + AbfsCounters statistics = + new AbfsCountersImpl(getFileSystem().getUri()); + + /* + * Calling the throttle methods to check correct summation and values of + * the counters. + */ + for (int i = 0; i < LARGE_OPERATIONS; i++) { + statistics.incrementCounter(AbfsStatistic.READ_THROTTLES, 1); + statistics.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1); + } + + Map metricMap = statistics.toMap(); + + /* + * Test to check read and write throttle statistics gave correct values for + * 1000 calls. + */ + assertAbfsStatistics(AbfsStatistic.READ_THROTTLES, LARGE_OPERATIONS, + metricMap); + assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS, + metricMap); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java index 20d96fadef6e7..f831d2d4cd26b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java @@ -43,7 +43,7 @@ public void testInitializeStats() throws IOException { describe("Testing the counter values after Abfs is initialised"); AbfsCounters instrumentation = - new AbfsInstrumentation(getFileSystem().getUri()); + new AbfsCountersImpl(getFileSystem().getUri()); //Testing summation of the counter values. for (int i = 0; i < LARGE_OPS; i++) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 420093307033c..ea2d1f2883ab7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -43,7 +43,7 @@ private void validateUserAgent(String expectedPattern, AbfsConfiguration config, boolean includeSSLProvider) { AbfsClient client = new AbfsClient(baseUrl, null, - config, null, null); + config, null, null, null); String sslProviderName = null; if (includeSSLProvider) { sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName(); From 75a398dbb9f4c7819e863e4b84e40f19aaf989cb Mon Sep 17 00:00:00 2001 From: Mehakmeet Singh Date: Tue, 8 Sep 2020 14:44:23 +0530 Subject: [PATCH 5/7] HADOOP-17229. No updation of bytes received counter value after response failure occurs in ABFS (#2264) Contributed by Mehakmeet Singh (cherry picked from commit 0d855159f0956af3070a1a173c8a6cb2c71a1ea3) --- .../azurebfs/services/AbfsRestOperation.java | 8 ++++-- .../azurebfs/ITestAbfsNetworkStatistics.java | 28 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 5ad609c88fac8..9d299d75dc45e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -177,8 +177,12 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS httpOperation.processResponse(buffer, bufferOffset, bufferLength); incrementCounter(AbfsStatistic.GET_RESPONSES, 1); - incrementCounter(AbfsStatistic.BYTES_RECEIVED, - httpOperation.getBytesReceived()); + //Only increment bytesReceived counter when the status code is 2XX. + if (httpOperation.getStatusCode() >= HttpURLConnection.HTTP_OK + && httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) { + incrementCounter(AbfsStatistic.BYTES_RECEIVED, + httpOperation.getBytesReceived()); + } } catch (IOException ex) { if (ex instanceof UnknownHostException) { LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost())); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java index 904fdf3f7c16e..f5c0638d3ed7d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; @@ -250,4 +251,31 @@ public void testAbfsHttpResponseStatistics() throws IOException { } } + /** + * Testing bytes_received counter value when a response failure occurs. + */ + @Test + public void testAbfsHttpResponseFailure() throws IOException { + describe("Test to check the values of bytes received counter when a " + + "response is failed"); + + AzureBlobFileSystem fs = getFileSystem(); + Path responseFailurePath = path(getMethodName()); + Map metricMap; + FSDataOutputStream out = null; + + try { + //create an empty file + out = fs.create(responseFailurePath); + //Re-creating the file again on same path with false overwrite, this + // would cause a response failure with status code 409. + out = fs.create(responseFailurePath, false); + } catch (FileAlreadyExistsException faee) { + metricMap = fs.getInstrumentationMap(); + // Assert after catching the 409 error to check the counter values. + assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, 0, metricMap); + } finally { + IOUtils.cleanupWithLogger(LOG, out); + } + } } From 784336829fb9f756ef83df1ac95fac59d1dfb2fb Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Wed, 26 Aug 2020 00:31:35 +0530 Subject: [PATCH 6/7] HADOOP-17215: Support for conditional overwrite. Contributed by Sneha Vijayarajan DETAILS: This change adds config key "fs.azure.enable.conditional.create.overwrite" with a default of true. When enabled, if create(path, overwrite: true) is invoked and the file exists, the ABFS driver will first obtain its etag and then attempt to overwrite the file on the condition that the etag matches. The purpose of this is to mitigate the non-idempotency of this method. Specifically, in the event of a network error or similar, the client will retry and this can result in the file being created more than once which may result in data loss. In essense this is like a poor man's file handle, and will be addressed more thoroughly in the future when support for lease is added to ABFS. TEST RESULTS: namespace.enabled=true auth.type=SharedKey ------------------- $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify Tests run: 87, Failures: 0, Errors: 0, Skipped: 0 Tests run: 457, Failures: 0, Errors: 0, Skipped: 42 Tests run: 207, Failures: 0, Errors: 0, Skipped: 24 namespace.enabled=true auth.type=OAuth ------------------- $mvn -T 1C -Dparallel-tests=abfs -Dscale -DtestsThreadCount=8 clean verify Tests run: 87, Failures: 0, Errors: 0, Skipped: 0 Tests run: 457, Failures: 0, Errors: 0, Skipped: 74 Tests run: 207, Failures: 0, Errors: 0, Skipped: 140 (cherry picked from commit e31a636e922a8fdbe0aa7cca53f6de7175e97254) --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 8 + .../fs/azurebfs/AzureBlobFileSystemStore.java | 92 +++++- .../azurebfs/constants/ConfigurationKeys.java | 4 + .../constants/FileSystemConfigurations.java | 1 + ...urrentWriteOperationDetectedException.java | 32 ++ .../fs/azurebfs/services/AbfsClient.java | 9 +- .../azurebfs/ITestAbfsNetworkStatistics.java | 34 ++- .../ITestAzureBlobFileSystemCreate.java | 281 ++++++++++++++++++ .../ITestAzureBlobFileSystemMkDir.java | 60 ++++ .../fs/azurebfs/services/TestAbfsClient.java | 55 ++++ 10 files changed, 564 insertions(+), 12 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index ccc546352908b..aaaa9362be684 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -134,6 +134,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) private String azureAtomicDirs; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE, + DefaultValue = DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE) + private boolean enableConditionalCreateOverwrite; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, DefaultValue = DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) private boolean createRemoteFileSystemDuringInitialization; @@ -427,6 +431,10 @@ public String getAzureAtomicRenameDirs() { return this.azureAtomicDirs; } + public boolean isConditionalCreateOverwriteEnabled() { + return this.enableConditionalCreateOverwrite; + } + public boolean getCreateRemoteFileSystemDuringInitialization() { return this.createRemoteFileSystemDuringInitialization; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 25aad7bc096ba..fb2ae9de2172d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -61,6 +61,7 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidFileSystemPropertyException; @@ -359,9 +360,28 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F umask.toString(), isNamespaceEnabled); - client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, overwrite, - isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null); + // if "fs.azure.enable.conditional.create.overwrite" is enabled and + // is a create request with overwrite=true, create will follow different + // flow. + boolean triggerConditionalCreateOverwrite = false; + if (overwrite + && abfsConfiguration.isConditionalCreateOverwriteEnabled()) { + triggerConditionalCreateOverwrite = true; + } + + if (triggerConditionalCreateOverwrite) { + conditionalCreateOverwriteFile(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null + ); + + } else { + client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), true, + overwrite, + isNamespaceEnabled ? getOctalNotation(permission) : null, + isNamespaceEnabled ? getOctalNotation(umask) : null, + null); + } return new AbfsOutputStream( client, @@ -379,6 +399,70 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext() { .build(); } + /** + * Conditional create overwrite flow ensures that create overwrites is done + * only if there is match for eTag of existing file. + * @param relativePath + * @param permission + * @param umask + * @return + * @throws AzureBlobFileSystemException + */ + private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePath, + final String permission, + final String umask) throws AzureBlobFileSystemException { + AbfsRestOperation op; + + try { + // Trigger a create with overwrite=false first so that eTag fetch can be + // avoided for cases when no pre-existing file is present (major portion + // of create file traffic falls into the case of no pre-existing file). + op = client.createPath(relativePath, true, + false, permission, umask, null); + } catch (AbfsRestOperationException e) { + if (e.getStatusCode() == HttpURLConnection.HTTP_CONFLICT) { + // File pre-exists, fetch eTag + try { + op = client.getPathStatus(relativePath); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { + // Is a parallel access case, as file which was found to be + // present went missing by this request. + throw new ConcurrentWriteOperationDetectedException( + "Parallel access to the create path detected. Failing request " + + "to honor single writer semantics"); + } else { + throw ex; + } + } + + String eTag = op.getResult() + .getResponseHeader(HttpHeaderConfigurations.ETAG); + + try { + // overwrite only if eTag matches with the file properties fetched befpre + op = client.createPath(relativePath, true, + true, permission, umask, eTag); + } catch (AbfsRestOperationException ex) { + if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) { + // Is a parallel access case, as file with eTag was just queried + // and precondition failure can happen only when another file with + // different etag got created. + throw new ConcurrentWriteOperationDetectedException( + "Parallel access to the create path detected. Failing request " + + "to honor single writer semantics"); + } else { + throw ex; + } + } + } else { + throw e; + } + } + + return op; + } + public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask) throws AzureBlobFileSystemException { boolean isNamespaceEnabled = getIsNamespaceEnabled(); @@ -391,7 +475,7 @@ public void createDirectory(final Path path, final FsPermission permission, fina client.createPath(AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), false, true, isNamespaceEnabled ? getOctalNotation(permission) : null, - isNamespaceEnabled ? getOctalNotation(umask) : null); + isNamespaceEnabled ? getOctalNotation(umask) : null, null); } public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index b9e8fadfd3105..36c929ac6f547 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -50,6 +50,10 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; + /** This config ensures that during create overwrite an existing file will be + * overwritten only if there is a match on the eTag of existing file. + */ + public static final String FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = "fs.azure.enable.conditional.create.overwrite"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; /** Provides a config control to enable or disable ABFS Flush operations - * HFlush and HSync. Default is true. **/ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 0a0ad379bb560..526df03600d86 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -54,6 +54,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = false; public static final String DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES = "/hbase"; + public static final boolean DEFAULT_FS_AZURE_ENABLE_CONDITIONAL_CREATE_OVERWRITE = true; public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final boolean DEFAULT_ENABLE_FLUSH = true; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java new file mode 100644 index 0000000000000..79813ddfe6400 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/ConcurrentWriteOperationDetectedException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +/** + * Thrown when a concurrent write operation is detected. + */ +@org.apache.hadoop.classification.InterfaceAudience.Public +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class ConcurrentWriteOperationDetectedException + extends AzureBlobFileSystemException { + + public ConcurrentWriteOperationDetectedException(String message) { + super(message); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 7149455595eee..33abffa9c89be 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -220,7 +220,7 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException } public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite, - final String permission, final String umask) throws AzureBlobFileSystemException { + final String permission, final String umask, final String eTag) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); if (!overwrite) { requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR)); @@ -234,6 +234,10 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask)); } + if (eTag != null && !eTag.isEmpty()) { + requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag)); + } + final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY); @@ -536,7 +540,8 @@ private URL createRequestUrl(final String query) throws AzureBlobFileSystemExcep return createRequestUrl(EMPTY_STRING, query); } - private URL createRequestUrl(final String path, final String query) + @VisibleForTesting + protected URL createRequestUrl(final String path, final String query) throws AzureBlobFileSystemException { final String base = baseUrl.toString(); String encodedPath = path; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java index f5c0638d3ed7d..b2a17cf64544e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -95,9 +95,18 @@ public void testAbfsHttpSendStatistics() throws IOException { connectionsMade++; requestsSent++; + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, sendRequestPath)) { + // Is a file overwrite case + long createRequestCalls = 1; + long createTriggeredGFSForETag = 0; + if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { + createRequestCalls += 1; + createTriggeredGFSForETag = 1; + } + for (int i = 0; i < LARGE_OPERATIONS; i++) { out.write(testNetworkStatsString.getBytes()); @@ -126,10 +135,13 @@ public void testAbfsHttpSendStatistics() throws IOException { * wrote each time). * */ + + connectionsMade += createRequestCalls + createTriggeredGFSForETag; + requestsSent += createRequestCalls; assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, - connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap); + connectionsMade + LARGE_OPERATIONS * 2, metricMap); assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, - requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap); + requestsSent + LARGE_OPERATIONS * 2, metricMap); assertAbfsStatistics(AbfsStatistic.BYTES_SENT, bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), metricMap); @@ -202,13 +214,21 @@ public void testAbfsHttpResponseStatistics() throws IOException { try { /* - * Creating a file and writing buffer into it. Also recording the - * buffer for future read() call. + * Creating a file and writing buffer into it. + * This is a file recreate, so it will trigger + * 2 extra calls if create overwrite is off by default. + * Also recording the buffer for future read() call. * This creating outputStream and writing requires 2 * * (LARGE_OPERATIONS) get requests. */ StringBuilder largeBuffer = new StringBuilder(); out = fs.create(getResponsePath); + + long createRequestCalls = 1; + if (this.getConfiguration().isConditionalCreateOverwriteEnabled()) { + createRequestCalls += 2; + } + for (int i = 0; i < LARGE_OPERATIONS; i++) { out.write(testResponseString.getBytes()); out.hflush(); @@ -233,7 +253,8 @@ public void testAbfsHttpResponseStatistics() throws IOException { * * get_response : get_responses(Last assertion) + 1 * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing - * LARGE_OPERATIONS times) + 1(open()) + 1(read()). + * LARGE_OPERATIONS times) + 1(open()) + 1(read()) + + * 1 (createOverwriteTriggeredGetForeTag). * * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS * * bytes wrote each time (bytes_received is equal to bytes wrote in the @@ -244,7 +265,8 @@ public void testAbfsHttpResponseStatistics() throws IOException { bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length), metricMap); assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, - getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap); + getResponses + 2 + createRequestCalls + 2 * LARGE_OPERATIONS, + metricMap); } finally { IOUtils.cleanupWithLogger(LOG, out, in); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java index e315ad2f6e312..bdd022a8f7bb2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java @@ -21,19 +21,47 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.FilterOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; import java.util.EnumSet; +import java.util.UUID; +import java.util.concurrent.Callable; import org.apache.hadoop.test.LambdaTestUtils; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConcurrentWriteOperationDetectedException; +import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.TestAbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; + +import static java.net.HttpURLConnection.HTTP_CONFLICT; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; /** * Test create operation. @@ -202,4 +230,257 @@ public void call() throws Exception { }); } + /** + * Tests if the number of connections made for: + * 1. create overwrite=false of a file that doesnt pre-exist + * 2. create overwrite=false of a file that pre-exists + * 3. create overwrite=true of a file that doesnt pre-exist + * 4. create overwrite=true of a file that pre-exists + * matches the expectation when run against both combinations of + * fs.azure.enable.conditional.create.overwrite=true and + * fs.azure.enable.conditional.create.overwrite=false + * @throws Throwable + */ + @Test + public void testDefaultCreateOverwriteFileTest() throws Throwable { + testCreateFileOverwrite(true); + testCreateFileOverwrite(false); + } + + public void testCreateFileOverwrite(boolean enableConditionalCreateOverwrite) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(enableConditionalCreateOverwrite)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + long totalConnectionMadeBeforeTest = fs.getInstrumentationMap() + .get(CONNECTIONS_MADE.getStatName()); + + int createRequestCount = 0; + final Path nonOverwriteFile = new Path("/NonOverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 1: Not Overwrite - File does not pre-exist + // create should be successful + fs.create(nonOverwriteFile, false); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + // Case 2: Not Overwrite - File pre-exists + intercept(FileAlreadyExistsException.class, new Callable() { + @Override + public FSDataOutputStream call() throws Exception { + return fs.create(nonOverwriteFile, false); + } + }); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + final Path overwriteFilePath = new Path("/OverwriteTest_FileName_" + + UUID.randomUUID().toString()); + + // Case 3: Overwrite - File does not pre-exist + // create should be successful + fs.create(overwriteFilePath, true); + + // One request to server to create path should be issued + createRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + + // Case 4: Overwrite - File pre-exists + fs.create(overwriteFilePath, true); + + if (enableConditionalCreateOverwrite) { + // Three requests will be sent to server to create path, + // 1. create without overwrite + // 2. GetFileStatus to get eTag + // 3. create with overwrite + createRequestCount += 3; + } else { + createRequestCount++; + } + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + createRequestCount, + fs.getInstrumentationMap()); + } + + /** + * Test negative scenarios with Create overwrite=false as default + * With create overwrite=true ending in 3 calls: + * A. Create overwrite=false + * B. GFS + * C. Create overwrite=true + * + * Scn1: A fails with HTTP409, leading to B which fails with HTTP404, + * detect parallel access + * Scn2: A fails with HTTP409, leading to B which fails with HTTP500, + * fail create with HTTP500 + * Scn3: A fails with HTTP409, leading to B and then C, + * which fails with HTTP412, detect parallel access + * Scn4: A fails with HTTP409, leading to B and then C, + * which fails with HTTP500, fail create with HTTP500 + * Scn5: A fails with HTTP500, fail create with HTTP500 + */ + @Test + public void testNegativeScenariosForCreateOverwriteDisabled() + throws Throwable { + + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(true)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + // Get mock AbfsClient with current config + AbfsClient + mockClient + = TestAbfsClient.getMockAbfsClient( + fs.getAbfsStore().getClient(), + fs.getAbfsStore().getAbfsConfiguration()); + + AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + abfsStore = setAzureBlobSystemStoreField(abfsStore, "client", mockClient); + + AbfsRestOperation successOp = mock( + AbfsRestOperation.class); + AbfsHttpOperation http200Op = mock( + AbfsHttpOperation.class); + when(http200Op.getStatusCode()).thenReturn(HTTP_OK); + when(successOp.getResult()).thenReturn(http200Op); + + AbfsRestOperationException conflictResponseEx + = getMockAbfsRestOperationException(HTTP_CONFLICT); + AbfsRestOperationException serverErrorResponseEx + = getMockAbfsRestOperationException(HTTP_INTERNAL_ERROR); + AbfsRestOperationException fileNotFoundResponseEx + = getMockAbfsRestOperationException(HTTP_NOT_FOUND); + AbfsRestOperationException preConditionResponseEx + = getMockAbfsRestOperationException(HTTP_PRECON_FAILED); + + doThrow(conflictResponseEx) // Scn1: GFS fails with Http404 + .doThrow(conflictResponseEx) // Scn2: GFS fails with Http500 + .doThrow( + conflictResponseEx) // Scn3: create overwrite=true fails with Http412 + .doThrow( + conflictResponseEx) // Scn4: create overwrite=true fails with Http500 + .doThrow( + serverErrorResponseEx) // Scn5: create overwrite=false fails with Http500 + .when(mockClient) + .createPath(any(String.class), eq(true), eq(false), any(String.class), + any(String.class), (String) eq(null)); + + doThrow(fileNotFoundResponseEx) // Scn1: GFS fails with Http404 + .doThrow(serverErrorResponseEx) // Scn2: GFS fails with Http500 + .doReturn(successOp) // Scn3: create overwrite=true fails with Http412 + .doReturn(successOp) // Scn4: create overwrite=true fails with Http500 + .when(mockClient) + .getPathStatus(any(String.class)); + + doThrow( + preConditionResponseEx) // Scn3: create overwrite=true fails with Http412 + .doThrow( + serverErrorResponseEx) // Scn4: create overwrite=true fails with Http500 + .when(mockClient) + .createPath(any(String.class), eq(true), eq(true), any(String.class), + any(String.class), (String) eq(null)); + + // Scn1: GFS fails with Http404 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - fail with File Not found + // Create will fail with ConcurrentWriteOperationDetectedException + validateCreateFileException(ConcurrentWriteOperationDetectedException.class, + abfsStore); + + // Scn2: GFS fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - fail with Server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + + // Scn3: create overwrite=true fails with Http412 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - pass + // 3. create overwrite=true - fail with Pre-Condition + // Create will fail with ConcurrentWriteOperationDetectedException + validateCreateFileException(ConcurrentWriteOperationDetectedException.class, + abfsStore); + + // Scn4: create overwrite=true fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with conflict + // 2. GFS - pass + // 3. create overwrite=true - fail with Server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + + // Scn5: create overwrite=false fails with Http500 + // Sequence of events expected: + // 1. create overwrite=false - fail with server error + // Create will fail with 500 + validateCreateFileException(AbfsRestOperationException.class, abfsStore); + } + + private AzureBlobFileSystemStore setAzureBlobSystemStoreField( + final AzureBlobFileSystemStore abfsStore, + final String fieldName, + Object fieldObject) throws Exception { + + Field abfsClientField = AzureBlobFileSystemStore.class.getDeclaredField( + fieldName); + abfsClientField.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(abfsClientField, + abfsClientField.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + abfsClientField.set(abfsStore, fieldObject); + return abfsStore; + } + + private void validateCreateFileException(final Class exceptionClass, final AzureBlobFileSystemStore abfsStore) + throws Exception { + final FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, + FsAction.ALL); + final FsPermission umask = new FsPermission(FsAction.NONE, FsAction.NONE, + FsAction.NONE); + final Path testPath = new Path("testFile"); + intercept(exceptionClass, new Callable() { + @Override + public OutputStream call() throws Exception { + return abfsStore.createFile(testPath, true, permission, umask); + } + }); + } + + private AbfsRestOperationException getMockAbfsRestOperationException(int status) { + return new AbfsRestOperationException(status, "", "", new Exception()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java index 382d3966485f1..de476a6abce9b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemMkDir.java @@ -18,12 +18,18 @@ package org.apache.hadoop.fs.azurebfs; +import java.util.UUID; + import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; +import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE; + /** * Test mkdir operation. */ @@ -45,4 +51,58 @@ public void testCreateDirWithExistingDir() throws Exception { public void testCreateRoot() throws Exception { assertMkdirs(getFileSystem(), new Path("/")); } + + /** + * Test mkdir for possible values of fs.azure.disable.default.create.overwrite + * @throws Exception + */ + @Test + public void testDefaultCreateOverwriteDirTest() throws Throwable { + // the config fs.azure.disable.default.create.overwrite should have no + // effect on mkdirs + testCreateDirOverwrite(true); + testCreateDirOverwrite(false); + } + + public void testCreateDirOverwrite(boolean enableConditionalCreateOverwrite) + throws Throwable { + final AzureBlobFileSystem currentFs = getFileSystem(); + Configuration config = new Configuration(this.getRawConfiguration()); + config.set("fs.azure.enable.conditional.create.overwrite", + Boolean.toString(enableConditionalCreateOverwrite)); + + final AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(), + config); + + long totalConnectionMadeBeforeTest = fs.getInstrumentationMap() + .get(CONNECTIONS_MADE.getStatName()); + + int mkdirRequestCount = 0; + final Path dirPath = new Path("/DirPath_" + + UUID.randomUUID().toString()); + + // Case 1: Dir does not pre-exist + fs.mkdirs(dirPath); + + // One request to server + mkdirRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + mkdirRequestCount, + fs.getInstrumentationMap()); + + // Case 2: Dir pre-exists + // Mkdir on existing Dir path will not lead to failure + fs.mkdirs(dirPath); + + // One request to server + mkdirRequestCount++; + + assertAbfsStatistics( + CONNECTIONS_MADE, + totalConnectionMadeBeforeTest + mkdirRequestCount, + fs.getInstrumentationMap()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index ea2d1f2883ab7..b5a22badb2bff 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.lang.reflect.Field; import java.net.URL; import java.util.regex.Pattern; @@ -30,6 +31,11 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.util.VersionInfo; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + /** * Test useragent of abfs client. * @@ -91,4 +97,53 @@ public void verifyUserAgentWithSSLProvider() throws Exception { validateUserAgent(expectedUserAgentPattern, new URL("https://azure.com"), abfsConfiguration, true); } + + public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, + AbfsConfiguration abfsConfig) throws Exception { + + AbfsClient client = mock(AbfsClient.class); + when(client.getRetryPolicy()).thenReturn( + new ExponentialRetryPolicy(1)); + + when(client.createDefaultUriQueryBuilder()).thenCallRealMethod(); + when(client.createRequestUrl(anyString(), anyString())).thenCallRealMethod(); + when(client.getAccessToken()).thenCallRealMethod(); + when(client.getSharedKeyCredentials()).thenCallRealMethod(); + when(client.createDefaultHeaders()).thenCallRealMethod(); + + // override baseurl + client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration", + abfsConfig); + + // override baseurl + client = TestAbfsClient.setAbfsClientField(client, "baseUrl", + baseAbfsClientInstance.getBaseUrl()); + + // override auth provider + client = TestAbfsClient.setAbfsClientField(client, "tokenProvider", + abfsConfig.getTokenProvider()); + + // override user agent + String userAgent = "APN/1.0 Azure Blob FS/3.4.0-SNAPSHOT (PrivateBuild " + + "JavaJRE 1.8.0_252; Linux 5.3.0-59-generic/amd64; openssl-1.0; " + + "UNKNOWN/UNKNOWN) MSFT"; + client = TestAbfsClient.setAbfsClientField(client, "userAgent", userAgent); + + return client; + } + + private static AbfsClient setAbfsClientField( + final AbfsClient client, + final String fieldName, + Object fieldObject) throws Exception { + + Field field = AbfsClient.class.getDeclaredField(fieldName); + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, + field.getModifiers() & ~java.lang.reflect.Modifier.FINAL); + field.set(client, fieldObject); + return client; + } } From 22805692d6bac3a5505ff9e9fc1cc724e3241c82 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 23 Nov 2020 17:22:01 +0000 Subject: [PATCH 7/7] HADOOP-17325. WASB Test Failures Contributed by Ayush Saxena and Steve Loughran Change-Id: I4bb76815bc1d11d1804dc67bafde68b6a995b974 (cherry picked from commit 07b7d073884720d8423be1dfdcfa60ec8833d2ae) --- .../fs/azure/InMemoryBlockBlobStore.java | 7 ++++- .../hadoop/fs/azure/MockStorageInterface.java | 30 +++++++++++++------ .../hadoop/fs/azure/TestBlobMetadata.java | 4 ++- 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java index b8971c488c45a..7ddeabe242ef6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/InMemoryBlockBlobStore.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.Map; +import static java.util.Objects.requireNonNull; + /** * A simple memory key-value store to help mock the Windows Azure Storage * implementation for unit testing. @@ -163,7 +165,10 @@ public synchronized boolean exists(String key) { @SuppressWarnings("unchecked") public synchronized HashMap getMetadata(String key) { - return (HashMap) blobs.get(key).metadata.clone(); + Entry entry = requireNonNull(blobs.get(key), "entry for " + key); + return (HashMap) requireNonNull(entry.metadata, + "metadata for " + key) + .clone(); } public synchronized HashMap getContainerMetadata() { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index d5f6437d960ab..feca8b14b9b4a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -37,6 +37,7 @@ import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.net.URLCodec; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.Path; import org.apache.http.client.utils.URIBuilder; import com.microsoft.azure.storage.AccessCondition; @@ -80,7 +81,7 @@ public InMemoryBlockBlobStore getBackingStore() { * Mocks the situation where a container already exists before WASB comes in, * i.e. the situation where a user creates a container then mounts WASB on the * pre-existing container. - * + * * @param uri * The URI of the container. * @param metadata @@ -137,9 +138,20 @@ private static String convertUriToDecodedString(URI uri) { private static URI convertKeyToEncodedUri(String key) { try { - return new URIBuilder().setPath(key).build(); + Path p = new Path(key); + URI unEncodedURI = p.toUri(); + return new URIBuilder().setPath(unEncodedURI.getPath()) + .setScheme(unEncodedURI.getScheme()).build(); } catch (URISyntaxException e) { - throw new AssertionError("Failed to encode key: " + key); + int i = e.getIndex(); + String details; + if (i >= 0) { + details = " -- \"" + e.getInput().charAt(i) + "\""; + } else { + details = ""; + } + throw new AssertionError("Failed to encode key: " + key + + ": " + e + details); } } @@ -148,8 +160,8 @@ public CloudBlobContainerWrapper getContainerReference(String name) throws URISyntaxException, StorageException { String fullUri; URIBuilder builder = new URIBuilder(baseUriString); - fullUri = builder.setPath(builder.getPath() + "/" + name).toString(); - + String path = builder.getPath() == null ? "" : builder.getPath() + "/"; + fullUri = builder.setPath(path + name).toString(); MockCloudBlobContainerWrapper container = new MockCloudBlobContainerWrapper( fullUri, name); // Check if we have a pre-existing container with that name, and prime @@ -354,11 +366,11 @@ protected MockCloudBlobWrapper(URI uri, HashMap metadata, this.uri = uri; this.metadata = metadata; this.properties = new BlobProperties(); - + this.properties=updateLastModifed(this.properties); this.properties=updateLength(this.properties,length); } - + protected BlobProperties updateLastModifed(BlobProperties properties){ try{ Method setLastModified =properties.getClass(). @@ -371,7 +383,7 @@ protected BlobProperties updateLastModifed(BlobProperties properties){ } return properties; } - + protected BlobProperties updateLength(BlobProperties properties,int length) { try{ Method setLength =properties.getClass(). @@ -383,7 +395,7 @@ protected BlobProperties updateLength(BlobProperties properties,int length) { } return properties; } - + protected void refreshProperties(boolean getMetadata) { if (backingStore.exists(convertUriToDecodedString(uri))) { byte[] content = backingStore.getContent(convertUriToDecodedString(uri)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java index 30c102839cb1e..832e7ec05a0af 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java @@ -202,8 +202,10 @@ public void testPermissionMetadata() throws Exception { Path selfishFile = new Path("/noOneElse"); fs.create(selfishFile, justMe, true, 4096, fs.getDefaultReplication(), fs.getDefaultBlockSize(), null).close(); + String mockUri = AzureBlobStorageTestAccount.toMockUri(selfishFile); + assertNotNull("converted URI", mockUri); HashMap metadata = backingStore - .getMetadata(AzureBlobStorageTestAccount.toMockUri(selfishFile)); + .getMetadata(mockUri); assertNotNull(metadata); String storedPermission = metadata.get("hdi_permission"); assertEquals(getExpectedPermissionString("rw-------"), storedPermission);