-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconsumer.py
26 lines (20 loc) · 810 Bytes
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import json
from kafka import KafkaConsumer
# Kafka broker address
bootstrap_servers = 'localhost:9092'
# Kafka topic to consume music recommendations
topic = 'tashi'
# Create a Kafka consumer
consumer = KafkaConsumer(topic,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode('utf-8')))
# Function to apply recommendations based on features
def apply_recommendations(features):
# For demonstration, let's just print the features
print("Received features:", features)
# Continuously consume messages and apply recommendations
for message in consumer:
# Get the features from the message value
features = message.value
# Apply recommendations based on the features
apply_recommendations(features)