Configuring Deserialization Error Handling

With streaming and stream processing applications, data quality is very important and a single bad record can cause a number of issues. In the ideal case, all records in a message broker are well formed. However, issues can occur such as the accidentally publishing of unrecognized payloads into a Kafka topic or a memory error causing corrupting a record for example. Streaming applications that attempt to consume these records will fail to deserialize them, causing the application to fail in most cases. In this tutorial, we'll show the different ways users can configure their queries to handle such failures in DeltaStream.

Configuring deserialization error handling is done by providing the source.deserialization.error.handling property in the WITH clause for a query's sources. There are 3 modes to choose from (documentation for these #_source_properties):

  • TERMINATE

  • IGNORE

  • IGNORE_AND_LOG

In the following sections, we'll cover each of these modes in detail for the scenario where a malformed record has made its way into a Kafka topic, which will be the source for one of our DSQL queries.

Assume we have the following pageviews Stream defined by the DDL below:

CREATE STREAM pageviews (
    viewtime BIGINT,
    userid VARCHAR,
    pageid VARCHAR
) WITH ('store'='kafka_store', 'topic'='pageviews', 'value.format'='JSON');

The pageviews topic contains the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }
{ "viewtime":"malformed_viewtime", "userid":"User_6", "pageid":"Page_32" }
{ "viewtime":1660931396413, "userid":"User_2", "pageid":"Page_96" }

TERMINATE

The TERMINATE mode is the default option and the strictest. With this mode, any records that the query fails to deserialize will cause the query to terminate.

Query using the source.deserialization.error.handling property default:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews;

Query explicitly setting the source.deserialization.error.handling property to TERMINATE:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews
WITH ('source.deserialization.error.handling'='TERMINATE');

The output of these queries will contain the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }

Note that only one of the three input records shows up in the output. Since the first record is well formed JSON and matches the value types specified in the pageviews DDL, the query is successfully able to deserialize that record and produce the output. The second record however has an unexpected value for the viewtime column, the string "malformed_viewtime" when the query is expected to deserialize that field as BIGINT. This causes a deserialization error for the source and since we chose the TERMINATE error handling mode, this error will cause the entire query to fail and stop any further progress.

IGNORE

The IGNORE mode can be used to instruct queries to continue if they encounter deserialization errors. Any records that fail to be deserialized from a query's source with IGNORE mode will simply be skipped.

Query setting the source.deserialization.error.handling property to IGNORE:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews
WITH ('source.deserialization.error.handling'='IGNORE');

The output of this query will contain the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }
{ "viewtime":1660931396413, "userid":"User_2", "pageid":"Page_96" }

Two of the three input records shows up in the output. As explained in the previous section, the first and third records are well formed JSON records that the query successfully deserialized. The second record failed deserialization. Since the source is using the IGNORE error handling mode, the second record is dropped and doesn't appear in the output. The query meanwhile continues processing records, hence the third record appearing in the output. Note that unlike the TERMINATE mode, the query doesn't fail and continues to process any additional records that appear in the source.

IGNORE_AND_LOG

The IGNORE_AND_LOG mode is similar to the IGNORE mode in that it will skip any records that the query fails deserialize, but in this mode the skipped records are also logged. Logging these records can be useful for users who want to know which payloads were problematic and potentially replay or act upon any skipped records.

The error log for this mode is specified by the required source.deserialization.error.log.topic and optional source.deserialization.error.log.store properties, as well as store specific properties. If the error log topic already exists, then it will be used. If the topic does not exist, then the topic will be created according to the additional store specific properties listed below. If the error log store is not specified, then the store in which the source relation belongs will be used by default.

The additional properties for Kafka:

  • source.deserialization.error.log.topic.partitions

  • source.deserialization.error.log.topic.replicas

The additional properties for Kinesis:

  • source.deserialization.error.log.topic.shards

Kafka source Relation

This subsection will focus on how IGNORE_AND_LOG behaves for query with the source Relation being backed by a Kafka topic. We can reuse our pageviews Stream in this example as it is associated with the Kafka Store named kafka_store.

Query setting the source.deserialization.error.handling property to IGNORE_AND_LOG:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews
WITH (
    'source.deserialization.error.handling'='IGNORE_ANG_LOG', 
    'source.deserialization.error.log.topic'='pageviews_copy_errlog',
    'source.deserialization.error.log.topic.partitions'='1',
    'source.deserialization.error.log.topic.replicas'='3'
);

Since we are not specifying a particular store for the error log topic to use by setting the source.deserialization.error.log.store property, we default to using the same store as the pageviews Stream, kafka_store.

The output of this query will contain the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }
{ "viewtime":1660931396413, "userid":"User_2", "pageid":"Page_96" }

The error log topic pageviews_copy_errlog will contain the following records:

{
  "partition": 0,
  "offset": 1,
  "topic": "pageviews",
  "store": "store_test",
  // "key" field would also be here if the source record contained a key
  "value": "eyAidmlld3RpbWUiOiJtYWxmb3JtZWRfdmlld3RpbWUiLCAidXNlcmlkIjoiVXNlcl82IiwgInBhZ2VpZCI6IlBhZ2VfMzIiIH0=",
  "timestamp": 1693341135576
}

Similarly to the IGNORE case, only the first and third input records shows up in the output. The second record failed deserialization and is skipped in the output, but information about the second record appears in the error log. Common metadata such as the original record's partition, offset, topic, store, and timestamp are included. Note that the value field has been encoded with base64. Since the query could not deserialize the original record, the Kafka record's entire value byte array is encoded using base64 and that String is included in the JSON error log payload. In this example, the original Kafka record doesn't contain a key, but if it were to contain a key, it would also be encoded using base64 and there would be a similar key field in the JSON payload.

We can validate that the value in the error log is what we expect by manually decoding the field:

$ echo "eyAidmlld3RpbWUiOiJtYWxmb3JtZWRfdmlld3RpbWUiLCAidXNlcmlkIjoiVXNlcl82IiwgInBhZ2VpZCI6IlBhZ2VfMzIiIH0=" | base64 -d | jq .
{
  "viewtime": "malformed_viewtime",
  "userid": "User_6",
  "pageid": "Page_32"
}

Kinesis source Relation

This subsection will focus on how IGNORE_AND_LOG behaves for query with the source Relation being backed by a Kinesis stream. Consider a new pageviews_kinesis Stream created by the following DDL:

CREATE STREAM pageviews_kinesis (
    viewtime BIGINT,
    userid VARCHAR,
    pageid VARCHAR
) WITH ('store'='kinesis_store', 'topic'='pageviews', 'value.format'='JSON');

Query setting the source.deserialization.error.handling property to IGNORE_AND_LOG:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews_kinesis
WITH (
    'source.deserialization.error.handling'='IGNORE_ANG_LOG', 
    'source.deserialization.error.log.store'='alt_kinesis_store', 
    'source.deserialization.error.log.topic'='pageviews_copy_errlog',
    'source.deserialization.error.log.topic.shards'='1'
);

Notice that here we are specifying a particular store for the error log topic to use by setting the source.deserialization.error.log.store to alt_kinesis_store.

The output of this query will contain the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }
{ "viewtime":1660931396413, "userid":"User_2", "pageid":"Page_96" }

The error log topic pageviews_copy_errlog will contain the following records:

{
  "partitionKey": "some_partition_key",
  "seqNum": "49644062644797676009557574763794547223012948382276648962",
  "shardId": "shardId-000000000000",
  "store": "store_test",
  "stream": "pageviews_kinesis",
  "data": "eyAidmlld3RpbWUiOiJtYWxmb3JtZWRfdmlld3RpbWUiLCAidXNlcmlkIjoiVXNlcl82IiwgInBhZ2VpZCI6IlBhZ2VfMzIiIH0=",
  "timestamp": 1693341135576
}

Similarly to the IGNORE case, only the first and third input records shows up in the output. The second record failed deserialization and is skipped in the output, but information about the second record appears in the error log. Common metadata such as the original record's partition key, sequence number, shard ID, store, and timestamp are included. Note that the datae field has been encoded with base64. Since the query could not deserialize the original record, the Kinesis stream's entire data byte array is encoded using base64 and that String is included in the JSON error log payload.

We can validate that the data in the error log is what we expect by manually decoding the field:

$ echo "eyAidmlld3RpbWUiOiJtYWxmb3JtZWRfdmlld3RpbWUiLCAidXNlcmlkIjoiVXNlcl82IiwgInBhZ2VpZCI6IlBhZ2VfMzIiIH0=" | base64 -d | jq .
{
  "viewtime": "malformed_viewtime",
  "userid": "User_6",
  "pageid": "Page_32"
}

Last updated