8000 HADOOP-16965. Refactor abfs stream configuration. (#1956) by arjun4084346 · Pull Request #4171 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HADOOP-16965. Refactor abfs stream configuration. (#1956) #4171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: branch-2.10
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
Expand Down Expand Up @@ -362,9 +364,15 @@ public OutputStream createFile(final Path path, final boolean overwrite, final F
client,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
0,
abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled());
populateAbfsOutputStreamContext());
}

private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
return new AbfsOutputStreamContext()
.withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
.enableFlush(abfsConfiguration.isFlushEnabled())
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.build();
}

public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
Expand Down Expand Up @@ -402,11 +410,18 @@ public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statist
null);
}

// Add statistics for InputStream
return new AbfsInputStream(client, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
abfsConfiguration.getTolerateOobAppends(), eTag);
populateAbfsInputStreamContext(),
eTag);
}

private AbfsInputStreamContext populateAbfsInputStreamContext() {
return new AbfsInputStreamContext()
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.build();
}

public OutputStream openFileForWrite(final Path path, final boolean overwrite) throws
Expand Down Expand Up @@ -435,9 +450,7 @@ public OutputStream openFileForWrite(final Path path, final boolean overwrite) t
client,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset,
abfsConfiguration.getWriteBufferSize(),
abfsConfiguration.isFlushEnabled(),
abfsConfiguration.isOutputStreamFlushDisabled());
populateAbfsOutputStreamContext());
}

public void rename(final Path source, final Path destination) throws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,19 @@ public class AbfsInputStream extends FSInputStream {
private boolean closed = false;

public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
final String path,
final long contentLength,
final int bufferSize,
final int readAheadQueueDepth,
final boolean tolerateOobAppends,
final String eTag) {
final AbfsClient client,
final Statistics statistics,
final String path,
final long contentLength,
final AbfsInputStreamContext abfsInputStreamContext,
final String eTag) {
this.client = client;
this.statistics = statistics;
this.path = path;
this.contentLength = contentLength;
this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.tolerateOobAppends = tolerateOobAppends;
this.bufferSize = abfsInputStreamContext.getReadBufferSize();
this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
this.readAheadEnabled = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

/**
* Class to hold extra input stream configs.
*/
public class AbfsInputStreamContext extends AbfsStreamContext {

private int readBufferSize;

private int readAheadQueueDepth;

private boolean tolerateOobAppends;

public AbfsInputStreamContext() {
}

public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}

public AbfsInputStreamContext withReadAheadQueueDepth(
final int readAheadQueueDepth) {
this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
? readAheadQueueDepth
: Runtime.getRuntime().availableProcessors();
return this;
}

public AbfsInputStreamContext withTolerateOobAppends(
final boolean tolerateOobAppends) {
this.tolerateOobAppends = tolerateOobAppends;
return this;
}

public AbfsInputStreamContext build() {
// Validation of parameters to be done here.
return this;
}

public int getReadBufferSize() {
return readBufferSize;
}

public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}

public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,17 @@ public AbfsOutputStream(
final AbfsClient client,
final String path,
final long position,
final int bufferSize,
final boolean supportFlush,
final boolean disableOutputStreamFlush) {
AbfsOutputStreamContext abfsOutputStreamContext) {
this.client = client;
this.path = path;
this.position = position;
this.closed = false;
this.supportFlush = supportFlush;
this.disableOutputStreamFlush = disableOutputStreamFlush;
this.supportFlush = abfsOutputStreamContext.isEnableFlush();
this.disableOutputStreamFlush = abfsOutputStreamContext
.isDisableOutputStreamFlush();
this.lastError = null;
this.lastFlushOffset = 0;
this.bufferSize = bufferSize;
this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

/**
* Class to hold extra output stream configs.
*/
public class AbfsOutputStreamContext extends AbfsStreamContext {< 9E7A /td>

private int writeBufferSize;

private boolean enableFlush;

private boolean disableOutputStreamFlush;

public AbfsOutputStreamContext() {
}

public AbfsOutputStreamContext withWriteBufferSize(
final int writeBufferSize) {
this.writeBufferSize = writeBufferSize;
return this;
}

public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
this.enableFlush = enableFlush;
return this;
}

public AbfsOutputStreamContext disableOutputStreamFlush(
final boolean disableOutputStreamFlush) {
this.disableOutputStreamFlush = disableOutputStreamFlush;
return this;
}

public AbfsOutputStreamContext build() {
// Validation of parameters to be done here.
return this;
}

public int getWriteBufferSize() {
return writeBufferSize;
}

public boolean isEnableFlush() {
return enableFlush;
}

public boolean isDisableOutputStreamFlush() {
return disableOutputStreamFlush;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

/**
* Base stream configuration class which is going
* to store common configs among input and output streams.
*/
public abstract class AbfsStreamContext {
}
0