CREATE TABLE AS SELECT

Syntax

CREATE TABLE
    table_name
[WITH (table_parameter = value [, ... ])] 
AS select_statement;

Description

CREATE TABLE AS is a statement that:

  • Generates a DDL statement to create a new Table.

  • Launches a new query to write the results of the SELECT statement into the newly created Table.

Arguments

table_name

This specifies the name of the new Table. Optionally, use <database_name>.<schema_name> as the prefix to the name to create the Relation in that namespace. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.

WITH (<table_parameter> = <value> [, …​ ])

Optionally, this clause specifies #_stream_parameters.

select_statement

This statement specifies the SELECT statement to run.

Table Parameters

Parameter NameDescription

store

The name of the Store that hosts the Topic for this Stream.

Required: No Default value: User’s default Store.

Type: String Valid values: See LIST STORES

Snowflake Specific Parameters

Parameter NameDescription

snowflake.db.name

Required: Yes Default values: None Type: String Valid values: Database names available from LIST ENTITIES.

snowflake.schema.name

Required: Yes Default value: None Type: String Valid values: Schema names available from LIST ENTITIES under the snowflake.db.name database name.

snowflake.table.name

snowflake.buffer.millis

Snowflake stores provide a delivery guarantee of at_least_once when producing events into a sink Table. Snowflake Stores provide an insert-only mode when writing to Snowflake tables.

Databricks Specific Parameters

Parameter NameDescription

databricks.catalog.name

Required: Yes Default values: None Type: String Valid values: Database names available from LIST ENTITIES.

databricks.schema.name

Required: Yes Default value: None Type: String Valid values: Schema names available from LIST ENTITIES under the databricks.catalog.name catalog name.

databricks.table.name

table.data.file.location

The S3 directory location for the Delta formatted data to be written. The credentials for writing to S3 is given during Store creation (see CREATE STORE). Note that the S3 bucket from the location specified by this parameter must match the databricks.cloud.s3.bucket property defined in the Store. Required: Yes Default value: None Type: String

Databricks stores provide a delivery guarantee of exactly_once when producing events into a sink Table. Databricks Stores provide an insert-only mode when writing to Databricks tables.

Examples

Create a copy of a Stream in a Snowflake Table

The following creates a replica of the source #_stream, pageviews in the Snowflake Table, PV_TABLE:

CREATE TABLE "PV_TABLE" WITH (
  'store' = 'sfstore',
  'snowflake.db.name' = 'DELTA_STREAMING',
  'snowflake.schema.name' = 'PUBLIC'
) AS SELECT * FROM pageviews;

Create a stream of changes for a Changelog in a Snowflake Table

The following CTAS query creates a new Snowflake Table to store incremental changes resulting from a grouping aggregation on the transactions #_stream:

CREATE TABLE "CC_TYPE_USAGE" WITH (
  'store' = 'sfstore',
  'snowflake.db.name' = 'DELTA_STREAMING',
  'snowflake.schema.name' = 'PUBLIC'
) AS SELECT
  cc_type AS "CREDIT_TYPE",
  tx_time AS "TRANSACTION_TS",
  tx_id AS "TRANSACTION_ID"
FROM transactions
GROUP BY cc_type;

This query stores all changes to the grouping column cc_type to the sink Table CC_TYPE_USAGE.

Create a copy of a Stream in a Databricks Table

The following creates a replica of the source #_stream, pageviews in the Databricks Table, pageviews_db:

CREATE TABLE pageviews_db WITH (
  'store' = 'databricks_store', 
  'databricks.catalog.name' = 'catalog1', 
  'databricks.schema.name' = 'schema1', 
  'databricks.table.name' = 'pageviews', 
  'table.data.file.location' = 's3://mybucket/test/0/pageviews'
) AS SELECT * FROM pageviews;

Upon issuing this query, a Databricks Table will be created in catalog1.schema1.pageviews which uses s3://mybucket/test/0/pageviews as its external location. This query will write the Delta formatted parquet files and update the Delta log in that S3 location.

Last updated