Machine Learning with Spark

June 21, 2018 - Spark, AWS, EMR

This is part 2 in a series exploring Spark. Setting up a Spark Cluster on AWS is part 1. Part 3 is Nicer Machine Learning with Spark

Part 1: Getting Ready to do Work

This tutorial will assume that you already have Spark installed and the dataset of all airline flights between 1987-2007 loaded into HDFS. If that's not true, you can go to my previous post Setting Up a Spark Cluster to build your own Spark Cluster.

Let's also discuss the problem we're going to try to solve. Our goal is to build a machine learning model that predicts how much a flight will be delayed by, given the airport, month, day of the week, etc. Let's start out by noting this is almost certainly not going to work well - if it was that predictable, people would already be exploiting that when booking flights. Instead, we're going to use this as an excuse to learn the syntax of doing machine learning in Spark.

Part 2: Loading the Data

In the terminal where you've SSH'd into your EMR cluster, run pySpark. This is going to create an interactive Python session that's already connected to Spark, and also create a variable called spark that tells Python how to interact with Spark. We can use that variable, which is known as a SparkSession to read in our data like so:
df ='com.databricks.spark.csv')\
.options(header='true', inferschema='true').load(‘hdfs:///data/*.csv')
After this runs, we'll have our data in Spark and can start pre-processing it to be ready to do machine learning.

Part 3: Preprocessing the Data

This data is cool, but it's not really ready for machine learning yet. If we take a look at df.printSchema() we'll note the same issues we saw in the previous blog post: several columns we expect to be numeric are being treated as strings. To rectify that, we need to tell Spark to convert those columns to floats. To do that, we can run:
from pyspark.sql.types import FloatType
df = df.withColumn('DepDelay', df['DepDelay'].cast(FloatType()))
df = df.withColumn('DepTime', df['DepTime'].cast(FloatType()))
i Each of these lines tells Spark to create a new column with the following syntax
df.withColumn(new column name, what to fill the column with)
In this case, we're replacing the old column with itself after converting everything in it to a Float number. Essentially, we're saying "take whatever is in this column, convert it to a number if you can, then put that back in the dataframe under the same name."

That's the first major hurdle cleared. We can't do machine learning if we're treating numeric values as strings. Next, we need to select just a few of the columns out to use in our model, and also handle the fact that our conversion from string probably left a few spots in the data where there are now "NaNs" or "Not a numbers." NaNs happened when we asked Spark to translate strings to number because if Spark gets something like three-thirty-seven it can't figure out how to change that to a number and just replaces it with the Spark equivalent of, "You screwed up here" which is a NaN. For simplicity, let's just grab a small subset of information about the flight. Namely: the time of departure, the airport of departure, the airline company, and our target the departure delay. Then let's drop all of the rows with NaNs in any of those values.
df =['Year','Month','DayofMonth','DayOfWeek',\
df =
So that's another hurdle - we've handled the NaNs and also dropped the uninteresting columns. Now let's deal with the Origin and UniqueCarrier columns. Both of those columns are categorical. For Origin we have the airport code for each possible airport, so we can't just convert that to a number directly. Same thing for UniqueCarrier, it's the acronym for the company running the flight. We'll need to convert these into dummy columns.

In Spark, we can do this by first giving every unique value in the column a specific indext, then One Hot Encoding those labels. Let's set up a labeler and an encoder for both of the categorical columns.
from import StringIndexer, OneHotEncoder
si = StringIndexer(inputCol='Origin', outputCol='OriginCode')
ohe = OneHotEncoder(inputCol='OriginCode',outputCol='OriginVec')
si2 = StringIndexer(inputCol='UniqueCarrier', outputCol='UniqueCarrierCode')
ohe2 = OneHotEncoder(inputCol='UniqueCarrierCode',outputCol='UniqueCarrierVec')
These will take in the input column, which is the current name of the column, then make an output column called whatever we named the outputCol. I've set these up so we can chain them together to make a nice pipeline for data, by using the output name of the label step as the input for the encoding step.

The last piece of the puzzle is where Spark will look a bit different for seasoned Python Machine Learner-ers. Spark expects that all of our input features are going to be packed up into a single vector for each row, all tucked into a single column. So instead of having:
        | feat0 | feat1 | feat2 |
        |  0.1  |  0.5  |  3.1  |
        |  1.2  |  0.3  |  1.3  |
        |       . . . . .       |
        |  1.1  |  0.1  |  2.1  |
We actually need to convert this to a format that looks like:
        |        features       |
        |    [0.1, 0.5, 3.1]    |
        |    [1.2, 0.3, 1.3]    |
        |       . . . . .       |
        |    [1.1, 0.1, 2.1]    |
Spark has a tool to do this called VectorAssembler. We're going to tell VectorAssembler what columns to pack together, and what to call the resulting column. Then we'll be ready to put all the pieces together into a pipeline.
from import VectorAssembler
va = VectorAssembler(inputCols=['Year','Month','DayofMonth','DayOfWeek','DepTime',\
'OriginVec','UniqueCarrierVec'], outputCol='features')
Note that we're passing our encoded vectors as the input, not the raw Origin or UniqueCarrier columns!. Now we just need to get our model ready.

Part 4: Preparing the ML Pipeline

Let's start with a pretty robust model that will be forgiving if we're a bit dumb with our settings: Random Forest Regression.
from import RandomForestRegressor
rf = RandomForestRegressor(featuresCol='features', labelCol='DepDelay', numTrees=10)
We'll start by telling it to use 10 trees. That's honestly not enough to do the job well, but like I said: tutorial. We just want to get a feel for how to do the job. So that's our last major component ready to go, now we just need to chain all the pieces together. Luckily for us, Spark has a tool for that called a Pipeline. We just need to create a pipeline object and tell it what order to go through all the tools we made. So for instance, we know we need to convert all the labels to encoded vectors before we move on to acually packing them into a "feature vector." So let's create a Pipeline and stack all the pieces together; then actually fit it to our dataframe.
from import Pipeline
pipe = Pipeline(stages=[si,si2,ohe,ohe2,va,rf])
model =
That will take a bit to run, but when it finishes, we will have a trained model! We're now ready to see how our model performs.

Part 5: Measuring Model Performance

To measure our model, we're going to rely on some built in regression measurement methods from Spark. The first thing we need to do though, is get our predictions together (note that by default, the predictions will be added to the dataframe under a new column called "prediction").
predictions = model.transform(df)
preds_vs_labels ="prediction", "DepDelay")
preds_vs_labels = preds_vs_labels.withColumn('label', preds_vs_labels['DepDelay'])
There's an extra step there where I convert DepDelay to be called label. That's a bit of house keeping because the Regression evaluator we're going to use looks for a column called 'label' by default. Now let's spawn a couple evaluators to measure Root Mean Squared Error (RMSE, which is the default result in teh evaluator) and R-squared (R2).
from import RegressionEvaluator
evaluator = RegressionEvaluator()
evaluator2 = RegressionEvaluator(metricName='r2')
These are now waiting for us to put in a set of predictions and actual labels. Let's add those in and see what we get back out.
print("R2 of predictions by model = " + str(evaluator2.evaluate(preds_vs_labels)))
print("RMSE of predictions by model = " + str(evaluator.evaluate(preds_vs_labels)))
For me, when I run this on just a few of the years (1990 & 1991), I get:
R2 of predictions by model = 0.043218518010994966
RMSE of predictions by model = 21.14839415926492
which basically tells me, your model is no better than just always guessing the average delay of ~8 minutes... you're still going to have an average error of about +/-20 minutes.

So yes, this model isn't good. However, this pipeline IS the machine learning pipeline for Spark. First you get your data and clean it up, then you vector assembler, then you pipeline everything together, fit, and evaluate. All the tools are here, and you can chain these together to work on any ML problem. You can also swap models and preprocessors in and out, as long as you maintain your pipeline. So go forth, and do ML at scale!