Skip to content

Commit

Permalink
feat(Kafka): Find coordinator v1 (#1725)
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f authored Jan 2, 2024
1 parent 84d1319 commit 2ce2e5a
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 24 deletions.
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ import qualified Kafka.Protocol.Service as K

#cv_handler SaslHandshake, 0, 1

#cv_handler FindCoordinator, 0, 0
#cv_handler FindCoordinator, 0, 1

#cv_handler JoinGroup, 0, 2
#cv_handler SyncGroup, 0, 1
Expand All @@ -90,7 +90,7 @@ handlers sc =
-- Read
, #mk_handler Fetch, 0, 4

, #mk_handler FindCoordinator, 0, 0
, #mk_handler FindCoordinator, 0, 1

-- Group
, #mk_handler JoinGroup, 0, 2
Expand Down
34 changes: 34 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ module HStream.Kafka.Server.Handler.Basic
, handleMetadata
-- 32: DescribeConfigs
, handleDescribeConfigs

, handleFindCoordinator
) where

import Control.Exception
Expand Down Expand Up @@ -239,3 +241,35 @@ handleDescribeConfigs serverCtx _ req = do
else return $ KCM.getErrorResponse KC.BROKER resource.resourceName ("invalid broker id:" <> resource.resourceName)
rt -> return $ KCM.getErrorResponse rt resource.resourceName ("unsupported resource type:" <> T.pack (show rt))
return $ K.DescribeConfigsResponse {results=K.NonNullKaArray results, throttleTimeMs=0}

---------------------------------------------------------------------------
-- 32: FindCoordinator
---------------------------------------------------------------------------
data CoordinatorType
= GROUP
| TRANSACTION
deriving (Enum, Eq)

handleFindCoordinator :: ServerContext -> K.RequestContext -> K.FindCoordinatorRequest -> IO K.FindCoordinatorResponse
handleFindCoordinator ServerContext{..} _ req = do
case toEnum (fromIntegral req.keyType) of
GROUP -> do
A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key)
Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId
return $ K.FindCoordinatorResponse {
errorMessage=Nothing
, nodeId=fromIntegral serverNodeId
, errorCode=0
, throttleTimeMs=0
, port=fromIntegral serverNodePort
, host=serverNodeHost
}
_ -> do
return $ K.FindCoordinatorResponse {
errorMessage=Just "KeyType Must be 0(GROUP)"
, nodeId=0
, errorCode=K.COORDINATOR_NOT_AVAILABLE
, throttleTimeMs=0
, port=0
, host=""
}
16 changes: 1 addition & 15 deletions hstream-kafka/HStream/Kafka/Server/Handler/Group.hs
Original file line number Diff line number Diff line change
@@ -1,34 +1,20 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedRecordDot #-}

module HStream.Kafka.Server.Handler.Group
( -- 19: CreateTopics
handleFindCoordinator
, handleJoinGroup
( handleJoinGroup
, handleSyncGroup
, handleHeartbeat
, handleLeaveGroup
, handleListGroups
, handleDescribeGroups
) where

import HStream.Common.Server.Lookup (KafkaResource (..),
lookupKafkaPersist)
import qualified HStream.Kafka.Group.GroupCoordinator as GC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamApi as A
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-- FIXME: move to a separated Coordinator module
handleFindCoordinator :: ServerContext -> K.RequestContext -> K.FindCoordinatorRequest -> IO K.FindCoordinatorResponse
handleFindCoordinator ServerContext{..} _ req = do
A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key)
Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId
return $ K.FindCoordinatorResponse 0 (fromIntegral serverNodeId) serverNodeHost (fromIntegral serverNodePort)

handleJoinGroup :: ServerContext -> K.RequestContext -> K.JoinGroupRequest -> IO K.JoinGroupResponse
handleJoinGroup ServerContext{..} = GC.joinGroup scGroupCoordinator

Expand Down
36 changes: 35 additions & 1 deletion hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,14 @@ newtype FindCoordinatorRequestV0 = FindCoordinatorRequestV0
} deriving (Show, Eq, Generic)
instance Serializable FindCoordinatorRequestV0

data FindCoordinatorRequestV1 = FindCoordinatorRequestV1
{ key :: !Text
-- ^ The coordinator key.
, keyType :: {-# UNPACK #-} !Int8
-- ^ The coordinator key type. (Group, transaction, etc.)
} deriving (Show, Eq, Generic)
instance Serializable FindCoordinatorRequestV1

data FindCoordinatorResponseV0 = FindCoordinatorResponseV0
{ errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
Expand All @@ -948,6 +956,23 @@ data FindCoordinatorResponseV0 = FindCoordinatorResponseV0
} deriving (Show, Eq, Generic)
instance Serializable FindCoordinatorResponseV0

data FindCoordinatorResponseV1 = FindCoordinatorResponseV1
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, errorMessage :: !NullableString
-- ^ The error message, or null if there was no error.
, nodeId :: {-# UNPACK #-} !Int32
-- ^ The node id.
, host :: !Text
-- ^ The host name.
, port :: {-# UNPACK #-} !Int32
-- ^ The port.
} deriving (Show, Eq, Generic)
instance Serializable FindCoordinatorResponseV1

data HeartbeatRequestV0 = HeartbeatRequestV0
{ groupId :: !Text
-- ^ The group id.
Expand Down Expand Up @@ -1647,6 +1672,7 @@ instance Service HStreamKafkaV1 where
, "metadata"
, "offsetCommit"
, "offsetFetch"
, "findCoordinator"
, "joinGroup"
, "heartbeat"
, "leaveGroup"
Expand Down Expand Up @@ -1699,6 +1725,13 @@ instance HasMethodImpl HStreamKafkaV1 "offsetFetch" where
type MethodInput HStreamKafkaV1 "offsetFetch" = OffsetFetchRequestV1
type MethodOutput HStreamKafkaV1 "offsetFetch" = OffsetFetchResponseV1

instance HasMethodImpl HStreamKafkaV1 "findCoordinator" where
type MethodName HStreamKafkaV1 "findCoordinator" = "findCoordinator"
type MethodKey HStreamKafkaV1 "findCoordinator" = 10
type MethodVersion HStreamKafkaV1 "findCoordinator" = 1
type MethodInput HStreamKafkaV1 "findCoordinator" = FindCoordinatorRequestV1
type MethodOutput HStreamKafkaV1 "findCoordinator" = FindCoordinatorResponseV1

instance HasMethodImpl HStreamKafkaV1 "joinGroup" where
type MethodName HStreamKafkaV1 "joinGroup" = "joinGroup"
type MethodKey HStreamKafkaV1 "joinGroup" = 11
Expand Down Expand Up @@ -1948,7 +1981,7 @@ supportedApiVersions =
, ApiVersionV0 (ApiKey 3) 0 5
, ApiVersionV0 (ApiKey 8) 0 3
, ApiVersionV0 (ApiKey 9) 0 3
, ApiVersionV0 (ApiKey 10) 0 0
, ApiVersionV0 (ApiKey 10) 0 1
, ApiVersionV0 (ApiKey 11) 0 2
, ApiVersionV0 (ApiKey 12) 0 1
, ApiVersionV0 (ApiKey 13) 0 1
Expand Down Expand Up @@ -1992,6 +2025,7 @@ getHeaderVersion (ApiKey (9)) 1 = (1, 0)
getHeaderVersion (ApiKey (9)) 2 = (1, 0)
getHeaderVersion (ApiKey (9)) 3 = (1, 0)
getHeaderVersion (ApiKey (10)) 0 = (1, 0)
getHeaderVersion (ApiKey (10)) 1 = (1, 0)
getHeaderVersion (ApiKey (11)) 0 = (1, 0)
getHeaderVersion (ApiKey (11)) 1 = (1, 0)
getHeaderVersion (ApiKey (11)) 2 = (1, 0)
Expand Down
51 changes: 45 additions & 6 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1883,30 +1883,49 @@ fetchResponseFromV4 x = FetchResponse
, throttleTimeMs = x.throttleTimeMs
}

newtype FindCoordinatorRequest = FindCoordinatorRequest
{ key :: Text
data FindCoordinatorRequest = FindCoordinatorRequest
{ key :: !Text
-- ^ The coordinator key.
, keyType :: {-# UNPACK #-} !Int8
-- ^ The coordinator key type. (Group, transaction, etc.)
} deriving (Show, Eq, Generic)
instance Serializable FindCoordinatorRequest

findCoordinatorRequestToV0 :: FindCoordinatorRequest -> FindCoordinatorRequestV0
findCoordinatorRequestToV0 x = FindCoordinatorRequestV0
{ key = x.key
}
findCoordinatorRequestToV1 :: FindCoordinatorRequest -> FindCoordinatorRequestV1
findCoordinatorRequestToV1 x = FindCoordinatorRequestV1
{ key = x.key
, keyType = x.keyType
}

findCoordinatorRequestFromV0 :: FindCoordinatorRequestV0 -> FindCoordinatorRequest
findCoordinatorRequestFromV0 x = FindCoordinatorRequest
{ key = x.key
, keyType = 0
}
findCoordinatorRequestFromV1 :: FindCoordinatorRequestV1 -> FindCoordinatorRequest
findCoordinatorRequestFromV1 x = FindCoordinatorRequest
{ key = x.key
, keyType = x.keyType
}

data FindCoordinatorResponse = FindCoordinatorResponse
{ errorCode :: {-# UNPACK #-} !ErrorCode
{ errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, nodeId :: {-# UNPACK #-} !Int32
, nodeId :: {-# UNPACK #-} !Int32
-- ^ The node id.
, host :: !Text
, host :: !Text
-- ^ The host name.
, port :: {-# UNPACK #-} !Int32
, port :: {-# UNPACK #-} !Int32
-- ^ The port.
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, errorMessage :: !NullableString
-- ^ The error message, or null if there was no error.
} deriving (Show, Eq, Generic)
instance Serializable FindCoordinatorResponse

Expand All @@ -1917,13 +1936,33 @@ findCoordinatorResponseToV0 x = FindCoordinatorResponseV0
, host = x.host
, port = x.port
}
findCoordinatorResponseToV1 :: FindCoordinatorResponse -> FindCoordinatorResponseV1
findCoordinatorResponseToV1 x = FindCoordinatorResponseV1
{ throttleTimeMs = x.throttleTimeMs
, errorCode = x.errorCode
, errorMessage = x.errorMessage
, nodeId = x.nodeId
, host = x.host
, port = x.port
}

findCoordinatorResponseFromV0 :: FindCoordinatorResponseV0 -> FindCoordinatorResponse
findCoordinatorResponseFromV0 x = FindCoordinatorResponse
{ errorCode = x.errorCode
, nodeId = x.nodeId
, host = x.host
, port = x.port
, throttleTimeMs = 0
, errorMessage = Nothing
}
findCoordinatorResponseFromV1 :: FindCoordinatorResponseV1 -> FindCoordinatorResponse
findCoordinatorResponseFromV1 x = FindCoordinatorResponse
{ errorCode = x.errorCode
, nodeId = x.nodeId
, host = x.host
, port = x.port
, throttleTimeMs = x.throttleTimeMs
, errorMessage = x.errorMessage
}

data HadminCommandRequest = HadminCommandRequest
Expand Down
5 changes: 5 additions & 0 deletions script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def get_field_default(field_type, default=None):
"Heartbeat": (0, 1),
"ListGroups": (0, 1),
"DescribeGroups": (0, 1),
"FindCoordinator": (0, 1),
}

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -456,6 +457,10 @@ def parse_field(field, api_version=0, flexible=False):
api_version, min_tagged_version, max_tagged_version
)

# if field is NullableString, the default value should be Nothing(null)
if type_type == "string" and default is None and in_null_version:
default = "null"

if (in_api_version, in_tagged_version) == (False, False):
return
elif (in_api_version, in_tagged_version) == (True, False):
Expand Down

0 comments on commit 2ce2e5a

Please sign in to comment.