You can write event server plugins to handle input data. For example, it's able to block invalid data, log, get statics or forward to other processing systems. There are two types of event server plugin.

  • Input Blocker: When these plugins are present, events coming into event server will be passed through all loaded and active plugins before reaching the actual event store. The order of processing is not defined, so events can go through these plugins in arbitrary order. One use case is for validating input data and throw exceptions to prevent bad data from going in. These plugins cannot transform the event.
  • Input Sniffer: When these are present, events will be broadcasted to these plugins in parallel. They do not block the event from reaching event store. They are useful for logging, statistics, and forwarding to other processing systems.

Create an event server plugin

At first, create a sbt project with following build.sbt:

1
2
3
4
name := "pio-plugin-example"
version := "1.0"
scalaVersion := "2.11.11"
libraryDependencies += "org.apache.predictionio" %% "apache-predictionio-core" % "0.12.0-incubating"

Event server plug-ins must extend EventServerPlugin. Here is an example of event server plug-in:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.example

import org.apache.predictionio.data.api._

class MyEventServerPlugin extends EventServerPlugin {
  val pluginName = "my-eventserver-plugin"
  val pluginDescription = "an example of event server plug-in"

  // inputBlocker or inputSniffer
  val pluginType = EventServerPlugin.inputBlocker   

  // Plug-in can handle input data in this method.
  // If plug-in found invalid data, it's possible to block them 
  // by throwing an exception in this method.
  override def process(
      eventInfo: EventInfo, 
      context: EventServerPluginContext): Unit = {
    println(eventInfo)
  }

  // Plug-in can handle requests to /plugins/<pluginType>/<pluginName>/* 
  // on the event server in this method.
  override def handleREST(
      appId: Int, 
      channelId: Option[Int], 
      arguments: Seq[String]): String = {
    """{"pluginName": "my-eventserver-plugin"}"""
  }
}

Plug-ins are loaded by ServiceLoader, so you must create META-INF/services/org.apache.predictionio.data.api.EventServerPlugin with a following content:

1
com.example.MyEventServerPlugin

Finally, run sbt package to package plugin as a jar file. In this case, the plugin jar file is generated at target/scala-2.11/pio-plugin-example_2.11-1.0.jar, so copy this file to PIO_HOME/plugins.

When you start (or restart) the event server, this plugin should be enabled.

Plugin APIs of event server

The event server has some plugins related APIs:

  • /plugins.json: Show all enabled plugins.
  • /plugins/inputblocker/<pluginName>/*: Handled by a corresponding input blocker plugin.
  • /plugins/inputsniffer/<pluginName>/*: Handled by a corresponding input sniffer plugin.

For example, if you send following request to the event server:

1
curl -XGET http://localhost:7070/plugins.json?accessKey=$ACCESS_KEY

The event server should respond following JSON response:

1
2
3
4
5
6
7
8
9
10
11
12
{
  "plugins": {
    "inputblockers": {
      "my-eventserver-plugin": {
        "name": "my-eventserver-plugin",
        "description": "an example of event server plug-in",
        "class": "com.example.MyEventServerPlugin"
      }
    },
    "inputsniffers": {}
  }
}