Skip to content

Commit

Permalink
Added ksqlDB queries
Browse files Browse the repository at this point in the history
  • Loading branch information
ifnesi committed Apr 14, 2023
1 parent 7c7a649 commit 7bedf05
Showing 1 changed file with 88 additions and 1 deletion.
89 changes: 88 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,95 @@ Detailed view of all microservices and to what Kafka topics their produce and ar
Confluent Cloud Stream Lineage view:
![image](static/images/docs/cc-stream-lineage.png)

### ksqlDB queries
#### Collections
Streams and tables are the two primary abstractions, they are referred to as collections. There are two ways of creating collections in ksqlDB:
- directly from Kafka topics (source collections)
- derived from other streams and tables (derived collections)

**Source Collections**: The topics produced/consumed by the microservices need to be ingested by ksqlDB so they can be stream processed:
```
CREATE STREAM IF NOT EXISTS PIZZA_ORDERED (
order_id VARCHAR KEY,
status INT,
timestamp BIGINT,
order STRUCT<
extra_toppings ARRAY<STRING>,
username STRING,
customer_id STRING,
sauce STRING,
cheese STRING,
main_topping STRING
>
) WITH (
KAFKA_TOPIC = 'pizza-ordered',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp'
);
CREATE STREAM IF NOT EXISTS PIZZA_ASSEMBLED (
order_id VARCHAR KEY,
status INT,
baking_time INT,
timestamp BIGINT
) WITH (
KAFKA_TOPIC = 'pizza-assembled',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp'
);
CREATE STREAM IF NOT EXISTS PIZZA_BAKED (
order_id VARCHAR KEY,
status INT,
timestamp BIGINT
) WITH (
KAFKA_TOPIC = 'pizza-baked',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp'
);
STREAM_DELIVERED: f"""CREATE STREAM IF NOT EXISTS PIZZA_DELIVERED (
order_id VARCHAR KEY,
status INT,
timestamp BIGINT
) WITH (
KAFKA_TOPIC = 'pizza-delivered',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp'
);
CREATE STREAM IF NOT EXISTS PIZZA_PENDING (
order_id VARCHAR KEY,
status INT,
timestamp BIGINT
) WITH (
KAFKA_TOPIC = 'pizza-pending',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'timestamp'
);
CREATE STREAM IF NOT EXISTS PIZZA_STATUS (
order_id VARCHAR KEY,
status INT,
timestamp BIGINT
) WITH (
KAFKA_TOPIC='pizza-status',
VALUE_FORMAT='JSON',
TIMESTAMP='timestamp'
);
```

**Derived Collections**: With the source collections created (streams) we can now extract the status field of each event and have them merged into a single topic/stream by creating persistent queries:
```
INSERT INTO PIZZA_STATUS SELECT order_id, status, timestamp FROM PIZZA_ORDERED EMIT CHANGES;
INSERT INTO PIZZA_STATUS SELECT order_id, status, timestamp FROM PIZZA_ASSEMBLED EMIT CHANGES;
INSERT INTO PIZZA_STATUS SELECT order_id, status, timestamp FROM PIZZA_BAKED EMIT CHANGES;
INSERT INTO PIZZA_STATUS SELECT order_id, status, timestamp FROM PIZZA_DELIVERED EMIT CHANGES;
INSERT INTO PIZZA_STATUS SELECT order_id, status, timestamp FROM PIZZA_PENDING EMIT CHANGES;
```

### Start now!
You can setup your own environment to do the tests, or go straight to the public demo by clicking <a href="http://confluent-pizza-demo-1570877491.eu-west-1.elb.amazonaws.com" title="Start demo" target="_blank">here</a>.
You can setup your own environment to do the tests, or go straight to the <a href="http://confluent-pizza-demo-1570877491.eu-west-1.elb.amazonaws.com" title="Start demo" target="_blank"><b>public demo by clicking here</b></a>.
- Enter your username
- No need for password
- You will only be able to see your own orders, you cannot see someone else's
Expand Down

0 comments on commit 7bedf05

Please sign in to comment.