Using Multiple Stores in Queries
In DeltaStream, a Store is where user streaming data resides. Apache Kafka and Amazon Kinesis are two instances of such Stores. DeltaStream reads data from streaming Stores, performs the desired computation, and writes the results of the computation to the same Store or another. In this tutorial, we will demonstrate how users with multiple Stores can easily move data from one Store to another Store and perform joins between data in different Stores.
Before beginning this tutorial, it is helpful to know how to create Stores and Relations. Check out these tutorials below:
Setup
In this tutorial, we will assume you have the following setup:
Two Stores—one Kafka Store called
MSK
and one Kinesis Store calledkinesis_store
A Stream in
kinesis_store
calledpageviews_kinesis
A Stream in
MSK
calledusers_kafka
By navigating to the Stores tab on the left menu, you'll see that we have already created a Kafka Store and a Kinesis Store.
Now that we have established our two Stores, each with an associated Relation, we can now continue to writing queries.
Move Data from One Store to Another
Suppose we are trying to migrate data out of Kinesis and into Kafka. We can easily create a Stream that selects all the columns from an existing Stream and specify the store that the new Stream is backed by. In our setup, we have a Stream belonging to our Kinesis Store called pageviews_kinesis
. Using a CREATE STREAM AS SELECT query (CSAS), we can create a new Stream that is essentially a copy of the pageviews_kinesis
Stream in the Kafka Store. We’ll call this new Stream pageviews_kafka
.
Notice in the CSAS query above that we include the store
property in the WITH
clause of the sink Stream, with a value of kafka_store
. Since a CSAS query creates a new Stream, specifying this property informs the query about which Store should back this new Stream. If the store
property is left empty, then the query will default to using the session’s current Store. We are also required to include the topic.replicas
and topic.partitions
properties as they are necessary when creating a new Kafka topic. If the source in this query was a Stream backed by a Kafka topic, then the source’s topic.replicas
and topic.partitions
values would be used by default. But since we are reading from a Kinesis backed Stream, DeltaStream cannot infer the value of these properties, and they must be set explicitly in the query.
By navigating to Stores > kafka_store
> Topics, we will see that a new Kafka topic has been created called pageviews_kafka
. This is where the streaming data for the newly created pageviews_kafka
Stream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a copy of the pageviews_kinesis
records.
Join Data from Sources Belonging to Different Stores
Notice in the CSAS above that we specify the output Store to be kinesis_store
in the WITH
clause, similar to what we did in the section above. However, even though we are creating a new Kinesis Stream, pageviews_enriched
, we notably don’t need to provide the topic.shards
property. The default value is inferred from the left-most source when possible. The sink Stream and the left-most source are both backed by Kinesis Streams, so pageview_kinesis
’s topic.shards
property will be applied to pageviews_enriched
.
By navigating to Stores > kinesis_store > Topics, we will see that a new Kinesis Stream has been created called pageviews_enriched
. This is where the streaming data for the newly created pageviews_enriched
Stream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a the result of the join between pageviews_kinesis
and users_kafka
.
Last updated