-
Notifications
You must be signed in to change notification settings - Fork 218
Python Topology DSL #84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I must say that the YAML DSL pyleus has is even more readable than a Python DSL would be. Unless we have a reason for wanting to make people use a Python DSL, I'd actually recommend trying to use the JSON format proposed in STORM-561, and generating the Clojure DSL from that until it gets supported directly in Storm. |
Don't you think the JSON format that is in STORM-561 is way too verbose? It would require things like fully-qualified Java class names for each component, etc. |
Good point. I only briefly skimmed it, so it didn't jump out at me immediately. I was just trying to avoid us having to develop yet another Storm DSL. Maybe we could just expand on the YAML DSL Pyleus has to allow for specifying what language a component is written in. We could generate the Clojure DSL in the short-term from the YAML and the verbose JSON in the long-term. |
Some commentary from Nathan Marz about this topic from STORM-561: "What I'm proposing instead is to ditch the idea of specifying topologies via configuration files and do it instead via an interpreted general purpose programming language (like Python). By using an interpreted language, you can construct and submit topologies without having to do a compilation, which is the entire purpose of this issue. You can use Java spouts and bolts in Python just as easily as you can within Java or from within a YAML or JSON file. I guarantee you can make a library for specifying topologies that's as nice as doing so via configuration files, except you never lose the power of a general purpose programming language." |
But that will mean needing to communicate with Nimbus via Thrift, won't it? The official Python Thrift library isn't Python 3 compliant (although thriftpy is, and I submitted a PR a while ago to make it compatible with the Storm Thrift files). |
Yes, I think he's in favor of doing the topology setup with Thrift (which, for me, means it would need to work with thriftpy since requiring local thrift installations is a non-starter, IMO). |
One other concern I have with this approach is that configuring topologies with Thrift means you can't use them with |
We have a little example for the Word Count Topology from examples: class WordCount(Topology):
word_spout = WordSpout.spec(
paralellism=2)
word_count_bolt = WordCountBolt.spec(
input=WordSpout,
group_on="word",
parallelism=8) Working w/ @omus @cabiad @becitratul at PyCon sprints, this is a little sketch we started to put together. Any thoughts on this look-and-feel, @dan-blanchard @kbourgoin @emmett9001 @msukmanowsky? We are thinking that the components themselves would include a property for the list of streams and fields they are emitting. |
I think it's a nice start. That said, I think it would be more intuitive if Would the DSL only be for pure Python topologies? I could imagine expanding it to support both shell and Java bolts/spouts via |
@dan-blanchard I agree -- we actually had two variants here, one where it was specified as a string: word_count_bolt = WordCountBolt.spec(
input=WordSpout) #=> expands to ("word_spout", "default") or:
or:
So, the idea here was that if you point to the class, the assumption is there is only one instance of that class in your topology. (Otherwise, Topology is ill-defined; error thrown.) In the second variant, the idea is that there must be a field named in the Topology class which will resolve to that component -- and, the "default" stream is implied. The final variant, the Makes sense? |
@dan-blanchard Re: the question about pure Python topologies, that's all I'm starting with. Java and other shell components would come later. After all, the first step here is auto-generating a CLJ file, so for now, doing true multi-lang could still happen with some Storm / Clojure DSL fu. |
collaborating on this here: https://floobits.com/amontalenti/streamparse |
more progress! |
Plan:
Optional:
|
According to the document : http://storm.apache.org/documentation/Clojure-DSL.html A stream grouping can be one of the following:
in details: http://storm.apache.org/documentation/Concepts.html Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt's tasks. There are seven built-in stream groupings in Storm, and you can implement a custom stream grouping by implementing the CustomStreamGrouping interface: 1.Shuffle grouping: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples. 2.Fields grouping: The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks. 3.Partial Key grouping: The stream is partitioned by the fields specified in the grouping, like the Fields grouping, but are load balanced between two downstream bolts, which provides better utilization of resources when the incoming data is skewed. This paper provides a good explanation of how it works and the advantages it provides. 4.All grouping: The stream is replicated across all the bolt's tasks. Use this grouping with care. 5.Global grouping: The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id. 6.None grouping: This grouping specifies that you don't care how the stream is grouped. Currently, none groupings are equivalent to shuffle groupings. Eventually though, Storm will push down bolts with none groupings to execute in the same thread as the bolt or spout they subscribe from (when possible). 7.Direct grouping: This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](/javadoc/apidocs/backtype/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to). 8.Local or shuffle grouping: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping. |
Thank you, @becitratul -- this is now in this branch: This is the set of groupings will have the DSL support for now. Note that the enum |
The image on the left is looking a little better than the one on the right, eh? cc @kbourgoin @dan-blanchard @msukmanowsky |
👍 but it could be improved even more if you swapped out Vim and Linux. |
awesome ! |
+1 |
I'm almost done the validator. Expect to see something tomorrow. |
Could you give an example of an error that would come up if people override the The part that made me really want to move this stuff into the At the very least I think we should make it so class WordSpout(Spout):
output_fields = ['word'] that way be a slightly different issue though. |
I feel like it makes more sense when you consider that no one ever creates instances of these classes manually. They're created by |
I see the point about output fields. I just don't get why It would be like saying I could divine all sorts of contrived uses of init that would break when the class is instantiated in the topology builder rather than the remote deployment environment, but that isn't the crux of my concern. |
@kbourgoin, when you get a chance, your thoughts on what the DSL should look like would be greatly appreciated. You only need to read from this comment down. |
I am +1 to
Reviewing this thread from this comment on, topics are:
Object-Oriented Design Using About subclassing Domain-Specific Language Design The differences in the DSL itself between the two approaches:
Topology Parameters There are some parameters in the topology spec which are attributes of the component, not of the topology, at least in terms of sensible defaults. When writing a component, you are going to decide on (default) naming conventions for attributes like
Attributes like |
I just realized this comment thread was never updated with the current state of things. Currently we have: class WordCount(Topology):
word_spout = Spec(WordSpout, parallelism=2)
word_bolt = Spec(WordCountBolt, source=word_spout, group_on=Grouping.fields("word"),
parallelism=8) So, @rduplain, what are your thoughts on |
The streamparse |
Good point. I think I initially argued against |
After some offline discussion with @rduplain I think we're approaching something that looks like this (for a really messy topology): class SuperComplexTopology(Topoology):
multi_spout1 = MultiSpout.spec(name='multi_spout1',
streams=[Stream(fields=['foo', 'bar']),
Stream(name='direct',
fields=['dir1', 'dir2'],
direct=True)],
parallelism_hint=4)
simple_spout = SimpleSpout.spec(parallelism_hint=2)
batching_bolt = SomeBatchingBolt.spec(inputs=[multi_spout1, simple_spout],
streams=[Stream(['sum'])],
json_conf={'topology.tick.tuple.freq.secs': 1})
directed_bolt = SomeDirectedBolt.spec(inputs={multi_spout1['direct']: Grouping.direct})
perl_bolt = ShellBolt(execution_command='perl', script='bolt.pl') \
.spec(inputs={simple_spout: Grouping.fields('junk')},
streams=[Stream(fields=['field1', 'field2'])])
ruby_bolt = ShellBolt(execution_command='ruby', script='bolt.rb') \
.spec(inputs={simple_spout: Grouping.fields('junk')})
java_bolt = JavaBolt(full_class_name='com.parsely.yucky.YuckyJavaBolt',
args_list=[45, 'arg2']) \
.spec(inputs=[perl_bolt]) where class SimpleSpout(Spout):
output_fields = ['junk'] The idea is that people could define It was suggested we could also make |
Just a few thoughts:
+1 to kwargs for |
I agree that What is How do we know all of the keys available in |
With regard to |
@kbourgoin We were trying to keep the names the same as the underlying Thrift ones for consistency, but I guess that's not really necessary. Readability should be the goal in the end.
Streams defined as an argument to |
This is a Stream with one field called sum. I just put it in there because people can pass kwargs without names, as annoying as that can be.
The keys are the names of the streams. It seemed a little nicer than adding another classmethod called |
should looks like |
+1 to removing For components that don't need configuration, does I see that the java bolt takes another bolt (not a stream) as its Is |
The default name for streams in Storm is "default", so it would be called "default" in that case.
Yeah, when given a list, it would use the default shuffle grouping, just like the Clojure DSL. |
No, because |
Hmm... maybe my example was a little to complex. @rduplain and I went into this assuming that |
After more deliberation with @kbourgoin offline, we're going to make the following changes from what I last gave:
I'll add some validation so people can't go crazy and mix and match. This leaves us with: class SuperComplexTopology(Topoology):
multi_spout1 = MultiSpout.spec(name='multi_spout1', parallelism=4)
simple_spout = SimpleSpout.spec(parallelism=2)
batching_bolt = SomeBatchingBolt.spec(inputs=[multi_spout1, simple_spout],
conf={'topology.tick.tuple.freq.secs': 1})
directed_bolt = SomeDirectedBolt.spec(inputs={multi_spout1['direct']: Grouping.direct})
perl_bolt = ShellBolt(command='perl', script='bolt.pl',
outputs=[Stream(fields=['field1', 'field2'])]) \
.spec(inputs={simple_spout: Grouping.fields('junk')})
ruby_bolt = ShellBolt(command='ruby', script='bolt.rb', outputs=['foo']) \
.spec(inputs={simple_spout: Grouping.fields('junk')})
java_bolt = JavaBolt(full_class_name='com.parsely.yucky.YuckyJavaBolt',
args_list=[45, 'arg2'], outputs=['coffee']) \
.spec(inputs=[perl_bolt]) where class MultiSpout(Spout):
outputs = [Stream(fields=['foo', 'bar']),
Stream(name='direct', fields=['dir1', 'dir2'], direct=True)]
...
class SimpleSpout(Spout):
outputs = ['junk']
...
class SomeBatchingBolt(BatchingBolt):
outputs = [Stream(['sum'])]
... The only point of debate that I think might be left is how to handle class SuperComplexTopology(Topoology):
multi_spout1 = MultiSpout.spec(name='multi_spout1', parallelism=4)
simple_spout = SimpleSpout.spec(parallelism=2)
batching_bolt = SomeBatchingBolt.spec(inputs=[multi_spout1, simple_spout],
conf={'topology.tick.tuple.freq.secs': 1})
directed_bolt = SomeDirectedBolt.spec(inputs={multi_spout1['direct']: Grouping.direct})
perl_bolt = ShellBolt.spec(command='perl', script='bolt.pl',
outputs=[Stream(fields=['field1', 'field2'])],
inputs={simple_spout: Grouping.fields('junk')})
ruby_bolt = ShellBolt.spec(command='ruby', script='bolt.rb', outputs=['foo'],
inputs={simple_spout: Grouping.fields('junk')})
java_bolt = JavaBolt.spec(full_class_name='com.parsely.yucky.YuckyJavaBolt',
args_list=[45, 'arg2'], outputs=['coffee'],
inputs=[perl_bolt]) For now I'm going to proceed with everything assuming we want the first approach, where |
I change my mind. I'm going forward assuming we don't want to instantiate |
+1. |
Oh, and this is super minor, but I'm going to say that if someone specifies a direct stream as an input in the list context (i.e., they didn't provide a grouping), we should default to the |
+1. Does that mean that |
Affirmative |
Done as of #199 being merged into master. |
We currently rely on
lein
and the Clojure DSL to build topologies. This is nice because Clojure DSL is bundled in Storm and it allows us to freely mix Python components with JVM (and even, other multi-lang) components. And via Clojure, we get local debugging for free viaLocalCluster
.But it's also an impediment to new users coming to streamparse expecting "pure Python" support for Storm. See for example this Twitter conversation:
https://twitter.com/sarthakdev/status/539390816339247104
The Clojure DSL was chosen for expediency, but for pure Python topologies, a Python DSL might be even better and allow the streamparse user not to learn much about Java/Clojure. The recently-released pyleus approach to this problem is to provide a YAML DSL and a Java builder tool.
One approach to a Python DSL would be to leverage some new work going on in Storm core to make Topology configuration dynamic via JSON, as described in #81. Another option would be to have the Python DSL actually generate Clojure DSL, which would then be compiled. I haven't currently decided on the best course of action but I am personally interested in building the Python DSL to make streamparse more usable by Pythonistas out-of-the-box.
The text was updated successfully, but these errors were encountered: