diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs index 4fe08c7fb..e39dbf48a 100644 --- a/hstream-kafka/HStream/Kafka/Group/Group.hs +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -601,8 +601,8 @@ addMember group@Group{..} member delayedResponse = do Utils.whenIORefEq leader Nothing $ do IO.atomicWriteIORef leader (Just member.memberId) - Log.info $ "updated leader, group:" <> Log.build groupId - <> "leader:" <> Log.build member.memberId + Log.info $ "updated leader for group: " <> Log.build groupId + <> ", leader: " <> Log.build member.memberId H.insert members member.memberId member updateSupportedProtocols group memberProtocols diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs index 36f8f9518..3183fe361 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs @@ -27,6 +27,8 @@ import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K +import HStream.Common.Server.Lookup (lookupKafkaPersist, KafkaResource (KafkaResGroup)) +import HStream.Server.HStreamApi (ServerNode(..)) handleJoinGroup :: ServerContext -> K.RequestContext @@ -144,10 +146,16 @@ handleDescribeGroups ServerContext{..} reqCtx req = do simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP gid AclOp_DESCRIBE >>= \case False -> return $ makeErrorGroup gid K.GROUP_AUTHORIZATION_FAILED "" True -> case group_m of + Just group -> do + ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext + loadBalanceHashRing scAdvertisedListenersKey + (KafkaResGroup group.groupId) + if serverNodeId /= serverID + then return $ makeErrorGroup gid K.NOT_COORDINATOR "" + else G.describe group -- Note: For non-existed group, return with no error and Dead state. -- See kafka.coordinator.group.GroupCoordinator#handleDescribeGroup. Nothing -> return $ makeErrorGroup gid K.NONE (T.pack (show G.Dead)) - Just group -> G.describe group -- FIXME: hard-coded constants return $ K.DescribeGroupsResponse { groups = Utils.listToKaArray describedGroups