Tutorial 6: going Spark

This section provides a tutorial on how to use Apache Spark in Eskapade. Spark works ‘out of the box’ in the Eskapade docker/vagrant image. For details on how to setup a custom Spark setup, see the Spark section in the Appendix.

In this tutorial we will basically redo Tutorial 1 but use Spark instead of Pandas for data processing. The following paragraphs describe step-by-step how to run a Spark job, use existing links and write your own links for Spark queries.

Note

To get familiar with Spark in Eskapade you can follow the exercises in python/eskapadespark/tutorials/tutorial_6.py.

Running the tutorial macro

The very first step to run the tutorial Spark job is:

$ eskapade_run python/eskapadespark/tutorials/tutorial_6.py

Eskapade will start a Spark session, do nothing, and quit - there are no chains/links defined yet. The Spark session is created via the SparkManager which, like the DataStore, is a singleton that configures and controls Spark sessions centrally. It is activated through the magic line:

process_manager.service(SparkManager).create_session(include_eskapade_modules=True)

Note that when the Spark session is created, the following line appears in logs:

Adding Python modules to egg archive <PATH_TO_ESKAPADE>/lib/es_python_modules.egg

This is the SparkManager that ensures all Eskapade source code is uploaded and available to the Spark cluster when running in a distributed environment. To include the Eskapade code the argument include_eskapade_modules need to be set to True (by default it is False).

If there was an ImportError: No module named pyspark then, most likely, SPARK_HOME and PYTHONPATH are not set up correctly. For details, see the Spark section in the Appendix.

Reading data

Spark can read data from various sources, e.g. local disk, HDFS, HIVE tables. Eskapade provides the SparkDfReader link that uses the pyspark.sql.DataFrameReader to read flat CSV files into Spark DataFrames, RDD’s, and Pandas DataFrames. To read in the Tutorial data, the following link should be added to the Data chain:

data = Chain('Data')
reader = SparkDfReader(name='Read_LA_ozone', store_key='data', read_methods=['csv'])
reader.read_meth_args['csv'] = (DATA_FILE_PATH,)
reader.read_meth_kwargs['csv'] = dict(sep=',', header=True, inferSchema=True)
data.add(reader)

The DataStore holds a pointer to the Spark dataframe in (distributed) memory. This is different from a Pandas dataframe, where the entire dataframe is stored in the DataStore, because a Spark dataframe residing on the cluster may not fit entirely in the memory of the machine running Eskapade. This means that Spark dataframes are never written to disk in DataStore pickles!

Spark Streaming

Eskapade supports the use of Spark Streaming as demonstrated in the word count example tutorials/esk610_spark_streaming_wordcount.py. The data is processed in (near) real-time as micro batches of RDD’s, so-called discretized streaming, where the stream originates from either new incoming files or network connection. As with regular Spark queries, various transformations can be defined and applied in subsequent Eskapade links.

For details on Spark Streaming, see also https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html.

File stream

The word count example using the file stream method can be run by executing in two different terminals:

terminal 1 $ eskapade_run -c stream_type='file' python/eskapadespark/tutorials/esk610_spark_streaming_wordcount.py

terminal 2 $ mkdir /tmp/eskapade_stream_test/
terminal 2 $ for ((i=0; i<=100; i++)); do echo "Hello world" > /tmp/eskapade_stream_test/dummy_$(printf %05d ${i}); sleep 0.2; done

Where bash for-loop will create a new file containing Hello world in the /tmp/eskapade_stream_test directory every 0.2 second. Spark Streaming will pick up and process these files and in terminal 1 a word count of the processed data will be displayed. Output is stored in results/esk610_spark_streaming/data/v0/dstream/wordcount. Only new files in /tmp/eskapade_stream_test are processed, do not forget to delete this directory.

TCP stream

The word count example using the TCP stream method can be run by executing in two different terminals:

terminal 1 $ eskapade_run -c stream_type='tcp' python/eskapadespark/tutorials/esk610_spark_streaming_wordcount.py

terminal 2 $ nc -lk 9999

Where nc (netcat) will stream data to port 9999 and Spark Streaming will listen to this port and process incoming data. In terminal 2 random words can be type (followed by enter) and in terminal 1 a word count of the processed data will by displayed. Output is stored in results/esk610_spark_streaming/data/v0/dstream/wordcount.