Tutorials

This section contains materials on how to use Eskapade-Spark. All command examples can be run from any directory with write access. For more in depth explanations on the functionality of the code-base, try the API docs.

All Spark Examples in Eskapade

All Eskapade-Spark example macros can be found in the tutorials directory. For ease of use, let’s make a shortcut to the directory containing the tutorials:

$ export TUTDIR=`pip show Eskapade-Spark | grep Location | awk '{ print $2"/eskapadespark/tutorials" }'`
$ ls -l $TUTDIR/

The numbering of the example macros follows the package structure:

  • esk600+: macros for processing Spark datasets and performing analysis with Spark.

These macros are briefly described below. You are encouraged to run all examples to see what they can do for you!

Example esk601: setting the spark configuration

Tutorial macro for configuring Spark in multiple ways.

$ eskapade_run $TUTDIR/esk601_spark_configuration.py

Example esk602: reading csv to a spark dataframe

Tutorial macro for reading CSV files into a Spark data frame.

$ eskapade_run $TUTDIR/esk602_read_csv_to_spark_df.py

Example esk603: writing spark data to csv

Tutorial macro for writing Spark data to a CSV file.

$ eskapade_run $TUTDIR/esk603_write_spark_data_to_csv.py

Example esk604: executing queries

Tutorial macro for applying a SQL-query to one more objects in the DataStore. Such SQL-queries can for instance be used to filter data.

$ eskapade_run $TUTDIR/esk604_spark_execute_query.py

Example esk605: creating Spark data frames from various input data

Tutorial macro for creating Spark data frames from different types of input data.

$ eskapade_run $TUTDIR/esk605_create_spark_df.py

Example esk606: converting Spark data frames into different data types

Tutorial macro for converting Spark data frames into a different data type and apply transformation functions on the resulting data.

$ eskapade_run $TUTDIR/esk606_convert_spark_df.py

Example esk607: adding a new column to a Spark dataframe

Tutorial macro for adding a new column to a Spark dataframe by applying a Spark built-in or user-defined function to a selection of columns in a Spark dataframe.

$ eskapade_run $TUTDIR/esk607_spark_with_column.py

Example esk608: making histograms of a Spark dataframe

Tutorial macro for making histograms of a Spark dataframe using the Histogrammar package.

$ eskapade_run $TUTDIR/esk608_spark_histogrammar.py

Example esk609: applying map functions on groups of rows

Tutorial macro for applying map functions on groups of rows in Spark data frames.

$ eskapade_run $TUTDIR/esk609_map_df_groups.py

Example esk610: running Spark Streaming word count example

Tutorial macro running Spark Streaming word count example in Eskapade, derived from:

https://spark.apache.org/docs/latest/streaming-programming-guide.html

Counts words in UTF8 encoded, ‘n’ delimited text received from a stream every second. The stream can be from either files or network.

$ eskapade_run $TUTDIR/esk610_spark_streaming_wordcount.py

Example esk611: techniques for flattening a time-series in Spark

This macro demonstrates techniques for flattening a time-series in Spark.

$ eskapade_run $TUTDIR/esk611_flatten_time_series.py

Tutorial 6: going Spark

This section provides a tutorial on how to use Apache Spark in Eskapade. Spark works ‘out of the box’ in the Eskapade docker/vagrant image. For details on how to setup a custom Spark setup, see the Spark section in the Appendix.

In this tutorial we will basically redo Tutorial 1 but use Spark instead of Pandas for data processing. The following paragraphs describe step-by-step how to run a Spark job, use existing links and write your own links for Spark queries.

Note

To get familiar with Spark in Eskapade you can follow the exercises in python/eskapadespark/tutorials/tutorial_6.py.

Running the tutorial macro

The very first step to run the tutorial Spark job is:

$ eskapade_run python/eskapadespark/tutorials/tutorial_6.py

Eskapade will start a Spark session, do nothing, and quit - there are no chains/links defined yet. The Spark session is created via the SparkManager which, like the DataStore, is a singleton that configures and controls Spark sessions centrally. It is activated through the magic line:

process_manager.service(SparkManager).create_session(include_eskapade_modules=True)

Note that when the Spark session is created, the following line appears in logs:

Adding Python modules to egg archive <PATH_TO_ESKAPADE>/lib/es_python_modules.egg

This is the SparkManager that ensures all Eskapade source code is uploaded and available to the Spark cluster when running in a distributed environment. To include the Eskapade code the argument include_eskapade_modules need to be set to True (by default it is False).

If there was an ImportError: No module named pyspark then, most likely, SPARK_HOME and PYTHONPATH are not set up correctly. For details, see the Spark section in the Appendix.

Reading data

Spark can read data from various sources, e.g. local disk, HDFS, HIVE tables. Eskapade provides the SparkDfReader link that uses the pyspark.sql.DataFrameReader to read flat CSV files into Spark DataFrames, RDD’s, and Pandas DataFrames. To read in the Tutorial data, the following link should be added to the Data chain:

data = Chain('Data')
reader = SparkDfReader(name='Read_LA_ozone', store_key='data', read_methods=['csv'])
reader.read_meth_args['csv'] = (DATA_FILE_PATH,)
reader.read_meth_kwargs['csv'] = dict(sep=',', header=True, inferSchema=True)
data.add(reader)

The DataStore holds a pointer to the Spark dataframe in (distributed) memory. This is different from a Pandas dataframe, where the entire dataframe is stored in the DataStore, because a Spark dataframe residing on the cluster may not fit entirely in the memory of the machine running Eskapade. This means that Spark dataframes are never written to disk in DataStore pickles!

Spark Streaming

Eskapade supports the use of Spark Streaming as demonstrated in the word count example tutorials/esk610_spark_streaming_wordcount.py. The data is processed in (near) real-time as micro batches of RDD’s, so-called discretized streaming, where the stream originates from either new incoming files or network connection. As with regular Spark queries, various transformations can be defined and applied in subsequent Eskapade links.

For details on Spark Streaming, see also https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html.

File stream

The word count example using the file stream method can be run by executing in two different terminals:

terminal 1 $ eskapade_run -c stream_type='file' python/eskapadespark/tutorials/esk610_spark_streaming_wordcount.py

terminal 2 $ mkdir /tmp/eskapade_stream_test/
terminal 2 $ for ((i=0; i<=100; i++)); do echo "Hello world" > /tmp/eskapade_stream_test/dummy_$(printf %05d ${i}); sleep 0.2; done

Where bash for-loop will create a new file containing Hello world in the /tmp/eskapade_stream_test directory every 0.2 second. Spark Streaming will pick up and process these files and in terminal 1 a word count of the processed data will be displayed. Output is stored in results/esk610_spark_streaming/data/v0/dstream/wordcount. Only new files in /tmp/eskapade_stream_test are processed, do not forget to delete this directory.

TCP stream

The word count example using the TCP stream method can be run by executing in two different terminals:

terminal 1 $ eskapade_run -c stream_type='tcp' python/eskapadespark/tutorials/esk610_spark_streaming_wordcount.py

terminal 2 $ nc -lk 9999

Where nc (netcat) will stream data to port 9999 and Spark Streaming will listen to this port and process incoming data. In terminal 2 random words can be type (followed by enter) and in terminal 1 a word count of the processed data will by displayed. Output is stored in results/esk610_spark_streaming/data/v0/dstream/wordcount.