CREATE CHANGELOG AS SELECT

Syntax

CREATE CHANGELOG
    changelog_name
[WITH (changelog_parameter = value [, ... ])]
AS select_statement;

Description

CREATE CHANGELOG AS is essentially a combination of two statements:

  • A DDL statement that creates a new #_changelog.

  • An INSERT INTO statement that runs a SELECT statement and adds the results into the newly created Changelog.

Arguments

changelog_name

This specifies the name of the new Changelog. Optionally, use <database_name>.<schema_name> as the prefix to the name to create the Relation in that scope. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.

WITH (changelog_parameter = value [, …​ ])

Optionally, this clause specifies the #_changelog_parameters.

select_statement

This statement specifies the SELECT statement to run.

Changelog Parameters

Parameter NameDescription

topic

The name of the Topic that has the data for a newly created sink Changelog. If the Topic doesn’t exist in the Store, a Topic with this name is created in the Store, and a Topic with the changelog_name is created in the corresponding Store.

Required: No Default value: Lowercase changelog_name. Type: String Valid values: See LIST TOPICS

store

The name of the Store that hosts the Topic for this Changelog. Required: No Default value: User’s default Store.

Type: String Valid values: See LIST STORES

value.format

timestamp

Name of the column in the Changelog to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins.

Required: No Default value: Record’s timestamp. Type: String Valid values: Must be of type BIGINT or TIMESTAMP. See Data Types.

timestamp.format

The format to use for TIMESTAMP typed fields. See Data Types.

Required: No Default value: sql Type: String Valid values: sql, iso8601

Kafka Specific Parameters

Parameter NameDescription

topic.partitions

The number of partitions to use when creating the Kafka topic, if applicable.

Required: Yes, unless Topic already exists. Default value: Leftmost source Relation Topic’s partition count. Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the Kafka topic, if applicable.

Required: Yes, unless Topic already exists. Default values: Leftmost source Relation Topic’s replica count. Type: Integer Valid values: [1, ...]

key.format

Required: No Default value: Key format from the leftmost source Relation’s key (if any) or the same as value.format. Type: KEY_FORMAT Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

key.type

Required: No, unless key.format is set and there is no default value. Default value: For certain query semantics (i.e. queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source Relation’s key is used by default (if any). See Row Key Definition. Type: String Valid values: See STRUCT in Data Types

delivery.guarantee

The fault tolerance guarantees applied when producing to this Changelog.

Required: No Default value: at_least_once Type: String Valid values:

  • exactly_once: Produces to the Changelog using Kafka transactions. These transactions will be committed when the query takes a checkpoint. On the consumer side, when setting the Kafka consumer isolation.level configuration to read_committed, only the committed records will be seen. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.

  • at_least_once: Ensures that records are output to the Changelog at least once. During query checkpointing, the query will wait to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query will try to reprocess old data.

  • none: There is no fault tolerance guarantee when producing to the Changelog. If there are issues on the Kafka broker then records may be lost and if there are issues with the query then output records may be duplicated.

Kinesis Specific Parameters

Parameter NameDescription

topic.shards

The number of shards to use when creating the Kinesis stream, if applicable.

Required: Yes, unless Topic already exists. Default values: Leftmost source Relation Topic’s shard count. Type: Integer Valid values: [1, ...] Alias: kinesis.shards

Kinesis stores provide a delivery guarantee of at_least_once when producing events into a sink Topic.

Examples

Create a copy Changelog

The following creates a replica of the source Changelog.

CREATE CHANGELOG users_clog AS SELECT * FROM users_log;

Create a Changelog in a specific Schema within a default Database

The following creates a replica of the source Changelog, but the new Relation belongs to the Schema named schema2 in the session’s current Database.

CREATE CHANGELOG schema2.users_log_copy AS SELECT * FROM users_log;

Create a Changelog in a specific Schema and Database

The following creates a replica of the source Changelog, but the new Relation belongs to the Schema named schema2 in the Database named db.

CREATE CHANGELOG db.schema2.users_log_copy AS SELECT * FROM users_log;

Create a case-sensitive Changelog

The following creates a replica of the source Changelog, and the new sink Relation has a case-sensitive name.

CREATE CHANGELOG "Users" AS SELECT * FROM users_log;

Create a case-sensitive Changelog in a case-sensitive Schema and Database

The following creates a replica of the source Changelog. The new sink Relation has a case-sensitive name and is in a case-sensitive Database and Schema.

CREATE CHANGELOG "DataBase"."Schema"."Users" AS SELECT * FROM users_log;

Create a new Changelog that is backed by a specific Topic

The following moves data from a Kafka Store to a Kinesis Store. The query creates a replica of the source Changelog, but this new Changelog is associated with the specified Topic called userstwo.

CREATE CHANGELOG
  users2
  WITH ('topic' = 'userstwo')
AS SELECT * FROM users_log;

Copy data from one Store to another

The following creates a replica of the source Changelog, but this new Changelog is associated with the specified Store called kinesis_store.

CREATE CHANGELOG
  users_kinesis
  WITH ('store' = 'kinesis_store')
AS SELECT * FROM users_kafka;

Convert data from JSON to Avro

The following creates a replica of the source Changelog that has a data format of JSON, but the new sink Changelog has a data format of Avro for its value and key.

CREATE CHANGELOG users_avro
  WITH ('value.format' = 'avro', 'key.format' = 'AVRO') AS 
SELECT * FROM users_json;

Simple projection to a Kafka topic with a specific number of partitions and replicas

The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.

CREATE CHANGELOG users2
  WITH ('topic.partitions' = '5', 'topic.replicas' = '3') AS 
SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;

Simple projection to a Kinesis stream with a specific number of shards

The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.

CREATE CHANGELOG
  users2
  WITH ('topic.shards' = '4')
AS SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;

Create a Changelog from aggregation

Aggregations of data on Streams result in a CHANGELOG output Relation type. The PRIMARY KEY for the following would be (userid).

CREATE CHANGELOG
  visitlogs
  WITH ('topic' = 'pglogs')
AS SELECT
  userid,
  count(pageid) AS pgcount
FROM pageviews
GROUP BY userid;

Create a Changelog from HOP window aggregation

Aggregations of data on Streams result in a CHANGELOG output Relation type. The PRIMARY KEY for the following would be (window_start, window_end, userid, pageid).

CREATE CHANGELOG
  averagetime
AS SELECT 
  window_start, 
  window_end, 
  userid, 
  pageid, 
  avg(viewtime) AS avgtime 
FROM HOP(pageviews, size 8 second, advance by 4 seconds)
GROUP BY
  window_start, 
  window_end, 
  userid,
  pageid;

Create a Changelog with specifying the timestamp column

The below statement creates a new Changelog, called userslogs2, from an already existing Changelog with the name userslogs. The timestamp Changelog parameter, specified in the WITH clause, is used to mark the registertime column in userslogs2 as the timestamp column. Therefore, any subsequent query that refers to userslogs2 in its FROM clause will use this column for time based operations.

CREATE CHANGELOG userslogs2
  WITH ('timestamp' = 'registertime') AS
SELECT userid, registertime, contactInfo['city'] AS city 
FROM userslog;

Create a Changelog with specifying the Kafka delivery guarantee

The below statement creates a new Changelog, called users_exactly_once, from the already existing Changelog userslog. The delivery.guarantee Changelog parameter, specified in the WITH clause, is used to override the default delivery.guarantee of at_least_once to exactly_once. A user may want to use this configuration if their application can tolerate higher latencies but cannot tolerate duplicate outputs.

CREATE CHANGELOG users_exactly_once 
  WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM userslog;

Last updated