-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStreamDataProducer.py
103 lines (68 loc) · 2.31 KB
/
StreamDataProducer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Databricks notebook source
"""
Author: Muhammad Umar Amanat
Description: Notebook for creating Kinesis stream using python boto3 api
Requriements: Data should be present at dbfs:/FileStore/dlt_aws
Version: V1
"""
# COMMAND ----------
import boto3
import random as rnd
import json
# COMMAND ----------
SCOPE_NAME = "dlt_aws_scope"
CSV_PATH = "dbfs:/FileStore/dlt_aws"
KINESIS_REGION = "us-west-2"
KINESIS_DATA_STREAM = "data-stream"
# COMMAND ----------
# COMMAND ----------
# MAGIC %md
# MAGIC ### Stream Creation
# COMMAND ----------
kinesis_client = boto3.client('kinesis',
aws_access_key_id=dbutils.secrets.get(SCOPE_NAME, "aws_access_key_id"),
aws_secret_access_key=dbutils.secrets.get(SCOPE_NAME, "aws_secret_access_key"),
region_name=KINESIS_REGION)
# COMMAND ----------
stream_list = [KINESIS_DATA_STREAM]
for s_name in stream_list:
try:
kinesis_client.create_stream(StreamName=s_name, StreamModeDetails={'StreamMode':'ON_DEMAND'})
print(f"[INFO] {s_name} created successfully")
except Exception as e:
print(f"[ERROR] While creating stream={s_name}, following error occured. {e}")
# COMMAND ----------
# COMMAND ----------
# MAGIC %md
# MAGIC ### Data Generator
# COMMAND ----------
cc = rnd.choice(["MX", "RU"]) ## country code
##randomly select 10 records
df = (spark
.read
.option("header", True)
.option("multiline", True)
.csv(path=f"{CSV_PATH}/{cc}videos.csv")
.select("video_id", "trending_date", "title", "channel_title", "category_id",
"publish_time", "views", "likes", "dislikes", "comment_count")
.sample(withReplacement=False, fraction=1.0)
.limit(10)
).toPandas()
# COMMAND ----------
for ind, i in df.iterrows():
try:
response = (kinesis_client
.put_record(StreamName=KINESIS_DATA_STREAM,
Data=json.dumps(i.to_dict()),
PartitionKey="category_id")
)
except Exception as e:
print(f"error occured {e}")
# COMMAND ----------
# COMMAND ----------
# MAGIC %md
# MAGIC #### Create Destination Database if not exist
# COMMAND ----------
# MAGIC %sql
# MAGIC CREATE DATABASE IF NOT EXISTS dlt_aws
# COMMAND ----------