Working with ProtoBuf Serialized Data and DeltaStream Descriptors

In streaming stores such as Apache Kafka and Amazon Kinesis, producers send data events as Bytes that need to be interpreted by the consumers of the data. The most popular formats for data serialization include JSON, ProtoBuf, and Apache Avro, all of which DeltaStream supports. In this tutorial, we will focus on ProtoBuf and how to create and use a Descriptor for data serialization/deserialization.

Create a Descriptor

When working with ProtoBuf, you first define a ProtoBuf message and then generate a ProtoBuf file descriptor from that message. This ProtoBuf file descriptor is then used by DeltaStream to generate any necessary code for serializing and deserializing data that conforms to the ProtoBuf message’s structure. Let’s assume for this tutorial that our ProtoBuf message, which lives in the file p.proto, looks like the following:

message Pageviews {
  int64 viewtime = 1;
  string userid = 2;
  string pageid = 3;
}

We can generate a ProtoBuf descriptor in the file pageviews_value.desc from this ProtoBuf message in the file p.proto (see ProtoBuf documentation for more details):

$ protoc --descriptor_set_out pageviews_value.desc p.proto

Now we can create a DeltaStream Descriptor from this ProtoBuf file descriptor. In the CLI, this can be done using the CREATE DESCRIPTOR_SOURCE DDL. In the UI, we can add a descriptor with the following steps:

  1. From the left menu, click Descriptors > UPLOAD:

  2. After choosing the file containing your ProtoBuf file descriptor (pageviews_value.desc in our example), you have the option to name your descriptor. Then press UPLOAD.

  3. Done! Now that we have successfully added a Descriptor, when you click on it you can see the message names contained in the Descriptor (e.g. Pageviews in this example).

Update a Topic with the Descriptor

  1. From the left menu, click Stores > Select the Store > Select the ProtoBuf Topic. In our example, we select the KafkaStore containing the ProtoBuf Topic pageviews_pb:

  2. Navigate to the DESCRIPTOR tab. Note that since this is a Kafka Store, which allows for keys, this page allows the user to assign a Key Descriptor and/or a Value Descriptor. For a Kinesis Store and other Stores that don’t allow for keys, there would only be a Value Descriptor option available.

  3. Select the dropdown menu, and choose the relevant Descriptors to assign to this Topic. In our example, we choose to assign the Pageviews Descriptor for the Value Descriptor and leave the Key Descriptor empty.

  4. Done! We’ve assigned our Descriptor to the relevant Topic, and now we can successfully run commands, such as PRINT TOPIC, and run queries with Relations using this Topic.

Queries with Descriptors and ProtoBuf

With Descriptors added to our Topic, we can now create a Relation for our Topic specifying a key.format or value.format of PROTOBUF as shown in the below DDL example. See CREATE STREAM for more details.

CREATE STREAM "pageviewsPB" (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
    WITH ('topic'='pageviews_pb', 'value.format'='PROTOBUF');

We can also create new Relations using CREATE STREAM AS SELECT or CREATE CHANGELOG AS SELECT specifying PROTOBUF as the data format for the sink Relation. In the below example, we show how we can easily convert the JSON Stream pageviews_json to a Stream with a ProtoBuf key and value format called pageviews_converted_to_proto.

CREATE STREAM pageviews_converted_to_proto WITH (
  'value.format' = 'protobuf', 'key.format' = 'PROTOBUF'
) AS 
SELECT * FROM pageviews_json;

When the sink Relation has a key or value format of PROTOBUF, the Descriptor for the sink Relation will automatically be created and assigned to the Topic. You can easily view your Descriptors in the left menu’s Descriptors tab or use the LIST DESCRIPTORS command in the CLI. Then if you want to use the Descriptor outside of DeltaStream, you can download the ProtoBuf descriptor by using the COPY DESCRIPTOR_SOURCE command.

Last updated