Creating Relations to Structure Raw Data

In DeltaStream, Stores provide a layer of abstraction around the raw streaming data, but to be able to process that data in queries, we need to make sense of the data. #_relations are used for defining the metadata and data format that describe the structure of the data for its native format.

Understanding the Data

Let’s say we have defined an Apache Kafka Store, and it has a few Topics:

demodb.public/msk_public# LIST TOPICS;
      Topic name       
-----------------------
  ds_syslogs      
  ds_pageviews         
  ds_shipments         
  ds_users             

Let’s assume all Topics are in JSON format. See CREATE STORE and UPDATE TOPIC for using other serialization formats, and refer to the Relation’s DDL statements for information around data formats, e.g. CREATE STREAM.

We can inspect the Topics to understand what kind of data we have. Here is the ds_pageviews Topic:

demodb.public/msk# PRINT TOPIC ds_pageviews;
{"userid":"User_7"} | {"viewtime":1677196372920,"userid":"User_7","pageid":"Page_82"}
{"userid":"User_3"} | {"viewtime":1677196372962,"userid":"User_3","pageid":"Page_97"}
{"userid":"User_6"} | {"viewtime":1677196373021,"userid":"User_6","pageid":"Page_80"}
{"userid":"User_1"} | {"viewtime":1677196373081,"userid":"User_1","pageid":"Page_73"}
{"userid":"User_2"} | {"viewtime":1677196373122,"userid":"User_2","pageid":"Page_35"}
{"userid":"User_7"} | {"viewtime":1677196373182,"userid":"User_7","pageid":"Page_58"}

And here is the ds_users:

demodb.public/msk# PRINT TOPIC ds_users;
{"userid":"User_6"} | {"registertime":1677196517022,"userid":"User_6","regionid":"Region_9","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}
{"userid":"User_8"} | {"registertime":1677196517619,"userid":"User_8","regionid":"Region_5","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"6502215368","city":"San Carlos","state":"CA","zipcode":"94070"}}
{"userid":"User_1"} | {"registertime":1677196518042,"userid":"User_1","regionid":"Region_3","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"9492229999","city":"Irvine","state":"CA","zipcode":"92617"}}
{"userid":"User_4"} | {"registertime":1677196518620,"userid":"User_4","regionid":"Region_6","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}

Defining Relations

Now that we know what the data looks like in our Topics, we can attach a structure to them for reference in queries. In our example, ds_pageviews is a continuous Stream of immutable page events from our users. So we define a #_stream for it:

CREATE STREAM pageviews (
    viewtime BIGINT, userid VARCHAR, pageid VARCHAR
) WITH ('topic'='ds_pageviews', 'value.format'='JSON');

See CREATE STREAM for more information.

Since the ds_users Topic hosts user information that changes over time, we define a #_changelog for it to be able to capture ongoing changes to each userid:

CREATE CHANGELOG users_log (
    registertime BIGINT, userid VARCHAR, regionid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>,
    PRIMARY KEY(userid)
) WITH ('topic'='ds_users', 'key.format'='JSON', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='JSON');

See CREATE CHANGELOG for more information.

For certain applications, it may be more useful to have access to a snapshot of the resulting data. See #_materialized_view and CREATE MATERIALIZED VIEW AS for more information on how to create a view for the data.

Once Relations are defined, they can be listed through their Database and Schema:

demodb.public/msk# LIST RELATIONS;
          Name         |       Type       |  Owner   |      Created at      |      Updated at       
-----------------------+------------------+----------+----------------------+-----------------------
  users_log            | Changelog        | sysadmin | 2023-01-12T20:41:00Z | 2023-01-12T20:41:00Z  
  pageviews            | Stream           | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z  

They can also be described using their Database and Schema:

demodb.public/msk# DESCRIBE RELATION pageviews;
    Name    |  Type  |                           Metadata                           |                 Columns                  |      Details       | Primary key |  Owner   |      Created at      |      Updated at       
------------+--------+--------------------------------------------------------------+------------------------------------------+--------------------+-------------+----------+----------------------+-----------------------
  pageviews | Stream | {value.format : json,store : msk,topic : pageviews}          | viewtime  BIGINT                         | store=msk          |             | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z  
            |        |                                                              | userid  VARCHAR                          | topic=pageviews    |             |          |                      |                       
            |        |                                                              | pageid  VARCHAR                          |                    |             |          |                      |                       

Using Relations

When Relations are defined for Topics, they become available to DeltaStream as a consumable entity. For example, they can be used in interactive queries:

demodb.public/msk# SELECT * FROM pageviews;
^C to exit
Waiting for sandbox to be in running state 'defined'..................
Waiting for interactive query('defined') to be in running state......
Interactive query is running
{"userid":"User_5"} | {"viewtime":1677274911334,"userid":"User_5","pageid":"Page_14"}
{"userid":"User_8"} | {"viewtime":1677274911528,"userid":"User_8","pageid":"Page_65"}
{"userid":"User_9"} | {"viewtime":1677274911766,"userid":"User_9","pageid":"Page_49"}
{"userid":"User_3"} | {"viewtime":1677274911812,"userid":"User_3","pageid":"Page_21"}
{"userid":"User_3"} | {"viewtime":1677274912412,"userid":"User_3","pageid":"Page_25"}
{"userid":"User_1"} | {"viewtime":1677274912569,"userid":"User_1","pageid":"Page_56"}
{"userid":"User_6"} | {"viewtime":1677274912819,"userid":"User_6","pageid":"Page_20"}

They can also be used in persistent queries, where they are continuously used as a source or sink:

CREATE STREAM user2_views
    AS SELECT userid, pageid
    FROM pageviews
    WHERE userid = 'User_2';

Last updated