Ecclesiastical Latin IPA: /ˈʃi.o/, [ˈʃiː.o], [ˈʃi.i̯o]
Verb: I can, know, understand, have knowledge.
Scio is a Scala API for Google Cloud Dataflow, inspired by Spark and Scalding. See the current API documentation for more information.
First install the Google Cloud SDK and create a Google Cloud Storage bucket for your project, e.g. gs://my-bucket
. Make sure it's in the same region as the BigQuery datasets you want to access and where you want Dataflow to launch workers on GCE.
Scio is built using SBT. To build Scio and publish artifacts locally, run:
git clone git@github.com:spotify/scio.git
cd scio
sbt publish-local
You can execute the examples locally from SBT. By default pipelines will be executed using the DirectPipelineRunner
and local filesystem will be used for input and output.
The -Dbigquery.project=<BILLING_PROJECT>
argument is required to compile the typed BigQuery example since the underlying macro makes BigQuery requests at compile time.
neville@localhost scio $ sbt -Dbigquery.project=<BILLING_PROJECT>
[info] ...
> project scio-examples
[info] ...
> runMain com.spotify.scio.examples.WordCount
--input=<INPUT FILE PATTERN>
--output=<OUTPUT DIRECTORY>
Note that unlike Hadoop, Scio or Dataflow input should be file patterns and not directories, i.e. gs://bucket/path/part-*.txt
and not gs://bucket/path
. Output on the other hand should be directories just like Hadoop, so gs://bucket/path
will produce files like gs://bucket/path/part-00000-of-00005.txt
.
Use the BlockingDataflowPipelineRunner
or DataflowPipelineRunner
to execute pipelines on Google Cloud Dataflow Service using managed resources in the Google Cloud Platform. The former will block the main process on ScioContext#close()
until job completes while the latter will submit the job and return immediately.
neville@localhost scio $ sbt -Dbigquery.project=<BILLING_PROJECT>
[info] ...
> project scio-examples
[info] ...
> runMain com.spotify.scio.examples.WordCount
--project=<YOUR CLOUD PLATFORM PROJECT NAME>
--stagingLocation=<YOUR CLOUD STORAGE LOCATION>
--zone=<GCE AVAILABILITY ZONE>
--runner=BlockingDataflowPipelineRunner
--input=<INPUT FILE PATTERN>
--output=<OUTPUT DIRECTORY>
Your Cloud Storage location should be entered in the form of gs://bucket/path/to/staging/directory
. The Cloud Platform project refers to its name (not number).
GCE availability zone should be in the same region as the BigQuery datasets and GCS bucket.
You may need a few extra settings to use BigQuery queries as pipeline input.
sbt -Dbigquery.project=<PROJECT-NAME> -Dbigquery.staging_dataset.location=<LOCATION>
bigquery.project
: GCP project to make BigQuery requests with at compile time.bigquery.staging_dataset.location
: Geographical location for BigQuery staging dataset, e.g.US
,EU
, must be the same as source tables and GCS buckets.
More Dataflow pipeline specific options available can be found in DataflowPipelineOptions
and super interfaces. Some more useful ones are from DataflowPipelineWorkerPoolOptions
:
--numWorkers
: Number of workers to use when executing the Dataflow job.--autoscalingAlgorithm
: [Experimental] The autoscaling algorithm to use for the workerpool.NONE
: does not change the size of the worker pool.THROUGHPUT_BASED
: Autoscale the workerpool based on throughput (up to maxNumWorkers). (default=NONE
)--maxNumWorkers
: [Experimental] The maximum number of workers to use when using workerpool autoscaling. (default=20)--diskSizeGb
: Remote worker disk size, in gigabytes, or 0 to use the default size.--workerMachineType
: Machine type to create Dataflow worker VMs as. See https://cloud.google.com/compute/docs/machine-types for a list of valid options. If unset, the Dataflow service will choose a reasonable default.--network
: GCE network for launching workers.
The Scio API is heavily influenced by Spark but there are some minor differences.
SCollection
is equivalent to Spark'sRDD
.PairSCollectionFunct 7DA0 ions
andDoubleSCollectionFunctions
are specialized versions ofSCollection
and equivalent to Spark'sPairRDDFunctions
andDoubleRDDFunctions
.- Execution planning is static and happens before the job is submitted. There is no driver node in a Dataflow cluster and one can only perform the equivalent of Spark transformations (
RDD
→RDD
) but not actions (RDD
→ driver local memory). - There is no broadcast either but the pattern of
RDD
→ driver via action and driver →RDD
via broadcast can be replaced withSCollection.asSingleTonSideInput
andSCollection.withSideInputs
. - There is no
DStream
(continuous series ofRDD
s) like in Spark Streaming. Values in aSCollection
are windowed based on timestamp and windowing operation. The same API works regardless of batch (single global window by default) or streaming mode. Aggregation type transformations that produceSCollection
s of a single value under global window will produce one value each window when a non-global window is defined. SCollection
has extra methods for side input, side output, and windowing.
Some features may look familiar to Scalding users.
Args
is a simple command line argument parser similar to the one in Scalding.- Powerful transforms are possible with
sum
,sumByKey
,aggregate
,aggregrateByKey
using AlgebirdSemigroup
s andAggregator
s. MultiJoin
and coGroup of up to 22 sources.JobTest
for end to end pipeline testing.
SCollection
has a few variations.
SCollectionWithAccumulator
for accessing custom counters similar to those in Hadoop. See AccumulatorExample.scala.SCollectionWithSideInput
for replicating smallSCollection
s to all left-hand side values in a largeSCollection
.SCollectionWithSideOutput
for output to multiple SCollections.WindowedSCollection
for accessing window information.SCollectionWithFanout
andSCollectionWithHotKeyFanout
for fanout of skewed data.
Scio also offers some additional features.
- Each worker can pull files from Google Cloud Storage via
DistCache
to be used in transforms locally, similar to Hadoop distributed cache. See DistCacheExample.scala. - Type safe BigQuery IO via Scala macros. Case classes and converters are generated at compile time based on BQ schema. This eliminates the error prone process of handling generic JSON objects. See TypedBigQueryTornadoes.scala.
- Sinks (
saveAs*
methods) returnFuture[Tap[T]]
that can be opened either in another pipeline asSCollection[T]
or directly asIterator[T]
once the current pipeline completes. This enables complex pipeline orchestration. See WordCountOrchestration.scala.
Scio includes the following artifacts:
scio-core
: core libraryscio-test
: test utilities, add to your project as a "test" dependencyscio-bigquery
: Add-on for BigQuery, included inscio-core
but can also be used standalonescio-bigtable
: Add-on for Bigtablescio-extra
: Extra utilities for working with collections, Breeze, etc.scio-hdfs
: Add-on for HDFS
To access HDFS from a Scio job, Hadoop configuration files (core-site.xml
, hdfs-site.xml
, etc.) must be present in src/main/resources
and --network
should be set to one that has access to the Hadoop cluster.
Copyright 2016 Spotify AB.
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0