This is more advanced example, we recommend you go through the DASE explanation first.

The default algorithm described in DASE uses user-to-item view events as training data. However, your application may have more than one type of events which you want to take into account, such as buy, rate and like events. One way to incorporate other types of events to improve the system is to add another algorithm to process these events, build a separated model and then combine the outputs of multiple algorithms during Serving.

In this example, we will add another algorithm to process like/dislike events. The final PredictedResults will be the combined outputs of both algorithms.

This is just one of the ways to handle mutliple types of events. We use this use case to demonstrate how one can build an engine with multiple algorithms. You may also build one single algorithm which takes different events into account without using multiple algorithms.

This example will demonstrate the following:

  • Read multiple types of events
  • Use positive and negative implicit events such as like and dislike with MLlib ALS algorithm
  • Integrate multiple algorithms into one engine

The complete source code of this examlpe can be found in here.

Step 1. Read "like" and "dislike" events as TrainingData

Modify the following in DataSource.scala:

  • In addition to the original ViewEvent class, add a new class LikeEvent which has a boolean like field to represent it's like or dislike event.
  • Add a new field likeEvents into TrainingData class to store the RDD[LikeEvent].
  • Modidy DataSource's readTraining() function to read "like" and "dislike" events from the Event Store.

The modification is shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  override
  def readTraining(sc: SparkContext): TrainingData = {

    ...

    // get all "user" "view" "item" events
    val viewEventsRDD: RDD[ViewEvent] = ...

    // ADDED
    // get all "user" "like" and "dislike" "item" events
    val likeEventsRDD: RDD[LikeEvent] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("like", "dislike")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
      // eventsDb.find() returns RDD[Event]
      .map { event =>
        val likeEvent = try {
          event.event match {
            case "like" | "dislike" => LikeEvent(
              user = event.entityId,
              item = event.targetEntityId.get,
              t = event.eventTime.getMillis,
              like = (event.event == "like"))
            case _ => throw new Exception(s"Unexpected event ${event} is read.")
          }
        } catch {
          case e: Exception => {
            logger.error(s"Cannot convert ${event} to LikeEvent." +
              s" Exception: ${e}.")
            throw e
          }
        }
        likeEvent
      }.cache()

    new TrainingData(
      users = usersRDD,
      items = itemsRDD,
      viewEvents = viewEventsRDD,
      likeEvents = likeEventsRDD // ADDED
    )
  }
}

...

case class LikeEvent( // ADDED
  user: String,
  item: String,
  t: Long,
  like: Boolean // true: like. false: dislike
)

class TrainingData(
  val users: RDD[(String, User)],
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent],
  val likeEvents: RDD[LikeEvent] // ADDED
) extends Serializable {
  override def toString = {
    s"users: [${users.count()} (${users.take(2).toList}...)]" +
    s"items: [${items.count()} (${items.take(2).toList}...)]" +
    s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" +
    // ADDED
    s"likeEvents: [${likeEvents.count()}] (${likeEvents.take(2).toList}...)"
  }
}

Step 2. Modify Preparator and PreparedData

In Preparator.scala, simply pass the newly added likeEvents from TrainingData to PreparedData, as shown the code below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
    new PreparedData(
      users = trainingData.users,
      items = trainingData.items,
      viewEvents = trainingData.viewEvents,
      likeEvents = trainingData.likeEvents) // ADDED
  }
}

class PreparedData(
  val users: RDD[(String, User)],
  val items: RDD[(String, Item)],
  val viewEvents: RDD[ViewEvent],
  val likeEvents: RDD[LikeEvent] // ADDED
) extends Serializable

Step 3. Add a new algorithm to train model with likeEvents

For demonstration purpose, we also use MLlib ALS to train model with likeEvents and hence the new algorithm class will share many common logic of the original algorithm. The only difference will be the train() function - the original one trains model with viewEvents while the new one uses likeEvents. In this case, we can simply create a new algorithm which extends the original ALSAlgorithm class and override the train() function.

You may also create a completely new algorithm to process likeEvents or other types of events without extending any existing algorithm, or simply modifying the same algorithm to take new events into account.

Using positive and negative implicit events (without explicit rating) with MLlib ALS

In the original ALSAlgorithm, the train() function calculates the number of times that the user has viewed the same item and then map it to MLlibRating object. However, like/dislike event is boolean and one time preference, so it doesn't make sense to aggregate the events if the user has multiple like/dislike events on the same item. However, a user may like an item and change her mind to dislike the same item later, or vice versa. In this case, we could simply use the latest like or dislike event of the user for the same item.

In addition, MLlib ALS can handle negative preference with ALS.trainImplicit(). Hence, we can map a dislike to rating of -1 and like to 1.

Negative preference does not work if use ALS.train() instead, which is for explicit rating such as "rate" event.

In summary, this new LikeAlgorithm does the following:

  • Extends original ALSAlgorithm class
  • Override train() to process the likeEvents in PreparedData
  • Use the latest event if the user likes/dislikes the same item multiple times
  • Map dislike to a MLlibRating object with rating of -1 and like to rating of 1
  • Use the MLlibRating to train the ALSModel in the same way as the original ALSAlgorithm
  • The predict() function is the same as the original ALSAlgorithm

It is shown in the code below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package org.apache.predictionio.examples.similarproduct

import org.apache.predictionio.data.storage.BiMap

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.{Rating => MLlibRating}

import grizzled.slf4j.Logger

// ADDED
// Extend original ALSAlgorithm and override train() function to handle
// like and dislike events
class LikeAlgorithm(ap: ALSAlgorithmParams) extends ALSAlgorithm(ap) {

  @transient lazy override val logger = Logger[this.type]

  override
  def train(sc: SparkContext, data: PreparedData): ALSModel = {
    require(!data.likeEvents.take(1).isEmpty,
      s"likeEvents in PreparedData cannot be empty." +
      " Please check if DataSource generates TrainingData" +
      " and Preprator generates PreparedData correctly.")
    require(!data.users.take(1).isEmpty,
      s"users in PreparedData cannot be empty." +
      " Please check if DataSource generates TrainingData" +
      " and Preprator generates PreparedData correctly.")
    require(!data.items.take(1).isEmpty,
      s"items in PreparedData cannot be empty." +
      " Please check if DataSource generates TrainingData" +
      " and Preprator generates PreparedData correctly.")
    // create User and item's String ID to integer index BiMap
    val userStringIntMap = BiMap.stringInt(data.users.keys)
    val itemStringIntMap = BiMap.stringInt(data.items.keys)

    // collect Item as Map and convert ID to Int index
    val items: Map[Int, Item] = data.items.map { case (id, item) =>
      (itemStringIntMap(id), item)
    }.collectAsMap.toMap

    val mllibRatings = data.likeEvents
      .map { r =>
        // Convert user and item String IDs to Int index for MLlib
        val uindex = userStringIntMap.getOrElse(r.user, -1)
        val iindex = itemStringIntMap.getOrElse(r.item, -1)

        if (uindex == -1)
          logger.info(s"Couldn't convert nonexistent user ID ${r.user}"
            + " to Int index.")

        if (iindex == -1)
          logger.info(s"Couldn't convert nonexistent item ID ${r.item}"
            + " to Int index.")

        // key is (uindex, iindex) tuple, value is (like, t) tuple
        ((uindex, iindex), (r.like, r.t))
      }.filter { case ((u, i), v) =>
        // keep events with valid user and item index
        (u != -1) && (i != -1)
      }.reduceByKey { case (v1, v2) => // MODIFIED
        // An user may like an item and change to dislike it later,
        // or vice versa. Use the latest value for this case.
        val (like1, t1) = v1
        val (like2, t2) = v2
        // keep the latest value
        if (t1 > t2) v1 else v2
      }.map { case ((u, i), (like, t)) => // MODIFIED
        // With ALS.trainImplicit(), we can use negative value to indicate
        // nagative siginal (ie. dislike)
        val r = if (like) 1 else -1
        // MLlibRating requires integer index for user and item
        MLlibRating(u, i, r)
      }
      .cache()

    // MLLib ALS cannot handle empty training data.
    require(!mllibRatings.take(1).isEmpty,
      s"mllibRatings cannot be empty." +
      " Please check if your events contain valid user and item ID.")
    // seed for MLlib ALS
    val seed = ap.seed.getOrElse(System.nanoTime)

    val m = ALS.trainImplicit(
      ratings = mllibRatings,
      rank = ap.rank,
      iterations = ap.numIterations,
      lambda = ap.lambda,
      blocks = -1,
      alpha = 1.0,
      seed = seed)

    new ALSModel(
      productFeatures = m.productFeatures.collectAsMap.toMap,
      itemStringIntMap = itemStringIntMap,
      items = items
    )
  }

}

Step 4. Modify Serving to combine multiple algorithms' outputs

When the engine is deployed, the Query is sent to all algorithms of the engine. The PredictedResults returned by all algorithms are passed to Serving component for further processing, as you could see that the argument predictedResults of the serve() function is type of Seq[PredictedResult].

In this example, the serve() function at first standardizes the PredictedResults of each algorithm so that we can combine the scores of multiple algorithms by adding the scores of the same item. Then we can take the top N items as defined in query.

Modify Serving.scala as shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class Serving
  extends LServing[Query, PredictedResult] {

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {

    // MODFIED
    val standard: Seq[Array[ItemScore]] = if (query.num == 1) {
      // if query 1 item, don't standardize
      predictedResults.map(_.itemScores)
    } else {
      // Standardize the score before combine
      val mvList: Seq[MeanAndVariance] = predictedResults.map { pr =>
        meanAndVariance(pr.itemScores.map(_.score))
      }

      predictedResults.zipWithIndex
        .map { case (pr, i) =>
          pr.itemScores.map { is =>
            // standardize score (z-score)
            // if standard deviation is 0 (when all items have the same score,
            // meaning all items are ranked equally), return 0.
            val score = if (mvList(i).stdDev == 0) {
              0
            } else {
              (is.score - mvList(i).mean) / mvList(i).stdDev
            }

            ItemScore(is.item, score)
          }
        }
    }

    // sum the standardized score if same item
    val combined = standard.flatten // Array of ItemScore
      .groupBy(_.item) // groupBy item id
      .mapValues(itemScores => itemScores.map(_.score).reduce(_ + _))
      .toArray // array of (item id, score)
      .sortBy(_._2)(Ordering.Double.reverse)
      .take(query.num)
      .map { case (k,v) => ItemScore(k, v) }

    PredictedResult(combined)
  }
}

You may combine results of multiple algorithms in different ways based on your requirements.

Step 5. Modify Engine.scala and engine.json

Modify Engine.scala to include the new algorithm LikeAlgorithm class and give it the name "likealgo" as shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...

object SimilarProductEngine extends EngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map(
        "als" -> classOf[ALSAlgorithm],
        "cooccurrence" -> classOf[CooccurrenceAlgorithm],
        "likealgo" -> classOf[LikeAlgorithm]), // ADDED
      classOf[Serving])
  }
}

...

Next, in order to train and deploy two algorithms for this engine, we also need to modify engine.json to include the new algorithm. The "algorithms" parameter is an array of each algorithm's name and parameters. By default, it has the one algorithm "als" and its parameter. Add another JSON for the new algorithm named "likealgo" and its parameter to the "algorithms" array, as shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

{
  ...

  "algorithms": [
    {
      "name": "als",
      "params": {
        "rank": 10,
        "numIterations" : 20,
        "lambda": 0.01,
        "seed": 3
      }
    },
    {
      "name": "likealgo",
      "params": {
        "rank": 8,
        "numIterations" : 15,
        "lambda": 0.01,
        "seed": 3
      }
    }
  ]
}

You may notice that the parameters of the new "likealgo" contains the same fields as "als". It is just becasuse the LikeAlgorithm class extends the original ALSAlgorithm class and shares the same algorithm parameter class defintion. If the other algorithm you add has its own parameter class, you just need to specify them inside its params field accordingly.

That's it! Now you have a engine configured with two algorithms.

Sample data with "like" and "dislike" events

For demonstration purpose, a sample import script is also provided for you to quickly test this engine. The script is modified from the original one used in Quick Start with the addition of importing like and dislike events.

You could find the import script in data/import_eventserver.py.

Make sure you are under the App directory. Execute the following to import the data (Replace the value of access_key parameter with your Access Key):

1
$ python data/import_eventserver.py --access_key 3mZWDzci2D5YsqAnqNnXH9SB6Rg3dsTBs8iHkK6X2i54IQsIZI1eEeQQyMfs7b3F

If you see error TypeError: init() got an unexpected keyword argument 'access_key', please update the Python SDK to the latest version.

You are ready to run pio build, train and deploy as described in the Quick Start.