eskapadespark.links package¶
Submodules¶
eskapadespark.links.daily_summary module¶
Project: Eskapade - A python-based package for data analysis.
Class: ExampleLink
Created: 2018-03-08
- Description:
Each feature given from the input df will by default correspond to 6 columns in the output: min, mean, max, stddev, count, and sum. The columns are named like ‘feature_stddev_0d’ (0d since we look 0 days back into the past).
The new dataframe will also contain the column new_date_col with the date, and all the identifying columns given in partitionby_cols.
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.daily_summary.
DailySummary
(**kwargs)¶ Bases:
escore.core.element.Link
Creates daily summary information from a timeseries dataframe.
Each feature given from the input df will by default correspond to 6 columns in the output: min, mean, max, stddev, count, and sum. The columns are named like ‘feature_stddev_0d’ (0d since we look 0 days back into the past).
The new dataframe will also contain the column new_date_col with the date, and all the identifying columns given in partitionby_cols.
-
__init__
(**kwargs)¶ Initialize an instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
- feature_cols (list/dict) – columns to take daily aggregates of. If list, all columns in the list are aggregated with the min, mean, max, stddev, count, and sum. If dict, the keys are column names to aggregate, and the values are lists of aggregation functions to apply. These must be built in spark aggregation functions.
- new_date_col (str) – name of the ‘date’ column which will be created (default ‘date’)
- datetime_col (str) – name of column with datetime information in the dataframe
- partitionby_cols (list) – identifying columns to partition by before aggregating
-
execute
()¶ Execute the link.
Returns: status code of execution Return type: StatusCode
-
finalize
()¶ Finalize the link.
Returns: status code of finalization Return type: StatusCode
-
initialize
()¶ Initialize the link.
Returns: status code of initialization Return type: StatusCode
-
eskapadespark.links.find_days_until_event module¶
Project: Eskapade - A python-based package for data analysis.
Class: ExampleLink
Created: 2018-03-08
- Description:
- Will create a new column (name given by countdown_col_name) containing the number of days between the current row and the next date on which event_col is greater than 0. The dataframe must include a column that has a date or datetime.
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.find_days_until_event.
FindDaysUntilEvent
(**kwargs)¶ Bases:
escore.core.element.Link
Find the number of days until an event in a spark dataframe.
Will create a new column (name given by countdown_col_name) containing the number of days between the current row and the next date on which event_col is greater than 0. The dataframe must include a column that has a date or datetime.
-
__init__
(**kwargs)¶ Find the number of days until a particular event in an ordered dataframe.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
- datetime_col (str) – column with datetime information
- event_col (str) – the column containing the events (0 for rows with no events, >0 otherwise)
- countdown_col_name (str) – column where the number of days until the next event will be stored
- partitionby_cols (str) – columns to partition the countdown by
-
execute
()¶ Execute the link.
Returns: status code of execution Return type: StatusCode
-
finalize
()¶ Finalize the link.
Returns: status code of finalization Return type: StatusCode
-
initialize
()¶ Initialize the link.
Returns: status code of initialization Return type: StatusCode
-
eskapadespark.links.rdd_group_mapper module¶
Project: Eskapade - A python-based package for data analysis.
Class: RddGroupMapper
Created: 2017/06/20
- Description:
- Apply a map function on groups in a Spark RDD
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.rdd_group_mapper.
RddGroupMapper
(**kwargs)¶ Bases:
escore.core.element.Link
Apply a map function on groups in a Spark RDD.
Group rows of key-value pairs in a Spark RDD by key and apply a custom map function on the group values. By default, the group key and the value returned by the map function forms a single row in the output RDD. If the “flatten_output_groups” flag is set, the returned value is interpreted as an iterable and a row is created for each item.
Optionally, a map function is applied on the rows of the input RDD, for example to create the group key-value pairs. Similarly, a function may be specified to map the key-value pairs resulting from the group map.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of the input data in the data store
- store_key (str) – key of the output data frame in the data store
- group_map – map function for group values
- input_map – map function for input rows; optional, e.g. to create group key-value pairs
- result_map – map function for output group values; optional, e.g. to flatten group key-value pairs
- flatten_output_groups (bool) – create a row for each item in the group output values (default is False)
- num_group_partitions (int) – number of partitions for group map (optional, no repartitioning by default)
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_configurator module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkConfigurator
Created: 2017/06/07
- Description:
- This link stops a running Spark session and starts a new one with the configuration provided to the link.
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_configurator.
SparkConfigurator
(**kwargs)¶ Bases:
escore.core.element.Link
Set configuration settings of SparkContext.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- spark_settings (iterable) – list of key/value pairs specifying the Spark configuration
- log_level (str) – verbosity level of the SparkContext
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_data_to_csv module¶
Project: Eskapade - A python-based package for data analysis.
Class : SparkDataToCsv
Created: 2015-11-16
- Description:
- Write Spark data to local CSV files
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_data_to_csv.
SparkDataToCsv
(**kwargs)¶ Bases:
escore.core.element.Link
Write Spark data to local CSV files.
Data to write to CSV are provided as a Spark RDD or a Spark data frame. The data are written to a configurable number of CSV files in the specified output directory.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link instance
- read_key (str) – data-store key of the Spark data
- output_path (str) – directory path of the output CSV file(s)
- mode (str) – write mode if data already exist (“overwrite”, “ignore”, “error”)
- compression_codec (str) – compression-codec class (e.g., ‘org.apache.hadoop.io.compress.GzipCodec’)
- sep (str) – CSV separator string
- header (tuple|bool) – column names to write as CSV header or boolean to indicate if names must be determined from input data frame
- num_files (int) – requested number of output files
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_df_converter module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkDfConverter
Created: 2017/06/15
- Description:
- Convert a Spark data frame into data of a different format
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_df_converter.
SparkDfConverter
(**kwargs)¶ Bases:
escore.core.element.Link
Link to convert a Spark data frame into a different format.
A data frame from the data store is converted into data of a different format and/or transformed. The format conversion is controlled by the “output_format” argument. The data frame can either be unchanged (“df”, default) or converted into a Spark RDD of tuples (“RDD”), a list of tuples (“list”), or a Pandas data frame (“pd”).
After the format conversion, the data can be transformed by functions specified by the “process_methods” argument. These functions will be sequentially applied to the output of the previous function. Each function is specified by either a callable object or a string. A string will be interpreted as the name of an attribute of the dataset type.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of the input data in the data store
- store_key (str) – key of the output data frame in the data store
- schema_key (str) – key to store the data-frame schema in the data store
- output_format (str) – data format to store: {“df” (default), “RDD”, “list”, “pd”}
- preserve_col_names (bool) – preserve column names for non-data-frame output formats (default is True)
- process_methods (iterable) – methods to apply sequentially on the produced data
- process_meth_args (dict) – positional arguments for process methods
- process_meth_kwargs (dict) – keyword arguments for process methods
- fail_missing_data (bool) – fail execution if the input data frame is missing (default is “True”)
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize SparkDfConverter.
-
eskapadespark.links.spark_df_creator module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkDfCreator
Created: 2017/06/13
- Description:
- Create a Spark data frame from generic input data
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_df_creator.
SparkDfCreator
(**kwargs)¶ Bases:
escore.core.element.Link
Link to create a Spark dataframe from generic input data.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of the input data in the data store
- store_key (str) – key of the output data frame in the data store
- schema – schema to create data frame if input data have a different format
- process_methods (iterable) – methods to apply sequentially on the produced data frame
- process_meth_args (dict) – positional arguments for process methods
- process_meth_kwargs (dict) – keyword arguments for process methods
- fail_missing_data (bool) – fail execution if data are missing (default is “True”)
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_df_reader module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkDfReader
Created: 2016/11/08
- Description:
- Read data into a Spark data frame
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_df_reader.
SparkDfReader
(**kwargs)¶ Bases:
escore.core.element.Link
Link to read data into a Spark dataframe.
Data are read with the Spark-SQL data-frame reader (pyspark.sql.DataFrameReader). The read-method to be applied on the reader instance (load, parquet, csv, …) can be specified by the user, including its arguments. In addition to the read method, also other functions to be applied on the reader (schema, option, …) and/or the resulting data frame (filter, select, repartition, …) can be included.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- store_key (str) – key of data to store in data store
- read_methods (iterable) – methods to apply sequentially on data-frame reader and data frame
- read_meth_args (dict) – positional arguments for read methods
- read_meth_kwargs (dict) – keyword arguments for read methods
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_df_writer module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkDfWriter
Created: 2016/11/08
- Description:
- Write data from a Spark data frame
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_df_writer.
SparkDfWriter
(**kwargs)¶ Bases:
escore.core.element.Link
Link to write data from a Spark dataframe.
Data are written with the Spark-SQL data-frame writer (pyspark.sql.DataFrameWriter). The write method to be applied (save, parquet, csv, …) can be specified by the user, including its arguments. In addition to the write method, also other functions to be applied on the writer (format, option, …) can be included.
If the input format is not a Spark data frame, an attempt is made to convert to a data frame. This works for lists, Spark RDDs, and Pandas data frames. A schema may be specified for the created data frame.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data in data store
- schema – schema to create data frame if input data have a different format
- write_methods (iterable) – methods to apply sequentially on data-frame writer
- write_meth_args (dict) – positional arguments for write methods
- write_meth_kwargs (dict) – keyword arguments for write methods
- num_files (int) – requested number of output files
- fail_missing_data (bool) – fail execution if data are missing (default is “True”)
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_execute_query module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkExecuteQuery
Created: 2017/11/08
- Description:
- SparkExecuteQuery applies a SQL-query to one or more objects in the DataStore and adds the output of the query to the DataStore as a Spark dataframe, RDD or Pandas dataframe.
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_execute_query.
SparkExecuteQuery
(**kwargs)¶ Bases:
escore.core.element.Link
Defines the content of link SparkExecuteQuery.
Applies a SQL-query to one or more objects in the DataStore. Such SQL-queries can for instance be used to filter Spark dataframes. All objects in the DataStore are registered as SQL temporary views. The output of the query can be added to the DataStore as a Spark dataframe (default), RDD or Pandas dataframe.
-
__init__
(**kwargs)¶ Store the configuration of link SparkExecuteQuery.
Parameters: - name (str) – name of link
- store_key (str) – key of data to store in data store
- output_format (str) – data format to store: {“df” (default), “rdd”, “pd”}
- query (str) – a string containing a SQL-query.
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_histogrammar_filler module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkHistogrammarFiller
Created: 2017/06/09
- Description:
- Algorithm to fill histogrammar sparse-bin histograms from a Spark dataframe. It is possible to do cleaning of these histograms by rejecting certain keys or removing inconsistent data types. Timestamp columns are converted to nanoseconds before the binning is applied.
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_histogrammar_filler.
SparkHistogrammarFiller
(**kwargs)¶ Bases:
eskapade.analysis.links.hist_filler.HistogrammarFiller
Fill histogrammar sparse-bin histograms with Spark.
Algorithm to fill histogrammar style sparse-bin and category histograms with Spark. It is possible to do after-filling cleaning of these histograms by rejecting certain keys or removing inconsistent data types. Timestamp columns are converted to nanoseconds before the binning is applied. Final histograms are stored in the datastore.
Example is available in: tutorials/esk605_hgr_filler_plotter.py.
-
__init__
(**kwargs)¶ Initialize link instance.
Store and do basic check on the attributes of link HistogrammarFiller.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store histograms in data store
- columns (list) – colums to pick up from input data (default is all columns)
- bin_specs (dict) – dictionaries used for rebinning numeric or timestamp columns
Example bin_specs dictionary is:
>>> bin_specs = {'x': {'bin_width': 1, 'bin_offset': 0}, >>> 'y': {'bin_edges': [0, 2, 3, 4, 5, 7, 8]}}
Parameters: - var_dtype (dict) – dict of datatypes of the columns to study from dataframe (if not provided, try to determine datatypes directy from dataframe)
- quantity (dict) – dictionary of lambda functions of how to pars certain columns
Example quantity dictionary is:
>>> quantity = {'y': lambda x: x}
Parameters: - store_at_finalize (bool) – store histograms in datastore at finalize(), not at execute() (useful when looping over datasets, default is False)
- dict (drop_keys) – dictionary used for dropping specific keys from bins dictionaries of histograms
Example drop_keys dictionary is:
>>> drop_keys = {'x': [1, 4, 8, 19], >>> 'y': ['apple', 'pear', 'tomato'], >>> 'x:y': [(1, 'apple'), (19, 'tomato')]}
-
assert_dataframe
(df)¶ Check that input data is a filled Spark data frame.
Parameters: df – input Spark data frame
-
construct_empty_hist
(df, columns)¶ Create an (empty) histogram of right type.
Create a multi-dim histogram by iterating through the columns in reverse order and passing a single-dim hist as input to the next column.
Parameters: - df – input dataframe
- columns (list) – histogram columns
Returns: created histogram
Return type: histogrammar.Count
-
fill_histogram
(idf, columns)¶ Fill input histogram with column(s) of input dataframe.
Parameters: - idf – input data frame used for filling histogram
- columns (list) – histogram column(s)
-
get_all_columns
(data)¶ Retrieve all columns / keys from input data.
Parameters: data – input data sample (pandas dataframe or dict) Returns: list of columns Return type: list
-
get_data_type
(df, col)¶ Get data type of dataframe column.
Parameters: - df – input data frame
- col (str) – column
-
process_and_store
()¶ Process and store spark-based histogram objects.
-
process_columns
(df)¶ Process columns before histogram filling.
Specifically, in this case convert timestamp columns to nanoseconds
Parameters: df – input data frame Returns: output data frame with converted timestamp columns Return type: DataFrame
-
-
eskapadespark.links.spark_histogrammar_filler.
hgr_patch_histogram
(hist)¶ Apply set of patches to histogrammer histogram.
Parameters: hist – histogrammar histogram to patch up.
-
eskapadespark.links.spark_histogrammar_filler.
hgr_reset_quantity
(hist, new_quantity=<function unit_func>)¶ Reset quantity attribute of histogrammar histogram.
If quantity refers to a Spark df the histogram cannot be pickled, b/c we cannot pickle a Spark df. Here we reset the quantity of a (filled) histogram to a neutral lambda function.
Parameters: - hist – histogrammar histogram to reset quantity of.
- new_quantity – new quantity function to reset hist.quantity to. default is lambda x: x.
-
eskapadespark.links.spark_histogrammar_filler.
unit_func
(x)¶ Dummy quantity function for histogrammar objects
Parameters: x – value Returns: the same value
eskapadespark.links.spark_streaming_controller module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkStreamingController
Created: 2017/07/12
- Description:
- Link to start/stop Spark Stream.
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_streaming_controller.
SparkStreamingController
(**kwargs)¶ Bases:
escore.core.element.Link
Defines the content of link SparkStreamingController.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
- timeout (int) – the amount of time (in seconds) for running the Spark Streaming Context
-
execute
()¶ Execute the link.
-
finalize
()¶ Finalize the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_streaming_wordcount module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkStreamingWordCount
Created: 2017/07/12
- Description:
- The Spark Streaming word count example derived from: https://spark.apache.org/docs/latest/streaming-programming-guide.html
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_streaming_wordcount.
SparkStreamingWordCount
(**kwargs)¶ Bases:
escore.core.element.Link
Counts words in UTF8 encoded delimited text.
Text is received from the network every second. To run this on your local machine, you need to first run a Netcat server
$ nc -lk 9999
and then run the example (in a second terminal)
$ eskapade_run tutorials/esk610_spark_streaming_wordcount.py
NB: hostname and port can be adapted in the macro.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
-
execute
()¶ Execute the link.
-
finalize
()¶ Finalize the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_streaming_writer module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkStreamingWriter
Created: 2017/07/12
- Description:
- This link writes Spark Stream DStream data to disk. The path specifies the directory on eithter local disk or HDFS where files are stored. Each processed RDD batch will be stored in a separate file (hence the number of files can increase rapidly).
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_streaming_writer.
SparkStreamingWriter
(**kwargs)¶ Bases:
escore.core.element.Link
Link to write Spark Stream to disk.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
- path (str) – the directory path of the output files (local disk or HDFS)
- suffix (str) – the suffix of the file names in the output directory
- repartition (int) – repartition RDD to number of files (default: single file per batch)
-
execute
()¶ Execute the link.
-
finalize
()¶ Finalize the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.spark_with_column module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkWithColumn
Created: 2018-03-08
- Description:
- SparkWithColumn adds the output of a column expression (column operation, sql.functions function, or udf) to a dataframe.
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.spark_with_column.
SparkWithColumn
(**kwargs)¶ Bases:
escore.core.element.Link
Create a new column from columns in a Spark dataframe
SparkWithColumn adds the output of a column expression (column operation, sql.functions function, or udf) to a dataframe.
-
__init__
(**kwargs)¶ Initialize SparkWithColumn instance
Parameters: - name (str) – name of link
- read_key (str) – key of data to read from data store
- store_key (str) – key of data to store in data store
- new_col_name (str) – name of newly created column
- new_col (Column) – the column object to be included in the dataframe, resulting from a column expression
-
execute
()¶ Execute the link.
Returns: status code of execution Return type: StatusCode
-
finalize
()¶ Finalize the link.
Returns: status code of finalization Return type: StatusCode
-
initialize
()¶ Initialize the link.
Returns: status code of initialization Return type: StatusCode
-
eskapadespark.links.sparkgeneralfuncprocessor module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkGeneralFuncProcessor
Created: 2016/11/08
- Description:
- Processor for applying pandas function on a Spark dataframe.
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.sparkgeneralfuncprocessor.
SparkGeneralFuncProcessor
(**kwargs)¶ Bases:
escore.core.element.Link
Processor for applying pandas function on a Spark dataframe.
The spark API is not (yet) as rich as the pandas API. Therefore sometimes one needs pandas to implement the desired algorithm. This link defines a general approach for applying an advanced function using pandas on a Spark dataframe. The Spark dataframe is grouped and the general function is applied on each group in parallel. In the general function a pandas dataframe can be created as follows: pandas_df = pd.DataFrame(list(group), columns=cols) For examples, see the function in the deutils.analysishelper module
This Link uses pyspark.RDD.groupByKey() function instead of pyspark.RDD.reduceBeKey() because one needs all the data of one group on one datanode in order to make a pandas dataframe from the group.
-
__init__
(**kwargs)¶ Initialize link instance.
Store the configuration of link SparkToGeneralFuncProcessor.
Parameters: - name (str) – name of link
- read_key (str) – key of data to read from data store. It should contain a spark dataframe or spark rdd.
- store_key (str) – key of data to store in data store
- groupby (list) – spark dataframe columns to group by
- columns (list) – The columns of the spark dataframe or RDD. Obligatory for RDD, not for spark dataframe.
- generalfunc (func) – The general function. Should be defined by the user. Arguments should be list of tuples (rows of RDD), column names and if necessary keyword arguments. Should return a list of native python types.
- function_args (dict) – Keyword arguments for the function
- nb_partitions (int) – The number of partitions for repartitioning after groupByKey
- return_map (func) – Function used by the map on the RDD after the generalfunc is applied. The default return a tuple of the groupby columns (row[0]) and the list returned by the generalfunc (row[1]).
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
eskapadespark.links.sparkhister module¶
Project: Eskapade - A python-based package for data analysis.
Class: SparkHister
Created: 2016/11/08
- Description:
- Algorithm to do…(fill in here)
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.links.sparkhister.
SparkHister
(name='HiveHister')¶ Bases:
escore.core.element.Link
Defines the content of link SparkHister.
-
__init__
(name='HiveHister')¶ Initialize link instance.
Store the configuration of link SparkHister.
Parameters: - name (str) – name of link
- read_key (str) – key of data to read from data store
- store_key (str) – key of data to store in data store
- columns (list) – columns of the Spark dataframe to make a histogram from
- bins (dict) – the bin edges of the histogram
- convert_for_mongo (bool) – if True the data structure of the result is converted so it can be stored in mongo
-
execute
()¶ Execute the link.
-
finalize
()¶ Finalize the link.
-
initialize
()¶ Initialize the link.
-
Module contents¶
-
class
eskapadespark.links.
RddGroupMapper
(**kwargs)¶ Bases:
escore.core.element.Link
Apply a map function on groups in a Spark RDD.
Group rows of key-value pairs in a Spark RDD by key and apply a custom map function on the group values. By default, the group key and the value returned by the map function forms a single row in the output RDD. If the “flatten_output_groups” flag is set, the returned value is interpreted as an iterable and a row is created for each item.
Optionally, a map function is applied on the rows of the input RDD, for example to create the group key-value pairs. Similarly, a function may be specified to map the key-value pairs resulting from the group map.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of the input data in the data store
- store_key (str) – key of the output data frame in the data store
- group_map – map function for group values
- input_map – map function for input rows; optional, e.g. to create group key-value pairs
- result_map – map function for output group values; optional, e.g. to flatten group key-value pairs
- flatten_output_groups (bool) – create a row for each item in the group output values (default is False)
- num_group_partitions (int) – number of partitions for group map (optional, no repartitioning by default)
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkConfigurator
(**kwargs)¶ Bases:
escore.core.element.Link
Set configuration settings of SparkContext.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- spark_settings (iterable) – list of key/value pairs specifying the Spark configuration
- log_level (str) – verbosity level of the SparkContext
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkDataToCsv
(**kwargs)¶ Bases:
escore.core.element.Link
Write Spark data to local CSV files.
Data to write to CSV are provided as a Spark RDD or a Spark data frame. The data are written to a configurable number of CSV files in the specified output directory.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link instance
- read_key (str) – data-store key of the Spark data
- output_path (str) – directory path of the output CSV file(s)
- mode (str) – write mode if data already exist (“overwrite”, “ignore”, “error”)
- compression_codec (str) – compression-codec class (e.g., ‘org.apache.hadoop.io.compress.GzipCodec’)
- sep (str) – CSV separator string
- header (tuple|bool) – column names to write as CSV header or boolean to indicate if names must be determined from input data frame
- num_files (int) – requested number of output files
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkDfConverter
(**kwargs)¶ Bases:
escore.core.element.Link
Link to convert a Spark data frame into a different format.
A data frame from the data store is converted into data of a different format and/or transformed. The format conversion is controlled by the “output_format” argument. The data frame can either be unchanged (“df”, default) or converted into a Spark RDD of tuples (“RDD”), a list of tuples (“list”), or a Pandas data frame (“pd”).
After the format conversion, the data can be transformed by functions specified by the “process_methods” argument. These functions will be sequentially applied to the output of the previous function. Each function is specified by either a callable object or a string. A string will be interpreted as the name of an attribute of the dataset type.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of the input data in the data store
- store_key (str) – key of the output data frame in the data store
- schema_key (str) – key to store the data-frame schema in the data store
- output_format (str) – data format to store: {“df” (default), “RDD”, “list”, “pd”}
- preserve_col_names (bool) – preserve column names for non-data-frame output formats (default is True)
- process_methods (iterable) – methods to apply sequentially on the produced data
- process_meth_args (dict) – positional arguments for process methods
- process_meth_kwargs (dict) – keyword arguments for process methods
- fail_missing_data (bool) – fail execution if the input data frame is missing (default is “True”)
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize SparkDfConverter.
-
-
class
eskapadespark.links.
SparkDfCreator
(**kwargs)¶ Bases:
escore.core.element.Link
Link to create a Spark dataframe from generic input data.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of the input data in the data store
- store_key (str) – key of the output data frame in the data store
- schema – schema to create data frame if input data have a different format
- process_methods (iterable) – methods to apply sequentially on the produced data frame
- process_meth_args (dict) – positional arguments for process methods
- process_meth_kwargs (dict) – keyword arguments for process methods
- fail_missing_data (bool) – fail execution if data are missing (default is “True”)
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkDfReader
(**kwargs)¶ Bases:
escore.core.element.Link
Link to read data into a Spark dataframe.
Data are read with the Spark-SQL data-frame reader (pyspark.sql.DataFrameReader). The read-method to be applied on the reader instance (load, parquet, csv, …) can be specified by the user, including its arguments. In addition to the read method, also other functions to be applied on the reader (schema, option, …) and/or the resulting data frame (filter, select, repartition, …) can be included.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- store_key (str) – key of data to store in data store
- read_methods (iterable) – methods to apply sequentially on data-frame reader and data frame
- read_meth_args (dict) – positional arguments for read methods
- read_meth_kwargs (dict) – keyword arguments for read methods
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkDfWriter
(**kwargs)¶ Bases:
escore.core.element.Link
Link to write data from a Spark dataframe.
Data are written with the Spark-SQL data-frame writer (pyspark.sql.DataFrameWriter). The write method to be applied (save, parquet, csv, …) can be specified by the user, including its arguments. In addition to the write method, also other functions to be applied on the writer (format, option, …) can be included.
If the input format is not a Spark data frame, an attempt is made to convert to a data frame. This works for lists, Spark RDDs, and Pandas data frames. A schema may be specified for the created data frame.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data in data store
- schema – schema to create data frame if input data have a different format
- write_methods (iterable) – methods to apply sequentially on data-frame writer
- write_meth_args (dict) – positional arguments for write methods
- write_meth_kwargs (dict) – keyword arguments for write methods
- num_files (int) – requested number of output files
- fail_missing_data (bool) – fail execution if data are missing (default is “True”)
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkExecuteQuery
(**kwargs)¶ Bases:
escore.core.element.Link
Defines the content of link SparkExecuteQuery.
Applies a SQL-query to one or more objects in the DataStore. Such SQL-queries can for instance be used to filter Spark dataframes. All objects in the DataStore are registered as SQL temporary views. The output of the query can be added to the DataStore as a Spark dataframe (default), RDD or Pandas dataframe.
-
__init__
(**kwargs)¶ Store the configuration of link SparkExecuteQuery.
Parameters: - name (str) – name of link
- store_key (str) – key of data to store in data store
- output_format (str) – data format to store: {“df” (default), “rdd”, “pd”}
- query (str) – a string containing a SQL-query.
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkHistogrammarFiller
(**kwargs)¶ Bases:
eskapade.analysis.links.hist_filler.HistogrammarFiller
Fill histogrammar sparse-bin histograms with Spark.
Algorithm to fill histogrammar style sparse-bin and category histograms with Spark. It is possible to do after-filling cleaning of these histograms by rejecting certain keys or removing inconsistent data types. Timestamp columns are converted to nanoseconds before the binning is applied. Final histograms are stored in the datastore.
Example is available in: tutorials/esk605_hgr_filler_plotter.py.
-
__init__
(**kwargs)¶ Initialize link instance.
Store and do basic check on the attributes of link HistogrammarFiller.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store histograms in data store
- columns (list) – colums to pick up from input data (default is all columns)
- bin_specs (dict) – dictionaries used for rebinning numeric or timestamp columns
Example bin_specs dictionary is:
>>> bin_specs = {'x': {'bin_width': 1, 'bin_offset': 0}, >>> 'y': {'bin_edges': [0, 2, 3, 4, 5, 7, 8]}}
Parameters: - var_dtype (dict) – dict of datatypes of the columns to study from dataframe (if not provided, try to determine datatypes directy from dataframe)
- quantity (dict) – dictionary of lambda functions of how to pars certain columns
Example quantity dictionary is:
>>> quantity = {'y': lambda x: x}
Parameters: - store_at_finalize (bool) – store histograms in datastore at finalize(), not at execute() (useful when looping over datasets, default is False)
- dict (drop_keys) – dictionary used for dropping specific keys from bins dictionaries of histograms
Example drop_keys dictionary is:
>>> drop_keys = {'x': [1, 4, 8, 19], >>> 'y': ['apple', 'pear', 'tomato'], >>> 'x:y': [(1, 'apple'), (19, 'tomato')]}
-
assert_dataframe
(df)¶ Check that input data is a filled Spark data frame.
Parameters: df – input Spark data frame
-
construct_empty_hist
(df, columns)¶ Create an (empty) histogram of right type.
Create a multi-dim histogram by iterating through the columns in reverse order and passing a single-dim hist as input to the next column.
Parameters: - df – input dataframe
- columns (list) – histogram columns
Returns: created histogram
Return type: histogrammar.Count
-
fill_histogram
(idf, columns)¶ Fill input histogram with column(s) of input dataframe.
Parameters: - idf – input data frame used for filling histogram
- columns (list) – histogram column(s)
-
get_all_columns
(data)¶ Retrieve all columns / keys from input data.
Parameters: data – input data sample (pandas dataframe or dict) Returns: list of columns Return type: list
-
get_data_type
(df, col)¶ Get data type of dataframe column.
Parameters: - df – input data frame
- col (str) – column
-
process_and_store
()¶ Process and store spark-based histogram objects.
-
process_columns
(df)¶ Process columns before histogram filling.
Specifically, in this case convert timestamp columns to nanoseconds
Parameters: df – input data frame Returns: output data frame with converted timestamp columns Return type: DataFrame
-
-
class
eskapadespark.links.
SparkStreamingController
(**kwargs)¶ Bases:
escore.core.element.Link
Defines the content of link SparkStreamingController.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
- timeout (int) – the amount of time (in seconds) for running the Spark Streaming Context
-
execute
()¶ Execute the link.
-
finalize
()¶ Finalize the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkStreamingWordCount
(**kwargs)¶ Bases:
escore.core.element.Link
Counts words in UTF8 encoded delimited text.
Text is received from the network every second. To run this on your local machine, you need to first run a Netcat server
$ nc -lk 9999
and then run the example (in a second terminal)
$ eskapade_run tutorials/esk610_spark_streaming_wordcount.py
NB: hostname and port can be adapted in the macro.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
-
execute
()¶ Execute the link.
-
finalize
()¶ Finalize the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkStreamingWriter
(**kwargs)¶ Bases:
escore.core.element.Link
Link to write Spark Stream to disk.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
- path (str) – the directory path of the output files (local disk or HDFS)
- suffix (str) – the suffix of the file names in the output directory
- repartition (int) – repartition RDD to number of files (default: single file per batch)
-
execute
()¶ Execute the link.
-
finalize
()¶ Finalize the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkWithColumn
(**kwargs)¶ Bases:
escore.core.element.Link
Create a new column from columns in a Spark dataframe
SparkWithColumn adds the output of a column expression (column operation, sql.functions function, or udf) to a dataframe.
-
__init__
(**kwargs)¶ Initialize SparkWithColumn instance
Parameters: - name (str) – name of link
- read_key (str) – key of data to read from data store
- store_key (str) – key of data to store in data store
- new_col_name (str) – name of newly created column
- new_col (Column) – the column object to be included in the dataframe, resulting from a column expression
-
execute
()¶ Execute the link.
Returns: status code of execution Return type: StatusCode
-
finalize
()¶ Finalize the link.
Returns: status code of finalization Return type: StatusCode
-
initialize
()¶ Initialize the link.
Returns: status code of initialization Return type: StatusCode
-
-
class
eskapadespark.links.
SparkGeneralFuncProcessor
(**kwargs)¶ Bases:
escore.core.element.Link
Processor for applying pandas function on a Spark dataframe.
The spark API is not (yet) as rich as the pandas API. Therefore sometimes one needs pandas to implement the desired algorithm. This link defines a general approach for applying an advanced function using pandas on a Spark dataframe. The Spark dataframe is grouped and the general function is applied on each group in parallel. In the general function a pandas dataframe can be created as follows: pandas_df = pd.DataFrame(list(group), columns=cols) For examples, see the function in the deutils.analysishelper module
This Link uses pyspark.RDD.groupByKey() function instead of pyspark.RDD.reduceBeKey() because one needs all the data of one group on one datanode in order to make a pandas dataframe from the group.
-
__init__
(**kwargs)¶ Initialize link instance.
Store the configuration of link SparkToGeneralFuncProcessor.
Parameters: - name (str) – name of link
- read_key (str) – key of data to read from data store. It should contain a spark dataframe or spark rdd.
- store_key (str) – key of data to store in data store
- groupby (list) – spark dataframe columns to group by
- columns (list) – The columns of the spark dataframe or RDD. Obligatory for RDD, not for spark dataframe.
- generalfunc (func) – The general function. Should be defined by the user. Arguments should be list of tuples (rows of RDD), column names and if necessary keyword arguments. Should return a list of native python types.
- function_args (dict) – Keyword arguments for the function
- nb_partitions (int) – The number of partitions for repartitioning after groupByKey
- return_map (func) – Function used by the map on the RDD after the generalfunc is applied. The default return a tuple of the groupby columns (row[0]) and the list returned by the generalfunc (row[1]).
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
SparkHister
(name='HiveHister')¶ Bases:
escore.core.element.Link
Defines the content of link SparkHister.
-
__init__
(name='HiveHister')¶ Initialize link instance.
Store the configuration of link SparkHister.
Parameters: - name (str) – name of link
- read_key (str) – key of data to read from data store
- store_key (str) – key of data to store in data store
- columns (list) – columns of the Spark dataframe to make a histogram from
- bins (dict) – the bin edges of the histogram
- convert_for_mongo (bool) – if True the data structure of the result is converted so it can be stored in mongo
-
execute
()¶ Execute the link.
-
finalize
()¶ Finalize the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
DailySummary
(**kwargs)¶ Bases:
escore.core.element.Link
Creates daily summary information from a timeseries dataframe.
Each feature given from the input df will by default correspond to 6 columns in the output: min, mean, max, stddev, count, and sum. The columns are named like ‘feature_stddev_0d’ (0d since we look 0 days back into the past).
The new dataframe will also contain the column new_date_col with the date, and all the identifying columns given in partitionby_cols.
-
__init__
(**kwargs)¶ Initialize an instance.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
- feature_cols (list/dict) – columns to take daily aggregates of. If list, all columns in the list are aggregated with the min, mean, max, stddev, count, and sum. If dict, the keys are column names to aggregate, and the values are lists of aggregation functions to apply. These must be built in spark aggregation functions.
- new_date_col (str) – name of the ‘date’ column which will be created (default ‘date’)
- datetime_col (str) – name of column with datetime information in the dataframe
- partitionby_cols (list) – identifying columns to partition by before aggregating
-
execute
()¶ Execute the link.
Returns: status code of execution Return type: StatusCode
-
finalize
()¶ Finalize the link.
Returns: status code of finalization Return type: StatusCode
-
initialize
()¶ Initialize the link.
Returns: status code of initialization Return type: StatusCode
-
-
class
eskapadespark.links.
FindDaysUntilEvent
(**kwargs)¶ Bases:
escore.core.element.Link
Find the number of days until an event in a spark dataframe.
Will create a new column (name given by countdown_col_name) containing the number of days between the current row and the next date on which event_col is greater than 0. The dataframe must include a column that has a date or datetime.
-
__init__
(**kwargs)¶ Find the number of days until a particular event in an ordered dataframe.
Parameters: - name (str) – name of link
- read_key (str) – key of input data to read from data store
- store_key (str) – key of output data to store in data store
- datetime_col (str) – column with datetime information
- event_col (str) – the column containing the events (0 for rows with no events, >0 otherwise)
- countdown_col_name (str) – column where the number of days until the next event will be stored
- partitionby_cols (str) – columns to partition the countdown by
-
execute
()¶ Execute the link.
Returns: status code of execution Return type: StatusCode
-
finalize
()¶ Finalize the link.
Returns: status code of finalization Return type: StatusCode
-
initialize
()¶ Initialize the link.
Returns: status code of initialization Return type: StatusCode
-
-
class
eskapadespark.links.
HiveReader
(**kwargs)¶ Bases:
escore.core.element.Link
Link to read a Hive table into a Spark dataframe.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - databaseName (str) – name of the hive database
- tableName (str) – name of the hive table
- store_key (str) – key of data to store in data store
- columns (list) – hive columns to read. If empty all columns will be queried.
- selection (str) – where clause of the hive query
- limit (str) – limit clause of the hive query
- processFuncs (dict) – process spark functions after query
- full_query (str) – if not empty execute only this querystring
- hive_sql_file (str) – path to an hive.sql file. If not empty the query in this file will be executed
-
add_proc_func
(func, **kwargs)¶
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-
-
class
eskapadespark.links.
HiveWriter
(**kwargs)¶ Bases:
escore.core.element.Link
Link to write a dataframe in the datastore into a Hive table.
-
__init__
(**kwargs)¶ Initialize link instance.
Parameters: - name (str) – name of link
- read_key (str) – key of data to read from data store
- db (str) – hive database name
- table (str) – hive table name
- schemSpec (dict) – if writing spark rdd, schema of hive types
- prefix (str) – prefix for hive column names
- column_names_not_to_change (list) – column names not to give the prefix
- columns (list) – columns to store in hive. If empty all columns will be stored
- not_columns (list) – columns to store not in hive
- change_column_names (list) – columns only to add prefix to
-
execute
()¶ Execute the link.
-
initialize
()¶ Initialize the link.
-