How to use Spark and Pandas to prepare big data
If you want to train machine learning models, you may need to prepare your data ahead of time. Data preparation can include cleaning your data, adding new columns, removing columns, combining columns, grouping rows, sorting rows, etc.
Once you write your data preparation code, there are a few ways to execute it:
- Download the data onto your local computer and run a script to transform it
- Download the data onto a server, upload a script, and run the script on the remote server
- Run some complex transformation on the data from a data warehouse using SQL-like language
- Use a Spark job with some logic from your script to transform the data
We’ll be sharing how to use option 4 to prepare data for machine learning models.
Prerequisites
Apache Spark is one of the most actively developed open-source projects in big data. The following code examples require that you have Spark set up and can execute Python code using the PySpark library. The examples also require that you have your data in Amazon S3 (Simple Storage Service). All this is set up on AWS EMR (Elastic MapReduce).
Outline
- How to use PySpark to load data from Amazon S3
- Write Python code to transform data
How to use PySpark to load data from Amazon S3
PySpark is “an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment.”
Feature sets and training sets (data used to store features for machine learning models) in this example are stored as CSV files in Amazon S3.
Here are the high level steps in the code:
- Load data from S3 files; we will use CSV (comma separated values) file format in this example.
- Group the data together by some column(s).
- Apply a Python function to each group; we will define this function in the next section.
from pyspark.sql import SparkSession
def load_data(spark, s3_location):
"""
spark:
Spark session
s3_location:
S3 bucket name and object prefix
"""
return (
spark
.read
.options(
delimiter=',',
header=True,
inferSchema=False,
)
.csv(s3_location)
)
with SparkSession.builder.appName('Mage').getOrCreate() as spark:
# 1. Load data from S3 files
df = load_data(spark, 's3://feature-sets/users/profiles/v1/*')
# 2. Group data by 'user_id' column
grouped = df.groupby('user_id')
# 3. Apply function named 'custom_transformation_function';
# we will define this function later in this article
df_transformed = grouped.apply(custom_transformation_function)
Write Python code to transform data
Here are the high level steps in the code:
- Define Pandas UDF (user defined function)
- Define schema
- Write code logic to be run on grouped data
Define Pandas UDF (user defined function)
Pandas is “a fast, powerful, flexible and easy to use open source data analysis and manipulation tool, built on top of the Python programming language”.
Pandas user-defined function (UDF) is built on top of Apache Arrow. Pandas UDF improves data performance by allowing developers to scale their workloads and leverage Panda’s APIs in Apache Spark. Pandas UDF works with Pandas APIs inside the function, and works with Apache Arrow to exchange data.
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(
SCHEMA_COMING_SOON,
PandasUDFType.GROUPED_MAP,
)
def custom_transformation_function(df):
pass
Define schema
Using Pandas UDF requires that we define the schema of the data structure that the custom function returns.
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import (
IntegerType,
StringType,
StructField,
StructType,
)
"""
StructField arguments:
First argument: column name
Second argument: column type
Third argument: True if this column can have null values
"""
SCHEMA_COMING_SOON = StructType([
StructField('user_id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('number_of_rows', IntegerType(), True),
])
@pandas_udf(
SCHEMA_COMING_SOON,
PandasUDFType.GROUPED_MAP,
)
def custom_transformation_function(df):
pass
Write code logic to be run on grouped data
Once your data has been grouped, your custom code logic can be executed on each group in parallel. Notice how the function named custom_transformation_function
returns a Pandas DataFrame with 3 columns: user_id
, date
, and number_of_rows
. These 3 columns have their column types explicitly defined in the schema when decorating the function with the @pandas_udf
decorator.
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import (
IntegerType,
StringType,
StructField,
StructType,
)
"""
StructField arguments:
First argument: column name
Second argument: column type
Third argument: True if this column can have null values
"""
SCHEMA_COMING_SOON = StructType([
StructField('user_id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('number_of_rows', IntegerType(), True),
])
@pandas_udf(
SCHEMA_COMING_SOON,
PandasUDFType.GROUPED_MAP,
)
def custom_transformation_function(df):
number_of_rows_by_date = df.groupby('date').size()
number_of_rows_by_date.columns = ['date', 'number_of_rows']
number_of_rows_by_date['user_id'] = df['user_id'].iloc[:1]
return number_of_rows_by_date
Putting it all together
The last piece of code we add will save the transformed data to S3 as a CSV file.
(
df_transformed.write
.option('delimiter', ',')
.option('header', 'True')
.mode('overwrite')
.csv('s3://feature-sets/users/profiles/transformed/v1/*')
)
Here is the final code snippet that combines all the steps together:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import (
IntegerType,
StringType,
StructField,
StructType,
)
"""
StructField arguments:
First argument: column name
Second argument: column type
Third argument: True if this column can have null values
"""
SCHEMA_COMING_SOON = StructType([
StructField('user_id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('number_of_rows', IntegerType(), True),
])
@pandas_udf(
SCHEMA_COMING_SOON,
PandasUDFType.GROUPED_MAP,
)
def custom_transformation_function(df):
number_of_rows_by_date = df.groupby('date').size()
number_of_rows_by_date.columns = ['date', 'number_of_rows']
number_of_rows_by_date['user_id'] = df['user_id'].iloc[:1]
return number_of_rows_by_date
def load_data(spark, s3_location):
"""
spark:
Spark session
s3_location:
S3 bucket name and object prefix
"""
return (
spark
.read
.options(
delimiter=',',
header=True,
inferSchema=False,
)
.csv(s3_location)
)
with SparkSession.builder.appName('Mage').getOrCreate() as spark:
# 1. Load data from S3 files
df = load_data(spark, 's3://feature-sets/users/profiles/v1/*')
# 2. Group data by 'user_id' column
grouped = df.groupby('user_id')
# 3. Apply function named 'custom_transformation_function';
# we will define this function later in this article
df_transformed = grouped.apply(custom_transformation_function)
# 4. Save new transformed data to S3
(
df_transformed.write
.option('delimiter', ',')
.option('header', 'True')
.mode('overwrite')
.csv('s3://feature-sets/users/profiles/transformed/v1/*')
)
Conclusion
This is how you can run complex transformations on large amounts of data using Python and the Pandas library. The benefit of this approach is that you can take advantage of Spark’s ability to query large amounts of data quickly while using Python and Pandas to perform complex data transformations through functional programming.