Starting with the Web App

This guide will take you through the steps to build an end-to-end streaming application with DeltaStream’s web application. Once you finish the steps in this guide, you will have hands on experience with foundational concepts in DeltaStream and will be able to build similar applications. For this guide, we will be using topics in Apache Kafka, but the steps should be the same if you have your data in other streaming stores such as Amazon Kinesis or Redpanda.

We assume you already have created your account and signed into DeltaStream. Also, we assume you already have created an Organization or have joined an existing Organization.

You will accomplish the following steps in this guide:

  • Connect to your streaming store (in this case, Apache Kafka) by creating a Store in DeltaStream.

  • Create your first Database.

  • Create Streams and Changelogs for your Kafka topics.

  • Create new Streams, Changelogs, and Materialized Views using DeltaStream’s continuous queries.

  • (Optional) Share your streaming data with other users in your organization.

Create a Store

First, create a Store in DeltaStream. A Store is a streaming store service such as Apache Kafka or Amazon Kinesis where your streaming data resides. To create a new Store, go to the Stores page in the main menu in the left side of the web app. Press the +New Store button in top right corner to open the form to create a new Store. Once you press the + New Store button, you will see the New Store form as shown here:

You can now name your new Store and select the Store type along with the availability zone for the Store in AWS. You should also provide the URL(s) for the Kafka cluster. If you are providing more than one URL, use “,” to separate the urls. After adding URLs for brokers in the text box, press the + button. For this guide, since we are using an Amazon MSK cluster, let’s name the store MSK.

Once you have all of the fields filled, press the NEXT button to go to the next page where you will provide required credentials to allow DeltaStream to connect to your Store. You don’t need to fill the credentials page since the Kafka cluster that we use for this guide is accessible to your account by default.

Once you save the new Store information, you will see your new Store in the Stores page as shown in the following image:

To make sure the new Store is set up correctly, go ahead and click on it to expand it and go to the Topics’ section. You should see the list of Topics in the Store as shown here.

In the list of Topics, find the ds_pageviews Topic and click on it. Then click on the Print button to see the messages that are coming to this Topic in real time as shown here.

Create a Database

DeltaStream provides a relational model on top of your streaming data. Similar to other relational systems, DeltaStream uses Databases and Schemas for namespacing and organizing your data. To create a new Database, go to the Catalog page from the main menu and click the + New Database button. Enter the Database name and click SAVE to create your first Database.

For this guide, we named our Database TestDB. Note that you can create as many Databases as you need. Once you create a Database, it will also have a Schema named public, but you can add more Schemas if you wish.

Create Streams and Changelogs

We will now create Relations on top of our Kafka topics using DeltaStream’s DDL statements. If you want to treat your streaming data in a Topic as an append only stream where each event is an independent event in your Stream, you define it as a Stream. In this guide we declare a Stream on the ds_pageviews Topic since each pageview event is an independent event. To do so, you can go to SQL page by choosing SQL from the main menu and writing the DDL statement there. Here is the statement to create a pageviews Stream:

CREATE STREAM pageviews (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR
)WITH (
    'topic'='ds_pageviews', 
    'value.format'='JSON'
);

Note that the above Stream will be created in the currently used Database and Schema, which are TestDB and public, respectively. Also, since there is no Store specified in the WITH clause, DeltaStream will use the default Store that we declared above as the Store where the pageviews topic is stored in.

We then declare a Changelog for the ds_users topic. A Changelog indicates that we want to interpret events in a Topic as UPSERT events; therefore, the events should have a primary key, and each event will be interpreted as an insert or update for the given primary key. Use the following statement in a SQL page to declare the users changelog:

CREATE CHANGELOG users_log(
    registertime BIGINT, 
    userid VARCHAR, 
    regionid VARCHAR, 
    gender VARCHAR, 
    interests ARRAY<VARCHAR>, 
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, 
    PRIMARY KEY(userid)
)WITH (
    'topic'='ds_users', 
    'key.format'='json', 
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='json'
);

Once you declare the pageviews Stream and users Changelog, you should be able to see them in the public schema of the TestDB database by navigating to the Databases page from the main menu:

Run Queries

Once you have declared Streams and Changelogs, you can write continuous queries in SQL to process this streaming data in real time. Let’s start with interactive queries where the result of such queries are streamed back to the user. Such queries can be used to inspect your Streams and Changelogs or build queries iteratively by inspecting the query result. As an example, let’s inspect the pageviews stream using the following interactive query:

SELECT * FROM pageviews;

Once you run this query, DeltaStream compiles it into a streaming job, runs it, and streams the result into the web app. You should see something like this in your screen:

CREATE STREAM csas_enriched_pv AS 
SELECT 
    TO_TIMESTAMP_LTZ(viewtime, 3) AS viewtime,  
    p.userid AS userid, 
    pageid, 
    TO_TIMESTAMP_LTZ(registertime, 3) AS registertime, 
    regionid, 
    gender, 
    interests, 
    contactinfo
FROM pageviews p
    JOIN users_log u ON u.userid = p.userid;

Once you write the above query in the SQL page and click RUN, you should see the confirmation of a query success as the following:

Under the hood, DeltaStream will compile and launch the query as an Apache Flink streaming job. You should be able to see the query along with its status in the Query Management page by clicking Queries in the main menu. Once the query successfully runs, you will have a new Kafka topic named csas_enriched_pv in your Kafka cluster, and a new Stream will also be added to the Streams in your Database, TestDB. You can examine the content of the new Stream by running the following query from the SQL page in the web app:

SELECT * FROM csas_enriched_pv;

The following image shows the result of running the above interactive continuous query. Note that the result of an interactive query is streamed to the client as shown below:

Now that we have the enriched pageviews Stream, let’s build a Materialized View where we compute the number of pageviews per user. Type the following statement in the SQL page to stream this Materialized View:

CREATE MATERIALIZED VIEW user_view_count AS 
SELECT
    userid, 
    COUNT(*) AS view_count 
FROM csas_enriched_pv 
GROUP BY userid;

Once you run the above query, DeltaStream will launch a streaming job that runs the SELECT statement and materializes the result of the query. The resulting Materialized View then can be queried the same way you would query a Materialized View in traditional relational databases; however, in DeltaStream, the data in the Materialized View is always kept fresh by the streaming job. The following is a simple query to get the current view count for a user with the userid of User_2.

SELECT * FROM user_view_count WHERE userid = 'User_2';

The result of the above query will be in one row as shown below:

As you see, the number of pageviews for User_2 is 3 at the time of running the above query. Now run the query again—you should see an updated result for the pageview count for the user, which indicates that every time you run a query on a Materialized View, you will get the most up-to-date result. DeltaStream will make sure the data in the view is continuously updated, using the continuous query that declared the Materialized View. Here is an image of running the same query on the Materialized View a few seconds later.

As you see, the result is updated to 11 from the previous value of 3.

You can see all of the Relations such as Streams, Changelogs, and Materialized Views along with queries and their relationship with each other in the Analytics page of the DeltaStream web app. As you can see in the image below, we have a query that joins pageviews and users and creates a new stream called csas_enriched_pv. We also have another query that creates a Materialized View named user_view_count from csas_enriched_pv.

Clean Up

Now that we have seen a basic use case, let’s clean up our environment. To do so, we first terminate the queries and then drop the created Streams, Changelogs, and Materialized Views. To terminate your queries, go to the Queries page. Then you can go to the corresponding Database and Schema to drop the Streams, Changelogs, and Materialized Views. Note that if there is a query that uses a Stream, Changelog, or Materialized View, you should terminate the query before dropping the relation.

Last updated