Skip to content
This repository has been archived by the owner on Sep 2, 2022. It is now read-only.

pslcorp/beam-workshop

Repository files navigation

Apache Beam Workshop

Beam 101 workshop using Python (and the Spark runner).

Prerequisites

  • Python 3 (3.6, 3.7 or 3.8 are supported as the day of writing)
    • pip
    • venv
  • Docker (for running a standalone Spark cluster)

Installation

# First create and activate a virtual env
python3 -m venv .venv
source .venv/bin/activate

# Then install the dependencies of this project inside the virtual env
python3 -m pip install wheel
python3 -m pip install -r requirements.txt

# Finally download the beam_spark_job_server docker image so we can use it latter
docker pull apache/beam_spark_job_server

Basics

Beam is a high-level framework to create data-parallel processing pipelines that can either be batch or streaming.
The pipelines can be defined using any of the available SDKS (Python, Java, GO), and executed on multiple different execution engines, like: Dataflow, Spark & Flink.

Beam is particularly useful for embarrassingly parallel data processing tasks, in which the problem can be decomposed into many smaller bundles of data that can be processed independently and in parallel. For example, ETL tasks and pure data integration jobs.

PCollection

The main abstraction provided by Beam are the PCollections.
A PCollection represents a potentially distributed, multi-element data set, they "contain" the data of the pipeline and are you only mechanism to interact / transform it.
They can contain any type of data, but all elements of the same PCollection must be of the same type.
They are created by a pipeline and belong to such pipeline, thus you can not share PCollections between different pipelines.

PCollections are immutable. Once created, you cannot add, remove, or change individual elements. You might process each element of a PCollection and generate new one, but it does not consume or modify the original input collection.(*)

Beam avoids unnecessary copying of elements. Thus PCollection contents are logically immutable, not physically immutable. Changes to input elements may be visible to other DoFns executing within the same bundle, and may cause correctness issues.
As a rule, it’s not safe to modify values provided to a DoFn.

A PCollection can be either bounded or unbounded in size.
A bounded PCollection represents a data set of a known, fixed size. While an unbounded PCollection represents a data set of unlimited size. Whether a PCollection is bounded or unbounded depends on the source of the data set that it represents.
Beam uses windowing to divide a continuously updating unbounded PCollection into logical windows of finite size. Aggregation transforms (such as GroupByKey & Combine) work on a per-window basis.

For a complete catalog of all transformations check this.

Side inputs / Calculated values

In addition to the main input (the PCollection), you can provide additional inputs to a transformation in the form of side inputs.
A side input is an additional input that your transformation can access each time it processes an element in the input PCollection.
Such values might be determined by a different branch of your pipeline.

A windowed PCollection may be infinite and thus cannot be compressed into a single value (or single collection class). When you create a PCollectionView of a windowed PCollection, the PCollectionView represents a single entity per window.

Unit testing

The approach to unit test Beam pipelines is to separate all the logic (data transformations) from the input and output.
Then, you can use the TestPipeline and beam.Create to create an initial PCollection from a small & fixed set of data. Subsequently you will pipe your logic to this PCollection, to finally use auxiliary functions beam provides (like assert_that & equal_to) in order to validate the logic produces the expected results.

You can mix that with whatever testing framework you are using; for example, using unittest you can execute the test like this:

python3 -m unittest -v test_word_count.py

Spark runner

In order to run our Beam pipeline on a Spark cluster, all we need to do is change the pipeline options; your business logic remains the same!

The changes are:
Set the runner to PortableRunner
Configure the job_endpoint (we will use Docker latter to run this)
Finally, select the environment_type; for testing we will be using LOOPBACK

beam_options = PipelineOptions(
  runner = 'PortableRunner',
  job_endpoint = 'localhost:8099',
  environment_type = 'LOOPBACK'
)
with beam.Pipeline(options = beam_options) as pipeline:

Then let's start a Spark job server using Docker.

docker run -d --net=host apache/beam_spark_job_server

Now we can just run the python script as before.

Note: For production code you should use spark-submit in conjunction with the previous setup to properly launch your job into a Spark cluster.
Additionally you would need to configure and use a different environment_type, check: https://beam.apache.org/documentation/runtime/sdk-harness-config/

Extras

Releases

No releases published

Packages

No packages published

Languages