-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathKafkaConsumer.hs
82 lines (73 loc) · 4.59 KB
/
KafkaConsumer.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
module KafkaConsumer where
{-| Author: Pablo Perez Garcia
This code example is build in top of hw-kafka-client library https://hackage.haskell.org/package/hw-kafka-client
Before start codgin remember that you need install in your system [librdkafka]
git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make
sudo make install
-}
import Control.Arrow ((&&&))
import Control.Exception (bracket)
import Data.Monoid ((<>))
import Kafka.Consumer
import qualified Data.Text as TEXT
import Data.ByteString
import qualified Data.ByteString.Char8 as CH
import qualified System.IO as SIO
import ConfigurationUtils
import Text.Read (lift)
import Control.Concurrent (threadDelay)
{-| IO monad to run the consumer, using the function [newConsumer] where we pass the ConsumerProperties and the Subscription
it return a Either of [KafkaError] or [KafkaConsumer] -}
startConsumer :: IO ()
startConsumer = do consumerSubscription <- createConsumerSubscription
consumerProps <- createConsumerProp
kafkaConsumerEither <- newConsumer consumerProps consumerSubscription
response <- case kafkaConsumerEither of
Left err -> do return $ Left err
Right kafkaConsumer -> consumeMessages kafkaConsumer
print response
{-| We use [ConsumerProperties] adding information of the broker host, consumer group id, commit strategy and log level-}
createConsumerProp :: IO ConsumerProperties
createConsumerProp = do groupIdName <- getGroupId
bootstrapServer <- getBootstrapServer
return $ brokersList [BrokerAddress bootstrapServer]
<> groupId (ConsumerGroupId groupIdName)
<> noAutoCommit
<> logLevel KafkaLogInfo
{-| Create a subscription using [createTopic] functions and is merged using [<>] operator with another subscription created
when we run [OffsetReset] function to define the Offset strategy -}
createConsumerSubscription :: IO Subscription
createConsumerSubscription = do topic <- getTopic
return $ createTopic topic <> offsetReset Earliest
{-| Create a subscription using [topics] functions which expect a [TopicName] type created passing a topic namex -}
createTopic:: String -> Subscription
createTopic topic = topics [TopicName topic]
{-| Having a Kafka consumer We use [mapM_] function to start iterating 15 times, polling for messages.
We then commit offset using [commitAllOffsets] which return a maybe of Kafka error which in case
is filled contains the error description
In Haskell as a declarative paradigm language has not loops operators, so in order to iterate to
get more message from the topic we do a recursive call of the function.-}
consumeMessages :: KafkaConsumer -> IO (Either KafkaError String)
consumeMessages kafkaConsumer = do message <- consumeMessage kafkaConsumer
SIO.putStrLn $ "Message: " <> show message
maybeError <- commitAllOffsets OffsetCommit kafkaConsumer
SIO.putStrLn $ "Offsets: " <> maybe "Committed." show maybeError
SIO.putStrLn $ "---------------------------------------"
_ <- consumeMessages kafkaConsumer -- Recursive call.
_ <- closeConsumer kafkaConsumer
return $ Right "All events processed"
{-| Having a Kafka consumer We use [pollMessage] function passing the consumer and a timeout.
It return an either of KafkaError or ConsumerRecord..-}
consumeMessage :: KafkaConsumer -> IO (Either KafkaError String)
consumeMessage kafkaConsumer = do eitherConsumerRecord <- pollMessage kafkaConsumer (Timeout 5000)
eitherResponse <- case eitherConsumerRecord of
Right consumerRecord -> getConsumerRecordValue consumerRecord
return eitherResponse
{-| Function to extract the value from the ConsumerRecord -}
getConsumerRecordValue :: ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> IO (Either KafkaError String)
getConsumerRecordValue(ConsumerRecord _ _ _ _ _ value) = case value of
Just value -> return $ Right $ CH.unpack value
Nothing -> return $ Left $ KafkaError "No data find in Customer record"