From 2ce2e5a2122652cb7ce5aedf9988b83f35ee63cf Mon Sep 17 00:00:00 2001 From: s12f <97083380+s12f@users.noreply.github.com> Date: Tue, 2 Jan 2024 18:06:43 +0800 Subject: [PATCH] feat(Kafka): Find coordinator v1 (#1725) --- .../HStream/Kafka/Server/Handler.hsc | 4 +- .../HStream/Kafka/Server/Handler/Basic.hs | 34 +++++++++++++ .../HStream/Kafka/Server/Handler/Group.hs | 16 +----- .../protocol/Kafka/Protocol/Message/Struct.hs | 36 ++++++++++++- .../protocol/Kafka/Protocol/Message/Total.hs | 51 ++++++++++++++++--- script/kafka_gen.py | 5 ++ 6 files changed, 122 insertions(+), 24 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index 2430445c4..408c7d377 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -65,7 +65,7 @@ import qualified Kafka.Protocol.Service as K #cv_handler SaslHandshake, 0, 1 -#cv_handler FindCoordinator, 0, 0 +#cv_handler FindCoordinator, 0, 1 #cv_handler JoinGroup, 0, 2 #cv_handler SyncGroup, 0, 1 @@ -90,7 +90,7 @@ handlers sc = -- Read , #mk_handler Fetch, 0, 4 - , #mk_handler FindCoordinator, 0, 0 + , #mk_handler FindCoordinator, 0, 1 -- Group , #mk_handler JoinGroup, 0, 2 diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs index 644c67fdd..43a9ccbf1 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs @@ -7,6 +7,8 @@ module HStream.Kafka.Server.Handler.Basic , handleMetadata -- 32: DescribeConfigs , handleDescribeConfigs + + , handleFindCoordinator ) where import Control.Exception @@ -239,3 +241,35 @@ handleDescribeConfigs serverCtx _ req = do else return $ KCM.getErrorResponse KC.BROKER resource.resourceName ("invalid broker id:" <> resource.resourceName) rt -> return $ KCM.getErrorResponse rt resource.resourceName ("unsupported resource type:" <> T.pack (show rt)) return $ K.DescribeConfigsResponse {results=K.NonNullKaArray results, throttleTimeMs=0} + +--------------------------------------------------------------------------- +-- 32: FindCoordinator +--------------------------------------------------------------------------- +data CoordinatorType + = GROUP + | TRANSACTION + deriving (Enum, Eq) + +handleFindCoordinator :: ServerContext -> K.RequestContext -> K.FindCoordinatorRequest -> IO K.FindCoordinatorResponse +handleFindCoordinator ServerContext{..} _ req = do + case toEnum (fromIntegral req.keyType) of + GROUP -> do + A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key) + Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId + return $ K.FindCoordinatorResponse { + errorMessage=Nothing + , nodeId=fromIntegral serverNodeId + , errorCode=0 + , throttleTimeMs=0 + , port=fromIntegral serverNodePort + , host=serverNodeHost + } + _ -> do + return $ K.FindCoordinatorResponse { + errorMessage=Just "KeyType Must be 0(GROUP)" + , nodeId=0 + , errorCode=K.COORDINATOR_NOT_AVAILABLE + , throttleTimeMs=0 + , port=0 + , host="" + } diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs index 3f55b9203..930fc971d 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs @@ -1,11 +1,8 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE DuplicateRecordFields #-} -{-# LANGUAGE OverloadedRecordDot #-} module HStream.Kafka.Server.Handler.Group - ( -- 19: CreateTopics - handleFindCoordinator - , handleJoinGroup + ( handleJoinGroup , handleSyncGroup , handleHeartbeat , handleLeaveGroup @@ -13,22 +10,11 @@ module HStream.Kafka.Server.Handler.Group , handleDescribeGroups ) where -import HStream.Common.Server.Lookup (KafkaResource (..), - lookupKafkaPersist) import qualified HStream.Kafka.Group.GroupCoordinator as GC import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified HStream.Logger as Log -import qualified HStream.Server.HStreamApi as A import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K --- FIXME: move to a separated Coordinator module -handleFindCoordinator :: ServerContext -> K.RequestContext -> K.FindCoordinatorRequest -> IO K.FindCoordinatorResponse -handleFindCoordinator ServerContext{..} _ req = do - A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key) - Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId - return $ K.FindCoordinatorResponse 0 (fromIntegral serverNodeId) serverNodeHost (fromIntegral serverNodePort) - handleJoinGroup :: ServerContext -> K.RequestContext -> K.JoinGroupRequest -> IO K.JoinGroupResponse handleJoinGroup ServerContext{..} = GC.joinGroup scGroupCoordinator diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 25a270dc9..4c8018b3c 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -936,6 +936,14 @@ newtype FindCoordinatorRequestV0 = FindCoordinatorRequestV0 } deriving (Show, Eq, Generic) instance Serializable FindCoordinatorRequestV0 +data FindCoordinatorRequestV1 = FindCoordinatorRequestV1 + { key :: !Text + -- ^ The coordinator key. + , keyType :: {-# UNPACK #-} !Int8 + -- ^ The coordinator key type. (Group, transaction, etc.) + } deriving (Show, Eq, Generic) +instance Serializable FindCoordinatorRequestV1 + data FindCoordinatorResponseV0 = FindCoordinatorResponseV0 { errorCode :: {-# UNPACK #-} !ErrorCode -- ^ The error code, or 0 if there was no error. @@ -948,6 +956,23 @@ data FindCoordinatorResponseV0 = FindCoordinatorResponseV0 } deriving (Show, Eq, Generic) instance Serializable FindCoordinatorResponseV0 +data FindCoordinatorResponseV1 = FindCoordinatorResponseV1 + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , errorMessage :: !NullableString + -- ^ The error message, or null if there was no error. + , nodeId :: {-# UNPACK #-} !Int32 + -- ^ The node id. + , host :: !Text + -- ^ The host name. + , port :: {-# UNPACK #-} !Int32 + -- ^ The port. + } deriving (Show, Eq, Generic) +instance Serializable FindCoordinatorResponseV1 + data HeartbeatRequestV0 = HeartbeatRequestV0 { groupId :: !Text -- ^ The group id. @@ -1647,6 +1672,7 @@ instance Service HStreamKafkaV1 where , "metadata" , "offsetCommit" , "offsetFetch" + , "findCoordinator" , "joinGroup" , "heartbeat" , "leaveGroup" @@ -1699,6 +1725,13 @@ instance HasMethodImpl HStreamKafkaV1 "offsetFetch" where type MethodInput HStreamKafkaV1 "offsetFetch" = OffsetFetchRequestV1 type MethodOutput HStreamKafkaV1 "offsetFetch" = OffsetFetchResponseV1 +instance HasMethodImpl HStreamKafkaV1 "findCoordinator" where + type MethodName HStreamKafkaV1 "findCoordinator" = "findCoordinator" + type MethodKey HStreamKafkaV1 "findCoordinator" = 10 + type MethodVersion HStreamKafkaV1 "findCoordinator" = 1 + type MethodInput HStreamKafkaV1 "findCoordinator" = FindCoordinatorRequestV1 + type MethodOutput HStreamKafkaV1 "findCoordinator" = FindCoordinatorResponseV1 + instance HasMethodImpl HStreamKafkaV1 "joinGroup" where type MethodName HStreamKafkaV1 "joinGroup" = "joinGroup" type MethodKey HStreamKafkaV1 "joinGroup" = 11 @@ -1948,7 +1981,7 @@ supportedApiVersions = , ApiVersionV0 (ApiKey 3) 0 5 , ApiVersionV0 (ApiKey 8) 0 3 , ApiVersionV0 (ApiKey 9) 0 3 - , ApiVersionV0 (ApiKey 10) 0 0 + , ApiVersionV0 (ApiKey 10) 0 1 , ApiVersionV0 (ApiKey 11) 0 2 , ApiVersionV0 (ApiKey 12) 0 1 , ApiVersionV0 (ApiKey 13) 0 1 @@ -1992,6 +2025,7 @@ getHeaderVersion (ApiKey (9)) 1 = (1, 0) getHeaderVersion (ApiKey (9)) 2 = (1, 0) getHeaderVersion (ApiKey (9)) 3 = (1, 0) getHeaderVersion (ApiKey (10)) 0 = (1, 0) +getHeaderVersion (ApiKey (10)) 1 = (1, 0) getHeaderVersion (ApiKey (11)) 0 = (1, 0) getHeaderVersion (ApiKey (11)) 1 = (1, 0) getHeaderVersion (ApiKey (11)) 2 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index 7ef56d80e..6ac8fbc58 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -1883,8 +1883,11 @@ fetchResponseFromV4 x = FetchResponse , throttleTimeMs = x.throttleTimeMs } -newtype FindCoordinatorRequest = FindCoordinatorRequest - { key :: Text +data FindCoordinatorRequest = FindCoordinatorRequest + { key :: !Text + -- ^ The coordinator key. + , keyType :: {-# UNPACK #-} !Int8 + -- ^ The coordinator key type. (Group, transaction, etc.) } deriving (Show, Eq, Generic) instance Serializable FindCoordinatorRequest @@ -1892,21 +1895,37 @@ findCoordinatorRequestToV0 :: FindCoordinatorRequest -> FindCoordinatorRequestV0 findCoordinatorRequestToV0 x = FindCoordinatorRequestV0 { key = x.key } +findCoordinatorRequestToV1 :: FindCoordinatorRequest -> FindCoordinatorRequestV1 +findCoordinatorRequestToV1 x = FindCoordinatorRequestV1 + { key = x.key + , keyType = x.keyType + } findCoordinatorRequestFromV0 :: FindCoordinatorRequestV0 -> FindCoordinatorRequest findCoordinatorRequestFromV0 x = FindCoordinatorRequest { key = x.key + , keyType = 0 + } +findCoordinatorRequestFromV1 :: FindCoordinatorRequestV1 -> FindCoordinatorRequest +findCoordinatorRequestFromV1 x = FindCoordinatorRequest + { key = x.key + , keyType = x.keyType } data FindCoordinatorResponse = FindCoordinatorResponse - { errorCode :: {-# UNPACK #-} !ErrorCode + { errorCode :: {-# UNPACK #-} !ErrorCode -- ^ The error code, or 0 if there was no error. - , nodeId :: {-# UNPACK #-} !Int32 + , nodeId :: {-# UNPACK #-} !Int32 -- ^ The node id. - , host :: !Text + , host :: !Text -- ^ The host name. - , port :: {-# UNPACK #-} !Int32 + , port :: {-# UNPACK #-} !Int32 -- ^ The port. + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , errorMessage :: !NullableString + -- ^ The error message, or null if there was no error. } deriving (Show, Eq, Generic) instance Serializable FindCoordinatorResponse @@ -1917,6 +1936,15 @@ findCoordinatorResponseToV0 x = FindCoordinatorResponseV0 , host = x.host , port = x.port } +findCoordinatorResponseToV1 :: FindCoordinatorResponse -> FindCoordinatorResponseV1 +findCoordinatorResponseToV1 x = FindCoordinatorResponseV1 + { throttleTimeMs = x.throttleTimeMs + , errorCode = x.errorCode + , errorMessage = x.errorMessage + , nodeId = x.nodeId + , host = x.host + , port = x.port + } findCoordinatorResponseFromV0 :: FindCoordinatorResponseV0 -> FindCoordinatorResponse findCoordinatorResponseFromV0 x = FindCoordinatorResponse @@ -1924,6 +1952,17 @@ findCoordinatorResponseFromV0 x = FindCoordinatorResponse , nodeId = x.nodeId , host = x.host , port = x.port + , throttleTimeMs = 0 + , errorMessage = Nothing + } +findCoordinatorResponseFromV1 :: FindCoordinatorResponseV1 -> FindCoordinatorResponse +findCoordinatorResponseFromV1 x = FindCoordinatorResponse + { errorCode = x.errorCode + , nodeId = x.nodeId + , host = x.host + , port = x.port + , throttleTimeMs = x.throttleTimeMs + , errorMessage = x.errorMessage } data HadminCommandRequest = HadminCommandRequest diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 58b1c9168..330597d87 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -122,6 +122,7 @@ def get_field_default(field_type, default=None): "Heartbeat": (0, 1), "ListGroups": (0, 1), "DescribeGroups": (0, 1), + "FindCoordinator": (0, 1), } # ----------------------------------------------------------------------------- @@ -456,6 +457,10 @@ def parse_field(field, api_version=0, flexible=False): api_version, min_tagged_version, max_tagged_version ) + # if field is NullableString, the default value should be Nothing(null) + if type_type == "string" and default is None and in_null_version: + default = "null" + if (in_api_version, in_tagged_version) == (False, False): return elif (in_api_version, in_tagged_version) == (True, False):