eskapadespark package¶
Subpackages¶
- eskapadespark.links package
- Submodules
- eskapadespark.links.daily_summary module
- eskapadespark.links.find_days_until_event module
- eskapadespark.links.rdd_group_mapper module
- eskapadespark.links.spark_configurator module
- eskapadespark.links.spark_data_to_csv module
- eskapadespark.links.spark_df_converter module
- eskapadespark.links.spark_df_creator module
- eskapadespark.links.spark_df_reader module
- eskapadespark.links.spark_df_writer module
- eskapadespark.links.spark_execute_query module
- eskapadespark.links.spark_histogrammar_filler module
- eskapadespark.links.spark_streaming_controller module
- eskapadespark.links.spark_streaming_wordcount module
- eskapadespark.links.spark_streaming_writer module
- eskapadespark.links.spark_with_column module
- eskapadespark.links.sparkgeneralfuncprocessor module
- eskapadespark.links.sparkhister module
- Module contents
Submodules¶
eskapadespark.data_conversion module¶
Project: Eskapade - A python-based package for data analysis.
Module: spark_analysis.data_conversion
Created: 2017/05/30
- Description:
- Converters between Spark, Pandas, and Python data formats
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
eskapadespark.data_conversion.
create_spark_df
(spark, data, schema=None, process_methods=None, **kwargs)¶ Create a Spark data frame from data in a different format.
A Spark data frame is created with either a specified schema or a schema inferred from the input data. The schema can be specified with the keyword argument “schema”.
Functions to transform the data frame after creation can be specified by the keyword argument “process_methods”. The value of this argument is an iterable of (function, arguments, keyword arguments) tuples to apply.
The data frame is created with the createDataFrame function of the SparkSession. Remaining keyword arguments are passed to this function.
>>> spark = pyspark.sql.SparkSession.builder.getOrCreate() >>> df = create_spark_df(spark, >>> [[1, 1.1, 'one'], [2, 2.2, 'two']], >>> schema=['int', 'float', 'str'], >>> process_methods=[('repartition', (), {'numPartitions': 6})]) >>> df.show() +---+-----+---+ |int|float|str| +---+-----+---+ | 2| 2.2|two| | 1| 1.1|one| +---+-----+---+
Parameters: - spark (pyspark.sql.SparkSession) – SparkSession instance
- data – input dataset
- schema – schema of created data frame
- process_methods (iterable) – methods to apply on the data frame after creation
Returns: created data frame
Return type: pyspark.sql.DataFrame
-
eskapadespark.data_conversion.
df_schema
(schema_spec)¶ Create Spark data-frame schema.
Create a schema for a Spark data frame from a dictionary of (name, data type) pairs, describing the columns. Data types are specified by Python types or by Spark-SQL types from the pyspark.sql.types module.
>>> from collections import OrderedDict as odict >>> schema_dict = odict() >>> schema_dict['foo'] = pyspark.sql.types.IntegerType() >>> schema_dict['bar'] = odict([('descr', str), ('val', float)]) >>> print(schema_dict) OrderedDict([('foo', IntegerType), ('bar', OrderedDict([('descr', <class 'str'>), ('val', <class 'float'>)]))]) >>> spark = pyspark.sql.SparkSession.builder.getOrCreate() >>> df = spark.createDataFrame([(1, ('one', 1.1)), (2, ('two', 2.2))], schema=df_schema(schema_dict)) >>> df.show() +---+---------+ |foo| bar| +---+---------+ | 1|[one,1.1]| | 2|[two,2.2]| +---+---------+
Parameters: schema_spec (dict) – schema specification Returns: data-frame schema Return type: pyspark.sql.types.StructType Raises: TypeError if data type is specified incorrectly
-
eskapadespark.data_conversion.
hive_table_from_df
(spark, df, db, table)¶ Create a Hive table from a Spark data frame.
Parameters: - spark (pyspark.sql.SparkSession) – SparkSession instance
- df (pyspark.sql.DataFrame) – input data frame
- db (str) – database for table
- table (str) – name of table
eskapadespark.decorators module¶
Project: Eskapade - A python-based package for data analysis.
Module: spark_analysis.decorators
Created: 2017/05/24
- Description:
- Decorators for Spark objects
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
eskapadespark.decorators.
spark_cls_reduce
(self)¶ Reduce function for Spark classes.
Spark objects connected to distributed data cannot be stored in Pickle files. This custom reduce function enables Pickling of a string representation of the Spark object.
eskapadespark.exceptions module¶
Project: Eskapade - A python-based package for data analysis.
Created: 2017/03/31
- Description:
- Eskapade exceptions
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
exception
eskapadespark.exceptions.
MissingJayDeBeApiError
(message='', required_by='')¶ Bases:
escore.exceptions.MissingPackageError
Exception raised if JayDeBeAPI is missing.
-
__init__
(message='', required_by='')¶ Set missing-package arguments.
Parameters: - message (str) – message to show when raised
- required_by (str) – info on component that requires the package
-
-
exception
eskapadespark.exceptions.
MissingPy4jError
(message='', required_by='')¶ Bases:
escore.exceptions.MissingPackageError
Exception raised if Py4J is missing.
-
__init__
(message='', required_by='')¶ Set missing-package arguments.
Parameters: - message (str) – message to show when raised
- required_by (str) – info on component that requires the package
-
-
exception
eskapadespark.exceptions.
MissingSparkError
(message='', required_by='')¶ Bases:
escore.exceptions.MissingPackageError
Exception raised if Spark is missing.
-
__init__
(message='', required_by='')¶ Set missing-package arguments.
Parameters: - message (str) – message to show when raised
- required_by (str) – info on component that requires the package
-
eskapadespark.functions module¶
Project: Eskapade - A python-based package for data analysis.
Module: spark_analysis.functions
Created: 2017/05/24
- Description:
- Collection of Spark functions defined for Eskapade
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
eskapadespark.functions.
calc_asym
(var1, var2)¶ Calculate asymmetry.
Calculate asymmetry between variables 1 and 2: >>> (var2 - var1) / (abs(var1) + abs(var2))
Returns: asymmetry value Return type: float
-
eskapadespark.functions.
is_inf
(x)¶ Test if value is infinite.
-
eskapadespark.functions.
is_nan
(x)¶ Test if value is NaN/null/None.
-
eskapadespark.functions.
spark_query_func
(spec)¶ Get Eskapade Spark-query function.
Get a function that returns a string to be used as a function in a Spark SQL query:
>>> count_fun = spark_query_func('count') >>> count_fun() 'count(*)' >>> cov_fun = spark_query_func('cov') >>> cov_fun('x', 'y') 'covar_pop(if(is_nan(x) or is_inf(x), NULL, x),if(is_nan(y) or is_inf(y), NULL, y))' >>> my_fun = spark_query_func('my_func::count(if({0:s} == 0, 1, NULL))') >>> my_fun.__name__ 'my_func' >>> my_fun('my_var') 'count(if(my_var == 0, 1, NULL))'
Parameters: spec (str) – function specification: “name” or “name::definition” Returns: query function
-
eskapadespark.functions.
spark_sql_func
(name, default_func=None)¶ Get Spark SQL function.
Get a function from pyspark.sql.functions by name. If function does not exist in the SQL-functions module, return a default function, if specified.
Parameters: - name (str) – name of function
- default_func – default function
Returns: Spark SQL function
Raises: RuntimeError if function does not exist
-
eskapadespark.functions.
to_date_time
(dt, tz_in=None, tz_out=None)¶ Convert value to date/time object.
Parameters: - dt – value representing a date/time (parsed by pandas.Timestamp)
- tz_in – time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize)
- tz_out – time zone to convert data/time value into (parsed by pandas.Timestamp.tz_convert)
Returns: date/time object
Return type: datetime.datetime
-
eskapadespark.functions.
to_timestamp
(dt, tz_in=None)¶ Convert value to Unix timestamp (ns).
Parameters: - dt – value representing a date/time (parsed by pandas.Timestamp)
- tz_in – time zone to localize data/time value to (parsed by pandas.Timestamp.tz_localize)
Returns: Unix timestamp (ns)
Return type: int
eskapadespark.resources module¶
Used by autodoc_mock_imports.
eskapadespark.spark_manager module¶
Project: Eskapade - A python-based package for data analysis.
Created: 2017/02/27
Class: SparkManager
- Description:
- Process service for managing Spark operations
- Authors:
- KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands
Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.
-
class
eskapadespark.spark_manager.
SparkManager
(config_path=None)¶ Bases:
escore.core.process_services.ProcessService
,escore.core.mixin.ConfigMixin
Process service for managing Spark operations.
-
__init__
(config_path=None)¶ Initialize Spark manager instance.
-
create_session
(enable_hive_support=False, include_eskapade_modules=False, **conf_kwargs)¶ Get or create Spark session.
Return the Spark-session instance. Create the session if it does not exist yet. If no SparkConfig is set yet, it is created. All keyword arguments are passed to the _create_spark_conf method in this case.
Parameters: - enable_hive_support (bool) – switch for enabling Spark Hive support
- include_eskapade_modules (bool) – switch to include Eskapade modules in Spark job submission. Default is False. Optional.
-
finish
()¶ Stop Spark session.
-
get_session
()¶ Get Spark session.
Return running Spark session and check if the Spark context is still alive.
-
spark_streaming_context
¶ Spark Streaming Context.
-
eskapadespark.version module¶
THIS FILE IS AUTO-GENERATED BY ESKAPADE SETUP.PY.