Kafka custom serializer/deserializer implementation
By default, the Kafka implementation serializes and deserializes ClipboardPages to and from JSON strings.
This document will describe how to implement a custom Java class and use this in your Kafka data set implementation to be able to use custom logic and formats.
The PegaSerde interface
You will have to create a Java class that implements the PegaSerde interface located in the package com.pega.platform.kafka.serde:
/**
* The interface for wrapping a serializer and deserializer for {@link com.pega.pegarules.pub.clipboard.ClipboardPage} objects.
*
* A class that implements this interface is expected to have a constructor with no parameter.
*/
public interface PegaSerde {
/**
* Configure this class, which will configure the underlying serializer and deserializer.
*
* @param tools PublicAPI
* @param configs configs in key/value pairs
*/
void configure(PublicAPI tools, Map<String, ?> configs);
/**
* Convert {@link com.pega.pegarules.pub.clipboard.ClipboardPage} into a byte array.
*
* @param tools PublicAPI associated with the request
* @param clipboardPage page to be serialized in to bytes
* @return serialized bytes
*/
byte[] serialize(PublicAPI tools, ClipboardPage clipboardPage);
/**
* Deserialize a byte array into a {@link com.pega.pegarules.pub.clipboard.ClipboardPage} object.
*
* @param tools PublicAPI associated with the request
* @param data serialized bytes
* @return deserialized typed data
*/
ClipboardPage deserialize(PublicAPI tools, byte[] data);
}
Configure
void configure(PublicAPI tools, Map<String, ?> configs);
The configure method will be called with a map of key value pairs that can be used to pass configuration options to your class. This is how to parameterize initialization of your class while still maintaining the necessary public, zero argument constructor.
Serialize
byte[] serialize(PublicAPI tools, ClipboardPage clipboardPage);
The serialize method is used to serialize a ClipboardPage into a byte array representation of the data format which is then sent to the Kafka producer. For example, if you are sending in JSON format, create a JSON string from the ClipboardPage and transform this string to a byte array so the Kafka consumer can read it as JSON.
Deserialize
ClipboardPage deserialize(PublicAPI tools, byte[] data);
The deserialize method is used to deserialize a byte array representation of the data format, received from the Kafka consumer, and create a ClipboardPage. For example, if you are receiving in JSON format, transform the byte array into a String or JSON object and use this to create a ClipboardPage.
The PegaSerde contract
All PegaSerde implementations must adhere to the following:
- Implementation must have a public constructor with no parameters
- Implementation must inherit from PegaSerde (located in com.pega.platform.kafka.serde)
- Methods should fail hard instead of returning null (returning null will result in an exception but will not have any serialization specific context).
- Implementations of serialize and deserialize must be implemented in a thread safe manner
Additionally, note that the following invariants apply:
- One PegaSerde is created per data set
- The configure method is called once and will contain the following keys:
- "classname" containing the class name string on which the Kafka data set is created
- "topicname" containing the topic name that is configured in the Kafka data set
- Any additional configuration options that are configured in the Kafka data set (see Configure Kafka data set section below)
Example PegaSerde implementation
See below for a trivial example of a PegaSerde implementation. It implements a very simple CSV data format as follows: "key1,value1,key2,value2"
public class CsvPegaSerde implements PegaSerde {
private String className;
@Override
public void configure(PublicAPI tools, Map<String, ?> configs) {
className = configs.get("classname").toString();
}
@Override
public byte[] serialize(PublicAPI tools, ClipboardPage clipboardPage) {
String customerId = clipboardPage.getString("CustomerId");
String customerAge = clipboardPage.getString("CustomerAge");
String value = Joiner.on(",").join("customerid", customerId, "customerage", customerAge);
return value.getBytes();
}
@Override
public ClipboardPage deserialize(PublicAPI tools, byte[] data) {
String value = new String(data);
List<String> values = Splitter.on(",").splitToList(value);
ClipboardPage page = tools.createPage(className, "");
for (int i = 0; i < values.size(); i += 2) {
page.getProperty(values.get(i)).setValue(values.get(i + 1));
}
return page;
}
}
Creating ClipboardPages
Internally, records are represented by ClipboardPage objects, which represent a collection of ClipboardProperty objects. So the data flow source has to deserialize data intoClipboardPages for ingestion by the following shapes, and the destination has to serialize ClipboardPage objects into a byte array to be consumed by Kafka.
You create a ClipboardPage object by going through the Pega PublicAPI (also know as tools) and invoking the createPage(String classname, String pageName) method:
ClipboardPage page = tools.createPage(className, "");
Values can be added to the ClipboardPage by setting then on a named property, which will be created if it does not exist:
page.getProperty("property_name").setValue("property_value");
For more ways of using ClipboardPages refer to the API documentation.
Creating and adding your class to Pega Platform
Use the following steps to create a java project with your PegaSerde implementation and add it to your Pega Platform application.
- Create a new or use an existing Java project
- Add the prpublic.jar as a dependency to your project
- You can get the prpublic.jar from the location prweb/WEB-INF/lib on your tomcat server as described in this article
- Create a new Java class that implements the com.pega.platform.kafka.serde.PegaSerde interface
- If you get the following exception "class file for com.pega.ibm.icu.math.BigDecimal not found" also add pricu2jdk.jar using the same method as step 2
- Build your project jar as a "fat" jar to ensure all dependencies are included
- Import your jar into Pega Platform by going to 'Configure > Application > Distribution > Import' and follow the steps in the wizard, make sure that you leave the Codeset name and version as the default (see screenshot below)
- In your prbootstrap.properties set the following parameter com.pega.pegarules.bootstrap.codeset.version.Customer=06-01-01 as described in this article
- Restart your system
- Configure your Kafka data set as described in the section below.
Configure Kafka data set
To configure your Kafka data set to use your PegaSerde implementation:
- create a regular data set of type Kafka
- Configure the 'Message Format' section to use the Custom message format
- Specify the fully qualified classname of your Java class
- (Optionally) specify any additional configuration options to be passed to your PegaSerde configure method
Metrics
When the detailed metrics on a data flow run are enabled (see Data Flow Run with Detailed Metrics), the serialization and deserialization latency times will be shown on the output and source shape statistics respectively. See the screenshot below.
Troubleshooting
Creating the Kafka data set:
Class x cannot be found on the classpath.
This means that the class that you are trying to use as the PegaSerde implementation cannot be found. This can be caused by:
- The fully qualified class name has not been typed properly. This means package name plus class name with '.' example: com.pega.dsm.kafka.impl.serde.JsonSerde
- The jar containing the PegaSerde implementation has not been added to the class path, please refer to the section of this document describing how to do this.
Class x does not have a public, zero argument, constructor.
The PegaSerde implementation class has to be instantiated through the Java reflection API, to do this the class needs to have a public constructor with no parameters.
Failed to instantiate class.
There can be several reasons that this exception shows up, all referring to the fact that there is a problem creating the class or performing the code in its constructor.
Please refer to the logs for the underlying exception that prevented the class to be initialized.
Class x is not of type PegaSerde.
When you see this message on the Kafka data set rule form it means that the class that the PegaSerde implementation class has been properly initialized but that it cannot be cast to the PegaSerde type. Make sure that you implement the correct interface found as described in this document, and refer to the logs for a more detailed description of the underlying problem.
Serialization and deserialization exceptions
Serializer/Deserializer returned null value.
This means that the PegaSerde implementation returned a null value while (de)serializing. Implementations should always fail hard, see the section on implementing the PegaSerde interface.
Other (de)serialization exceptions
Usually, this points to an exception occurring inside of the (de)serialization implementation methods and can be caused by problems with the code or the input record(s).
Refer to the logs for more information.