We'll create a simple application in Java using Spark which will integrate with the Kafka topic we created earlier. The application will read the messages as posted and count the frequency of words in every message
Before Going to Spark streaming, You will need to make sure your kafka Producer is running. If you dont know about kafka consumer and producer check my profile i listed it in a separate repository " https://github.com/majid0110/Kafka-Producer-and-consumers-using-python "
Now you will Need to add defendecies Listed as below :
Use this command : spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 (Your_code_with_location like C:\Users\soft_\Desktop\Spark.py)
org.apache.spark
spark-sql-kafka-0-10_2.10</artifactI
2.3.0
Spark Streaming uses readStream() on SparkSession to load a streaming Dataset from Kafka.
Since the value is in binary, first we need to convert the binary value to String using selectExpr()
Now, extract the value which is in JSON String to DataFrame and convert to DataFrame columns using custom schema.
These are the basics of spark streaming with kafka, Now look at the out below:
The output will be based on your CSV, In my case I have only two columns in my CSV file.
After this you will need to store data in hiveDB for performing SQL Querries.
you can run this code on cross environments like on local Machine and Sandbox-HDP.hortonworks but you will need to change your ip and host according to your platform.
For this first you will need to create a table and specify the columns in it. you need to customized it according to your need. in my case it looks like :
When you create a Hive table, you need to define how this table should read/write data from/to file system, i.e. the “input format” and “output format”. You also need to define how this table should deserialize the data to rows, or serialize rows to data, i.e. the “serde”. The following options can be used to specify the storage format(“serde”, “input format”, “output format”), e.g. CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet'). By default, we will read the table files as plain text.
if you have any qurried regarding this code, feel free to contact me at ' [email protected] '