Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: fix response for OffsetFetch #1812

Merged
merged 1 commit into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ listToKaArray = K.KaArray . Just . V.fromList
kaArrayToVector :: K.KaArray a -> V.Vector a
kaArrayToVector kaArray = fromMaybe V.empty (K.unKaArray kaArray)

vectorToKaArray :: V.Vector a -> K.KaArray a
vectorToKaArray vec = K.KaArray (Just vec)

mapKaArray :: (a -> b) -> K.KaArray a -> K.KaArray b
mapKaArray f arr = K.KaArray (fmap (V.map f) (K.unKaArray arr))

Expand Down
3 changes: 2 additions & 1 deletion hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -916,11 +916,12 @@ fetchOffsets Group{..} reqTopic validateReqTopic = validateReqTopic reqTopic >>=
, partitions = K.KaArray (Just $ (makeErrorPartition code) <$> partitions')
}
where
-- FIXME: hardcoded constants
makeErrorPartition code idx =
K.OffsetFetchResponsePartition
{ partitionIndex = idx
, committedOffset = -1
, metadata = Nothing
, metadata = Just ""
, errorCode = code
}

Expand Down
8 changes: 5 additions & 3 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,19 @@ fetchOffsets GroupOffsetManager{..} topicName partitions = do
, partitions = KaArray {unKaArray = Just res}
}
where
-- FIXME: hardcoded constants
getOffset cache partitionIdx = do
let key = mkTopicPartition topicName partitionIdx
in case Map.lookup key cache of
Just offset -> return $ OffsetFetchResponsePartition
{ committedOffset = offset
, metadata = Nothing
, metadata = Just ""
, partitionIndex= partitionIdx
, errorCode = K.NONE
}
Nothing -> return $ OffsetFetchResponsePartition
{ committedOffset = -1
, metadata = Nothing
, metadata = Just ""
, partitionIndex= partitionIdx
, errorCode = K.NONE
-- TODO: check the error code here
Expand All @@ -214,9 +215,10 @@ fetchAllOffsets GroupOffsetManager{..} = do
-- group offsets by TopicName
cachedOffset <- Map.foldrWithKey foldF Map.empty <$> readIORef offsetsCache
return . KaArray . Just . V.map makeTopic . V.fromList . Map.toList $ cachedOffset
-- FIXME: hardcoded constants
where makePartition partition offset = OffsetFetchResponsePartition
{ committedOffset = offset
, metadata = Nothing
, metadata = Just ""
, partitionIndex=partition
, errorCode = K.NONE
}
Expand Down
30 changes: 18 additions & 12 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_DESCRIBE >>= \case
False -> return $ makeErrorResponse K.TOPIC_AUTHORIZATION_FAILED
True -> do
group <- GC.getGroup scGroupCoordinator req.groupId
group_m <- GC.getGroupM scGroupCoordinator req.groupId
case K.unKaArray req.topics of
-- 'Nothing' means fetch offsets of ALL topics.
-- WARNING: Offsets of unauthzed topics should not be leaked
Expand All @@ -201,11 +201,13 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
-- FIXME: Better method than passing a "validate" function?
Nothing -> do
Log.debug $ "fetching all offsets in group:" <> Log.build req.groupId
topicResps <- G.fetchAllOffsets group $ \reqTopic -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
topicResps <- case group_m of
Nothing -> return (K.NonNullKaArray V.empty)
Just group -> G.fetchAllOffsets group $ \reqTopic -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
return $ K.OffsetFetchResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
Expand All @@ -217,11 +219,15 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
-- on each topic. That is why we pass a "validate"
-- function to it.
-- FIXME: Better method than passing a "validate" function?
G.fetchOffsets group reqTopic $ \reqTopic_ -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
case group_m of
-- FIXME: what error code should it return? 'K.UNKNOWN_TOPIC_OR_PARTITION'
-- crashes some tests...
Nothing -> return (makeErrorTopicResponse K.NONE reqTopic)
Just group -> G.fetchOffsets group reqTopic $ \reqTopic_ -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
return $ K.OffsetFetchResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
Expand All @@ -240,7 +246,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
K.OffsetFetchResponsePartition
{ partitionIndex = idx
, errorCode = code
, metadata = Nothing
, metadata = Just ""
, committedOffset = -1
}
}
Expand Down
Loading