JOIN

Syntax

FROM relation_reference
        [WITH(source_property = value [, ...])]
    join_type relation_reference
        [WITH(source_property = value [, ...])]
    [WITHIN size time_unit]
    ON join_criteria]

Description

JOIN is the SQL operator to combine records from two Relations according to a given join criteria. Join criteria, specified as the ON clause, can only be a Boolean comparison expression that declares equality of a pair of matching columns, one from each Relation, e.g. left.col = right.col. A Relation alias can be specified for each Relation in the join clause. The alias can then be used to refer to the Relation in other clauses in the query, such as SELECT or WHERE clauses. This is useful in case referred Relations have duplicate column names. DeltaStream has support for LEFT JOIN, INNER JOIN, and FULL JOIN. Currently, two types of join are supported: interval join and temporal join.

Arguments

Join Types

Join TypeDescription

JOIN

Returns records that have matching values in both Relations.

INNER JOIN

Same as JOIN.

FULL JOIN

Returns all records when there is a match in either the left or right Relation.

FULL OUTER JOIN

Same as FULL JOIN.

LEFT JOIN

Returns all records from the left Relation and the matched records from the right Relation.

LEFT OUTER JOIN

Same as LEFT JOIN.

RIGHT JOIN

Not supported.

RIGHT OUTER JOIN

Not supported.

Interval Join (Stream/Stream)

Interval join combines records coming from two Streams according to a join condition and a time dimension. The ON clause defines the join condition in the form of equality of two columns from the source Streams. The time dimension is specified using the WITHIN clause, which defines a time interval. Two records are matching if they have the same value for referred columns in the join criteria and the difference between their timestamps is valid according to the time interval in the WITHIN clause. For interval joins, INNER, LEFT, and FULL join types are supported.

Examples

Join two Streams

Given orders and shipments #_streams with the following example records:

           orders                               shipments
+-----------+--------+--------+  +--------------+---------+-----------+
| ordertime | itemid | price  |  | shipmenttime | orderid |      city |
+-----------+--------+--------+  +--------------+---------+-----------+
|      2000 |      2 |  18.23 |  |         6000 |       2 | Palo Alto |
|      3000 |      5 | 187.88 |  |        14000 |       5 |   Chicago |
|     10000 |      2 |  52.32 |  |        15000 |       2 |  New York |
|     12000 |      5 |  74.99 |  +--------------+---------+-----------+
|     13000 |      4 | 892.54 |
|     18000 |      2 | 123.90 |
|     23000 |      5 |  40.44 |
|     26000 |      2 |  12.88 |
+-----------+--------+--------+

When the following SELECT statement is run:

SELECT 
  o.ordertime,
  o.itemid AS oid, 
  s.orderid AS sid, 
  o.price, 
  s.city AS shipment_city 
FROM 
  orders o WITH ('timestamp'='ordertime')
  JOIN shipments s WITH ('timestamp'='shipmenttime')
  WITHIN 10 seconds ON o.itemid = s.orderid;

We get the following output:

Output:
+-----------+-----+-----+--------+---------------+
| ordertime | oid | sid | price  | shipment_city |
+-----------+-----+-----+--------+---------------+
|      2000 |   2 |   2 |  18.23 |     Palo Alto |
|     10000 |   2 |   2 |  52.32 |     Palo Alto |
|     10000 |   2 |   2 |  52.32 |      New York |
|     12000 |   5 |   5 |  74.99 |       Chicago |
|     18000 |   2 |   2 | 123.90 |      New York |
|     23000 |   5 |   5 |  40.44 |       Chicago |
+-----------+-----+-----+--------+---------------+

NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this join query. Since this is a streaming query, more records will be outputted as data arrives in the source. Notice that not every record from orders is joined and outputted. The reason for this is because the join type is JOIN and the join condition is that the orders record’s itemid matches the shipments record’s orderid within 10 seconds of the orders record’s ordertime. For example, the second orders record has an ordertime of 3000, but the only shipments record with the corresponding orderid has a shipmenttime of 14000, which is not within 10 seconds of the ordertime. Similarly, the orders record with itemid=2 and ordertime=26000 is not within 10 seconds of either of the shipments records with the corresponding orderid. However, the orders record with itemid=2 and ordertime=10000 is within 10 seconds of both shipments records with a corresponding orderid, so the orders record joins on both shipments records and produces 2 records in the output, one for each successful join.

Interval self-join

The following query performs a temporal join, joining the pageviews Stream with itself when the pageid values match and the userid values don’t match within 5 minutes of the left Relation’s timestamp. Essentially, the query finds all the pairs of unique users that visited the same pageid within 5 minutes of the event’s timestamp.

SELECT 
  a.userid AS userid1, 
  b.userid AS userid2, 
  a.pageid AS pgid 
FROM
  pageviews a WITH ('timestamp' = 'viewtime')
  JOIN pageviews b WITH ('timestamp' = 'viewtime')
  WITHIN 5 minutes ON a.pageid = b.pageid
WHERE a.userid != b.userid;

Temporal Join (Stream/Changelog)

A temporal join combines records coming from a Stream with a Changelog according to a join condition on records’ timestamps. A join condition is defined using the ON clause and specifies the equality of a pair of columns, one from the Stream side and one from the Changelog. Any column or field can be picked from the Stream, as long as it is defined with the NOT NULL constraint. The column from the Changelog has to be the primary column. Records’ timestamps are used to correlate records from the Stream side with the relevant version of a record from the Changelog side. For temporal joins, only INNER and LEFT join types are supported.

Examples

Join a Stream and Changelog

Given pageviews #_stream and users_log #_changelog with a PRIMARY KEY of userid with the following example records:

         pageviews                                   users_log
+----------+--------+---------+  +--------------+--------+----------+-----------------+
| viewtime | userid | pageid  |  | registertime | userid | regionid |    interests    |
+----------+--------+---------+  +--------------+--------+----------+-----------------+
|     2000 | User_2 | Page_34 |  |         6000 | User_2 | Region_1 | [News, Movies]  |
|     3000 | User_5 | Page_67 |  |        14000 | User_5 | Region_1 | [Games, Sports] |
|    10000 | User_2 | Page_90 |  |        15000 | User_2 | Region_1 | [Sports, News]  |
|    12000 | User_5 | Page_34 |  +--------------+--------+----------+-----------------+
|    13000 | User_4 | Page_89 |
|    18000 | User_2 | Page_21 |
|    23000 | User_5 | Page_76 |
|    26000 | User_2 | Page_23 |
+----------+--------+---------+

When the following SELECT statement is run:

SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p WITH ('timestamp'='viewtime')
  JOIN users_log u WITH ('timestamp'='registertime')
  ON u.userid = p.userid;

We get the following output:

Output:
+----------+--------+--------+---------+--------------+
| viewtime |  pvid  |  uid   | pageid  | top_interest |
+----------+--------+--------+---------+--------------+
|    10000 | User_2 | User_2 | Page_90 | News         |
|    18000 | User_2 | User_2 | Page_21 | Sports       |
|    23000 | User_5 | User_5 | Page_76 | Games        |
|    26000 | User_2 | User_2 | Page_23 | Sports       |
+----------+--------+--------+---------+--------------+

NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this join query. Since this is a streaming query, more records will be outputted as data arrives in the source. Notice that not every record from pageviews is joined and outputted. The reason for this is because the join type is JOIN and the join condition is that the pageviews record’s userid matches the users_log record’s userid at the event timestamp. For example, the first pageviews record has a viewtime of 2000, but there has yet to be any user_logs records with registertime <= 2000. Thus, there is no corresponding record to join with and nothing is returned. Notice the pageviews record with userid=User_2 and viewtime=10000 matches with the users_log record with userid=User_2 and registertime=6000, while the pageviews record with userid=User_2 and viewtime=18000 matches with the users_log record with userid=User_2 and registertime=15000. Think of the users_log Changelog as a constantly updating state of the world with a userid as the PRIMARY KEY. In this example, for each pageviews record, we are looking up the current information in the users_log Changelog for the corresponding userid. At timestamp 10000, User_2 has a registered top_interest as News. At timestamp 15000, User_2’s top_interest changes to Sports, so for pageviews records with userid=User_2 with a timestamp > 15000, the joined top interest will report Sports instead of News.

Temporal join with separate source properties

The following performs a temporal join between Stream pageviews and Changelog Users on equal userid values. The Stream on the left side of the join will start ingesting data starting from the epoch timestamp 1675225325286, and the right side of the join will start ingesting data from the earliest available records.

SELECT 
  p.userid AS "PvID", 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p
  WITH (
    'starting.position' = 'timestamp', 
    'starting.position.timestamp.ms' = '1675225325286',
    'timestamp' = 'viewtime'
  ) 
JOIN "Users" u WITH ( 'starting.position' = 'earliest' )
ON u.userid = p.userid;

Temporal join on columns with non-primitive data types

The following shows an example of temporal join using a join criteria defined on a pair of columns whose data type is STRUCT (i.e. non-primitive). The example includes the DDL statements for the Changelog and Stream relations, along with the temporal join's query. Note that the contactinfo column in the users Stream is defined with the NOT NULL constraint. This requirement is essential for temporal join, as both columns involved in the join criteria must not contain NULL values.

-- DDL for the Changelog
CREATE CHANGELOG users_log (
  registertime BIGINT, 
  userid VARCHAR, 
  interests ARRAY<VARCHAR>, 
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, 
  PRIMARY KEY(contactinfo)
) WITH (
  'topic'='users_log', 
  'value.format'='json');

-- DDL for the Stream
CREATE STREAM users (
  registertime BIGINT,
  userid VARCHAR, 
  interests ARRAY<VARCHAR>,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL)
WITH (
  'topic'='users', 
  'key.format'='json', 
  'key.type'='STRUCT<userid VARCHAR>', 
  'value.format'='json');
  
-- Temporal Join Query
SELECT
  l.userid AS uid, 
  l.contactinfo AS contact,
  r.interests AS hobbies
FROM users l JOIN users_log r 
ON l.contactinfo = r.contactinfo;

Last updated