eskapadespark package

Submodules

eskapadespark.data_conversion module

Project: Eskapade - A python-based package for data analysis.

Module: spark_analysis.data_conversion

Created: 2017/05/30

Description:
Converters between Spark, Pandas, and Python data formats
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

eskapadespark.data_conversion.create_spark_df(spark, data, schema=None, process_methods=None, **kwargs)

Create a Spark data frame from data in a different format.

A Spark data frame is created with either a specified schema or a schema inferred from the input data. The schema can be specified with the keyword argument “schema”.

Functions to transform the data frame after creation can be specified by the keyword argument “process_methods”. The value of this argument is an iterable of (function, arguments, keyword arguments) tuples to apply.

The data frame is created with the createDataFrame function of the SparkSession. Remaining keyword arguments are passed to this function.

>>> spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>> df = create_spark_df(spark,
>>>                      [[1, 1.1, 'one'], [2, 2.2, 'two']],
>>>                      schema=['int', 'float', 'str'],
>>>                      process_methods=[('repartition', (), {'numPartitions': 6})])
>>> df.show()
+---+-----+---+
|int|float|str|
+---+-----+---+
|  2|  2.2|two|
|  1|  1.1|one|
+---+-----+---+
Parameters:
  • spark (pyspark.sql.SparkSession) – SparkSession instance
  • data – input dataset
  • schema – schema of created data frame
  • process_methods (iterable) – methods to apply on the data frame after creation
Returns:

created data frame

Return type:

pyspark.sql.DataFrame

eskapadespark.data_conversion.df_schema(schema_spec)

Create Spark data-frame schema.

Create a schema for a Spark data frame from a dictionary of (name, data type) pairs, describing the columns. Data types are specified by Python types or by Spark-SQL types from the pyspark.sql.types module.

>>> from collections import OrderedDict as odict
>>> schema_dict = odict()
>>> schema_dict['foo'] = pyspark.sql.types.IntegerType()
>>> schema_dict['bar'] = odict([('descr', str), ('val', float)])
>>> print(schema_dict)
OrderedDict([('foo', IntegerType), ('bar', OrderedDict([('descr', <class 'str'>), ('val', <class 'float'>)]))])
>>> spark = pyspark.sql.SparkSession.builder.getOrCreate()
>>> df = spark.createDataFrame([(1, ('one', 1.1)), (2, ('two', 2.2))], schema=df_schema(schema_dict))
>>> df.show()
+---+---------+
|foo|      bar|
+---+---------+
|  1|[one,1.1]|
|  2|[two,2.2]|
+---+---------+
Parameters:schema_spec (dict) – schema specification
Returns:data-frame schema
Return type:pyspark.sql.types.StructType
Raises:TypeError if data type is specified incorrectly
eskapadespark.data_conversion.hive_table_from_df(spark, df, db, table)

Create a Hive table from a Spark data frame.

Parameters:
  • spark (pyspark.sql.SparkSession) – SparkSession instance
  • df (pyspark.sql.DataFrame) – input data frame
  • db (str) – database for table
  • table (str) – name of table

eskapadespark.decorators module

Project: Eskapade - A python-based package for data analysis.

Module: spark_analysis.decorators

Created: 2017/05/24

Description:
Decorators for Spark objects
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

eskapadespark.decorators.spark_cls_reduce(self)

Reduce function for Spark classes.

Spark objects connected to distributed data cannot be stored in Pickle files. This custom reduce function enables Pickling of a string representation of the Spark object.

eskapadespark.exceptions module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/03/31

Description:
Eskapade exceptions
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

exception eskapadespark.exceptions.MissingJayDeBeApiError(message='', required_by='')

Bases: escore.exceptions.MissingPackageError

Exception raised if JayDeBeAPI is missing.

__init__(message='', required_by='')

Set missing-package arguments.

Parameters:
  • message (str) – message to show when raised
  • required_by (str) – info on component that requires the package
exception eskapadespark.exceptions.MissingPy4jError(message='', required_by='')

Bases: escore.exceptions.MissingPackageError

Exception raised if Py4J is missing.

__init__(message='', required_by='')

Set missing-package arguments.

Parameters:
  • message (str) – message to show when raised
  • required_by (str) – info on component that requires the package
exception eskapadespark.exceptions.MissingSparkError(message='', required_by='')

Bases: escore.exceptions.MissingPackageError

Exception raised if Spark is missing.

__init__(message='', required_by='')

Set missing-package arguments.

Parameters:
  • message (str) – message to show when raised
  • required_by (str) – info on component that requires the package

eskapadespark.functions module

Project: Eskapade - A python-based package for data analysis.

Module: spark_analysis.functions

Created: 2017/05/24

Description:
Collection of Spark functions defined for Eskapade
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

eskapadespark.functions.calc_asym(var1, var2)

Calculate asymmetry.

Calculate asymmetry between variables 1 and 2: >>> (var2 - var1) / (abs(var1) + abs(var2))

Returns:asymmetry value
Return type:float
eskapadespark.functions.is_inf(x)

Test if value is infinite.

eskapadespark.functions.is_nan(x)

Test if value is NaN/null/None.

eskapadespark.functions.spark_query_func(spec)

Get Eskapade Spark-query function.

Get a function that returns a string to be used as a function in a Spark SQL query:

>>> count_fun = spark_query_func('count')
>>> count_fun()
'count(*)'
>>> cov_fun = spark_query_func('cov')
>>> cov_fun('x', 'y')
'covar_pop(if(is_nan(x) or is_inf(x), NULL, x),if(is_nan(y) or is_inf(y), NULL, y))'
>>> my_fun = spark_query_func('my_func::count(if({0:s} == 0, 1, NULL))')
>>> my_fun.__name__
'my_func'
>>> my_fun('my_var')
'count(if(my_var == 0, 1, NULL))'
Parameters:spec (str) – function specification: “name” or “name::definition”
Returns:query function
eskapadespark.functions.spark_sql_func(name, default_func=None)

Get Spark SQL function.

Get a function from pyspark.sql.functions by name. If function does not exist in the SQL-functions module, return a default function, if specified.

Parameters:
  • name (str) – name of function
  • default_func – default function
Returns:

Spark SQL function

Raises:

RuntimeError if function does not exist

eskapadespark.functions.to_date_time(dt, tz_in=None, tz_out=None)

Convert value to date/time object.

Parameters:
  • dt – value representing a date/time (parsed by pandas.Timestamp)
  • tz_in – time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize)
  • tz_out – time zone to convert data/time value into (parsed by pandas.Timestamp.tz_convert)
Returns:

date/time object

Return type:

datetime.datetime

eskapadespark.functions.to_timestamp(dt, tz_in=None)

Convert value to Unix timestamp (ns).

Parameters:
  • dt – value representing a date/time (parsed by pandas.Timestamp)
  • tz_in – time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize)
Returns:

Unix timestamp (ns)

Return type:

int

eskapadespark.resources module

Used by autodoc_mock_imports.

eskapadespark.spark_manager module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/02/27

Class: SparkManager

Description:
Process service for managing Spark operations
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

class eskapadespark.spark_manager.SparkManager(config_path=None)

Bases: escore.core.process_services.ProcessService, escore.core.mixin.ConfigMixin

Process service for managing Spark operations.

__init__(config_path=None)

Initialize Spark manager instance.

create_session(enable_hive_support=False, include_eskapade_modules=False, **conf_kwargs)

Get or create Spark session.

Return the Spark-session instance. Create the session if it does not exist yet. If no SparkConfig is set yet, it is created. All keyword arguments are passed to the _create_spark_conf method in this case.

Parameters:
  • enable_hive_support (bool) – switch for enabling Spark Hive support
  • include_eskapade_modules (bool) – switch to include Eskapade modules in Spark job submission. Default is False. Optional.
finish()

Stop Spark session.

get_session()

Get Spark session.

Return running Spark session and check if the Spark context is still alive.

spark_streaming_context

Spark Streaming Context.

eskapadespark.version module

THIS FILE IS AUTO-GENERATED BY ESKAPADE SETUP.PY.

Module contents