PredictionIO's DASE architecture brings the separation-of-concerns design principle to predictive engine development. DASE stands for the following components of an engine:

  • Data - includes Data Source and Data Preparator
  • Algorithm(s)
  • Serving
  • Evaluator

Let's look at the code and see how you can customize the engine you built from the Classification Engine Template.

Evaluator will not be covered in this tutorial. Please visit evaluation explained for using evaluation.

The Engine Design

As you can see from the Quick Start, MyClassification takes a JSON prediction query, e.g. { "attr0":4, "attr1":3, "attr2":8 }, and return a JSON predicted result.

for version < v0.3.1, it is array of features values: { "features": [4, 3, 8] }

In MyClassification/src/main/scala/Engine.scala, the Query case class defines the format of query, such as { "attr0":4, "attr1":3, "attr2":8 }:

1
2
3
4
5
6
class Query(
  val attr0 : Double,
  val attr1 : Double,
  val attr2 : Double
) extends Serializable

The PredictedResult case class defines the format of predicted result, such as {"label":2.0}:

1
2
3
class PredictedResult(
  val label: Double
) extends Serializable

Finally, ClassificationEngine is the Engine Factory that defines the components this engine will use: Data Source, Data Preparator, Algorithm(s) and Serving components.

1
2
3
4
5
6
7
8
9
object ClassificationEngine extends IEngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map("naive" -> classOf[NaiveBayesAlgorithm]),
      classOf[Serving])
  }
}

Spark MLlib

Spark's MLlib NaiveBayes algorithm takes training data of RDD type, i.e. RDD[LabeledPoint] and train a model, which is a NaiveBayesModel object.

PredictionIO's MLlib Classification engine template, which MyClassification bases on, integrates this algorithm under the DASE architecture. We will take a closer look at the DASE code below.

Check this out to learn more about MLlib's NaiveBayes algorithm.

Data

In the DASE architecture, data is prepared by 2 components sequentially: Data Source and Data Preparator. Data Source and Data Preparator takes data from the data store and prepares RDD[LabeledPoint] for the NaiveBayes algorithm.

Data Source

In MyClassification/src/main/scala/DataSource.scala, the readTraining method of the class DataSource reads, and selects, data from datastore of EventServer and it returns TrainingData.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
case class DataSourceParams(appName: String) extends Params

class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    val labeledPoints: RDD[LabeledPoint] = PEventStore.aggregateProperties(
      appName = dsp.appName,
      entityType = "user",
      // only keep entities with these required properties defined
      required = Some(List("plan", "attr0", "attr1", "attr2")))(sc)
      // aggregateProperties() returns RDD pair of
      // entity ID and its aggregated properties
      .map { case (entityId, properties) =>
        try {
          LabeledPoint(properties.get[Double]("plan"),
            Vectors.dense(Array(
              properties.get[Double]("attr0"),
              properties.get[Double]("attr1"),
              properties.get[Double]("attr2")
            ))
          )
        } catch {
          case e: Exception => {
            logger.error(s"Failed to get properties ${properties} of" +
              s" ${entityId}. Exception: ${e}.")
            throw e
          }
        }
      }.cache()

    new TrainingData(labeledPoints)
  }
}

PEventStore is an object which provides function to access data that is collected through the Event Server, and PEventStore.aggregateProperties aggregates the event records of the 4 properties (attr0, attr1, attr2 and plan) for each user.

PredictionIO automatically loads the parameters of datasource specified in MyEngine/engine.json, including appName, to dsp.

In engine.json:

1
2
3
4
5
6
7
8
9
{
  ...
  "datasource": {
    "params": {
      "appName": "MyApp1"
    }
  },
  ...
}

In this sample text data file, columns are delimited by comma (,). The first column are labels. The second column are features.

The class definition of TrainingData is:

1
2
3
class TrainingData(
  val labeledPoints: RDD[LabeledPoint]
) extends Serializable

and PredictionIO passes the returned TrainingData object to Data Preparator.

Data Preparator

In MyClassification/src/main/scala/Preparator.scala, the prepare of class Preparator takes TrainingData. It then conducts any necessary feature selection and data processing tasks. At the end, it returns PreparedData which should contain the data Algorithm needs. For MLlib NaiveBayes, it is RDD[LabeledPoint].

By default, prepare simply copies the unprocessed TrainingData data to PreparedData:

1
2
3
4
5
6
7
8
9
10
11
class PreparedData(
  val labeledPoints: RDD[LabeledPoint]
) extends Serializable

class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
    new PreparedData(trainingData.labeledPoints)
  }
}

PredictionIO passes the returned PreparedData object to Algorithm's train function.

Algorithm

In MyClassification/src/main/scala/NaiveBayesAlgorithm.scala, the two methods of the algorithm class are train and predict. train is responsible for training a predictive model. PredictionIO will store this model and predict is responsible for using this model to make prediction.

train(...)

train is called when you run pio train. This is where MLlib NaiveBayes algorithm, i.e. NaiveBayes.train, is used to train a predictive model.

1
2
3
def train(sc: SparkContext, data: PreparedData): NaiveBayesModel = {
    NaiveBayes.train(data.labeledPoints, ap.lambda)
}

In addition to RDD[LabeledPoint] (i.e. data.labeledPoints), NaiveBayes.train takes 1 parameter: lambda.

The values of this parameter is specified in algorithms of MyClassification/engine.json:

1
2
3
4
5
6
7
8
9
10
11
12
{
  ...
  "algorithms": [
    {
      "name": "naive",
      "params": {
        "lambda": 1.0
      }
    }
  ]
  ...
}

PredictionIO will automatically loads these values into the constructor ap, which has a corresponding case class AlgorithmParams:

1
2
3
case class AlgorithmParams(
  lambda: Double
) extends Params

NaiveBayes.train then returns a NaiveBayesModel model. PredictionIO will automatically store the returned model.

predict(...)

The predict method is called when you send a JSON query to http://localhost:8000/queries.json. PredictionIO converts the query, such as { "attr0":4, "attr1":3, "attr2":8 } to the Query class you defined previously.

The predictive model NaiveBayesModel of MLlib NaiveBayes offers a function called predict. predict takes a dense vector of features. It predicts the label of the item represented by this feature vector.

1
2
3
4
5
6
  def predict(model: NaiveBayesModel, query: Query): PredictedResult = {
    val label = model.predict(Vectors.dense(
        query.attr0, query.attr1, query.attr2
    ))
    new PredictedResult(label)
  }

You have defined the class PredictedResult earlier in this page.

PredictionIO passes the returned PredictedResult object to Serving.

Serving

The serve method of class Serving processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model. Serving then returns the final predicted result. PredictionIO will convert it to a JSON response automatically.

In MyClassification/src/main/scala/Serving.scala,

1
2
3
4
5
6
7
8
9
class Serving
  extends LServing[Query, PredictedResult] {

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {
    predictedResults.head
  }
}

When you send a JSON query to http://localhost:8000/queries.json, PredictedResult from all models will be passed to serve as a sequence, i.e. Seq[PredictedResult].

An engine can train multiple models if you specify more than one Algorithm component in object RecommendationEngine inside Engine.scala. Since only one NaiveBayesAlgorithm is implemented by default, this Seq contains one element.

In this case, serve simply returns the predicted result of the first, and the only, algorithm, i.e. predictedResults.head.

Congratulations! You have just learned how to customize and build a production-ready engine. Have fun!