Row Metadata Functions

Description

DeltaStream’s row metadata functions are used to access extra information about a record, beside the value columns. Each record in a Relation is created according to a message, read from the Relation’s source Topic. DeltaStream extracts some information from each message and makes the below information available for each corresponding record:

  • Row timestamp: Each record has an associated timestamp that is set based on its original source message’s timestamp. The row timestamp value is of the BIGINT data type.

  • Row key: If a Row Key Definition is provided for a source #_stream or #_changelog, backed by a Kafka topic, then its records will have row keys. For a given record, row key is set according to its source Kafka message's key. The key.format and key.type parameters in the source Relation’s definition are used for this purpose. Check Row Key Definition for more details.

  • Row metadata: Each record has a number of associated metadata fields. The names and data types of metadata fields are Store dependent, and they are extracted from the original source message for the record. You can see the details about them for each supported Store type in the below table. For each record, the row metadata is available as a STRUCT instance that encapsulates all metadata fields for that record.

Store TypeRow Metadata Items

Kafka

  • topic (VARCHAR): Topic’s name for the record’s Relation.

  • partition (INTEGER) : Identifier of the Kafka partition that the record is stored in.

  • offset (BIGINT) : Offset of the record in its Kafka partition.

  • timestamp_type (VARCHAR) : Type of the record timestamp, assigned by Kafka.

Kinesis

  • stream (VARCHAR): Name of the Kinesis stream that stores the record.

  • partition_key (VARCHAR) : Kinesis partition key for the record.

  • shard_id (VARCHAR) : Identifier of the Kinesis shard that the record is stored in.

  • sequence_number (VARCHAR) : Sequence number of the record in its Kinesis shard.

There are three row metadata functions available to access the above information about a record:

FunctionDescription

rowtime()

Returns the value of the row timestamp for each record.

rowkey()

Returns the row key for the record (if a Row Key Definition is provided for the source Relation, backed by a Kafka topic). Otherwise, it returns NULL.

rowmeta()

Returns a value of theSTRUCT data type that contains the row metadata items for each record.

Row metadata functions can be used similarly to the Built-in Functions by simply calling their names. They can appear in the SELECT, WHERE, and GROUP BY clauses of a SELECT statement. A row metadata function can be called with or without an argument. Depending on the query, it is used in one of the following ways:

  • If the FROM clause of the query is referring to only one Relation, a row metadata function call does not need an argument.

  • If the FROM clause of the query is referring to more than one Relation (for example, it is a JOIN), then a row metadata function call needs the alias or name of the Relation it is referring to as its argument. This is required to resolve the ambiguity in the function call. When multiple Relations are referred in the context of a given query, the metadata information, as explained above, is available for records in each Relation. The Relation name or alias, added as the argument to a given row metadata function call, clarifies which source Relation the function call is referring to.

Examples

In the following examples, assume two streams named pageviews and users have been created using the below DDL statements, both on topics stored in Kafka stores. Note that the users definition includes a row key. Therefore, each record in the users Stream has a row key, and according to the key definition, its value is a STRUCT with a single field, called userid, and of the VARCHAR data type.

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews',
  'value.format' = 'json'
);
CREATE STREAM "users" (
  registertime BIGINT,
  userid VARCHAR,
  regionid VARCHAR,
  gender VARCHAR,
  interests ARRAY<VARCHAR>,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>
 ) WITH (
   'topic'='users',
   'value.format'='json',
   'key.type'='STRUCT<userid VARCHAR>',
   'key.format'='json'
 );

Row metadata function calls with select and filter

In the below example, the query uses the rowtime and rowmeta row metadata functions to access each record’s row timestamp and row metadata. Moreover, since row metadata is of the STRUCT data type, the query uses the -> operator to access row metadata items inside this STRUCT. Note that the row metadata function calls in the SELECT clause are used to extract the extra information from each record and add them as value columns to the query’s result. The row metadata function call in the WHERE clause is used to apply filtering on the input records based on their partition values.

SELECT userid,
       rowtime() AS row_ts,
       rowmeta()->partition AS row_partition,
       rowmeta()->offset AS row_offset
FROM pageviews
WHERE rowmeta()->partition = 0;

Row metadata function call to access the row key

The users Stream has a row key (see the DDL statement above). Therefore, the below query uses the rowkey row metadata function to extract the value of this key in each record. Given that the row key for users is a STRUCT with one field called userid, the rowkey()->userid expression in the SELECT clause is used to extract the value of this field inside the key’s STRUCT. Moreover, since the rowmeta row function returns a STRUCT, the data type of the meta column in the query’s result is a STRUCT that contains all the row metadata items, extracted from the source Relation, for each record.

SELECT contactinfo->phone AS phone,
       rowtime() AS row_ts,
       rowkey()->userid AS uid,
       rowmeta() as meta
FROM "users";

Row metadata function call with grouping

The below query creates a new #_changelog called userlogs by running GROUP BY and aggregation on the users Stream using a hopping window. Note that the row metadata function calls are used in both SELECT and GROUP BY clauses. In the SELECT clause, the query counts the number of rows’ offsets in each group, while each group is formed according to a hopping window’s start and end times along with the value of the userid field, extracted from the row key.

CREATE CHANGELOG userslogs AS
SELECT window_start,
       window_end,
       rowkey()->userid AS uid,
       count(rowmeta()->offset) AS offset_cnt,
       count(contactinfo->zipcode) AS zip_cnt
FROM HOP(users, size 6 second, advance by 3 seconds)
GROUP BY window_start, window_end, rowkey()->userid;

Row metadata function call with join

The below query runs an interval join between the pageviews and users streams. It uses row metadata function calls to access the row timestamp, row key, and row metadata in the records from both streams. Given that two Relations are referred in this query, each row metadata function call requires the name or alias of the Relation it is referring to as its argument. Note that the argument value can be the alias that is defined for that Relation in the FROM clause (p for pageviews and u for users in this query), or it can be the name of the Relation. A Relation name can be specified alone (e.g. pageviews), or it can be specified as a fully or partially qualified name via specifying the database_name and/or schema_name in the format [<database_name>.<schema_name>.]<relation_name> (e.g. db1.public.pageviews).

SELECT p.userid AS pid, 
       rowkey(u)->userid AS u_key_uid,
       rowtime(p) AS pv_time,
       rowtime(u) AS u_time,
       rowmeta(pageviews)->offset AS p_offset,
       rowmeta(users)->offset AS u_offset
FROM pageviews p JOIN users u
WITHIN 1 minute
ON u.userid = p.userid;

Last updated