8000 Create suro-file and suro-s3 module for less dependencies of suro-server by metacret · Pull Request #78 · Netflix/suro · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Create suro-file and suro-s3 module for less dependencies of suro-server #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 20, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ project(':suro-server') {
compile project(':suro-core')
compile project(':suro-client')
compile project(':suro-kafka')

compile 'net.java.dev.jets3t:jets3t:0.9.0'
compile project(':suro-localfile')
compile project(':suro-s3')

compile 'org.eclipse.jetty:jetty-servlet:7.6.7.v20120910'
compile 'com.sun.jersey.contribs:jersey-guice:1.11'
compile 'com.sun.jersey:jersey-bundle:1.11'
compile 'com.amazonaws:aws-java-sdk:1.4.7'
compile 'javax.servlet:servlet-api:2.5'
compile 'com.amazonaws:aws-java-sdk:1.4.7'

compile 'commons-cli:commons-cli:1.2'

Expand Down Expand Up @@ -174,3 +174,25 @@ project(':suro-kafka') {
testCompile project(':suro-client')
}
}

project(':suro-s3') {
dependencies {
compile project(':suro-core')
compile project(':suro-localfile')

compile 'net.java.dev.jets3t:jets3t:0.9.0'
compile 'com.amazonaws:aws-java-sdk:1.4.7'

testCompile project(':suro-client').sourceSets.test.output
}
}

project(':suro-localfile') {
dependencies {
compile project(':suro-core')

compile project(':suro-client')

testCompile project(':suro-client').sourceSets.test.output
}
}
4 changes: 4 additions & 0 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
<<<<<<< HEAD
#Sat May 17 22:50:37 PDT 2014
=======
#Mon May 19 18:35:38 PDT 2014
>>>>>>> upstream/master
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
rootProject.name='suro'
include 'suro-core', 'suro-client', 'suro-server', 'suro-integration-test', 'suro-kafka'
include 'suro-core', 'suro-client', 'suro-server', 'suro-integration-test', 'suro-kafka', 'suro-s3', 'suro-localfile'
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.netflix.suro.queue;

public interface TrafficController {
void stopTakingTraffic();

void startTakingTraffic();

int getStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,29 @@ public interface FileWriter {
* {@link FileNameFormatter}
*
* @param outputDir the directory where the file is located
* @throws IOException
* @throws java.io.IOException
*/
void open(String outputDir) throws IOException;

/**
* @return the file length
* @throws IOException
* @throws java.io.IOException
*/
long getLength() throws IOException;
void writeTo(Message message) throws IOException;

/**
* Flush all data to the disk
*
* @throws IOException
* @throws java.io.IOException
*/
void sync() throws IOException;

/**
* Close the current file, create and open the new file.
*
* @param newPath The path that points to the newly rotated file.
* @throws IOException
* @throws java.io.IOException
*/
void rotate(String newPath) throws IOException;
FileSystem getFS();
Expand All @@ -77,7 +77,7 @@ public interface FileWriter {
*
* @param oldName The name of the file when it is not done.
* @param newName The new name of the file after is is marked as done.
* @throws IOException
* @throws java.io.IOException
*/
void setDone(String oldName, String newName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public FileWriterBase(String codecClass, Logger log, Configuration conf) {
*
* @param oldName
* @param newName
* @throws IOException
* @throws java.io.IOException
*/
public void setDone(String oldName, String newName) throws IOException {
Path oldPath = new Path(oldName);
Expand Down Expand Up @@ -104,7 +104,7 @@ public static CompressionCodec createCodecInstance(String codecClass) throws Cla
*
* @param newPath
* @return
* @throws IOException
* @throws java.io.IOException
*/
public SequenceFile.Writer createSequenceFile(String newPath) throws IOException {
if (codec != null) {
Expand All @@ -125,7 +125,7 @@ fs, conf, new Path(newPath),
*
* @param path
* @return
* @throws IOException
* @throws java.io.IOException
*/
public FSDataOutputStream createFSDataOutputStream(String path) throws IOException {
return fs.create(new Path(path), false);
Expand All @@ -137,7 +137,7 @@ public FSDataOutputStream createFSDataOutputStream(String path) throws IOExcepti
*
* @param outputStream
* @return
* @throws IOException
* @throws java.io.IOException
*/
public DataOutputStream createDataOutputStream(FSDataOutputStream outputStream) throws IOException {
if (codec != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.netflix.suro.message.MessageContainer;
import com.netflix.suro.sink.notice.QueueNotice;
import com.netflix.suro.sink.notice.Notice;
import com.netflix.suro.queue.MessageSetProcessorManager;
import com.netflix.suro.queue.TrafficController;
import com.netflix.suro.sink.QueuedSink;
import com.netflix.suro.sink.Sink;
import com.netflix.suro.queue.MemoryQueue4Sink;
Expand All @@ -51,8 +51,8 @@
/**
* LocalFileSink appends messages to the file in local file system and rotates
* the file when the file size reaches to the threshold or in the regular basis
* whenever it comes earlier. When {@link SpaceChecker} checks not enough disk
* space, it triggers {@link com.netflix.suro.queue.MessageSetProcessorManager} not to take the traffic anymore.
* whenever it comes earlier. When {@link com.netflix.suro.sink.localfile.LocalFileSink.SpaceChecker} checks not enough disk
* space, it triggers {@link com.netflix.suro.queue.TrafficController} not to take the traffic anymore.
*
* @author jbae
*/
Expand All @@ -71,7 +71,7 @@ public class LocalFileSink extends QueuedSink implements Sink {
private final int minPercentFreeDisk;
private final Notice<String> notice;

private MessageSetProcessorManager messageSetProcessorManager;
private TrafficController trafficController;
private SpaceChecker spaceChecker;

private String filePath;
Expand All @@ -97,7 +97,7 @@ public LocalFileSink(
@JsonProperty("queue4Sink") MessageQueue4Sink queue4Sink,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("batchTimeout") int batchTimeout,
@JacksonInject("queueManager") MessageSetProcessorManager messageSetProcessorManager,
@JacksonInject("queueManager") TrafficController trafficController,
@JacksonInject("spaceChecker") SpaceChecker spaceChecker) {
if (!outputDir.endsWith("/")) {
outputDir += "/";
Expand All @@ -110,7 +110,7 @@ public LocalFileSink(
this.rotationPeriod = new Period(rotationPeriod == null ? "PT2m" : rotationPeriod);
this.minPercentFreeDisk = minPercentFreeDisk == 0 ? 85 : minPercentFreeDisk;
this.notice = notice == null ? new QueueNotice<String>() : notice;
this.messageSetProcessorManager = messageSetProcessorManager;
this.trafficController = trafficController;
this.spaceChecker = spaceChecker;

Monitors.registerObject(LocalFileSink.class.getSimpleName() + "-" + outputDir.replace('/', '_'), this);
Expand All @@ -127,8 +127,23 @@ public void open() {
if (spaceChecker == null) {
spaceChecker = new SpaceChecker(minPercentFreeDisk, outputDir);
}
if (messageSetProcessorManager == null) {
messageSetProcessorManager = new MessageSetProcessorManager();
if (trafficController == null) {
trafficController = new TrafficController() {
@Override
public void stopTakingTraffic() {

}

@Override
public void startTakingTraffic() {

}

@Override
public int getStatus() {
return 0;
}
};
}

notice.init();
Expand Down Expand Up @@ -191,17 +206,17 @@ private void rotate() throws IOException {
nextRotation = new DateTime().plus(rotationPeriod).getMillis();

if (!spaceChecker.hasEnoughSpace()) {
messageSetProcessorManager.stopTakingTraffic();
trafficController.stopTakingTraffic();
} else {
messageSetProcessorManager.startTakingTraffic();
trafficController.startTakingTraffic();
}
}

/**
* Before polling messages from the queue, it should check whether to rotate
* the file and start to write to new file.
*
* @throws IOException
* @throws java.io.IOException
*/
@Override
protected void beforePolling() throws IOException {
Expand All @@ -218,7 +233,7 @@ protected void beforePolling() throws IOException {
* commit the queue and clear messages
*
* @param msgList
* @throws IOException
* @throws java.io.IOException
*/
@Override
protected void write(List<Message> msgList) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.netflix.suro.sink.localfile;

import com.netflix.suro.SuroPlugin;
import com.netflix.suro.sink.SuroSink;
import com.netflix.suro.sink.notice.NoNotice;
import com.netflix.suro.sink.notice.QueueNotice;

public class SuroSinkPlugin extends SuroPlugin {
@Override
protected void configure() {
this.addSinkType(LocalFileSink.TYPE, LocalFileSink.class);

this.addSinkType(SuroSink.TYPE, SuroSink.class);

this.addNoticeType(NoNotice.TYPE, NoNotice.class);
this.addNoticeType(QueueNotice.TYPE, QueueNotice.class);
}
}
Loading
0