CREATE CHANGELOG

Syntax

CREATE CHANGELOG changelog_name (
   column_name data_type [NOT NULL] [, ...],
   PRIMARY KEY (column_name [, ...])
) WITH (changelog_parameter = value [, ...]);

Description

A Changelog is a sequence of partitioned and partially ordered events (we use events and records synonymously). A Changelog is a relational representation of data in the streaming Stores, such as the data in a Apache Kafka topic or an Amazon Kinesis stream.

A Changelog defines a PRIMARY KEY for records that is used to represent the change over time for records with the same PRIMARY KEY. Records in a Changelog correlate with each other based on the PRIMARY KEY. This means that a record in a Changelog either is an insert record if it’s the first time the record with the given PRIMARY KEY is appended to the Changelog or upsert records if a previous record with the same PRIMARY KEY has been inserted into the Changelog.

A Changelog is a type of Relation. Each Relation belongs to a Schema in a Database, so the fully qualified name of the Relation would be <database>.<schema>.<relation>.

Arguments

changelog_name

This specifies the name of the new Changelog. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.

column_name

This is the name of a column to be created in the new Changelog. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.

data_type

This refers to the data type of the column, which can include array specifiers. For more information on the data types supported by DeltaStream, refer to the data types reference.

NOT NULL

Defines a constraint on the column, ensuring it cannot contain NULL values.

PRIMARY KEY (column_name [, …​])

The PRIMARY KEY constraint specifies that column(s) of a table can contain only unique (non-duplicate), non-null values.

WITH (changelog_parameter = value [, …​ ])

This clause specifies Changelog Parameters.

Changelog Parameters

Parameter NameDescription

topic

Required: No Default value: Lowercase changelog_name Type: String

store

Name of the store that hosts the Topic for this Changelog.

Required: No Default value: User’s default Store name Type: String Valid values: See LIST STORES.

value.format

Required: Yes Type: String Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

timestamp

Name of the column in the Changelog to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins. If the type of this timestamp field is BIGINT, it is expected that the values are epoch milliseconds UTC.

Required: No Default value: Record’s timestamp Type: String Valid values: Must be of type BIGINT or TIMESTAMP. See Data Types.

timestamp.format

The format to use for TIMESTAMP typed fields. See Data Types.

Required: No Default value: sql Type: String Valid values: sql, iso8601

Kafka Specific Parameters

Parameters to be used if the associated Store is type KAFKA:

Parameter NameDescription

topic.partitions

The number of partitions to use when creating the Topic, if applicable. If the Topic already exists, then this value must be equal to the number of partitions in the existing Kafka Topic.

Required: Yes, unless Topic already exists Default value: Leftmost source Relation Topic’s partition count Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the Topic, if applicable. If the Topic already exists, then this value must be equal to the number of replicas in the existing Kafka Topic.

Required: Yes, unless Topic already exists Default values: Leftmost source Relation Topic's replica count Type: Integer Valid values: [1, ...]

key.format

Required: No, unless key.type is set Default value: None Type: String Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

key.type

Required: No, unless key.format is set Default value: None Type: String Valid values: See STRUCT in Data Types.

delivery.guarantee

The fault tolerance guarantees applied when producing to this Changelog.

Required: No Default value: at_least_once Type: String Valid values:

  • exactly_once: Produces to the Changelog using Kafka transactions. These transactions will be committed when the query takes a checkpoint. On the consumer side, when setting the Kafka consumer isolation.level configuration to read_committed, only the committed records will be seen. Since records aren’t committed until the query takes a checkpoint, there is some additional delay when using this setting.

  • at_least_once: Ensures that records are output to the Changelog at least once. During query checkpointing, the query will wait to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query will try to reprocess old data.

  • none: There is no fault tolerance guarantee when producing to the Changelog. If there are issues on the Kafka broker, then records may be lost, and if there are issues with the query, then output records may be duplicated.

Kinesis Specific Parameters

Parameters to be used if the associated Store is type KINESIS:

Parameter NameDescription

topic.shards

The number of shards to use when creating the Topic, if applicable. If the Topic already exists, then this value must be equal to the number of shards in the existing Kinesis Data Stream.

Required: Yes, unless Topic already exists Default values: Leftmost source Relation topic’s shard count Type: Integer Valid values: [1, ...] Alias: kinesis.shards

Kinesis stores provide a delivery guarantee of at_least_once when producing events into a sink Topic.

Examples

Create a new Changelog

The following creates a new Changelog, user_last_page. This Changelog reads from a topic named pageviews and has a value.format of JSON. Note that this query also specifies userid as the PRIMARY KEY for the Changelog:

CREATE CHANGELOG user_last_page (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR,
   PRIMARY KEY(userid)
)
WITH (
   'topic'='pageviews',
   'value.format'='json'
);

Create a new Changelog for an existing Topic

The following creates a new users Changelog for the existing users Topic in the current Store. This DDL implies that the name of the Changelog should be used as the name of the Topic that hosts the records. This DDL also implies the original structure for the users Topic with a PRIMARY KEY for updates:

CREATE CHANGELOG "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<
        phone VARCHAR,
        city VARCHAR,
        "state" VARCHAR,
        zipcode VARCHAR>,
    PRIMARY KEY(userid)
) WITH ( 'value.format'='json' );

Create a new Changelog with a multi-column Primary Key

The following creates a new Changelog, pagevisits. This Changelog reads from a Topic named pageviews and has a value.format of JSON. Note that this query also specifies (userid, pageid) as the PRIMARY KEY for the Changelog:

CREATE CHANGELOG pagevisits (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR,
   PRIMARY KEY(userid, pageid)
) WITH ( 'topic'='pageviews', 'value.format'='json' );

Create a new Changelog with specifying key and timestamp

The following creates a new Changelog, LatestPageVisitor, in the Database, DataBase, and Schema, Schema2. This Changelog reads from a topic named case_sensitive_pageviews from the store OtherStore and has a value.format of Avro and a key.format of PROTOBUF. Since the key.format is included, it is required that the key.type is also provided and the value in this example is STRUCT<pageid VARCHAR>. Note that this query also specifies PageId as the PRIMARY KEY for the Changelog, and many of the columns are in quotes, indicating they are case-sensitive. The case-insensitive column named CaseInsensitiveCol will be lowercase as caseinsensitivecol when the Relation is created. In the parameters, the timestamp for this Relation is also specified so queries processing data using this Relation as the source will refer to the timestamp column ViewTime as the event’s timestamp:

CREATE CHANGELOG "DataBase"."Schema2"."LatestPageVisitor" (
   "ViewTime" BIGINT,
   "userID" VARCHAR,
   "PageId" VARCHAR,
   "CaseSensitiveCol" BIGINT,
   CaseInsensitiveCol BIGINT,
   PRIMARY KEY("PageId")
) WITH (
   'topic'='case_sensitive_pageviews',
   'store'='OtherStore',
   'value.format'='avro',
   'key.format'='protobuf',
   'key.type'='STRUCT<"PageId" VARCHAR>',
   'timestamp'='ViewTime'
);

Create a new Changelog specifying Kafka delivery guarantee

The following creates a new Changelog, user_exactly_once. This Changelog reads from a Topic named users and has a delivery.guarantee of exactly_once. By specifying the delivery.guarantee, we are overriding the default value of at_least_once. A user may want to use this configuration if their application can tolerate higher latencies but cannot tolerate duplicate records. When this Changelog is used as the sink in an INSERT INTO query, the query will use the delivery.guarantee specified here.

CREATE CHANGELOG user_exactly_once (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR,
   PRIMARY KEY(userid)
)
WITH (
   'topic'='users',
   'value.format'='json',
   'delivery.guarantee'='exactly_once'
);

Create a new Changelog with `NOT NULL` column

The following creates a new Changelog, users_log. Two columns in this Changelog are defined with the NOT NULL constraint: registertime and contactinfo . This means in any valid record from this Changelog, these two columns are not allowed to contain null values.

CREATE CHANGELOG users_log (
    registertime BIGINT NOT NULL,
    userid VARCHAR, 
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL,
    PRIMARY KEY(userid)
)
WITH (
   'topic'='users', 
    'key.format'='json', 
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='json'
);

Last updated