Skip to content

Commit

Permalink
kafka: fix exception handle for delete topic and read latency tracking (
Browse files Browse the repository at this point in the history
#1720)

* kafka: catch S.NOTFOUND exception for delete topic handler

* kafka: fix tracking read latency
  • Loading branch information
YangKian authored Dec 27, 2023
1 parent ae85d8a commit 9131db5
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 32 deletions.
8 changes: 4 additions & 4 deletions hstream-kafka/HStream/Kafka/Common/Metrics/ConsumeStats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ topicTotalSendMessages =
P.Info "topic_messages_out" "Successfully read messages from a topic"
{-# NOINLINE topicTotalSendMessages #-}

readLatencySnd :: P.Histogram
readLatencySnd =
P.unsafeRegister . P.histogram (P.Info "topic_read_latency" "topic read latency in second") $
topicReadStoreLatency :: P.Histogram
topicReadStoreLatency =
P.unsafeRegister . P.histogram (P.Info "topic_read_store_latency" "topic read store latency in second") $
[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
{-# NOINLINE readLatencySnd #-}
{-# NOINLINE topicReadStoreLatency #-}

totalConsumeRequest :: P.Vector P.Label2 P.Counter
totalConsumeRequest =
Expand Down
8 changes: 4 additions & 4 deletions hstream-kafka/HStream/Kafka/Common/Metrics/ProduceStats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ topicTotalAppendMessages =
P.Info "topic_messages_in" "Successfully appended messages for a topic"
{-# NOINLINE topicTotalAppendMessages #-}

appendLatencySnd :: P.Vector P.Label1 P.Histogram
appendLatencySnd =
P.unsafeRegister . P.vector "topicName" . P.histogram (P.Info "topic_append_latency" "topic append latency in second") $
topicWriteStoreLatency :: P.Vector P.Label1 P.Histogram
topicWriteStoreLatency =
P.unsafeRegister . P.vector "topicName" . P.histogram (P.Info "topic_write_store_latency" "topic write store latency in second") $
[0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10]
{-# NOINLINE appendLatencySnd #-}
{-# NOINLINE topicWriteStoreLatency #-}

totalProduceRequest :: P.Vector P.Label2 P.Counter
totalProduceRequest =
Expand Down
9 changes: 5 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
else S.readerSetTimeout reader r.maxWaitMs
S.readerSetWaitOnlyWhenNoData reader
(_, records) <- foldWhileM (0, []) $ \(size, acc) -> do
rs <- M.observeDuration M.readLatencySnd $ S.readerRead reader 100
rs <- M.observeDuration M.topicReadStoreLatency $ S.readerRead reader 100
if null rs
then pure ((size, acc), False)
else do let size' = size + sum (map (K.recordBytesSize . (.recordPayload)) rs)
Expand All @@ -206,17 +206,18 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
if r.maxWaitMs > defTimeout
then do
S.readerSetTimeout reader defTimeout
rs1 <- M.observeDuration M.readLatencySnd $
rs1 <- M.observeDuration M.topicReadStoreLatency $
S.readerRead reader storageOpts.fetchMaxLen
let size = sum (map (K.recordBytesSize . (.recordPayload)) rs1)
if size >= fromIntegral r.minBytes
then pure rs1
else do S.readerSetTimeout reader (r.maxWaitMs - defTimeout)
rs2 <- S.readerRead reader storageOpts.fetchMaxLen
rs2 <- M.observeDuration M.topicReadStoreLatency $
S.readerRead reader storageOpts.fetchMaxLen
pure $ rs1 <> rs2
else do
S.readerSetTimeout reader r.maxWaitMs
S.readerRead reader storageOpts.fetchMaxLen
M.observeDuration M.topicReadStoreLatency $ S.readerRead reader storageOpts.fetchMaxLen

-------------------------------------------------------------------------------

Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
o (fromIntegral batchLength)
(K.CompactBytes storedBs)
Log.debug1 $ "Append key " <> Log.buildString' appendKey
r <- M.observeWithLabel M.appendLatencySnd streamName $
r <- M.observeWithLabel M.topicWriteStoreLatency streamName $
S.appendCompressedBS ldclient logid storedRecord S.CompressionNone
appendAttrs
let !partLabel = (streamName, T.pack . show $ partition)
Expand Down
36 changes: 17 additions & 19 deletions hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,16 @@ handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} =
return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just V.empty)
K.KaArray (Just topicNames_)
| V.null topicNames_ -> return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just V.empty)
| otherwise -> do
| otherwise -> do
respTopics <- forM topicNames_ $ \topicName -> do
try (deleteTopic topicName) >>= \case
Left (e :: SomeException) -> do
Log.warning $ "Exception occurs when deleting topic " <> Log.build topicName <> ": " <> Log.build (show e)
return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_SERVER_ERROR
Left (e :: SomeException)
| Just _ <- fromException @S.NOTFOUND e -> do
Log.warning $ "Delete topic failed, topic " <> Log.build topicName <> " does not exist"
return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_TOPIC_OR_PARTITION
| otherwise -> do
Log.warning $ "Exception occurs when deleting topic " <> Log.build topicName <> ": " <> Log.build (show e)
return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_SERVER_ERROR
Right res -> return res
return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just respTopics)
where
Expand All @@ -92,18 +96,12 @@ handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} =
deleteTopic :: T.Text -> IO K.DeletableTopicResultV0
deleteTopic topicName = do
let streamId = S.transToTopicStreamName topicName
S.doesStreamExist scLDClient streamId >>= \case
True -> do
-- delete offset caches.
--
-- XXX: Normally we do not need to delete this because the logid is a
-- random number and will unlikely be reused.
partitions <- S.listStreamPartitionsOrdered scLDClient streamId
V.forM_ partitions $ \(_, logid) ->
cleanOffsetCache scOffsetManager logid
S.removeStream scLDClient streamId
Stats.stream_stat_erase scStatsHolder (Utils.textToCBytes topicName)
return $ K.DeletableTopicResultV0 topicName K.NONE
False -> do
Log.warning $ "Stream " <> Log.build (show streamId) <> " does not exist"
return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_TOPIC_OR_PARTITION
-- delete offset caches.
--
-- XXX: Normally we do not need to delete this because the logid is a
-- random number and will unlikely be reused.
partitions <- S.listStreamPartitionsOrdered scLDClient streamId
V.forM_ partitions $ \(_, logid) ->
cleanOffsetCache scOffsetManager logid
S.removeStream scLDClient streamId
return $ K.DeletableTopicResultV0 topicName K.NONE

0 comments on commit 9131db5

Please sign in to comment.