PredictionIO's DASE architecture brings the separation-of-concerns design principle to predictive engine development. DASE stands for the following components of an engine:
- Data - includes Data Source and Data Preparator
- Algorithm(s)
- Serving
- Evaluator
Let's look at the code and see how you can customize the engine you built from the Product Ranking Template.
The Engine Design
As you can see from the Quick Start, MyProductRanking takes a JSON prediction query, e.g. { "user": "u2", "items": ["i1", "i3", "i10", "i2", "i5", "i31", "i9"] }
, and return a JSON predicted result. In MyProductRanking/src/main/scala/Engine.scala, the Query
case class defines the format of such query:
1 2 3 4 | case class Query( user: String, items: List[String] ) extends Serializable |
The PredictedResult
case class defines the format of predicted result, such as
1 2 3 4 5 6 7 8 9 | {"itemScores":[ {"item":"i5","score":1.0038217983580324}, {"item":"i3","score":0.00598658734782459}, {"item":"i2","score":0.004048103059012265}, {"item":"i9","score":-1.966935819737517E-4}, {"item":"i1","score":-0.0016841195307744916}, {"item":"i31","score":-0.0019770986240634503}, {"item":"i10","score":-0.0031498317618844918}], "isOriginal":false} |
with:
1 2 3 4 5 6 7 8 9 | case class PredictedResult( itemScores: Array[ItemScore], isOriginal: Boolean // set to true if the items are not ranked at all. ) extends Serializable case class ItemScore( item: String, score: Double ) extends Serializable |
Finally, ProductRankingEngine
is the Engine Factory that defines the components this engine will use: Data Source, Data Preparator, Algorithm(s) and Serving components.
1 2 3 4 5 6 7 8 9 | object ProductRankingEngine extends IEngineFactory { def apply() = { new Engine( classOf[DataSource], classOf[Preparator], Map("als" -> classOf[ALSAlgorithm]), classOf[Serving]) } } |
Spark MLlib
The PredictionIO Product Ranking Engine Template integrates Spark's MLlib ALS algorithm under the DASE architecture. We will take a closer look at the DASE code below.
The MLlib ALS algorithm takes training data of RDD type, i.e. RDD[Rating]
and train a model, which is a MatrixFactorizationModel
object.
You can visit here to learn more about MLlib's ALS collaborative filtering algorithm.
Data
In the DASE architecture, data is prepared by 2 components sequentially: DataSource and DataPreparator. They take data from the data store and prepare them for Algorithm.
Data Source
In MyProductRanking/src/main/scala/DataSource.scala, the readTraining
method of class DataSource
reads and selects data from the Event Store (data store of the Event Server). It returns TrainingData
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | case class DataSourceParams(appName: String) extends Params class DataSource(val dsp: DataSourceParams) extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] { @transient lazy val logger = Logger[this.type] override def readTraining(sc: SparkContext): TrainingData = { // create a RDD of (entityID, User) val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties(...) ... // create a RDD of (entityID, Item) val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(...) ... // get all "user" "view" "item" events val viewEventsRDD: RDD[ViewEvent] = PEventStore.find(...) ... new TrainingData( users = usersRDD, items = itemsRDD, viewEvents = viewEventsRDD ) } } |
PredictionIO automatically loads the parameters of datasource specified in MyProductRanking/engine.json, including appName, to dsp
.
In engine.json:
1 2 3 4 5 6 7 8 9 | { ... "datasource": { "params" : { "appName": "MyApp1" } }, ... } |
In readTraining()
, PEventStore
is an object which provides function to access data that is collected by PredictionIO Event Server.
This Product Ranking Engine Template requires "user" and "item" entities that are set by events.
PEventStore.aggregateProperties(...)
aggregates properties of the user
and item
that are set, unset, or delete by special events $set, $unset and $delete. Please refer to Event API for more details of using these events.
The following code aggregates the properties of user
and then map each result to a User()
object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | // create a RDD of (entityID, User) val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties( appName = dsp.appName, entityType = "user" )(sc).map { case (entityId, properties) => val user = try { // placeholder for expanding user properties User() } catch { case e: Exception => { logger.error(s"Failed to get properties ${properties} of" + s" user ${entityId}. Exception: ${e}.") throw e } } (entityId, user) }.cache() |
In the template, User()
object is a simple dummy as a placeholder for you to customize and expand.
Similarly, the following code aggregates the properties of item
and then map each result to a Item()
object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | // create a RDD of (entityID, Item) val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties( appName = dsp.appName, entityType = "item" )(sc).map { case (entityId, properties) => val item = try { // placeholder for expanding item properties Item() } catch { case e: Exception => { logger.error(s"Failed to get properties ${properties} of" + s" item ${entityId}. Exception: ${e}.") throw e } } (entityId, item) }.cache() |
In the template, Item()
object is a simple dummy as a placeholder for you to customize and expand.
PEventStore.find(...)
specifies the events that you want to read. In this case, "user view item" events are read and then each is mapped to a ViewEvent()
object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | // get all "user" "view" "item" events val viewEventsRDD: RDD[ViewEvent] = PEventStore.find( appName = dsp.appName, entityType = Some("user"), eventNames = Some(List("view")), // targetEntityType is optional field of an event. targetEntityType = Some(Some("item")))(sc) // PEventStore.find() returns RDD[Event] .map { event => val viewEvent = try { event.event match { case "view" => ViewEvent( user = event.entityId, item = event.targetEntityId.get, t = event.eventTime.getMillis) case _ => throw new Exception(s"Unexpected event ${event} is read.") } } catch { case e: Exception => { logger.error(s"Cannot convert ${event} to ViewEvent." + s" Exception: ${e}.") throw e } } viewEvent }.cache() |
ViewEvent
case class is defined as:
1 | case class ViewEvent(user: String, item: String, t: Long) |
TrainingData
contains an RDD of User
, Item
and ViewEvent
objects. The class definition of TrainingData
is:
1 2 3 4 5 | class TrainingData( val users: RDD[(String, User)], val items: RDD[(String, Item)], val viewEvents: RDD[ViewEvent] ) extends Serializable { ... } |
PredictionIO then passes the returned TrainingData
object to Data Preparator.
Data Preparator
In MyProductRanking/src/main/scala/Preparator.scala, the prepare
method of class Preparator
takes TrainingData
as its input and performs any necessary feature selection and data processing tasks. At the end, it returns PreparedData
which should contain the data Algorithm needs.
By default, prepare
simply copies the unprocessed TrainingData
data to PreparedData
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | class Preparator extends PPreparator[TrainingData, PreparedData] { def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { new PreparedData( users = trainingData.users, items = trainingData.items, viewEvents = trainingData.viewEvents) } } class PreparedData( val users: RDD[(String, User)], val items: RDD[(String, Item)], val viewEvents: RDD[ViewEvent] ) extends Serializable |
PredictionIO passes the returned PreparedData
object to Algorithm's train
function.
Algorithm
In MyProductRanking/src/main/scala/ALSAlgorithm.scala, the two methods of the algorithm class are train
and predict
. train
is responsible for training the predictive model;predict
is responsible for using this model to make prediction.
train(...)
train
is called when you run pio train. This is where MLlib ALS algorithm, i.e. ALS.trainImplicit()
, is used to train a predictive model.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | def train(sc: SparkContext, data: PreparedData): ALSModel = { ... // create User and item's String ID to integer index BiMap val userStringIntMap = BiMap.stringInt(data.users.keys) val itemStringIntMap = BiMap.stringInt(data.items.keys) val mllibRatings = data.viewEvents .map { r => // Convert user and item String IDs to Int index for MLlib val uindex = userStringIntMap.getOrElse(r.user, -1) val iindex = itemStringIntMap.getOrElse(r.item, -1) if (uindex == -1) logger.info(s"Couldn't convert nonexistent user ID ${r.user}" + " to Int index.") if (iindex == -1) logger.info(s"Couldn't convert nonexistent item ID ${r.item}" + " to Int index.") ((uindex, iindex), 1) }.filter { case ((u, i), v) => // keep events with valid user and item index (u != -1) && (i != -1) }.reduceByKey(_ + _) // aggregate all view events of same user-item pair .map { case ((u, i), v) => // MLlibRating requires integer index for user and item MLlibRating(u, i, v) } // MLLib ALS cannot handle empty training data. require(!mllibRatings.take(1).isEmpty, s"mllibRatings cannot be empty." + " Please check if your events contain valid user and item ID.") // seed for MLlib ALS val seed = ap.seed.getOrElse(System.nanoTime) val m = ALS.trainImplicit( ratings = mllibRatings, rank = ap.rank, iterations = ap.numIterations, lambda = ap.lambda, blocks = -1, alpha = 1.0, seed = seed) new ALSModel( rank = m.rank, userFeatures = m.userFeatures.collectAsMap.toMap, productFeatures = m.productFeatures.collectAsMap.toMap, userStringIntMap = userStringIntMap, itemStringIntMap = itemStringIntMap ) } |
Working with Spark MLlib's ALS.trainImplicit(....)
MLlib ALS does not support String
user ID and item ID. ALS.trainImplicit
thus also assumes int-only Rating
object. First, you can rename MLlib's Integer-only Rating
to MLlibRating
for clarity:
1 | import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} |
In order to use MLlib's ALS algorithm, we need to convert the viewEvents
into MLlibRating
. There are two things we need to handle:
- Map user and item String ID of the ViewEvent into Integer ID, as required by
MLlibRating
. ViewEvent
object is an implicit event that does not have an explicit rating value.ALS.trainImplicit()
supports implicit preference. If theMLlibRating
has higher rating value, it means higher confidence that the user prefers the item. Hence we can aggregate how many times the user has viewed the item to indicate the confidence level that the user may prefer the item.
You create a bi-directional map with BiMap.stringInt
which maps each String record to an Integer index.
1 2 | val userStringIntMap = BiMap.stringInt(data.users.keys) val itemStringIntMap = BiMap.stringInt(data.items.keys) |
Then convert the user and item String ID in each ViewEvent to Int with these BiMaps. We use default -1 if the user or item String ID couldn't be found in the BiMap and filter out these events with invalid user and item ID later. After filtering, we use reduceByKey()
to add up all values for the same key (uindex, iindex) and then finally map to MLlibRating
object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | val mllibRatings = data.viewEvents .map { r => // Convert user and item String IDs to Int index for MLlib val uindex = userStringIntMap.getOrElse(r.user, -1) val iindex = itemStringIntMap.getOrElse(r.item, -1) if (uindex == -1) logger.info(s"Couldn't convert nonexistent user ID ${r.user}" + " to Int index.") if (iindex == -1) logger.info(s"Couldn't convert nonexistent item ID ${r.item}" + " to Int index.") ((uindex, iindex), 1) }.filter { case ((u, i), v) => // keep events with valid user and item index (u != -1) && (i != -1) }.reduceByKey(_ + _) // aggregate all view events of same user-item pair .map { case ((u, i), v) => // MLlibRating requires integer index for user and item MLlibRating(u, i, v) } |
In addition to RDD[MLlibRating]
, ALS.trainImplicit
takes the following parameters: rank, iterations, lambda and seed.
The values of these parameters are specified in algorithms of MyProductRanking/engine.json:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | { ... "algorithms": [ { "name": "als", "params": { "rank": 10, "numIterations": 20, "lambda": 0.01, "seed": 3 } } ] ... } |
PredictionIO will automatically loads these values into the constructor ap
, which has a corresponding case class ALSAlgorithmParams
:
1 2 3 4 5 | case class ALSAlgorithmParams( rank: Int, numIterations: Int, lambda: Double, seed: Option[Long]) extends Params |
The seed
parameter is an optional parameter, which is used by MLlib ALS algorithm internally to generate random values. If the seed
is not specified, current system time would be used and hence each train may produce different results. Specify a fixed value for the seed
if you want to have deterministic result (For example, when you are testing).
ALS.trainImplicit()
then returns a MatrixFactorizationModel
model which contains two RDDs: userFeatures and productFeatures. They correspond to the user X latent features matrix and item X latent features matrix, respectively. In this case, we will make use of both userFeatures and productFeatures matrix to rank the items for the user. These matrixes are stored as local model. You could see the ALSModel
class is defined as:
1 2 3 4 5 6 7 | class ALSModel( val rank: Int, val userFeatures: Map[Int, Array[Double]], val productFeatures: Map[Int, Array[Double]], val userStringIntMap: BiMap[String, Int], val itemStringIntMap: BiMap[String, Int] ) extends Serializable { ... } |
PredictionIO will automatically store the returned model, i.e. ALSModel
in this example.
predict(...)
predict
is called when you send a JSON query to http://localhost:8000/queries.json. PredictionIO converts the query, such as { "user": "u2", "items": ["i1", "i3", "i10", "i2", "i5", "i31", "i9"] }
to the Query
class you defined previously.
To rank the calculated the ranked scores of the items, we first look up the feature vector of this user (if the user exists). Then we look up the feature vectors of the items in query (if the items exist). The score is the dot product of the user and item feature vectors. The items are then sorted by the score.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | def predict(model: ALSModel, query: Query): PredictedResult = { val itemStringIntMap = model.itemStringIntMap val productFeatures = model.productFeatures // default itemScores array if items are not ranked at all lazy val notRankedItemScores = query.items.map(i => ItemScore(i, 0)).toArray model.userStringIntMap.get(query.user).map { userIndex => // lookup userFeature for the user model.userFeatures.get(userIndex) }.flatten // flatten Option[Option[Array[Double]]] to Option[Array[Double]] .map { userFeature => val scores: Vector[Option[Double]] = query.items.toVector .par // convert to parallel collection for parallel lookup .map { iid => // convert query item id to index val featureOpt: Option[Array[Double]] = itemStringIntMap.get(iid) // productFeatures may not contain the item .map (index => productFeatures.get(index)) // flatten Option[Option[Array[Double]]] to Option[Array[Double]] .flatten featureOpt.map(f => dotProduct(f, userFeature)) }.seq // convert back to sequential collection // check if all scores is None (get rid of all None and see if empty) val isAllNone = scores.flatten.isEmpty if (isAllNone) { logger.info(s"No productFeature for all items ${query.items}.") PredictedResult( itemScores = notRankedItemScores, isOriginal = true ) } else { // sort the score val ord = Ordering.by[ItemScore, Double](_.score).reverse val sorted = query.items.zip(scores).map{ case (iid, scoreOpt) => ItemScore( item = iid, score = scoreOpt.getOrElse[Double](0) ) }.sorted(ord).toArray PredictedResult( itemScores = sorted, isOriginal = false ) } }.getOrElse { logger.info(s"No userFeature found for user ${query.user}.") PredictedResult( itemScores = notRankedItemScores, isOriginal = true ) } } |
PredictionIO passes the returned PredictedResult
object to Serving.
Serving
The serve
method of class Serving
processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model. Serving then returns the final predicted result. PredictionIO will convert it to a JSON response automatically.
In MyProductRanking/src/main/scala/Serving.scala,
1 2 3 4 5 6 7 8 9 | class Serving extends LServing[Query, PredictedResult] { override def serve(query: Query, predictedResults: Seq[PredictedResult]): PredictedResult = { predictedResults.head } } |
When you send a JSON query to http://localhost:8000/queries.json, PredictedResult
from all models will be passed to serve
as a sequence, i.e. Seq[PredictedResult]
.
An engine can train multiple models if you specify more than one Algorithm component in
object RecommendationEngine
inside Engine.scala. Since only oneALSAlgorithm
is implemented by default, thisSeq
contains one element.