CREATE STORE

Syntax

CREATE STORE
    store_name
WITH (store_parameter = value [, ...]);

Description

DeltaStream processes streaming data that is stored in streaming stores such as Apache Kafka and Amazon Kinesis. To access such data, the first step is to connect to such data stores. This is done using CREATE STORE statement. It defines a new Store with connection details to connect to a remote data source. Currently DeltaStream supports Kafka (Confluent Cloud, Amazon MSK, RedPanda, etc.) and Amazon Kinesis. Support for other streaming stores such as Google Pub/Sub and Apache Pulsar are coming soon.

Arguments

store_name

Name of the Store to define. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.

WITH (store_parameter = value [, …​ ])

This clause specifies Store parameters; see Store Parameters for more information.

Store Parameters

Parameter NameDescription

type

Required. Specifies the Store type.

Type: STORE_TYPE

Valid values: KAFKA, KINESIS, SNOWFLAKE, DATABRICKS, POSTGRESQL

access_region

Required, unless specified in properties.file. Specifies the region of the Store. In order to improve latency and reduce data transfer costs, the region should be the same cloud and region that the physical Store is running in. Type: String Valid values: See LIST REGIONS

uris

Required, unless specified in properties.file. List of comma separated host:port URIs to connect to the store. Type: String

tls.disabled

Optional. Specifies if the store should be accessed over TLS. Default value: TRUE

Type: Boolean

Valid values: TRUE or FALSE

tls.verify_server_hostname

Optional. Specifies if the server CNAME should be validated against the certificate. Default value: TRUE Type: Boolean Valid values: TRUE or FALSE

tls.ca_cert_file

Optional. Path to a CA certificate file in PEM format. Default value: Public CA chains. Type: String Valid values: Path to a SSL certificate in PEM format

tls.cipher_suites

Optional. Comma separated list of cipher suites to use when establishing a TLS connection. Default value: [] (all supported cipher suites are enabled) Type: List Valid values: Full cipher suite names describing algorithm content

tls.protocols

Optional. Comma separated list TLS protocol versions to use while establishing a TLS connection. Default value: TLSv1.2,TLSv1.1,TLSv1 Type: List Valid values: TLS protocols with version

schema_registry.name

Optional. Name of a Schema Registry to associate with the store. A Schema Registry must first be created using the CREATE SCHEMA_REGISTRY DDL statement. Only one Schema Registry can be associated with a store. Default value: None Type: String Valid values: See LIST SCHEMA_REGISTRIES

properties.file

Optional. The file path to a yaml file containing any store parameter. Default value: None Type: String Valid values: File path in current user's filesystem

Kafka Specific Parameters

Parameters to be used if type is KAFKA:

Parameter NameDescription

kafka.sasl.hash_function

Optional. SASL hash function to use when authenticating with Apache Kafka brokers. Default value: NONE. Type: HASH_FUNCTION Valid values: NONE, PLAIN, SHA256, and SHA512

kafka.sasl.username

Required if kafka.sasl.hash_function is not NONE, optional otherwise. Username to use when authenticating with Apache Kafka brokers. Default value: None Type: String

kafka.sasl.password

Required if kafka.sasl.hash_function is not NONE, optional otherwise. Password to use when authenticating with Apache Kafka brokers. Default value: None Type: String

tls.client.cert_file

Required if kafka.sasl.hash_function is SHA256 or SHA512, optional otherwise. Path to a client certificate file in PEM format. Default value: None Type: String Valid value: Path to a SSL certificate in PEM format

tls.client.key_file

Required if kafka.sasl.hash_function is SHA256 or SHA512, optional otherwise. Path to the client key file in PEM format. Default value: None Type: String Valid value: Path to a SSL certificate in PEM format

Kinesis Specific Parameters

Parameters to be used if type is KINESIS:

Parameter NameDescription

kinesis.access_key_id

AWS IAM access key to use when authenticating with an Amazon Kinesis service. Required: If static AWS credentials are required for authenticating with the Amazon Kinesis service. Default value: None Type: String

kinesis.secret_access_key

AWS IAM secret access key to use when authenticating with an Amazon Kinesis service. Required: If static AWS credentials are required for authenticating with the Amazon Kinesis service. Default value: None Type: String

Snowflake Specific Parameters

Parameters to be used if type is SNOWFLAKE:

Parameter NameDescription

snowflake.account_id

Snowflake account identifier assigned to the Snowflake account. Required: Yes Default value: None Type: String

snowflake.cloud.region

Snowflake cloud region name, where the account resources operate in. Required: Yes

Default value: None Type: String Valid values:

  • AWS us-east-1

  • AWS us-east-2

  • AWS us-west-1

  • AWS us-west-2

snowflake.role_name

Access control role to use for the Store operations after connecting to Snowflake. Required: Yes

Default value: None Type: String

snowflake.username

User login name for the Snowflake account. Required: Yes Default value: None Type: String

snowflake.warehouse_name

Warehouse name to use for queries and other store operations that require compute resources. Required: Yes Default value: None Type: String

snowflake.client.key_file

Path to the Snowflake account's private key in PEM format. Required: Yes Default value: None Type: String

snowflake.client.key_passphrase

Passphrase for decrypting the Snowflake account's private key.

Required: No Default value: None Type: String

Databricks Specific Parameters

Parameters to be used if type is DATABRICKS:

Parameter NameDescription

databricks.app_token

Databricks personal access token used when authenticating with a Databricks workspace. Required: Yes Default value: None Type: String

databricks.warehouse_id

The identifier for a Databricks SQL Warehouse belonging to a Databricks workspace. This Warehouse will be used to create and query Tables in Databricks. Required: Yes

Default value: None Type: String

aws.access_key_id

AWS access key ID used for writing data to S3. Required: Yes

Default value: None Type: String

aws.secret_access_key

AWS secret access key used for writing data to S3.

Required: Yes Default value: None Type: String

databricks.cloud.s3.bucket

The AWS S3 bucket that CREATE TABLE AS SELECT queries will write data to. Required: Yes Default value: None Type: String

databricks.cloud.region

The cloud region that the databricks.cloud.s3.bucket belongs to. Required: Yes Default value: None Type: String

Valid values:

  • AWS us-east-1

  • AWS us-east-2

  • AWS us-west-1

  • AWS us-west-2

PostgreSQL Specific Parameters

Parameter NameDescription

postgres.username

Username to connect to the database instance specified with the store's uris parameter. Required: Yes Default value: None Type: String

postgres.password

Password to connect to the database instance using the store's username parameter. Required: Yes

Default value: None Type: String

Examples

Create a Kafka Store with credentials

The following creates a new Kafka store with name my_kafka_store:

CREATE STORE 
    my_kafka_store 
WITH ( 
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092', 
    'tls.ca_cert_file' = '/certs/us-east-1/self-signed-kafka-ca.crt'
);

Create a Kafka Store with credentials from a file

The following creates a new Kafka store with name MyKafkaStore++:

CREATE STORE 
    "MyKafkaStore++" 
WITH ( 
    'type' = KAFKA,
    'properties.file' = '/User/user1/deltastream/kafka_store/properties.yaml'
);
$ cat /User/user1/deltastream/kafka_store/properties.yaml
uris: "http://uri1,http://uri2"
access_region: "AWS us-east-1"
kafka.sasl.hash_function: PLAIN
kafka.sasl.username: "ABCDEFGH12345678"
kafka.sasl.password: "kafkasaslpassword"

Create a Kinesis Store with credentials

The following statement creates a new Kinesis store with name my_kinesis_store:

CREATE STORE 
    my_kinesis_store 
WITH ( 
    'type' = KINESIS, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'https://url.to.kinesis.aws:4566', 
    'kinesis.access_key_id' = 'testkey', 
    'kinesis.secret_access_key' = 'testsecret'
);

Create a Kafka Store with a Schema Registry

The following statement creates a new Kafka store with a Schema Registry named sr. Note that the store name is case-sensitive and thus has quotes around it:

CREATE STORE
    "kafkaStoreWithSR"
WITH (
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092', 
    'schema_registry.name' = sr
);

Create a Confluent Kafka Store with credentials

The following creates a new Confluent Cloud Kafka store with the case-sensitive name ConfluentCloudKafkaStore:

CREATE STORE "ConfluentCloudKafkaStore" 
WITH ( 
    'type' = KAFKA,
    'access_region' = "AWS us-east-1",
    'uris' = 'abc-12345.us-east-1.aws.confluent.cloud:9092',
    'kafka.sasl.hash_function' = PLAIN,
    'kafka.sasl.username' = 'credentials_username',
    'kafka.sasl.password' = 'credentials_password'
);

Create a Snowflake Store

CREATE STORE sf 
WITH ( 
    'type' = SNOWFLAKE,
    'access_region' = "AWS us-east-1",
    'uris' = 'https://my-account.snowflakecomputing.com',
    'snowflake.account_id' = 'my-account',
    'snowflake.role_name' = 'ACCOUNTADMIN',
    'snowflake.username' = 'STREAMING_USER',
    'snowflake.warehouse_name' = 'COMPUTE_WH',
    'snowflake.client.key_file' = '/path/to/pk/my_account_rsa.p8'
);

Create a Snowflake Store with client key passphrase

CREATE STORE sf 
WITH ( 
    'type' = SNOWFLAKE,
    'access_region' = "AWS us-east-1",
    'uris' = 'https://my-account.snowflakecomputing.com',
    'snowflake.account_id' = 'my-account',
    'snowflake.role_name' = 'ACCOUNTADMIN',
    'snowflake.username' = 'STREAMING_USER',
    'snowflake.warehouse_name' = 'COMPUTE_WH',
    'snowflake.client.key_file' = '/path/to/pk/my_account_rsa.p8',
    'properties.file' = '/path/to/deltastream/snowflake_store/properties.yaml'
;
$ cat /path/to/deltastream/snowflake_store/properties.yaml
snowflake.client.key_passphrase: "my$account$$key$$$phrase"

Create a Databricks Store

CREATE STORE databricks_store WITH (
  'type' = DATABRICKS,
  'access_region' = "AWS us-east-1", 
  'uris' = 'https://dbc-12345678-1234.cloud.databricks.com', 
  'databricks.app_token' = 'dapiabcdefghijklmnopqrstuvw123456789', 
  'databricks.warehouse_id' = 'abcdefgh1234567', 
  'aws.access_key_id' = 'AWS_ACCESS_KEY', 
  'aws.secret_access_key' = 'AWS_SECRET_ACCESS_KEY', 
  'databricks.cloud.s3.bucket' = 'mybucket', 
  'databricks.cloud.region' = 'AWS us-west-2'
);

Create a PostgreSQL Store

CREATE STORE ps_store WITH (
  'type' = POSTGRESQL,
  'access_region' = "AWS us-east-1",
  'uris' = 'postgresql://mystore.com:5432/demo', 
  'postgres.username' = 'user',
  'postgres.password' = 'password'
);

Last updated