- Detecting Patterns in Tables Beta
- Introduction and Examples
- Installation Guide
- SQL Semantics
- Examples
- Partitioning
- Order of Events
- Define & Measures
- Aggregations
- Defining a Pattern
- Greedy & Reluctant Quantifiers
- Time constraint
- Output Mode
- Pattern Navigation
- Pattern Variable Referencing
- Example
- Logical Offsets
- Examples
- Pattern Variable Referencing
- After Match Strategy
- Examples
- AFTER MATCH SKIP PAST LAST ROW
- AFTER MATCH SKIP TO NEXT ROW
- AFTER MATCH SKIP TO LAST A
- AFTER MATCH SKIP TO FIRST A
- Examples
- Introduction and Examples
- Time attributes
- Controlling Memory Consumption
- Known Limitations
Detecting Patterns in Tables Beta
It is a common use case to search for a set of event patterns, especially in case of data streams.Flink comes with a complex event processing (CEP) librarywhich allows for pattern detection in event streams. Furthermore, Flink’s SQL API provides arelational way of expressing queries with a large set of built-in functions and rule-basedoptimizations that can be used out of the box.
In December 2016, the International Organization for Standardization (ISO) released a new versionof the SQL standard which includes Row Pattern Recognition in SQL(ISO/IEC TR 19075-5:2016).It allows Flink to consolidate CEP and SQL API using the MATCH_RECOGNIZE clause for complex eventprocessing in SQL.
A MATCH_RECOGNIZE clause enables the following tasks:
- Logically partition and order the data that is used with the
PARTITION BYandORDER BYclauses. - Define patterns of rows to seek using the
PATTERNclause. These patterns use a syntax similar tothat of regular expressions. - The logical components of the row pattern variables are specified in the
DEFINEclause. - Define measures, which are expressions usable in other parts of the SQL query, in the
MEASURESclause.
The following example illustrates the syntax for basic pattern recognition:
SELECT T.aid, T.bid, T.cidFROM MyTableMATCH_RECOGNIZE (PARTITION BY useridORDER BY proctimeMEASURESA.id AS aid,B.id AS bid,C.id AS cidPATTERN (A B C)DEFINEA AS name = 'a',B AS name = 'b',C AS name = 'c') AS T
This page will explain each keyword in more detail and will illustrate more complex examples.
Attention Flink’s implementation of the MATCH_RECOGNIZEclause is a subset of the full standard. Only those features documented in the following sectionsare supported. Since the development is still in an early phase, please also take a look at theknown limitations.
- Introduction and Examples
- Installation Guide
- SQL Semantics
- Examples
- Partitioning
- Order of Events
- Define & Measures
- Aggregations
- Defining a Pattern
- Greedy & Reluctant Quantifiers
- Time constraint
- Output Mode
- Pattern Navigation
- Pattern Variable Referencing
- Logical Offsets
- After Match Strategy
- Time attributes
- Controlling Memory Consumption
- Known Limitations
Introduction and Examples
Installation Guide
The pattern recognition feature uses the Apache Flink’s CEP library internally. In order to be ableto use the MATCH_RECOGNIZE clause, the library needs to be added as a dependency to your Mavenproject.
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_2.11</artifactId><version>1.9.0</version></dependency>
Alternatively, you can also add the dependency to the cluster classpath (see thedependency section for more information).
If you want to use the MATCH_RECOGNIZE clause in theSQL Client, you don’t have to do anything as all thedependencies are included by default.
SQL Semantics
Every MATCH_RECOGNIZE query consists of the following clauses:
- PARTITION BY - defines the logical partitioning of the table; similar to a
GROUP BYoperation. - ORDER BY - specifies how the incoming rows should be ordered; this isessential as patterns depend on an order.
- MEASURES - defines output of the clause; similar to a
SELECTclause. - ONE ROW PER MATCH - output mode which defines how many rows per match should beproduced.
- AFTER MATCH SKIP - specifies where the next match should start; this isalso a way to control how many distinct matches a single event can belong to.
- PATTERN - allows constructing patterns that will be searched for using aregular expression-like syntax.
- DEFINE - this section defines the conditions that the pattern variables mustsatisfy.
Attention Currently, the MATCH_RECOGNIZE clause can onlybe applied to an append table. Furthermore, italways produces an append table as well.
Examples
For our examples, we assume that a table Ticker has been registered. The table contains prices ofstocks at a particular point in time.
The table has a following schema:
Ticker|-- symbol: String # symbol of the stock|-- price: Long # price of the stock|-- tax: Long # tax liability of the stock|-- rowtime: TimeIndicatorTypeInfo(rowtime) # point in time when the change to those values happened
For simplification, we only consider the incoming data for a single stock ACME. A ticker couldlook similar to the following table where rows are continuously appended.
symbol rowtime price tax====== ==================== ======= ======='ACME' '01-Apr-11 10:00:00' 12 1'ACME' '01-Apr-11 10:00:01' 17 2'ACME' '01-Apr-11 10:00:02' 19 1'ACME' '01-Apr-11 10:00:03' 21 3'ACME' '01-Apr-11 10:00:04' 25 2'ACME' '01-Apr-11 10:00:05' 18 1'ACME' '01-Apr-11 10:00:06' 15 1'ACME' '01-Apr-11 10:00:07' 14 2'ACME' '01-Apr-11 10:00:08' 24 2'ACME' '01-Apr-11 10:00:09' 25 2'ACME' '01-Apr-11 10:00:10' 19 1
The task is now to find periods of a constantly decreasing price of a single ticker. For this, onecould write a query like:
SELECT *FROM TickerMATCH_RECOGNIZE (PARTITION BY symbolORDER BY rowtimeMEASURESSTART_ROW.rowtime AS start_tstamp,LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,LAST(PRICE_UP.rowtime) AS end_tstampONE ROW PER MATCHAFTER MATCH SKIP TO LAST PRICE_UPPATTERN (START_ROW PRICE_DOWN+ PRICE_UP)DEFINEPRICE_DOWN AS(LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) ORPRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),PRICE_UP ASPRICE_UP.price > LAST(PRICE_DOWN.price, 1)) MR;
The query partitions the Ticker table by the symbol column and orders it by the rowtimetime attribute.
The PATTERN clause specifies that we are interested in a pattern with a starting event START_ROWthat is followed by one or more PRICE_DOWN events and concluded with a PRICE_UP event. If sucha pattern can be found, the next pattern match will be seeked at the last PRICE_UP event asindicated by the AFTER MATCH SKIP TO LAST clause.
The DEFINE clause specifies the conditions that need to be met for a PRICE_DOWN and PRICE_UPevent. Although the START_ROW pattern variable is not present it has an implicit condition thatis evaluated always as TRUE.
A pattern variable PRICE_DOWN is defined as a row with a price that is smaller than the price ofthe last row that met the PRICE_DOWN condition. For the initial case or when there is no last rowthat met the PRICE_DOWN condition, the price of the row should be smaller than the price of thepreceding row in the pattern (referenced by START_ROW).
A pattern variable PRICE_UP is defined as a row with a price that is larger than the price of thelast row that met the PRICE_DOWN condition.
This query produces a summary row for each period in which the price of a stock was continuouslydecreasing.
The exact representation of the output rows is defined in the MEASURES part of the query. Thenumber of output rows is defined by the ONE ROW PER MATCH output mode.
symbol start_tstamp bottom_tstamp end_tstamp========= ================== ================== ==================ACME 01-APR-11 10:00:04 01-APR-11 10:00:07 01-APR-11 10:00:08
The resulting row describes a period of falling prices that started at 01-APR-11 10:00:04 andachieved the lowest price at 01-APR-11 10:00:07 that increased again at 01-APR-11 10:00:08.
Partitioning
It is possible to look for patterns in partitioned data, e.g., trends for a single ticker or aparticular user. This can be expressed using the PARTITION BY clause. The clause is similar tousing GROUP BY for aggregations.
Attention It is highly advised to partition the incomingdata because otherwise the MATCH_RECOGNIZE clause will be translated into a non-parallel operatorto ensure global ordering.
Order of Events
Apache Flink allows for searching for patterns based on time; eitherprocessing time or event time.
In case of event time, the events are sorted before they are passed to the internal pattern statemachine. As a consequence, the produced output will be correct regardless of the order in whichrows are appended to the table. Instead, the pattern is evaluated in the order specified by thetime contained in each row.
The MATCH_RECOGNIZE clause assumes a time attribute with ascendingordering as the first argument to ORDER BY clause.
For the example Ticker table, a definition like ORDER BY rowtime ASC, price DESC is valid butORDER BY price, rowtime or ORDER BY rowtime DESC, price ASC is not.
Define & Measures
The DEFINE and MEASURES keywords have similar meanings to the WHERE and SELECT clauses in asimple SQL query.
The MEASURES clause defines what will be included in the output of a matching pattern. It canproject columns and define expressions for evaluation. The number of produced rows depends on theoutput mode setting.
The DEFINE clause specifies conditions that rows have to fulfill in order to be classified to acorresponding pattern variable. If a condition is not defined for a patternvariable, a default condition will be used which evaluates to true for every row.
For a more detailed explanation about expressions that can be used in those clauses, please have alook at the event stream navigation section.
Aggregations
Aggregations can be used in DEFINE and MEASURES clauses. Bothbuilt-in and customuser defined functions are supported.
Aggregate functions are applied to each subset of rows mapped to a match. In order to understandhow those subsets are evaluated have a look at the event stream navigationsection.
The task of the following example is to find the longest period of time for which the average priceof a ticker did not go below certain threshold. It shows how expressible MATCH_RECOGNIZE canbecome with aggregations. This task can be performed with the following query:
SELECT *FROM TickerMATCH_RECOGNIZE (PARTITION BY symbolORDER BY rowtimeMEASURESFIRST(A.rowtime) AS start_tstamp,LAST(A.rowtime) AS end_tstamp,AVG(A.price) AS avgPriceONE ROW PER MATCHAFTER MATCH SKIP TO FIRST BPATTERN (A+ B)DEFINEA AS AVG(A.price) < 15) MR;
Given this query and following input values:
symbol rowtime price tax====== ==================== ======= ======='ACME' '01-Apr-11 10:00:00' 12 1'ACME' '01-Apr-11 10:00:01' 17 2'ACME' '01-Apr-11 10:00:02' 13 1'ACME' '01-Apr-11 10:00:03' 16 3'ACME' '01-Apr-11 10:00:04' 25 2'ACME' '01-Apr-11 10:00:05' 2 1'ACME' '01-Apr-11 10:00:06' 4 1'ACME' '01-Apr-11 10:00:07' 10 2'ACME' '01-Apr-11 10:00:08' 15 2'ACME' '01-Apr-11 10:00:09' 25 2'ACME' '01-Apr-11 10:00:10' 30 1
The query will accumulate events as part of the pattern variable A as long as the average priceof them does not exceed 15. For example, such a limit exceeding happens at 01-Apr-11 10:00:04.The following period exceeds the average price of 15 again at 01-Apr-11 10:00:10. Thus theresults for said query will be:
symbol start_tstamp end_tstamp avgPrice========= ================== ================== ============ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5
Note Aggregations can be applied to expressions, but only ifthey reference a single pattern variable. Thus SUM(A.price A.tax) is a valid one, butAVG(A.price B.tax) is not.
Attention DISTINCT aggregations are not supported.
Defining a Pattern
The MATCH_RECOGNIZE clause allows users to search for patterns in event streams using a powerfuland expressive syntax that is somewhat similar to the widespread regular expression syntax.
Every pattern is constructed from basic building blocks, called pattern variables, to whichoperators (quantifiers and other modifiers) can be applied. The whole pattern must be enclosed inbrackets.
An example pattern could look like:
PATTERN (A B+ C* D)
One may use the following operators:
- Concatenation - a pattern like
(A B)means that the contiguity is strict betweenAandB.Therefore, there can be no rows that were not mapped toAorBin between. - Quantifiers - modify the number of rows that can be mapped to the pattern variable.
*— 0 or more rows+— 1 or more rows?— 0 or 1 rows{ n }— exactly n rows (n > 0){ n, }— n or more rows (n ≥ 0){ n, m }— between n and m (inclusive) rows (0 ≤ n ≤ m, 0 < m){ , m }— between 0 and m (inclusive) rows (m > 0)
Attention Patterns that can potentially produce an emptymatch are not supported. Examples of such patterns are PATTERN (A), PATTERN (A? B),PATTERN (A{0,} B{0,} C*), etc.
Greedy & Reluctant Quantifiers
Each quantifier can be either greedy (default behavior) or reluctant. Greedy quantifiers try tomatch as many rows as possible while reluctant quantifiers try to match as few as possible.
In order to illustrate the difference, one can view the following example with a query where agreedy quantifier is applied to the B variable:
SELECT *FROM TickerMATCH_RECOGNIZE(PARTITION BY symbolORDER BY rowtimeMEASURESC.price AS lastPriceONE ROW PER MATCHAFTER MATCH SKIP PAST LAST ROWPATTERN (A B* C)DEFINEA AS A.price > 10,B AS B.price < 15,C AS C.price > 12)
Given we have the following input:
symbol tax price rowtime======= ===== ======== =====================XYZ 1 10 2018-09-17 10:00:02XYZ 2 11 2018-09-17 10:00:03XYZ 1 12 2018-09-17 10:00:04XYZ 2 13 2018-09-17 10:00:05XYZ 1 14 2018-09-17 10:00:06XYZ 2 16 2018-09-17 10:00:07
The pattern above will produce the following output:
symbol lastPrice======== ===========XYZ 16
The same query where B is modified to B?, which means that B* should be reluctant, willproduce:
symbol lastPrice======== ===========XYZ 13XYZ 16
The pattern variable B matches only to the row with price 12 instead of swallowing the rowswith prices 12, 13, and 14.
Attention It is not possible to use a greedy quantifier forthe last variable of a pattern. Thus, a pattern like (A B*) is not allowed. This can be easilyworked around by introducing an artificial state (e.g. C) that has a negated condition of B. Soyou could use a query like:
PATTERN (A B* C)DEFINEA AS condA(),B AS condB(),C AS NOT condB()
Attention The optional reluctant quantifier (A?? orA{0,1}?) is not supported right now.
Time constraint
Especially for streaming use cases, it is often required that a pattern finishes within a givenperiod of time. This allows for limiting the overall state size that Flink has to maintaininternally, even in case of greedy quantifiers.
Therefore, Flink SQL supports the additional (non-standard SQL) WITHIN clause for defining a timeconstraint for a pattern. The clause can be defined after the PATTERN clause and takes aninterval of millisecond resolution.
If the time between the first and last event of a potential match is longer than the given value,such a match will not be appended to the result table.
Note It is generally encouraged to use the WITHIN clause asit helps Flink with efficient memory management. Underlying state can be pruned once the thresholdis reached.
Attention However, the WITHIN clause is not part of theSQL standard. The recommended way of dealing with time constraints might change in the future.
The use of the WITHIN clause is illustrated in the following example query:
SELECT *FROM TickerMATCH_RECOGNIZE(PARTITION BY symbolORDER BY rowtimeMEASURESC.rowtime AS dropTime,A.price - C.price AS dropDiffONE ROW PER MATCHAFTER MATCH SKIP PAST LAST ROWPATTERN (A B* C) WITHIN INTERVAL '1' HOURDEFINEB AS B.price > A.price - 10C AS C.price < A.price - 10)
The query detects a price drop of 10 that happens within an interval of 1 hour.
Let’s assume the query is used to analyze the following ticker data:
symbol rowtime price tax====== ==================== ======= ======='ACME' '01-Apr-11 10:00:00' 20 1'ACME' '01-Apr-11 10:20:00' 17 2'ACME' '01-Apr-11 10:40:00' 18 1'ACME' '01-Apr-11 11:00:00' 11 3'ACME' '01-Apr-11 11:20:00' 14 2'ACME' '01-Apr-11 11:40:00' 9 1'ACME' '01-Apr-11 12:00:00' 15 1'ACME' '01-Apr-11 12:20:00' 14 2'ACME' '01-Apr-11 12:40:00' 24 2'ACME' '01-Apr-11 13:00:00' 1 2'ACME' '01-Apr-11 13:20:00' 19 1
The query will produce the following results:
symbol dropTime dropDiff====== ==================== ============='ACME' '01-Apr-11 13:00:00' 14
The resulting row represents a price drop from 15 (at 01-Apr-11 12:00:00) to 1 (at01-Apr-11 13:00:00). The dropDiff column contains the price difference.
Notice that even though prices also drop by higher values, for example, by 11 (between01-Apr-11 10:00:00 and 01-Apr-11 11:40:00), the time difference between those two events islarger than 1 hour. Thus, they don’t produce a match.
Output Mode
The output mode describes how many rows should be emitted for every found match. The SQL standarddescribes two modes:
ALL ROWS PER MATCHONE ROW PER MATCH.
Currently, the only supported output mode is ONE ROW PER MATCH that will always produce oneoutput summary row for each found match.
The schema of the output row will be a concatenation of[partitioning columns] + [measures columns] in that particular order.
The following example shows the output of a query defined as:
SELECT *FROM TickerMATCH_RECOGNIZE(PARTITION BY symbolORDER BY rowtimeMEASURESFIRST(A.price) AS startPrice,LAST(A.price) AS topPrice,B.price AS lastPriceONE ROW PER MATCHPATTERN (A+ B)DEFINEA AS LAST(A.price, 1) IS NULL OR A.price > LAST(A.price, 1),B AS B.price < LAST(A.price))
For the following input rows:
symbol tax price rowtime======== ===== ======== =====================XYZ 1 10 2018-09-17 10:00:02XYZ 2 12 2018-09-17 10:00:03XYZ 1 13 2018-09-17 10:00:04XYZ 2 11 2018-09-17 10:00:05
The query will produce the following output:
symbol startPrice topPrice lastPrice======== ============ ========== ===========XYZ 10 13 11
The pattern recognition is partitioned by the symbol column. Even though not explicitly mentionedin the MEASURES clause, the partitioned column is added at the beginning of the result.
Pattern Navigation
The DEFINE and MEASURES clauses allow for navigating within the list of rows that (potentially)match a pattern.
This section discusses this navigation for declaring conditions or producing output results.
Pattern Variable Referencing
A pattern variable reference allows a set of rows mapped to a particular pattern variable in theDEFINE or MEASURES clauses to be referenced.
For example, the expression A.price describes a set of rows mapped so far to A plus the currentrow if we try to match the current row to A. If an expression in the DEFINE/MEASURES clauserequires a single row (e.g. A.price or A.price > 10), it selects the last value belonging tothe corresponding set.
If no pattern variable is specified (e.g. SUM(price)), an expression references the defaultpattern variable * which references all variables in the pattern. In other words, it creates alist of all the rows mapped so far to any variable plus the current row.
Example
For a more thorough example, one can take a look at the following pattern and correspondingconditions:
PATTERN (A B+)DEFINEA AS A.price > 10,B AS B.price > A.price AND SUM(price) < 100 AND SUM(B.price) < 80
The following table describes how those conditions are evaluated for each incoming event.
The table consists of the following columns:
#- the row identifier that uniquely identifies an incoming row in the lists[A.price]/[B.price]/[price].price- the price of the incoming row.[A.price]/[B.price]/[price]- describe lists of rows which are used in theDEFINEclause to evaluate conditions.Classifier- the classifier of the current row which indicates the pattern variable the rowis mapped to.A.price/B.price/SUM(price)/SUM(B.price)- describes the result after those expressionshave been evaluated.
| # | price | Classifier | [A.price] | [B.price] | [price] | A.price | B.price | SUM(price) | SUM(B.price) |
|---|---|---|---|---|---|---|---|---|---|
| #1 | 10 | -> A | #1 | - | - | 10 | - | - | - |
| #2 | 15 | -> B | #1 | #2 | #1, #2 | 10 | 15 | 25 | 15 |
| #3 | 20 | -> B | #1 | #2, #3 | #1, #2, #3 | 10 | 20 | 45 | 35 |
| #4 | 31 | -> B | #1 | #2, #3, #4 | #1, #2, #3, #4 | 10 | 31 | 76 | 66 |
| #5 | 35 | #1 | #2, #3, #4, #5 | #1, #2, #3, #4, #5 | 10 | 35 | 111 | 101 |
As can be seen in the table, the first row is mapped to pattern variable A and subsequent rowsare mapped to pattern variable B. However, the last row does not fulfill the B conditionbecause the sum over all mapped rows SUM(price) and the sum over all rows in B exceed thespecified thresholds.
Logical Offsets
Logical offsets enable navigation within the events that were mapped to a particular patternvariable. This can be expressed with two corresponding functions:
| Offset functions | Description |
|---|---|
| Returns the value of the field from the event that was mapped to the n-th last element of the variable. The counting starts at the last element mapped. |
| Returns the value of the field from the event that was mapped to the n-th element of the variable. The counting starts at the first element mapped. |
Examples
For a more thorough example, one can take a look at the following pattern and correspondingconditions:
PATTERN (A B+)DEFINEA AS A.price > 10,B AS (LAST(B.price, 1) IS NULL OR B.price > LAST(B.price, 1)) AND(LAST(B.price, 2) IS NULL OR B.price > 2 * LAST(B.price, 2))
The following table describes how those conditions are evaluated for each incoming event.
The table consists of the following columns:
price- the price of the incoming row.Classifier- the classifier of the current row which indicates the pattern variable the rowis mapped to.LAST(B.price, 1)/LAST(B.price, 2)- describes the result after those expressions have beenevaluated.
| price | Classifier | LAST(B.price, 1) | LAST(B.price, 2) | Comment |
|---|---|---|---|---|
| 10 | -> A | |||
| 15 | -> B | null | null | Notice that LAST(A.price, 1) is null because there is still nothing mapped to B. |
| 20 | -> B | 15 | null | |
| 31 | -> B | 20 | 15 | |
| 35 | 31 | 20 | Not mapped because 35 < 2 * 20. |
It might also make sense to use the default pattern variable with logical offsets.
In this case, an offset considers all the rows mapped so far:
PATTERN (A B? C)DEFINEB AS B.price < 20,C AS LAST(price, 1) < C.price
| price | Classifier | LAST(price, 1) | Comment |
|---|---|---|---|
| 10 | -> A | ||
| 15 | -> B | ||
| 20 | -> C | 15 | LAST(price, 1) is evaluated as the price of the row mapped to the B variable. |
If the second row did not map to the B variable, we would have the following results:
| price | Classifier | LAST(price, 1) | Comment |
|---|---|---|---|
| 10 | -> A | ||
| 20 | -> C | 10 | LAST(price, 1) is evaluated as the price of the row mapped to the A variable. |
It is also possible to use multiple pattern variable references in the first argument of theFIRST/LAST functions. This way, one can write an expression that accesses multiple columns.However, all of them must use the same pattern variable. In other words, the value of theLAST/FIRST function must be computed in a single row.
Thus, it is possible to use LAST(A.price A.tax), but an expression like LAST(A.price B.tax)is not allowed.
After Match Strategy
The AFTER MATCH SKIP clause specifies where to start a new matching procedure after a completematch was found.
There are four different strategies:
SKIP PAST LAST ROW- resumes the pattern matching at the next row after the last row of thecurrent match.SKIP TO NEXT ROW- continues searching for a new match starting at the next row after thestarting row of the match.SKIP TO LAST variable- resumes the pattern matching at the last row that is mapped to thespecified pattern variable.SKIP TO FIRST variable- resumes the pattern matching at the first row that is mapped to thespecified pattern variable.
This is also a way to specify how many matches a single event can belong to. For example, with theSKIP PAST LAST ROW strategy every event can belong to at most one match.
Examples
In order to better understand the differences between those strategies one can take a look at thefollowing example.
For the following input rows:
symbol tax price rowtime======== ===== ======= =====================XYZ 1 7 2018-09-17 10:00:01XYZ 2 9 2018-09-17 10:00:02XYZ 1 10 2018-09-17 10:00:03XYZ 2 5 2018-09-17 10:00:04XYZ 2 17 2018-09-17 10:00:05XYZ 2 14 2018-09-17 10:00:06
We evaluate the following query with different strategies:
SELECT *FROM TickerMATCH_RECOGNIZE(PARTITION BY symbolORDER BY rowtimeMEASURESSUM(A.price) AS sumPrice,FIRST(rowtime) AS startTime,LAST(rowtime) AS endTimeONE ROW PER MATCH[AFTER MATCH STRATEGY]PATTERN (A+ C)DEFINEA AS SUM(A.price) < 30)
The query returns the sum of the prices of all rows mapped to A and the first and last timestampof the overall match.
The query will produce different results based on which AFTER MATCH strategy was used:
AFTER MATCH SKIP PAST LAST ROW
symbol sumPrice startTime endTime======== ========== ===================== =====================XYZ 26 2018-09-17 10:00:01 2018-09-17 10:00:04XYZ 17 2018-09-17 10:00:05 2018-09-17 10:00:06
The first result matched against the rows #1, #2, #3, #4.
The second result matched against the rows #5, #6.
AFTER MATCH SKIP TO NEXT ROW
symbol sumPrice startTime endTime======== ========== ===================== =====================XYZ 26 2018-09-17 10:00:01 2018-09-17 10:00:04XYZ 24 2018-09-17 10:00:02 2018-09-17 10:00:05XYZ 15 2018-09-17 10:00:03 2018-09-17 10:00:05XYZ 22 2018-09-17 10:00:04 2018-09-17 10:00:06XYZ 17 2018-09-17 10:00:05 2018-09-17 10:00:06
Again, the first result matched against the rows #1, #2, #3, #4.
Compared to the previous strategy, the next match includes row #2 again for the next matching.Therefore, the second result matched against the rows #2, #3, #4, #5.
The third result matched against the rows #3, #4, #5.
The forth result matched against the rows #4, #5, #6.
The last result matched against the rows #5, #6.
AFTER MATCH SKIP TO LAST A
symbol sumPrice startTime endTime======== ========== ===================== =====================XYZ 26 2018-09-17 10:00:01 2018-09-17 10:00:04XYZ 15 2018-09-17 10:00:03 2018-09-17 10:00:05XYZ 22 2018-09-17 10:00:04 2018-09-17 10:00:06XYZ 17 2018-09-17 10:00:05 2018-09-17 10:00:06
Again, the first result matched against the rows #1, #2, #3, #4.
Compared to the previous strategy, the next match includes only row #3 (mapped to A) again forthe next matching. Therefore, the second result matched against the rows #3, #4, #5.
The third result matched against the rows #4, #5, #6.
The last result matched against the rows #5, #6.
AFTER MATCH SKIP TO FIRST A
This combination will produce a runtime exception because one would always try to start a new matchwhere the last one started. This would produce an infinite loop and, thus, is prohibited.
One has to keep in mind that in case of the SKIP TO FIRST/LAST variable strategy it might bepossible that there are no rows mapped to that variable (e.g. for pattern A*). In such cases, aruntime exception will be thrown as the standard requires a valid row to continue the matching.
Time attributes
In order to apply some subsequent queries on top of the MATCH_RECOGNIZE it might be required touse time attributes. To select those there are available two functions:
| Function | Description |
|---|---|
MATCH_ROWTIME() | Returns the timestamp of the last row that was mapped to the given pattern.The resulting attribute is a rowtime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations. |
MATCH_PROCTIME() | Returns a proctime attribute that can be used in subsequent time-based operations such as time-windowed joins and group window or over window aggregations. |
Controlling Memory Consumption
Memory consumption is an important consideration when writing MATCH_RECOGNIZE queries, as thespace of potential matches is built in a breadth-first-like manner. Having that in mind, one mustmake sure that the pattern can finish. Preferably with a reasonable number of rows mapped to thematch as they have to fit into memory.
For example, the pattern must not have a quantifier without an upper limit that accepts everysingle row. Such a pattern could look like this:
PATTERN (A B+ C)DEFINEA as A.price > 10,C as C.price > 20
The query will map every incoming row to the B variable and thus will never finish. This querycould be fixed, e.g., by negating the condition for C:
PATTERN (A B+ C)DEFINEA as A.price > 10,B as B.price <= 20,C as C.price > 20
Or by using the reluctant quantifier:
PATTERN (A B+? C)DEFINEA as A.price > 10,C as C.price > 20
Attention Please note that the MATCH_RECOGNIZE clausedoes not use a configured state retention time.One may want to use the WITHIN clause for this purpose.
Known Limitations
Flink’s implementation of the MATCH_RECOGNIZE clause is an ongoing effort, and some features ofthe SQL standard are not yet supported.
Unsupported features include:
- Pattern expressions:
- Pattern groups - this means that e.g. quantifiers can not be applied to a subsequence of thepattern. Thus,
(A (B C)+)is not a valid pattern. - Alterations - patterns like
PATTERN((A B | C D) E), which means that either a subsequenceA BorC Dhas to be found before looking for theErow. PERMUTEoperator - which is equivalent to all permutations of variables that it was appliedto e.g.PATTERN (PERMUTE (A, B, C))=PATTERN (A B C | A C B | B A C | B C A | C A B | C B A).- Anchors -
^, $, which denote beginning/end of a partition, those do not make sense in thestreaming context and will not be supported. - Exclusion -
PATTERN ({- A -} B)meaning thatAwill be looked for but will not participatein the output. This works only for theALL ROWS PER MATCHmode. - Reluctant optional quantifier -
PATTERN A??only the greedy optional quantifier is supported.
- Pattern groups - this means that e.g. quantifiers can not be applied to a subsequence of thepattern. Thus,
ALL ROWS PER MATCHoutput mode - which produces an output row for every row that participated in the creation of a found match. This also means:- that the only supported semantic for the
MEASURESclause isFINAL CLASSIFIERfunction, which returns the pattern variable that a row was mapped to, is not yetsupported.
- that the only supported semantic for the
SUBSET- which allows creating logical groups of pattern variables and using those groups intheDEFINEandMEASURESclauses.- Physical offsets -
PREV/NEXT, which indexes all events seen rather than only those that weremapped to a pattern variable (as in logical offsets case). - Extracting time attributes - there is currently no possibility to get a time attribute forsubsequent time-based operations.
MATCH_RECOGNIZEis supported only for SQL. There is no equivalent in the Table API.- Aggregations:
- distinct aggregations are not supported.
