From 9131db54c7c6c041556f6fff4f5b73a4b6d87f1c Mon Sep 17 00:00:00 2001 From: YangKian <45479280+YangKian@users.noreply.github.com> Date: Wed, 27 Dec 2023 17:16:28 +0800 Subject: [PATCH] kafka: fix exception handle for delete topic and read latency tracking (#1720) * kafka: catch S.NOTFOUND exception for delete topic handler * kafka: fix tracking read latency --- .../Kafka/Common/Metrics/ConsumeStats.hs | 8 ++--- .../Kafka/Common/Metrics/ProduceStats.hs | 8 ++--- .../HStream/Kafka/Server/Handler/Consume.hs | 9 ++--- .../HStream/Kafka/Server/Handler/Produce.hs | 2 +- .../HStream/Kafka/Server/Handler/Topic.hs | 36 +++++++++---------- 5 files changed, 31 insertions(+), 32 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Common/Metrics/ConsumeStats.hs b/hstream-kafka/HStream/Kafka/Common/Metrics/ConsumeStats.hs index 9c5d103ca..1fc51d645 100644 --- a/hstream-kafka/HStream/Kafka/Common/Metrics/ConsumeStats.hs +++ b/hstream-kafka/HStream/Kafka/Common/Metrics/ConsumeStats.hs @@ -14,11 +14,11 @@ topicTotalSendMessages = P.Info "topic_messages_out" "Successfully read messages from a topic" {-# NOINLINE topicTotalSendMessages #-} -readLatencySnd :: P.Histogram -readLatencySnd = - P.unsafeRegister . P.histogram (P.Info "topic_read_latency" "topic read latency in second") $ +topicReadStoreLatency :: P.Histogram +topicReadStoreLatency = + P.unsafeRegister . P.histogram (P.Info "topic_read_store_latency" "topic read store latency in second") $ [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10] -{-# NOINLINE readLatencySnd #-} +{-# NOINLINE topicReadStoreLatency #-} totalConsumeRequest :: P.Vector P.Label2 P.Counter totalConsumeRequest = diff --git a/hstream-kafka/HStream/Kafka/Common/Metrics/ProduceStats.hs b/hstream-kafka/HStream/Kafka/Common/Metrics/ProduceStats.hs index b431ca6b8..187bdd2e4 100644 --- a/hstream-kafka/HStream/Kafka/Common/Metrics/ProduceStats.hs +++ b/hstream-kafka/HStream/Kafka/Common/Metrics/ProduceStats.hs @@ -14,11 +14,11 @@ topicTotalAppendMessages = P.Info "topic_messages_in" "Successfully appended messages for a topic" {-# NOINLINE topicTotalAppendMessages #-} -appendLatencySnd :: P.Vector P.Label1 P.Histogram -appendLatencySnd = - P.unsafeRegister . P.vector "topicName" . P.histogram (P.Info "topic_append_latency" "topic append latency in second") $ +topicWriteStoreLatency :: P.Vector P.Label1 P.Histogram +topicWriteStoreLatency = + P.unsafeRegister . P.vector "topicName" . P.histogram (P.Info "topic_write_store_latency" "topic write store latency in second") $ [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10] -{-# NOINLINE appendLatencySnd #-} +{-# NOINLINE topicWriteStoreLatency #-} totalProduceRequest :: P.Vector P.Label2 P.Counter totalProduceRequest = diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 8532690a4..7dc0f603d 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -184,7 +184,7 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do else S.readerSetTimeout reader r.maxWaitMs S.readerSetWaitOnlyWhenNoData reader (_, records) <- foldWhileM (0, []) $ \(size, acc) -> do - rs <- M.observeDuration M.readLatencySnd $ S.readerRead reader 100 + rs <- M.observeDuration M.topicReadStoreLatency $ S.readerRead reader 100 if null rs then pure ((size, acc), False) else do let size' = size + sum (map (K.recordBytesSize . (.recordPayload)) rs) @@ -206,17 +206,18 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do if r.maxWaitMs > defTimeout then do S.readerSetTimeout reader defTimeout - rs1 <- M.observeDuration M.readLatencySnd $ + rs1 <- M.observeDuration M.topicReadStoreLatency $ S.readerRead reader storageOpts.fetchMaxLen let size = sum (map (K.recordBytesSize . (.recordPayload)) rs1) if size >= fromIntegral r.minBytes then pure rs1 else do S.readerSetTimeout reader (r.maxWaitMs - defTimeout) - rs2 <- S.readerRead reader storageOpts.fetchMaxLen + rs2 <- M.observeDuration M.topicReadStoreLatency $ + S.readerRead reader storageOpts.fetchMaxLen pure $ rs1 <> rs2 else do S.readerSetTimeout reader r.maxWaitMs - S.readerRead reader storageOpts.fetchMaxLen + M.observeDuration M.topicReadStoreLatency $ S.readerRead reader storageOpts.fetchMaxLen ------------------------------------------------------------------------------- diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index 35599951e..d5751fcae 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -127,7 +127,7 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d o (fromIntegral batchLength) (K.CompactBytes storedBs) Log.debug1 $ "Append key " <> Log.buildString' appendKey - r <- M.observeWithLabel M.appendLatencySnd streamName $ + r <- M.observeWithLabel M.topicWriteStoreLatency streamName $ S.appendCompressedBS ldclient logid storedRecord S.CompressionNone appendAttrs let !partLabel = (streamName, T.pack . show $ partition) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs index b5965d1d4..d304d1fb6 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs @@ -74,12 +74,16 @@ handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} = return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just V.empty) K.KaArray (Just topicNames_) | V.null topicNames_ -> return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just V.empty) - | otherwise -> do + | otherwise -> do respTopics <- forM topicNames_ $ \topicName -> do try (deleteTopic topicName) >>= \case - Left (e :: SomeException) -> do - Log.warning $ "Exception occurs when deleting topic " <> Log.build topicName <> ": " <> Log.build (show e) - return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_SERVER_ERROR + Left (e :: SomeException) + | Just _ <- fromException @S.NOTFOUND e -> do + Log.warning $ "Delete topic failed, topic " <> Log.build topicName <> " does not exist" + return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_TOPIC_OR_PARTITION + | otherwise -> do + Log.warning $ "Exception occurs when deleting topic " <> Log.build topicName <> ": " <> Log.build (show e) + return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_SERVER_ERROR Right res -> return res return $ K.DeleteTopicsResponseV0 (K.KaArray $ Just respTopics) where @@ -92,18 +96,12 @@ handleDeleteTopicsV0 ServerContext{..} _ K.DeleteTopicsRequestV0{..} = deleteTopic :: T.Text -> IO K.DeletableTopicResultV0 deleteTopic topicName = do let streamId = S.transToTopicStreamName topicName - S.doesStreamExist scLDClient streamId >>= \case - True -> do - -- delete offset caches. - -- - -- XXX: Normally we do not need to delete this because the logid is a - -- random number and will unlikely be reused. - partitions <- S.listStreamPartitionsOrdered scLDClient streamId - V.forM_ partitions $ \(_, logid) -> - cleanOffsetCache scOffsetManager logid - S.removeStream scLDClient streamId - Stats.stream_stat_erase scStatsHolder (Utils.textToCBytes topicName) - return $ K.DeletableTopicResultV0 topicName K.NONE - False -> do - Log.warning $ "Stream " <> Log.build (show streamId) <> " does not exist" - return $ K.DeletableTopicResultV0 topicName K.UNKNOWN_TOPIC_OR_PARTITION + -- delete offset caches. + -- + -- XXX: Normally we do not need to delete this because the logid is a + -- random number and will unlikely be reused. + partitions <- S.listStreamPartitionsOrdered scLDClient streamId + V.forM_ partitions $ \(_, logid) -> + cleanOffsetCache scOffsetManager logid + S.removeStream scLDClient streamId + return $ K.DeletableTopicResultV0 topicName K.NONE