Tutorials¶
This section contains materials on how to use Eskapade-Spark. All command examples can be run from any directory with write access. For more in depth explanations on the functionality of the code-base, try the API docs.
All Spark Examples in Eskapade¶
All Eskapade-Spark example macros can be found in the tutorials directory. For ease of use, let’s make a shortcut to the directory containing the tutorials:
$ export TUTDIR=`pip show Eskapade-Spark | grep Location | awk '{ print $2"/eskapadespark/tutorials" }'`
$ ls -l $TUTDIR/
The numbering of the example macros follows the package structure:
esk600+
: macros for processing Spark datasets and performing analysis with Spark.
These macros are briefly described below. You are encouraged to run all examples to see what they can do for you!
Example esk601: setting the spark configuration¶
Tutorial macro for configuring Spark in multiple ways.
$ eskapade_run $TUTDIR/esk601_spark_configuration.py
Example esk602: reading csv to a spark dataframe¶
Tutorial macro for reading CSV files into a Spark data frame.
$ eskapade_run $TUTDIR/esk602_read_csv_to_spark_df.py
Example esk603: writing spark data to csv¶
Tutorial macro for writing Spark data to a CSV file.
$ eskapade_run $TUTDIR/esk603_write_spark_data_to_csv.py
Example esk604: executing queries¶
Tutorial macro for applying a SQL-query to one more objects in the DataStore. Such SQL-queries can for instance be used to filter data.
$ eskapade_run $TUTDIR/esk604_spark_execute_query.py
Example esk605: creating Spark data frames from various input data¶
Tutorial macro for creating Spark data frames from different types of input data.
$ eskapade_run $TUTDIR/esk605_create_spark_df.py
Example esk606: converting Spark data frames into different data types¶
Tutorial macro for converting Spark data frames into a different data type and apply transformation functions on the resulting data.
$ eskapade_run $TUTDIR/esk606_convert_spark_df.py
Example esk607: adding a new column to a Spark dataframe¶
Tutorial macro for adding a new column to a Spark dataframe by applying a Spark built-in or user-defined function to a selection of columns in a Spark dataframe.
$ eskapade_run $TUTDIR/esk607_spark_with_column.py
Example esk608: making histograms of a Spark dataframe¶
Tutorial macro for making histograms of a Spark dataframe using the Histogrammar package.
$ eskapade_run $TUTDIR/esk608_spark_histogrammar.py
Example esk609: applying map functions on groups of rows¶
Tutorial macro for applying map functions on groups of rows in Spark data frames.
$ eskapade_run $TUTDIR/esk609_map_df_groups.py
Example esk610: running Spark Streaming word count example¶
Tutorial macro running Spark Streaming word count example in Eskapade, derived from:
https://spark.apache.org/docs/latest/streaming-programming-guide.html
Counts words in UTF8 encoded, ‘n’ delimited text received from a stream every second. The stream can be from either files or network.
$ eskapade_run $TUTDIR/esk610_spark_streaming_wordcount.py
Example esk611: techniques for flattening a time-series in Spark¶
This macro demonstrates techniques for flattening a time-series in Spark.
$ eskapade_run $TUTDIR/esk611_flatten_time_series.py
Tutorial 6: going Spark¶
This section provides a tutorial on how to use Apache Spark in Eskapade. Spark works ‘out of the box’ in the Eskapade docker/vagrant image. For details on how to setup a custom Spark setup, see the Spark section in the Appendix.
In this tutorial we will basically redo Tutorial 1 but use Spark instead of Pandas for data processing. The following paragraphs describe step-by-step how to run a Spark job, use existing links and write your own links for Spark queries.
Note
To get familiar with Spark in Eskapade you can follow the exercises in python/eskapadespark/tutorials/tutorial_6.py
.
Running the tutorial macro¶
The very first step to run the tutorial Spark job is:
$ eskapade_run python/eskapadespark/tutorials/tutorial_6.py
Eskapade will start a Spark session, do nothing, and quit - there are no chains/links defined yet.
The Spark session is created via the SparkManager
which, like the DataStore
,
is a singleton that configures and controls Spark sessions centrally. It is activated through the magic line:
process_manager.service(SparkManager).create_session(include_eskapade_modules=True)
Note that when the Spark session is created, the following line appears in logs:
Adding Python modules to egg archive <PATH_TO_ESKAPADE>/lib/es_python_modules.egg
This is the SparkManager
that ensures all Eskapade source code is uploaded and available to the Spark cluster when
running in a distributed environment. To include the Eskapade code the argument include_eskapade_modules
need to be
set to True
(by default it is False
).
If there was an ImportError: No module named pyspark
then, most likely, SPARK_HOME
and PYTHONPATH
are not set up correctly. For details, see the Spark section in the Appendix.
Reading data¶
Spark can read data from various sources, e.g. local disk, HDFS, HIVE tables. Eskapade provides the SparkDfReader
link that uses the pyspark.sql.DataFrameReader
to read flat CSV files into Spark DataFrames, RDD’s, and Pandas DataFrames.
To read in the Tutorial data, the following link should be added to the Data
chain:
data = Chain('Data')
reader = SparkDfReader(name='Read_LA_ozone', store_key='data', read_methods=['csv'])
reader.read_meth_args['csv'] = (DATA_FILE_PATH,)
reader.read_meth_kwargs['csv'] = dict(sep=',', header=True, inferSchema=True)
data.add(reader)
The DataStore
holds a pointer to the Spark dataframe in (distributed) memory. This is different from a Pandas dataframe,
where the entire dataframe is stored in the DataStore
, because a Spark dataframe residing on the cluster may not fit
entirely in the memory of the machine running Eskapade. This means that Spark dataframes are never written to disk in DataStore
pickles!
Using existing links¶
Spark has a large set of standard functions for Spark DataFrames and RDD’s. Although the purpose of Eskapade is not to
duplicate this functionality, there are some links created for generic functions to facilitate specifying Spark queries
directly in the macro, instead of hard-coding them in links. This is handy for bookkeeping queries at a central place
and reducing code duplication, especially for smaller analysis steps. For example, the SparkExecuteQuery
link takes
any string containig SQL statements to perform a custom query with Spark on a dataframe.
Column transformations¶
To add two columns to the Tutorial data using the conversion functions defined earlier in the macro, two SparkWithColumn
links need to be added to the Data
chain, one for each additional column:
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType, FloatType
...
transform = SparkWithColumn(name='Transform_doy', read_key=reader.store_key,
store_key='transformed_data', col_select=['doy'],
func=udf(comp_date, TimestampType()), new_column='date')
data.add(transform)
transform = SparkWithColumn(name='Transform_vis', read_key=transform.store_key,
store_key='transformed_data', col_select=['vis'],
func=udf(mi_to_km, FloatType()), new_column='vis_km')
data.add(transform)
Note that the functions defined in the macro are converted to user-defined functions with pyspark.sql.functions.udf
and their output types are explicitly specified in terms of pyspark.sql.types
. Omitting these type definitions can
lead to obscure errors when executing the job.
Creating custom links¶
More complex queries deserve their own links since links provide full flexibility w.r.t. specifying custom data operation. For this Tutorial the ‘complex query’ is to just print 42 rows of the Spark dataframe. Of course, more advanced Spark functions can be applied in a similar fashion. A link is created just like was done before, e.g.:
$ eskapade_generate_link --dir python/eskapadespark/links SparkDfPrinter
This creates the link python/eskapadespark/links/sparkdfprinter.py
. Do not forget to include the
import
statements in the __init__.py
file as indicated by the eskapade_generate_link
command.
The next step is to add the desired functionality to the link. In this case, the Spark dataframe needs to be retrieved
from the DataStore
and a show()
method of that dataframe needs to be executed. The execute()
method of the
link is the right location for this:
def execute(self):
"""Execute the link.
:returns: status code of execution
:rtype: StatusCode
"""
settings = process_manager.service(ConfigObject)
ds = process_manager.service(DataStore)
# --- your algorithm code goes here
self.logger.debug('Now executing link: {link}.', link=self.name)
df = ds[self.read_key]
df.show(self.nrows)
return StatusCode.Success
There is an additional attribute self.nrows
which should be set in the link. By default, a generated link process
only the read_key
and store_key
arguments and fails if there are any residual kwargs.
To set the nrows
attribute, add nrows
to the key-value arguments in the __init__()
method:
def __init__(self, **kwargs):
...
self._process_kwargs(kwargs, read_key=None, store_key=None, nrows=1)
In order to configure Eskapade to run this link, the link needs to be added to a chain, e.g. Summary
, in the
tutorial/tutorial_6.py
macro. This should look similar to:
printer = SparkDfPrinter(name='Print_spark_df', read_key=transform.store_key, nrows=42)
summary.add(printer)
The name of the dataframe is the output name of the transform
link and the number of rows to print is specified by the nrows
parameter.
Eskapade should now be ready to finally execute the macro and provide the desired output:
$ eskapade_run python/eskapadespark/tutorials/tutorial_6.py
* * * Welcome to Eskapade * * *
...
+-----+----+----+--------+----+----+---+---+---+---+--------------------+--------+
|ozone| vh|wind|humidity|temp| ibh|dpg|ibt|vis|doy| date| vis_km|
+-----+----+----+--------+----+----+---+---+---+---+--------------------+--------+
| 3|5710| 4| 28| 40|2693|-25| 87|250| 3|1976-01-03 00:00:...| 402.335|
| 5|5700| 3| 37| 45| 590|-24|128|100| 4|1976-01-04 00:00:...| 160.934|
| 5|5760| 3| 51| 54|1450| 25|139| 60| 5|1976-01-05 00:00:...| 96.5604|
...
| 6|5700| 4| 86| 55|2398| 21|121|200| 44|1976-02-13 00:00:...| 321.868|
| 4|5650| 5| 61| 41|5000| 51| 24|100| 45|1976-02-14 00:00:...| 160.934|
| 3|5610| 5| 62| 41|4281| 42| 52|250| 46|1976-02-15 00:00:...| 402.335|
+-----+----+----+--------+----+----+---+---+---+---+--------------------+--------+
only showing top 42 rows
...
* * * Leaving Eskapade. Bye! * * *
That’s it!
Spark Streaming¶
Eskapade supports the use of Spark Streaming as demonstrated in the word count example tutorials/esk610_spark_streaming_wordcount.py
.
The data is processed in (near) real-time as micro batches of RDD’s, so-called discretized streaming, where the stream
originates from either new incoming files or network connection. As with regular Spark queries, various transformations
can be defined and applied in subsequent Eskapade links.
For details on Spark Streaming, see also https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html.
File stream¶
The word count example using the file stream method can be run by executing in two different terminals:
terminal 1 $ eskapade_run -c stream_type='file' python/eskapadespark/tutorials/esk610_spark_streaming_wordcount.py
terminal 2 $ mkdir /tmp/eskapade_stream_test/
terminal 2 $ for ((i=0; i<=100; i++)); do echo "Hello world" > /tmp/eskapade_stream_test/dummy_$(printf %05d ${i}); sleep 0.2; done
Where bash for
-loop will create a new file containing Hello world
in the /tmp/eskapade_stream_test directory every 0.2 second.
Spark Streaming will pick up and process these files and in terminal 1
a word count of the processed data will be displayed.
Output is stored in results/esk610_spark_streaming/data/v0/dstream/wordcount
.
Only new files in /tmp/eskapade_stream_test are processed, do not forget to delete this directory.
TCP stream¶
The word count example using the TCP stream method can be run by executing in two different terminals:
terminal 1 $ eskapade_run -c stream_type='tcp' python/eskapadespark/tutorials/esk610_spark_streaming_wordcount.py
terminal 2 $ nc -lk 9999
Where nc
(netcat) will stream data to port 9999 and Spark Streaming will listen to this port and process incoming data.
In terminal 2
random words can be type (followed by enter) and in terminal 1
a word count of the processed data
will by displayed. Output is stored in results/esk610_spark_streaming/data/v0/dstream/wordcount
.