Skip to content

Commit

Permalink
kafka: improvements (#1805)
Browse files Browse the repository at this point in the history
- a more informative type for Record
- fetch: drop trySeek
- append: let appendRecords more cohesive
  • Loading branch information
4eUeP authored May 6, 2024
1 parent b563ed1 commit ca965b3
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 42 deletions.
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Common/FetchManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import qualified HStream.Store as S
data FetchLogContext = FetchLogContext
{ nextOffset :: Int64
-- ^ Expect next offset to be fetched
, remRecords :: Vector K.RecordFormat
, remRecords :: Vector K.Record
-- ^ Remaining records of the batch
} deriving (Show)

Expand Down
35 changes: 34 additions & 1 deletion hstream-kafka/HStream/Kafka/Common/RecordFormat.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
module HStream.Kafka.Common.RecordFormat
( RecordFormat (..)
( Record (..)
, RecordFormat (..)
, recordBytesSize
-- * Helpers
, seekMessageSet
, trySeekMessageSet
) where

import Control.Monad
Expand All @@ -11,8 +13,18 @@ import qualified Data.ByteString as BS
import Data.Int
import GHC.Generics (Generic)

import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified Kafka.Protocol.Encoding as K

-- | Record is the smallest unit of data in HStream Kafka.
--
-- For Fetch handler
data Record = Record
{ recordFormat :: !RecordFormat
, recordLsn :: !S.LSN
} deriving (Show)

-- on-disk format
data RecordFormat = RecordFormat
{ version :: {-# UNPACK #-} !Int8
Expand Down Expand Up @@ -45,3 +57,24 @@ seekMessageSet i bs{- MessageSet data -} =
void $ K.takeBytes (fromIntegral len)
in snd <$> K.runParser' parser bs
{-# INLINE seekMessageSet #-}

-- | Try to bypass the records if the fetch offset is not the first record
-- in the batch.
trySeekMessageSet
:: Record -- ^ The first record in the batch
-> Int64 -- ^ The fetch offset
-> IO (ByteString, S.LSN)
trySeekMessageSet r fetchOffset = do
let bytesOnDisk = K.unCompactBytes r.recordFormat.recordBytes
magic <- K.decodeRecordMagic bytesOnDisk
fstRecordBytes <-
if magic >= 2
then pure bytesOnDisk
else do
let absStartOffset = r.recordFormat.offset + 1 - fromIntegral r.recordFormat.batchLength
offset = fetchOffset - absStartOffset
if offset > 0
then do Log.debug1 $ "Seek MessageSet " <> Log.build offset
seekMessageSet (fromIntegral offset) bytesOnDisk
else pure bytesOnDisk
pure (fstRecordBytes, r.recordLsn)
54 changes: 20 additions & 34 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ type RecordTable =
VS.MVector
S.C_LogID
V.MVector
(Vector K.RecordFormat, GV.Growing Vector GV.RealWorld K.RecordFormat)
(Vector K.Record, GV.Growing Vector GV.RealWorld K.Record)

data LsnData
= LsnData S.LSN S.LSN Int64
-- ^ (startLsn, tailLsn, highwaterOffset)
--
-- NOTE: tailLsn is LSN_INVALID if the partition is empty
| ContReading (Vector K.RecordFormat) Int64
| ContReading (Vector K.Record) Int64
-- ^ (remRecords, highwaterOffset)
--
-- Continue reading, do not need to start reading
Expand Down Expand Up @@ -202,7 +202,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
M.withLabel M.topicTotalSendBytes partLabel $ \counter -> void $
M.addCounter counter (fromIntegral $ BS.length bs)
M.withLabel M.topicTotalSendMessages partLabel $ \counter -> void $ do
let totalRecords = V.sum $ V.map (\K.RecordFormat{..} -> batchLength) v
let totalRecords = V.sum $ V.map (.recordFormat.batchLength) v
M.addCounter counter (fromIntegral totalRecords)
-- PartitionData
pure $ K.PartitionData
Expand Down Expand Up @@ -389,7 +389,7 @@ readMode1 r storageOpts reader = do
case p.elsn of
ContReading remRecords _ -> do
void $ atomicFetchAddFastMut mutRemSize $ V.sum $
V.map (BS.length . K.unCompactBytes . (.recordBytes)) remRecords
V.map (BS.length . K.unCompactBytes . (.recordFormat.recordBytes)) remRecords
-- [TAG_NEV]: Make sure do not insert empty vector to the table,
-- since we will assume the vector is non-empty in `encodePartition`
unless (V.null remRecords) $
Expand Down Expand Up @@ -435,7 +435,7 @@ readMode1 r storageOpts reader = do
insertRecords recordTable rs
pure recordTable

insertRemRecords :: RecordTable -> S.C_LogID -> Vector K.RecordFormat -> IO ()
insertRemRecords :: RecordTable -> S.C_LogID -> Vector K.Record -> IO ()
insertRemRecords table logid records = do
(rv, v) <- maybe ((V.empty, ) <$> GV.new) pure =<< (HT.lookup table logid)
HT.insert table logid (rv <> records, v)
Expand All @@ -446,7 +446,7 @@ readMode1 r storageOpts reader = do
recordFormat <- K.runGet @K.RecordFormat record.recordPayload
let logid = record.recordAttr.recordAttrLogID
(rv, v) <- maybe ((V.empty, ) <$> GV.new) pure =<< (HT.lookup table logid)
v' <- GV.append v recordFormat
v' <- GV.append v (K.Record recordFormat (record.recordAttr.recordAttrLSN))
HT.insert table logid (rv, v')

-- In kafka broker, regarding the format on disk, the broker will return
Expand All @@ -466,7 +466,7 @@ encodePartition
:: FastMutInt
-> FastMutInt
-> K.FetchPartition
-> Vector K.RecordFormat
-> Vector K.Record
-> IO (ByteString, Maybe Int64, Int)
-- ^ (encoded bytes, next offset, taken vector index)
--
Expand All @@ -478,8 +478,18 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do
where
doEncode maxBytes = do
isFristPartition <- readFastMutInt mutIsFirstPartition
(fstRecordBytes, vs) <- trySeek
let fstLen = BS.length fstRecordBytes
let (fstRecord :: K.Record, vs) =
-- [TAG_NEV]: This should not be Nothing, because if we found the
-- key in `readRecords`, it means we have at least one record in
-- this.
fromMaybe (error "LogicError: got empty vector value")
(V.uncons v)
-- NOTE: since we don't support RecordBatch version < 2, we don't need to
-- seek the MessageSet.
--
-- Also see 'HStream.Kafka.Common.RecordFormat.trySeekMessageSet'
let fstRecordBytes = K.unCompactBytes fstRecord.recordFormat.recordBytes
fstLen = BS.length fstRecordBytes
if isFristPartition == 1
-- First partition
then do
Expand Down Expand Up @@ -508,7 +518,7 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do
(bb, lastOffset', takenVecIdx) <-
vecFoldWhileM vs (BB.byteString fstBs, Left fstBs, 0) $ \(b, lb, i) r -> do
-- FIXME: Does this possible be multiple BatchRecords?
let rbs = K.unCompactBytes r.recordBytes
let rbs = K.unCompactBytes r.recordFormat.recordBytes
rlen = BS.length rbs
curMaxBytes <- atomicFetchAddFastMut mutMaxBytes (-rlen)
curPartMaxBytes <- atomicFetchAddFastMut mutPartitionMaxBytes (-rlen)
Expand All @@ -529,30 +539,6 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do

pure (BS.toStrict $ BB.toLazyByteString bb, lastOffset, takenVecIdx)

-- Try to bypass the records if the fetch offset is not the first record
-- in the batch.
trySeek = do
let (fstRecord :: K.RecordFormat, vs) =
-- [TAG_NEV]: This should not be Nothing, because if we found the
-- key in `readRecords`, it means we have at least one record in
-- this.
fromMaybe (error "LogicError: got empty vector value")
(V.uncons v)
bytesOnDisk = K.unCompactBytes fstRecord.recordBytes
-- only the first MessageSet need to to this seeking
magic <- K.decodeRecordMagic bytesOnDisk
fstRecordBytes <-
if | magic >= 2 -> pure bytesOnDisk
| otherwise -> do
let absStartOffset = fstRecord.offset + 1 - fromIntegral fstRecord.batchLength
offset = p.fetchOffset - absStartOffset
if offset > 0
then do
Log.debug1 $ "Seek MessageSet " <> Log.build offset
K.seekMessageSet (fromIntegral offset) bytesOnDisk
else pure bytesOnDisk
pure (fstRecordBytes, vs)

errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData
errorPartitionResponse partitionIndex ec = K.PartitionData
{ partitionIndex = partitionIndex
Expand Down
12 changes: 6 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,9 @@ handleProduce ServerContext{..} _reqCtx req = do
--
-- Currently, only support LogAppendTime
catches (do
(S.AppendCompletion{..}, offset) <-
(appendCompTimestamp, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes
Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
Expand Down Expand Up @@ -148,7 +145,7 @@ appendRecords
-> (Text, Int32)
-> Word64
-> ByteString
-> IO (S.AppendCompletion, Int64)
-> IO (Int64, Int64) -- ^ Return (logAppendTimeMs, baseOffset)
appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = do
batch <- K.decodeRecordBatch shouldValidateCrc bs
let batchLength = batch.recordsCount
Expand Down Expand Up @@ -196,7 +193,10 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
void $ M.addCounter counter (fromIntegral $ BS.length storedRecord)
M.withLabel M.topicTotalAppendMessages partLabel $ \counter ->
void $ M.addCounter counter (fromIntegral batchLength)
pure (r, startOffset)
Log.debug1 $ "Append done " <> Log.build r.appendCompLogID
<> ", lsn: " <> Log.build r.appendCompLSN
<> ", start offset: " <> Log.build startOffset
pure (r.appendCompTimestamp, startOffset)

-- TODO: performance improvements
--
Expand Down

0 comments on commit ca965b3

Please sign in to comment.