Spark Cross-Validation with Mulitple Pipelines

Cross-Validation with Apache Spark Pipelines is commonly used to tune the hyperparameters of stages in a PipelineModel. But what do you do if you want to evaluate more than one pipeline with different stages, e.g. using different types of classifiers? You would probably just run cross-validation on each pipeline separately and compare the results, which would generally work fine. You might not know that stages are actually a parameter in the PipelineModel and can be evaluated just like any other parameter, with a few caveats. So it is possible to put multiple pipelines into the Spark CrossValidator to automatically select the best one and make more efficient use of your data and caching. This post will show you how.

CV Example with Two Pipelines

This use case came out of an existing JIRA at SPARK-19979 and will work for pipeline model selection with Spark ML CrossValidator and TrainValidationSplit. Let’s start with an example where I want to construct a classification pipeline, but I am not tied to any particular classifier, just which ever one performns best under cross-validation according to my metric. First, I will define some pipeline stages and an empty pipeline. It is crucial to declare the pipeline empty because the stages are a parameter and will be added as part of the parameter grid.

import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.classification.{DecisionTreeClassifier, LogisticRegression}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{PCA, VectorIndexer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

val vectorIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")

val pca = new PCA()
    .setInputCol(vectorIndexer.getOutputCol)
    .setOutputCol("pcaFeatures")

val dt = new DecisionTreeClassifier()
    .setLabelCol("label")
    .setFeaturesCol(vectorIndexer.getOutputCol)

val lr = new LogisticRegression()
    .setLabelCol("label")
    .setFeaturesCol(pca.getOutputCol)

val pipeline = new Pipeline()

Now lets make an Array of stages that will correspond to the pipelines we want and build a separate parameter grid for each of those pipelines.

val pipeline1 = Array[PipelineStage](vectorIndexer, dt)
val pipeline2 = Array[PipelineStage](vectorIndexer, pca, lr)

val pipeline1_grid = new ParamGridBuilder()
  .baseOn(pipeline.stages -> pipeline1)
  .addGrid(dt.maxDepth, Array(2, 5))
  .build()

val pipeline2_grid = new ParamGridBuilder()
  .baseOn(pipeline.stages -> pipeline2)
  .addGrid(pca.k, Array(10, 20))
  .addGrid(lr.regParam, Array(0.1, 0.01))
  .build()

Here we make use of the ParamGridBuilder in Spark and use the method baseOn() to build a grid where all param values will have a fixed pipeline. It’s important to note that the ParamGridBuilder simply builds an Array with all combinations of param values. This means that once we have built a grid for each pipeline, we can just concatenate the Arrays to get the grid we want and then use that to run cross-validation.

val paramGrid = pipeline1_grid ++ pipeline2_grid

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(5)

// Run cross-validation, assuming "training" data exists
val cvModel = cv.fit(training)

// Get the best selected pipeline model
val pipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]

The above code has the advantage over running cross-validation twice on each pipeline and manually comparing the result because this will allow each pipeline to reuse the cached data folds. Inside the CrossValidator.fit call, once the data is split into folds and divided into training and evaluation sets, it is cached. So by combining both pipelines into a single cv.fit, the cached data is reused and that can make a huge improvement in processing time.

Details on Constructing an Optimum Param Grid

The reason we construct the param grids separately is because the logistic regression pipeline is additionally using PCA to reduce dimensionality, while the decision tree pipeline is not. If we were to construct just a single grid, everything would work, but it would mean that we are performing unnecessary work because the PCA parameters have no affect on the decision tree which does not even use the resulting column. To illustrate this let’s look at the output of our param grid.

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
    dtc_da28416bb307-maxDepth: 2,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    dtc_da28416bb307-maxDepth: 5,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 10,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 10,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 20,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 20,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
})

Each element in the array represents a pipeline model that will be evaluated at each cross-validation fold. There are 2 decision tree pipelines for each value of maxDepth we are evaluating and 4 logistic regression pipelines for all combinations of PCA k and LogisticRegression regParam. If instead we made one single grid including both pipelines, you can see the grid is much larger (16 vs. 6 models) because each combination of PCA is run on the decision tree, along with the classifier params regParam and maxDepth even if they are not for the classifer in that PipelineModel being evaluated so we end up doing much more work than is really needed.

val dumbGrid = new ParamGridBuilder()
    .addGrid(pipeline.stages, Array(pipeline1, pipeline2))
    .addGrid(dt.maxDepth, Array(2, 5))
    .addGrid(pca.k, Array(10, 20))
    .addGrid(lr.regParam, Array(0.1, 0.01))
    .build()

// Exiting paste mode, now interpreting.

dumbGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
    pca_d39f26a3e7dd-k: 10,
    dtc_da28416bb307-maxDepth: 2,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 10,
    dtc_da28416bb307-maxDepth: 2,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 10,
    dtc_da28416bb307-maxDepth: 5,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 10,
    dtc_da28416bb307-maxDepth: 5,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 10,
    dtc_da28416bb307-maxDepth: 2,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 10,
    dtc_da28416bb307-maxDepth: 2,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 10,
    dtc_da28416bb307-maxDepth: 5,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 10,
    dtc_da28416bb307-maxDepth: 5,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 20,
    dtc_da28416bb307-maxDepth: 2,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 20,
    dtc_da28416bb307-maxDepth: 2,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 20,
    dtc_da28416bb307-maxDepth: 5,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 20,
    dtc_da28416bb307-maxDepth: 5,
    logreg_d3b3ce8022a9-regParam: 0.1,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 20,
    dtc_da28416bb307-maxDepth: 2,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 20,
    dtc_da28416bb307-maxDepth: 2,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
}, {
    pca_d39f26a3e7dd-k: 20,
    dtc_da28416bb307-maxDepth: 5,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@218e188e
}, {
    pca_d39f26a3e7dd-k: 20,
    dtc_da28416bb307-maxDepth: 5,
    logreg_d3b3ce8022a9-regParam: 0.01,
    pipeline_2e9bf0dddb22-stages: [Lorg.apache.spark.ml.PipelineStage;@d3e77a3
})

Further Optimizations

One final note is that even with the above method, there is still some duplicated work due to the common VectorIndexer stage that exists in both pipelines. So for a given cross-validation fold, the VectorIndexer transforms the same data for each PipelineModel being evaluated. There is no easy workaround for this except for brining the VectorIndexer out of the pipelines, but that kind of defeats the purpose of constructing pipelines in the first place. Optimizing pipelines to reduce this duplicated work is currently a work in progress that can be tracked on SPARK-19071.

Written on August 18, 2017