8000 HADOOP-16058 S3A to support terasort by steveloughran · Pull Request #577 · apache/hadoop · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

HADOOP-16058 S3A to support terasort #577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,25 +128,31 @@ public int run(String[] args) throws Exception {
usage();
}
}

FileSystem fs = FileSystem.get(getConf());
if (bigMapInput == null || outputPath == null) {
// report usage and exit
usage();
// t 10000 his stops IDES warning about unset local variables.
return -1;
}

JobConf jobConf = new JobConf(getConf(), BigMapOutput.class);

jobConf.setJobName("BigMapOutput");
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
jobConf.setOutputFormat(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(jobConf, bigMapInput);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
outputPath.getFileSystem(jobConf).delete(outputPath, true);
FileOutputFormat.setOutputPath(jobConf, outputPath);
jobConf.setMapperClass(IdentityMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(BytesWritable.class);
jobConf.setOutputValueClass(BytesWritable.class);

if (createInput) {
createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB);
createBigMapInputFile(jobConf,
bigMapInput.getFileSystem(jobConf),
bigMapInput,
fileSizeInMB);
}

Date startTime = new Date();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public int run(String[] args) throws Exception {
}

JobConf jobConf = setupJob(numMaps, numReduces, jarFile);
FileSystem fs = FileSystem.get(jobConf);
FileSystem fs = BASE_DIR.getFileSystem(jobConf);
Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt");
generateTextFile(fs, inputFile, inputLines, inputSortOrder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.slf4j.Logger;
Expand All @@ -45,7 +43,6 @@
public class TeraOutputFormat extends FileOutputFormat<Text,Text> {
private static final Logger LOG =
LoggerFactory.getLogger(TeraOutputFormat.class);
private OutputCommitter committer = null;

/**
* Set the requirement for a final sync before the stream is closed.
Expand Down Expand Up @@ -145,12 +142,4 @@ public RecordWriter<Text,Text> getRecordWriter(TaskAttemptContext job
return new TeraRecordWriter(fileOut, job);
}

public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ public int run(String[] args) throws Exception {
try {
TeraInputFormat.writePartitionFile(job, partitionFile);
} catch (Throwable e) {
LOG.error(e.getMessage());
LOG.error("{}", e.getMessage(), e);
return -1;
}
job.addCacheFile(partitionUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private void runTeraGen(Configuration conf, Path sortInput)
String[] genArgs = {NUM_ROWS, sortInput.toString()};

// Run TeraGen
assertEquals(ToolRunner.run(conf, new TeraGen(), genArgs), 0);
assertEquals(0, ToolRunner.run(conf, new TeraGen(), genArgs));
}

private void runTeraSort(Configuration conf,
Expand All @@ -71,7 +71,7 @@ private void runTeraSort(Configuration conf,
String[] sortArgs = {sortInput.toString(), sortOutput.toString()};

// Run Sort
assertEquals(ToolRunner.run(conf, new TeraSort(), sortArgs), 0);
assertEquals(0, ToolRunner.run(conf, new TeraSort(), sortArgs));
}

private void runTeraValidator(Configuration job,
Expand All @@ -80,7 +80,7 @@ private void runTeraValidator(Configuration job,
String[] svArgs = {sortOutput.toString(), valOutput.toString()};

// Run Tera-Validator
assertEquals(ToolRunner.run(job, new TeraValidate(), svArgs), 0);
assertEquals(0, ToolRunner.run(job, new TeraValidate(), svArgs));
}

@Test
Expand Down
4 changes: 4 additions & 0 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@
<exclude>**/ITestS3AHuge*.java</exclude>
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
<exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
<exclude>**/ITestTerasort*.java</exclude>
</excludes>
</configuration>
</execution>
Expand Down Expand Up @@ -220,6 +221,9 @@
<include>**/ITestS3AEncryptionSSEC*.java</include>
<!-- this sets out to overlaod DynamoDB, so must be run standalone -->
<include>**/ITestDynamoDBMetadataStoreScale.java</include>
<!-- the terasort tests both work with a file in the same path in -->
<!-- the local FS. Running them sequentially guarantees isolation -->
<include>**/ITestTerasort*.java</include>
</includes>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.hadoop.fs.s3a.commit;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -30,6 +32,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
Expand All @@ -50,6 +53,7 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;

/**
Expand All @@ -75,6 +79,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {

private InconsistentAmazonS3Client inconsistentClient;


/**
* Should the inconsistent S3A client be used?
* Default value: true.
Expand Down Expand Up @@ -436,4 +441,63 @@ public static TaskAttemptContext taskAttemptForJob(JobId jobId,
jContext.getConfiguration(),
TypeConverter.fromYarn(attemptID));
}


/**
* Load in the success data marker: this guarantees that an S3A
* committer was used.
* @param fs filesystem
* @param outputPath path of job
* @param committerName name of committer to match
* @return the success data
* @throws IOException IO failure
*/
public static SuccessData validateSuccessFile(final S3AFileSystem fs,
final Path outputPath, final String committerName) throws IOException {
SuccessData successData = null;
try {
successData = loadSuccessFile(fs, outputPath);
} catch (FileNotFoundException e) {
// either the output path is missing or, if its the success file,
// somehow the relevant committer wasn't picked up.
String dest = outputPath.toString();
LOG.error("No _SUCCESS file found under {}", dest);
List<String> files = new ArrayList<>();
applyLocatedFiles(fs.listFiles(outputPath, true),
(status) -> {
files.add(status.getPath().toString());
LOG.error("{} {}", status.getPath(), status.getLen());
});
throw new AssertionError("No _SUCCESS file in " + dest
+ "; found : " + files.stream().collect(Collectors.joining("\n")),
e);
}
String commitDetails = successData.toString();
LOG.info("Committer name " + committerName + "\n{}",
commitDetails);
LOG.info("Committer statistics: \n{}",
successData.dumpMetrics(" ", " = ", "\n"));
LOG.info("Diagnostics\n{}",
successData.dumpDiagnostics(" ", " = ", "\n"));
assertEquals("Wrong committer in " + commitDetails,
committerName, successData.getCommitter());
return successData;
}

/**
* Load a success file; fail if the file is empty/nonexistent.
* @param fs filesystem
* @param outputPath directory containing the success file.
* @return the loaded file.
* @throws IOException failure to find/load the file
* @throws AssertionError file is 0-bytes long
*/
public static SuccessData loadSuccessFile(final S3AFileSystem fs,
final Path outputPath) throws IOException {
Path success = new Path(outputPath, _SUCCESS);
FileStatus status = fs.getFileStatus(success);
assertTrue("0 byte success file - not a s3guard committer " + success,
status.getLen() > 0);
return SuccessData.load(fs, success);
}
}
Loading
0