Skip to content

Commit

Permalink
feat:Windowed Top N (#46)
Browse files Browse the repository at this point in the history
* resolving conflict

* Add 'windowed-top-N/flinksql' module with dependencies

This commit introduces the 'windowed-top-N/flinksql' module, which includes its build.gradle with appropriate Java version and test implementation dependencies. A SQL template for creating movie views is also included. Lastly, the 'windowed-top-N:flinksql' module was added to general settings.gradle.

* Apply suggestions from code review

Co-authored-by: Dave Troiano <[email protected]>

---------

Co-authored-by: Dave Troiano <[email protected]>
  • Loading branch information
bbejeck and davetroiano authored Apr 25, 2024
1 parent dc3de57 commit 2e56b1c
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 0 deletions.
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ include 'tumbling-windows:kstreams'
include 'udf:ksql'
include 'versioned-ktables:kstreams'
include 'window-final-result:kstreams'
include 'windowed-top-N:flinksql'
261 changes: 261 additions & 0 deletions windowed-top-N/flinksql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
# Windowed Top-N in Flink SQL

The Top-N functionality in Flink SQL is excellent for tracking the top (or bottom) records in an event stream. But what if you wanted the top records within distinct time ranges? For example, consider you work for a video streaming service like Netflix or Hulu. You need to see the top genre of movies subscribers watch by the hour to make more accurate recommendations. To do this ranking by hour, you can use a [Windowed Top-N query](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/window-topn/) and [windowing table-valued functions](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/window-tvf/).

## Setup

Let's assume the following DDL for our base `movie_views` table:

```sql
TABLE movie_views (
id INT,
title STRING,
genre STRING,
movie_start TIMESTAMP(3),
WATERMARK FOR movie_start as movie_start
)
```

## Compute the Windowed Top-N

Given the `movie_views` table definition above, we can retrieve the top genre by hour with this query.

```sql
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY category_count DESC ) as hour_rank
FROM (
SELECT window_start, window_end, genre, COUNT(*) as category_count
FROM TABLE(TUMBLE(TABLE movie_views, DESCRIPTOR(movie_start), INTERVAL '1' HOUR))
GROUP BY window_start, window_end, genre
)
) WHERE hour_rank = 1 ;
```

There are a few moving parts to this query, so let's break it down starting from the inside and working our way out.

The innermost query is a [TUMBLE windowing tvf](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/window-tvf/#tumble) that selects the window start, window end, genre and a count of genre for each movie started in a 1-hour tumbling window.

```sql
SELECT window_start, window_end, genre, COUNT(*) as category_count
FROM TABLE(TUMBLE(TABLE movie_views, DESCRIPTOR(movie_start), INTERVAL '1' HOUR))
GROUP BY window_start, window_end, genre
```

Working our way out to the next query, it selects all results from the tumbling window query. It performs an [over aggregation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/over-agg/) partitioning results by the window start and window end and ordering them (descending) by the count. This query gives us the rank of movies by genre started each hour.

```sql
SELECT *, SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY category_count DESC )
FROM (
....
)
```

The outermost query selects all results from the `OVER` aggregation where the `hour_rank` column equals 1, indicating it was the top genre of the movie that started in that hour.

```sql
SELECT *
FROM ( .... )
WHERE hour_rank = 1 ;
```

Here are some essential concepts used to calculate the windowed Top-N results

1. `ROW_NUMBER()`, starting at one, assigns a unique, sequential number to each row representing its place in the result set, which we've labeled `hour_rank.`
2. `PARTITION BY` specifies how to partition the data. Using a partition of window starting and ending, you'll rank the movie genres in each 1-hour tumbling window.
3. `ORDER BY` orders results by the number calculated by the `ROW_NUMBER()` function which is its position in the window.

## Running the example

You can run the example backing this tutorial in one of three ways: a Flink Table API-based JUnit test, locally with the Flink SQL Client
against Flink and Kafka running in Docker, or with Confluent Cloud.

<details>
<summary>Flink Table API-based test</summary>

#### Prerequisites

* Java 17, e.g., follow the OpenJDK installation instructions [here](https://openjdk.org/install/) if you don't have Java.
* Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/)

#### Run the test

Run the following command to execute [FlinkSqlTopNTest#testTopN](src/test/java/io/confluent/developer/FlinkSqlTopNTest.java):

```plaintext
./gradlew clean :windowed-top-N:flinksql:test
```

The test starts Kafka and Schema Registry with [Testcontainers](https://testcontainers.com/), runs the Flink SQL commands
above against a local Flink `StreamExecutionEnvironment`, and ensures that the aggregation results are what we expect.
</details>

<details>
<summary>Flink SQL Client CLI</summary>

#### Prerequisites

* Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/)
* [Docker Compose](https://docs.docker.com/compose/install/). Ensure that the command `docker compose version` succeeds.

#### Run the commands

First, start Flink and Kafka:

```shell
docker compose -f ./docker/docker-compose-flinksql.yml up -d
```

Next, open the Flink SQL Client CLI:

```shell
docker exec -it flink-sql-client sql-client.sh
```

Finally, run following SQL statements to create the `movie_views` table backed by Kafka running in Docker, populate it with
test data, and run the Top-N query.

```sql
CREATE TABLE movie_views (
id INT,
title STRING,
genre STRING,
movie_start TIMESTAMP(3),
WATERMARK FOR movie_start as movie_start
) WITH (
'connector' = 'kafka',
'topic' = 'movie_views',
'properties.bootstrap.servers' = 'broker:9092',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
);

```

```sql
INSERT INTO movie_views (id, title, genre, movie_start)
VALUES (123, 'The Dark Knight', 'Action', TO_TIMESTAMP('2024-04-23 19:04:00')),
(456, 'Avengers: Endgame', 'Action', TO_TIMESTAMP('2024-04-23 22:01:00')),
(789, 'Inception', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:24:00')),
(147, 'Joker', 'Drama', TO_TIMESTAMP('2024-04-23 22:56:00')),
(258, 'The Godfather', 'Crime', TO_TIMESTAMP('2024-04-23 19:13:00')),
(369, 'Casablanca', 'Romance', TO_TIMESTAMP('2024-04-23 20:26:00')),
(321, 'The Shawshank Redemption', 'Drama', TO_TIMESTAMP('2024-04-23 20:20:00')),
(654, 'Forrest Gump', 'Drama', TO_TIMESTAMP('2024-04-23 21:54:00')),
(987, 'Fight Club', 'Drama', TO_TIMESTAMP('2024-04-23 23:24:00')),
(135, 'Pulp Fiction', 'Crime', TO_TIMESTAMP('2024-04-23 22:09:00')),
(246, 'The Godfather: Part II', 'Crime', TO_TIMESTAMP('2024-04-23 19:28:00')),
(357, 'The Departed', 'Crime', TO_TIMESTAMP('2024-04-23 23:11:00')),
(842, 'Toy Story 3', 'Animation', TO_TIMESTAMP('2024-04-23 23:12:00')),
(931, 'Up', 'Animation', TO_TIMESTAMP('2024-04-23 22:17:00')),
(624, 'The Lion King', 'Animation', TO_TIMESTAMP('2024-04-23 22:28:00')),
(512, 'Star Wars: The Force Awakens', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:42:00')),
(678, 'The Matrix', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 19:25:00')),
(753, 'Interstellar', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:14:00')),
(834, 'Titanic', 'Romance', TO_TIMESTAMP('2024-04-23 20:25:00')),
(675, 'Pride and Prejudice', 'Romance', TO_TIMESTAMP('2024-04-23 23:37:00')),
(333, 'The Pride of Archbishop Carroll', 'History', TO_TIMESTAMP('2024-04-24 03:37:00'));
```

```sql
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY category_count DESC ) as hour_rank
FROM (
SELECT window_start, window_end, genre, COUNT(*) as category_count
FROM TABLE(TUMBLE(TABLE movie_views, DESCRIPTOR(movie_start), INTERVAL '1' HOUR))
GROUP BY window_start, window_end, genre
)
) WHERE hour_rank = 1 ;
```

The query output should look like this:

```plaintext
window_start window_end genre category_count hour_rank
2024-04-23 19:00:00 2024-04-23 20:00:00 Crime 2 1
2024-04-23 20:00:00 2024-04-23 21:00:00 Sci-Fi 3 1
2024-04-23 21:00:00 2024-04-23 22:00:00 Drama 1 1
2024-04-23 22:00:00 2024-04-23 23:00:00 Animation 2 1
2024-04-23 23:00:00 2024-04-24 00:00:00 Animation 1 1
```

When you are finished, clean up the containers used for this tutorial by running:

```shell
docker compose -f ./docker/docker-compose-flinksql.yml down
```

</details>

<details>
<summary>Confluent Cloud</summary>

#### Prerequisites

* A [Confluent Cloud](https://confluent.cloud/signup) account
* A Flink compute pool created in Confluent Cloud. Follow [this](https://docs.confluent.io/cloud/current/flink/get-started/quick-start-cloud-console.html) quick start to create one.

#### Run the commands

In the Confluent Cloud Console, navigate to your environment and then click the `Open SQL Workspace` button for the compute
pool that you have created.

Select the default catalog (Confluent Cloud environment) and database (Kafka cluster) to use with the dropdowns at the top right.

Finally, run following SQL statements to create the `movie_views` table, populate it with test data, and run the windowed Top-N query.

```sql
CREATE TABLE movie_views (
id INT,
title STRING,
genre STRING,
movie_start TIMESTAMP(3),
WATERMARK FOR movie_start as movie_start
)
```

```sql
INSERT INTO movie_views (id, title, genre, movie_start)
VALUES (123, 'The Dark Knight', 'Action', TO_TIMESTAMP('2024-04-23 19:04:00')),
(456, 'Avengers: Endgame', 'Action', TO_TIMESTAMP('2024-04-23 22:01:00')),
(789, 'Inception', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:24:00')),
(147, 'Joker', 'Drama', TO_TIMESTAMP('2024-04-23 22:56:00')),
(258, 'The Godfather', 'Crime', TO_TIMESTAMP('2024-04-23 19:13:00')),
(369, 'Casablanca', 'Romance', TO_TIMESTAMP('2024-04-23 20:26:00')),
(321, 'The Shawshank Redemption', 'Drama', TO_TIMESTAMP('2024-04-23 20:20:00')),
(654, 'Forrest Gump', 'Drama', TO_TIMESTAMP('2024-04-23 21:54:00')),
(987, 'Fight Club', 'Drama', TO_TIMESTAMP('2024-04-23 23:24:00')),
(135, 'Pulp Fiction', 'Crime', TO_TIMESTAMP('2024-04-23 22:09:00')),
(246, 'The Godfather: Part II', 'Crime', TO_TIMESTAMP('2024-04-23 19:28:00')),
(357, 'The Departed', 'Crime', TO_TIMESTAMP('2024-04-23 23:11:00')),
(842, 'Toy Story 3', 'Animation', TO_TIMESTAMP('2024-04-23 23:12:00')),
(931, 'Up', 'Animation', TO_TIMESTAMP('2024-04-23 22:17:00')),
(624, 'The Lion King', 'Animation', TO_TIMESTAMP('2024-04-23 22:28:00')),
(512, 'Star Wars: The Force Awakens', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:42:00')),
(678, 'The Matrix', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 19:25:00')),
(753, 'Interstellar', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:14:00')),
(834, 'Titanic', 'Romance', TO_TIMESTAMP('2024-04-23 20:25:00')),
(675, 'Pride and Prejudice', 'Romance', TO_TIMESTAMP('2024-04-23 23:37:00')),
(333, 'The Pride of Archbishop Carroll', 'History', TO_TIMESTAMP('2024-04-24 03:37:00'));
```

```sql
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY category_count DESC ) as hour_rank
FROM (
SELECT window_start, window_end, genre, COUNT(*) as category_count
FROM TABLE(TUMBLE(TABLE movie_views, DESCRIPTOR(movie_start), INTERVAL '1' HOUR))
GROUP BY window_start, window_end, genre
)
) WHERE hour_rank = 1 ;

```

The query output should look like this:

![](img/query-output.png)
39 changes: 39 additions & 0 deletions windowed-top-N/flinksql/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
buildscript {
repositories {
mavenCentral()
}
}

plugins {
id 'java'
id 'idea'
}

java {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
version = "0.0.1"

repositories {
mavenCentral()
}

dependencies {
testImplementation project(path: ':common', configuration: 'testArtifacts')
testImplementation 'com.google.guava:guava:31.1-jre'
testImplementation 'junit:junit:4.13.2'
testImplementation 'org.testcontainers:testcontainers:1.19.3'
testImplementation 'org.testcontainers:kafka:1.19.3'
testImplementation 'org.apache.flink:flink-sql-connector-kafka:3.0.2-1.18'
testImplementation 'org.apache.flink:flink-connector-base:1.18.0'
testImplementation 'org.apache.flink:flink-sql-avro-confluent-registry:1.18.0'
testImplementation 'org.apache.flink:flink-test-utils:1.18.0'
testImplementation 'org.apache.flink:flink-test-utils-junit:1.18.0'
testImplementation 'org.apache.flink:flink-table-api-java-bridge:1.18.0'
testImplementation 'org.apache.flink:flink-table-planner_2.12:1.18.0'
testImplementation 'org.apache.flink:flink-table-planner_2.12:1.18.0:tests'
testImplementation 'org.apache.flink:flink-statebackend-rocksdb:1.18.0'
testImplementation 'org.apache.flink:flink-json:1.19.0'

}
Binary file added windowed-top-N/flinksql/img/query-output.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions windowed-top-N/flinksql/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
rootProject.name = 'windowed-top-N'
include ':common'
project(':common').projectDir = file('../../common')
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.confluent.developer;


import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static io.confluent.developer.TestUtils.rowObjectsFromTableResult;
import static io.confluent.developer.TestUtils.yyyy_MM_dd;
import static org.junit.Assert.assertEquals;

public class FlinkSqlWindowedTopNTest extends AbstractFlinkKafkaTest {

@Test
public void testWindowedTopN() throws Exception {

streamTableEnv.getConfig().set("table.exec.source.idle-timeout", "5 ms");
streamTableEnv.executeSql(getResourceFileContents("create-movie-views.sql.template",
Optional.of(kafkaPort), Optional.of(schemaRegistryPort))).await();

streamTableEnv.executeSql(getResourceFileContents("populate-movie-starts.sql")).await();

TableResult tableResult = streamTableEnv.executeSql(getResourceFileContents("query-movie-starts-for-top-categories-per-hour-dynamic.sql"));

List<Row> actualResults = rowObjectsFromTableResult(tableResult);
List<Row> expectedResults = getExpectedFinalUpdateRowObjects();
assertEquals(expectedResults, actualResults);
}

private List<Row> getExpectedFinalUpdateRowObjects() {
List<Row> rowList = new ArrayList<>();
rowList.add(Row.ofKind(RowKind.INSERT, yyyy_MM_dd("2024-04-23 19:00:00"), yyyy_MM_dd("2024-04-23 20:00:00"), "Crime", 2L, 1L));
rowList.add(Row.ofKind(RowKind.INSERT, yyyy_MM_dd("2024-04-23 20:00:00"), yyyy_MM_dd("2024-04-23 21:00:00"), "Sci-Fi", 3L, 1L));
rowList.add(Row.ofKind(RowKind.INSERT, yyyy_MM_dd("2024-04-23 21:00:00"), yyyy_MM_dd("2024-04-23 22:00:00"), "Drama", 1L, 1L));
rowList.add(Row.ofKind(RowKind.INSERT, yyyy_MM_dd("2024-04-23 22:00:00"), yyyy_MM_dd("2024-04-23 23:00:00"), "Animation", 2L, 1L));
rowList.add(Row.ofKind(RowKind.INSERT, yyyy_MM_dd("2024-04-23 23:00:00"), yyyy_MM_dd("2024-04-24 00:00:00"), "Animation", 1L, 1L));
return rowList;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TABLE movie_views (
id INT,
title STRING,
genre STRING,
movie_start TIMESTAMP(3),
WATERMARK FOR movie_start as movie_start
) WITH (
'connector' = 'kafka',
'topic' = 'movie_views',
'properties.bootstrap.servers' = 'localhost:KAFKA_PORT',
'scan.startup.mode' = 'earliest-offset',
'scan.bounded.mode' = 'latest-offset',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
INSERT INTO movie_views (id, title, genre, movie_start)
VALUES (123, 'The Dark Knight', 'Action', TO_TIMESTAMP('2024-04-23 19:04:00')),
(456, 'Avengers: Endgame', 'Action', TO_TIMESTAMP('2024-04-23 22:01:00')),
(789, 'Inception', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:24:00')),
(147, 'Joker', 'Drama', TO_TIMESTAMP('2024-04-23 22:56:00')),
(258, 'The Godfather', 'Crime', TO_TIMESTAMP('2024-04-23 19:13:00')),
(369, 'Casablanca', 'Romance', TO_TIMESTAMP('2024-04-23 20:26:00')),
(321, 'The Shawshank Redemption', 'Drama', TO_TIMESTAMP('2024-04-23 20:20:00')),
(654, 'Forrest Gump', 'Drama', TO_TIMESTAMP('2024-04-23 21:54:00')),
(987, 'Fight Club', 'Drama', TO_TIMESTAMP('2024-04-23 23:24:00')),
(135, 'Pulp Fiction', 'Crime', TO_TIMESTAMP('2024-04-23 22:09:00')),
(246, 'The Godfather: Part II', 'Crime', TO_TIMESTAMP('2024-04-23 19:28:00')),
(357, 'The Departed', 'Crime', TO_TIMESTAMP('2024-04-23 23:11:00')),
(842, 'Toy Story 3', 'Animation', TO_TIMESTAMP('2024-04-23 23:12:00')),
(931, 'Up', 'Animation', TO_TIMESTAMP('2024-04-23 22:17:00')),
(624, 'The Lion King', 'Animation', TO_TIMESTAMP('2024-04-23 22:28:00')),
(512, 'Star Wars: The Force Awakens', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:42:00')),
(678, 'The Matrix', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 19:25:00')),
(753, 'Interstellar', 'Sci-Fi', TO_TIMESTAMP('2024-04-23 20:14:00')),
(834, 'Titanic', 'Romance', TO_TIMESTAMP('2024-04-23 20:25:00')),
(675, 'Pride and Prejudice', 'Romance', TO_TIMESTAMP('2024-04-23 23:37:00'));
Loading

0 comments on commit 2e56b1c

Please sign in to comment.