-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkinesis_read.py
42 lines (31 loc) · 977 Bytes
/
kinesis_read.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# Databricks notebook source
"""
Author: Muhammad Umar Amanat
Description: Reading data from kinesis data stream and store into bronze table
Requriements: NO
Version: V1
"""
# COMMAND ----------
import dlt
# COMMAND ----------
SCOPE_NAME = "dlt_aws_scope"
CSV_PATH = "dbfs:/FileStore/dlt_aws"
KINESIS_REGION = "us-west-2"
KINESIS_DATA_STREAM = "data-stream"
# COMMAND ----------
# COMMAND ----------
# MAGIC %md
# MAGIC ### Data Reader
# COMMAND ----------
@dlt.table(table_properties={"pipelines.reset.allowed": "false"})
def kinesis_bronze():
return (spark
.readStream
.format("kinesis")
.option("streamName", KINESIS_DATA_STREAM)
.option("region", KINESIS_REGION)
.option("initialPosition", 'earliest')
.option("awsAccessKey", dbutils.secrets.get(SCOPE_NAME, "aws_access_key_id"))
.option("awsSecretKey", dbutils.secrets.get(SCOPE_NAME, "aws_secret_access_key"))
.load()
)