APPLICATION

Syntax

BEGIN APPLICATION application_name
    statement;
    statement;
    ...
    statement;
END APPLICATION;

Description

By using an APPLICATION, you can create a set of DSQL statements (including DDL and Query statements) and run them as a unit of work with all-or-nothing effect:

  • In case of DDL statements, either all of them succeed or none depending on whether there is a failure in processing statements. This means the metastore will be updated, only if all statements in the Application succeed.

  • In case of Query statements, all queries will be run in a single job. This helps achieving better efficiency and resource utilization at runtime.

The order of statements in an application matters.

Supported Statements

Currently, below statement types are supported in applications:

Benefits of Application

Applications help users achieve better efficiency and overall cost reduction in two ways:

What Happens during an Application Failure

If a failure happens when processing one of the DDL or Query statements (if any) in an application (for example, due to a parse error), then the whole application fails. This means none of the entities will be created and no query will start.

If a runtime failure happens when the application job is running (for example, due to an authentication error to access a store), then all queries in the application will fail; However all the relation(s) which were created at the beginning of application via DDL statements (if any) will remain.

Virtual Relation

Here is the syntax to define a virtual relation:

BEGIN APPLICATION application_name
...
CREATE VIRTUAL STREAM virtual.public.stream_name 
AS select_statement;
...
CREATE VIRTUAL CHANGELOG virtual.public.changelog_name 
AS select_statement;
...
END APPLICATION;

Virtual relations are for defining intermediate computation results which are used in one or more subsequent queries in the same Application. They help simplify the computation logic and do some common computation among queries only once and use it multiple times, at no extra cost.

Examples

For the following examples, assume a Stream named pageviews and has been created using the below CREATE STREAM statement:

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews',
  'value.format' = 'json'
);

Application with multiple statements

BEGIN APPLICATION app
  
  CREATE STREAM pageviews_2 AS 
    SELECT * FROM pageviews WHERE userid = 'User_2';
  
  CREATE CHANGELOG view_log AS
    SELECT pageid, count(userid) AS cnt
    FROM pageviews
    GROUP BY pageid;
    
END APPLICATION;

The example Application below has 5 statements. New streams are defined and some are used in other queries in the same Application. For example, the INSERT INTO statement is reading data from pv_copy, which is defined using a CREATE STREAM AS SELECT statement and writes into pageviews2 which is defined with another CSAS in the same Application. Note that the order of Statements matter in Application. Therefore, statements defining pv_copy and pageviews2 should appear before the INSERT INTO statement, that is using them, in the Application body. Moreover, the users Stream is defined via a CREATE STREAM statement and is used in the JOIN query and last CSAS statement. Again, data records are read once from pageviews and users Topics and are used in the queries that are referring to them in their FROM clause. All the queries in the Application run within a single job.

BEGIN APPLICATION app
  
  CREATE STREAM "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>)
    WITH ('topic'='users', 'value.format'='json');

  CREATE STREAM pv_copy AS 
    SELECT * FROM pageviews;

  CREATE STREAM pageviews2 AS 
    SELECT userid, pageid FROM pageviews WHERE userid = 'User_2' OR userid = 'User_3';

  INSERT INTO pageviews2 SELECT userid, pageid FROM pv_copy;

  CREATE STREAM pvusers AS 
  SELECT p.userid AS pv_uid, u.gender, p.pageid, u.interests[1] AS top_interest  
  FROM pageviews p JOIN "users" u 
  WITHIN 10 seconds ON p.userid = u.userid;

  CREATE STREAM ca_users AS 
  SELECT userid, contactinfo->city, interests[2] AS hobby 
  FROM "users" WHERE contactinfo->state = 'CA';
    
END APPLICATION;

Application with virtual relations

In the example Application below, we assume the pageviews Stream is already defined in DeltaStream and we have a topic, named users in our Store. In this Application, we first create a Changelog on the users topic to track changes in users' information. Given that we are interested in pageviews done by 3 specific users, we can create a virtual Stream: virtual.public.v1 to filter records for those users from pageviews and use them in subsequent queries. We first find pageviews done by these 3 users every 30 seconds using a Tumble window function on virtual.public.v1 and write the results into a new topic visit_freq. Then we create a new virtual Stream: virtual.public.v2 using a temporal join between virtual.public.v1 and users_log Changelog to extend each record with its user's information. We then use virtual.public.v2 records in two subsequent queries: One to find the popular_pages visited by female users among our users of interest and a second query to find pages visited by different users in a given duration and enrich the results by latest information about the visiting users.

Note that although we have 5 CAS statements in this Application, only 3 new topics will be created in the configured Store: visit_freq, popular_pages and cross_region_pages . Virtual relations' records are not persisted in any topic and are only fed to subsequent queries which are referring them.

BEGIN APPLICATION app

 CREATE CHANGELOG users_log
    (registertime BIGINT, userid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, PRIMARY KEY(userid))
    WITH ('topic'='users', 'key.format'='json', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='json');

 CREATE VIRTUAL STREAM virtual.public.v1 AS 
   SELECT * FROM pageviews
   WHERE userid = 'User_1' OR userid = 'User_2' OR userid = 'User_3';
 
 CREATE CHANGELOG visit_freq WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
  SELECT window_start, window_end, userid, count(pageid) AS cnt
  FROM TUMBLE(virtual.public.v1, SIZE 30 SECONDS)
  GROUP BY window_start, window_end, userid;

 CREATE VIRTUAL STREAM virtual.public.v2 AS
  SELECT v.userid, v.pageid, u.gender, u.interests[1] AS hobby, u.contactinfo->zipcode AS zipcode
   FROM virtual.public.v1 v JOIN users_log u
   ON u.userid = v.userid;

 CREATE CHANGELOG popular_pages WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
  SELECT pageid, count(DISTINCT userid) AS cnt
  FROM virtual.public.v2
  WHERE gender = 'FEMALE'
  GROUP BY pageid
  HAVING count(DISTINCT userid) > 2;

  CREATE STREAM cross_region_pages WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
   SELECT l.pageid,
    l.zipcode AS zipcode_1, l.hobby AS hobby_1,
    r.zipcode AS zipcode_2, r.hobby AS hobby_2
   FROM virtual.public.v2 l JOIN virtual.public.v2 r
   WITHIN 5 SECONDS ON l.pageid = r.pageid
   WHERE (l.userid <> r.userid);

END APPLICATION;

Last updated