8000 GitHub - gitter-badger/scio: Scala API for Google Cloud Dataflow
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

gitter-badger/scio

 
 

Repository files navigation

Scio

Build Status codecov.io GitHub license Maven Central

Ecclesiastical Latin IPA: /ˈʃi.o/, [ˈʃiː.o], [ˈʃi.i̯o]

Verb: I can, know, understand, have knowledge.

Scio is a Scala API for Google Cloud Dataflow, inspired by Spark and Scalding. See the current API documentation for more information.

Getting Started

First install the Google Cloud SDK and create a Google Cloud Storage bucket for your project, e.g. gs://my-bucket. Make sure it's in the same region as the BigQuery datasets you want to access and where you want Dataflow to launch workers on GCE.

Building Scio

Scio is built using SBT. To build Scio and publish artifacts locally, run:

git clone git@github.com:spotify/scio.git
cd scio
sbt publish-local

Running the Examples

You can execute the examples locally from SBT. By default pipelines will be executed using the DirectPipelineRunner and local filesystem will be used for input and output.

The -Dbigquery.project=<BILLING_PROJECT> argument is required to compile the typed BigQuery example since the underlying macro makes BigQuery requests at compile time.

neville@localhost scio $ sbt -Dbigquery.project=<BILLING_PROJECT>
[info] ...
> project scio-examples
[info] ...
> runMain com.spotify.scio.examples.WordCount
--input=<INPUT FILE PATTERN>
--output=<OUTPUT DIRECTORY>

Note that unlike Hadoop, Scio or Dataflow input should be file patterns and not directories, i.e. gs://bucket/path/part-*.txt and not gs://bucket/path. Output on the other hand should be directories just like Hadoop, so gs://bucket/path will produce files like gs://bucket/path/part-00000-of-00005.txt.

Use the BlockingDataflowPipelineRunner or DataflowPipelineRunner to execute pipelines on Google Cloud Dataflow Service using managed resources in the Google Cloud Platform. The former will block the main process on ScioContext#close() until job completes while the latter will submit the job and return immediately.

neville@localhost scio $ sbt -Dbigquery.project=<BILLING_PROJECT>
[info] ...
> project scio-examples
[info] ...
> runMain com.spotify.scio.examples.WordCount
--project=<YOUR CLOUD PLATFORM PROJECT NAME>
--stagingLocation=<YOUR CLOUD STORAGE LOCATION>
--zone=<GCE AVAILABILITY ZONE>
--runner=BlockingDataflowPipelineRunner
--input=<INPUT FILE PATTERN>
--output=<OUTPUT DIRECTORY>

Your Cloud Storage location should be entered in the form of gs://bucket/path/to/staging/directory. The Cloud Platform project refers to its name (not number).

GCE availability zone should be in the same region as the BigQuery datasets and GCS bucket.

BigQuery Settings

You may need a few extra settings to use BigQuery queries as pipeline input.

sbt -Dbigquery.project=<PROJECT-NAME> -Dbigquery.staging_dataset.location=<LOCATION>
  • bigquery.project: GCP project to make BigQuery requests with at compile time.
  • bigquery.staging_dataset.location: Geographical location for BigQuery staging dataset, e.g. US, EU, must be the same as source tables and GCS buckets.

Options

More Dataflow pipeline specific options available can be found in DataflowPipelineOptions and super interfaces. Some more useful ones are from DataflowPipelineWorkerPoolOptions:

  • --numWorkers: Number of workers to use when executing the Dataflow job.
  • --autoscalingAlgorithm: [Experimental] The autoscaling algorithm to use for the workerpool. NONE: does not change the size of the worker pool. THROUGHPUT_BASED: Autoscale the workerpool based on throughput (up to maxNumWorkers). (default=NONE)
  • --maxNumWorkers: [Experimental] The maximum number of workers to use when using workerpool autoscaling. (default=20)
  • --diskSizeGb: Remote worker disk size, in gigabytes, or 0 to use the default size.
  • --workerMachineType: Machine type to create Dataflow worker VMs as. See https://cloud.google.com/compute/docs/machine-types for a list of valid options. If unset, the Dataflow service will choose a reasonable default.
  • --network: GCE network for launching workers.

Scio, Spark and Scalding

The Scio API is heavily influenced by Spark but there are some minor differences.

  • SCollection is equivalent to Spark's RDD.
  • PairSCollectionFunct 7DA0 ions and DoubleSCollectionFunctions are specialized versions of SCollection and equivalent to Spark's PairRDDFunctions and DoubleRDDFunctions.
  • Execution planning is static and happens before the job is submitted. There is no driver node in a Dataflow cluster and one can only perform the equivalent of Spark transformations (RDDRDD) but not actions (RDD → driver local memory).
  • There is no broadcast either but the pattern of RDD → driver via action and driver → RDD via broadcast can be replaced with SCollection.asSingleTonSideInput and SCollection.withSideInputs.
  • There is no DStream (continuous series of RDDs) like in Spark Streaming. Values in a SCollection are windowed based on timestamp and windowing operation. The same API works regardless of batch (single global window by default) or streaming mode. Aggregation type transformations that produce SCollections of a single value under global window will produce one value each window when a non-global window is defined.
  • SCollection has extra methods for side input, side output, and windowing.

Some features may look familiar to Scalding users.

  • Args is a simple command line argument parser similar to the one in Scalding.
  • Powerful transforms are possible with sum, sumByKey, aggregate, aggregrateByKey using Algebird Semigroups and Aggregators.
  • MultiJoin and coGroup of up to 22 sources.
  • JobTest for end to end pipeline testing.

SCollection has a few variations.

Scio also offers some additional features.

  • Each worker can pull files from Google Cloud Storage via DistCache to be used in transforms locally, similar to Hadoop distributed cache. See DistCacheExample.scala.
  • Type safe BigQuery IO via Scala macros. Case classes and converters are generated at compile time based on BQ schema. This eliminates the error prone process of handling generic JSON objects. See TypedBigQueryTornadoes.scala.
  • Sinks (saveAs* methods) return Future[Tap[T]] that can be opened either in another pipeline as SCollection[T] or directly as Iterator[T] once the current pipeline completes. This enables complex pipeline orchestration. See WordCountOrchestration.scala.

Artifacts

Scio includes the following artifacts:

  • scio-core: core library
  • scio-test: test utilities, add to your project as a "test" dependency
  • scio-bigquery: Add-on for BigQuery, included in scio-core but can also be used standalone
  • scio-bigtable: Add-on for Bigtable
  • scio-extra: Extra utilities for working with collections, Breeze, etc.
  • scio-hdfs: Add-on for HDFS

To access HDFS from a Scio job, Hadoop configuration files (core-site.xml, hdfs-site.xml, etc.) must be present in src/main/resources and --network should be set to one that has access to the Hadoop cluster.

License

Copyright 2016 Spotify AB.

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

About

Scala API for Google Cloud Dataflow

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Scala 70.3%
  • Java 29.1%
  • Other 0.6%
0