Using Multiple Stores in Queries

In DeltaStream, a Store is where user streaming data resides. Apache Kafka and Amazon Kinesis are two instances of such Stores. DeltaStream reads data from streaming Stores, performs the desired computation, and writes the results of the computation to the same Store or another. In this tutorial, we will demonstrate how users with multiple Stores can easily move data from one Store to another Store and perform joins between data in different Stores.

Before beginning this tutorial, it is helpful to know how to create Stores and Relations. Check out these tutorials below:

Setup

In this tutorial, we will assume you have the following setup:

  • Two Stores—one Kafka Store called MSK and one Kinesis Store called kinesis_store

  • A Stream in kinesis_store called pageviews_kinesis

  • A Stream in MSK called users_kafka

By navigating to the Stores tab on the left menu, you'll see that we have already created a Kafka Store and a Kinesis Store.

Now that we have established our two Stores, each with an associated Relation, we can now continue to writing queries.

Move Data from One Store to Another

Suppose we are trying to migrate data out of Kinesis and into Kafka. We can easily create a Stream that selects all the columns from an existing Stream and specify the store that the new Stream is backed by. In our setup, we have a Stream belonging to our Kinesis Store called pageviews_kinesis. Using a CREATE STREAM AS SELECT query (CSAS), we can create a new Stream that is essentially a copy of the pageviews_kinesis Stream in the Kafka Store. We’ll call this new Stream pageviews_kafka.

Notice in the CSAS query above that we include the store property in the WITH clause of the sink Stream, with a value of kafka_store. Since a CSAS query creates a new Stream, specifying this property informs the query about which Store should back this new Stream. If the store property is left empty, then the query will default to using the session’s current Store. We are also required to include the topic.replicas and topic.partitions properties as they are necessary when creating a new Kafka topic. If the source in this query was a Stream backed by a Kafka topic, then the source’s topic.replicas and topic.partitions values would be used by default. But since we are reading from a Kinesis backed Stream, DeltaStream cannot infer the value of these properties, and they must be set explicitly in the query.

By navigating to Stores > kafka_store > Topics, we will see that a new Kafka topic has been created called pageviews_kafka. This is where the streaming data for the newly created pageviews_kafka Stream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a copy of the pageviews_kinesis records.

Join Data from Sources Belonging to Different Stores

Notice in the CSAS above that we specify the output Store to be kinesis_store in the WITH clause, similar to what we did in the section above. However, even though we are creating a new Kinesis Stream, pageviews_enriched, we notably don’t need to provide the topic.shards property. The default value is inferred from the left-most source when possible. The sink Stream and the left-most source are both backed by Kinesis Streams, so pageview_kinesis’s topic.shards property will be applied to pageviews_enriched.

By navigating to Stores > kinesis_store > Topics, we will see that a new Kinesis Stream has been created called pageviews_enriched. This is where the streaming data for the newly created pageviews_enriched Stream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a the result of the join between pageviews_kinesis and users_kafka.

Last updated