Skip to content

Commit

Permalink
kafka proto: bench encoding (#1719)
Browse files Browse the repository at this point in the history
a slight performance improvements
  • Loading branch information
4eUeP authored Dec 27, 2023
1 parent ed5e27c commit ae85d8a
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 22 deletions.
37 changes: 29 additions & 8 deletions common/base/HStream/Base/Growing.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Fork from proto-lens: https://github.com/google/proto-lens with BSD-3-Clause
-- license.
-- Copy from proto-lens: https://github.com/google/proto-lens (BSD-3-Clause)
-- and modified by HStream contributors.

-- | A mutable vector that grows in size.
--
Expand All @@ -20,6 +20,11 @@ module HStream.Base.Growing
, append
, unsafeFreeze
, RealWorld

-- If you do not know what you are doing, you should not use the following
-- functions.
, unsafeNewLen
, unsafeWrite
) where

import Control.Monad.Primitive (PrimMonad, PrimState, RealWorld)
Expand All @@ -43,23 +48,32 @@ data Growing v s a = Growing
new :: (PrimMonad m, V.Vector v a) => m (Growing v (PrimState m) a)
new = Growing 0 <$> MV.new 0

-- Be careful with this function.
--
-- You may want to use 'unsafeWrite' to initialize the vector
unsafeNewLen
:: (PrimMonad m, V.Vector v a)
=> Int -> m (Growing v (PrimState m) a)
unsafeNewLen len = Growing len <$> MV.unsafeNew len
{-# INLINE unsafeNewLen #-}

-- | Unsafely convert a growing vector to an immutable one without
-- copying. After this call, you may not use the growing vector
-- nor any other growing vectors that were used to produce this one.
unsafeFreeze
:: (PrimMonad m, V.Vector v a)
=> Growing v (PrimState m) a -> m (v a)
:: (PrimMonad m, V.Vector v a)
=> Growing v (PrimState m) a -> m (v a)
unsafeFreeze (Growing len m) = V.unsafeFreeze (MV.take len m)

-- | Returns a new growing vector with a new element at the end.
-- Note that the return value may share storage with the input value.
-- Furthermore, calling @append@ twice on the same input may result
-- in two vectors that share the same storage.
append
:: (PrimMonad m, V.Vector v a)
=> Growing v (PrimState m) a
-> a
-> m (Growing v (PrimState m) a)
:: (PrimMonad m, V.Vector v a)
=> Growing v (PrimState m) a
-> a
-> m (Growing v (PrimState m) a)
append (Growing len v) x
| len < MV.length v = do
MV.unsafeWrite v len x
Expand All @@ -70,3 +84,10 @@ append (Growing len v) x
MV.unsafeWrite v' len x
return $ Growing (len + 1) v'
{-# INLINE append #-}

-- | Replace the element at the given position. No bounds checks are performed.
unsafeWrite
:: (PrimMonad m, V.Vector v a)
=> Growing v (PrimState m) a -> Int -> a -> m ()
unsafeWrite (Growing len v) idx ele = MV.unsafeWrite v idx ele
{-# INLINE unsafeWrite #-}
16 changes: 16 additions & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ library kafka-protocol
build-depends:
, base >=4.11 && <5
, bytestring
, deepseq
, digest ^>=0.0.2.0
, hstream-common-base
, text
Expand Down Expand Up @@ -101,6 +102,21 @@ test-suite kafka-protocol-test
build-tool-depends: hspec-discover:hspec-discover >=2 && <3
ghc-options: -threaded -rtsopts -with-rtsopts=-N

benchmark kafka-protocol-bench-encoding
import: shared-properties
type: exitcode-stdio-1.0
main-is: Encoding.hs
hs-source-dirs: protocol/bench
build-depends:
, base
, bytestring
, criterion
, hstream-kafka:kafka-protocol
, vector

default-language: Haskell2010
ghc-options: -threaded -rtsopts -with-rtsopts=-N

library
import: shared-properties
exposed-modules:
Expand Down
32 changes: 20 additions & 12 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DefaultSignatures #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ViewPatterns #-}
Expand Down Expand Up @@ -64,6 +65,7 @@ module Kafka.Protocol.Encoding
, takeBytes
) where

import Control.DeepSeq (NFData)
import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
Expand Down Expand Up @@ -196,10 +198,10 @@ runPut = BL.toStrict . toLazyByteString . put
-- Extra Primitive Types

newtype VarInt32 = VarInt32 { unVarInt32 :: Int32 }
deriving newtype (Show, Num, Integral, Real, Enum, Ord, Eq, Bounded)
deriving newtype (Show, Num, Integral, Real, Enum, Ord, Eq, Bounded, NFData)

newtype VarInt64 = VarInt64 { unVarInt64 :: Int64 }
deriving newtype (Show, Num, Integral, Real, Enum, Ord, Eq, Bounded)
deriving newtype (Show, Num, Integral, Real, Enum, Ord, Eq, Bounded, NFData)

type NullableString = Maybe Text

Expand All @@ -225,7 +227,7 @@ data TaggedFields = EmptyTaggedFields

newtype KaArray a = KaArray
{ unKaArray :: Maybe (Vector a) }
deriving newtype (Show, Eq, Ord)
deriving newtype (Show, Eq, Ord, NFData)

instance Functor KaArray where
fmap f (KaArray xs) = KaArray $ fmap f <$> xs
Expand All @@ -238,20 +240,20 @@ instance Functor CompactKaArray where
fmap f (CompactKaArray xs) = CompactKaArray $ fmap f <$> xs

newtype RecordKey = RecordKey { unRecordKey :: Maybe ByteString }
deriving newtype (Show, Eq, Ord)
deriving newtype (Show, Eq, Ord, NFData)

newtype RecordValue = RecordValue { unRecordValue :: Maybe ByteString }
deriving newtype (Show, Eq, Ord)
deriving newtype (Show, Eq, Ord, NFData)

newtype RecordArray a = RecordArray { unRecordArray :: Vector a }
deriving newtype (Show, Eq, Ord)
deriving newtype (Show, Eq, Ord, NFData)

newtype RecordHeaderKey = RecordHeaderKey { unRecordHeaderKey :: Text }
deriving newtype (Show, Eq, Ord, IsString, Monoid, Semigroup)
deriving newtype (Show, Eq, Ord, IsString, Monoid, Semigroup, NFData)

newtype RecordHeaderValue = RecordHeaderValue
{ unRecordHeaderValue :: Maybe ByteString }
deriving newtype (Show, Eq, Ord)
deriving newtype (Show, Eq, Ord, NFData)

-------------------------------------------------------------------------------
-- Instances
Expand Down Expand Up @@ -353,7 +355,9 @@ data BatchRecord
= BatchRecordV0 RecordV0
| BatchRecordV1 RecordV1
| BatchRecordV2 RecordBatch
deriving (Show, Eq)
deriving (Show, Eq, Generic)

instance NFData BatchRecord

decodeBatchRecords :: Bool -> ByteString -> IO (Vector BatchRecord)
decodeBatchRecords shouldValidateCrc batchBs =
Expand Down Expand Up @@ -535,6 +539,7 @@ data RecordV0 = RecordV0
} deriving (Generic, Show, Eq)

instance Serializable RecordV0
instance NFData RecordV0

minRecordSizeV0 :: Int
minRecordSizeV0 =
Expand All @@ -552,6 +557,7 @@ data RecordV1 = RecordV1
} deriving (Generic, Show, Eq)

instance Serializable RecordV1
instance NFData RecordV1

minRecordSizeV1 :: Int
minRecordSizeV1 =
Expand Down Expand Up @@ -594,6 +600,7 @@ data RecordV2 = RecordV2
} deriving (Generic, Show, Eq)

instance Serializable RecordV2
instance NFData RecordV2

data RecordBatch = RecordBatch
{ baseOffset :: {-# UNPACK #-} !Int64
Expand All @@ -612,6 +619,7 @@ data RecordBatch = RecordBatch
} deriving (Generic, Show, Eq)

instance Serializable RecordBatch
instance NFData RecordBatch

-------------------------------------------------------------------------------
-- Misc
Expand Down Expand Up @@ -687,9 +695,9 @@ putCompactArray Nothing = putVarWord32 0
getRecordArray :: Serializable a => Parser (Vector a)
getRecordArray = do
!n <- fromIntegral <$> getVarInt32
if n >= 0
then V.replicateM n get
else fail $! "Length of RecordArray must not be negative " <> show n
if | n > 0 -> V.replicateM n get
| n == 0 -> pure V.empty
| otherwise -> fail $! "Length of RecordArray must not be negative " <> show n

putRecordArray :: Serializable a => Vector a -> Builder
putRecordArray xs =
Expand Down
6 changes: 4 additions & 2 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ module Kafka.Protocol.Encoding.Parser
, getRecordString
-- * Internals
, takeBytes
, fromIO
) where

import Control.Monad
Expand Down Expand Up @@ -273,10 +274,11 @@ getCompactNullableBytes = do
-- ref: https://kafka.apache.org/documentation/#record
getRecordNullableBytes :: Parser (Maybe ByteString)
getRecordNullableBytes = do
n <- fromIntegral <$> getVarInt32
!n <- fromIntegral <$!> getVarInt32
if n >= 0
then Just <$> takeBytes n
else pure Nothing
{-# INLINE getRecordNullableBytes #-}

-- | Record header key
--
Expand Down Expand Up @@ -341,7 +343,6 @@ instance (Show r) => Show (Result r) where
show (Fail _ err) = "Fail " <> err
show (More _) = "More"


instance Functor Result where
fmap f (Done bs r) = Done bs (f r)
fmap f (More k) = More (fmap (fmap f) . k)
Expand All @@ -365,6 +366,7 @@ anyWord8 = Parser $ \bs next ->

-- | Take the specified number of bytes
takeBytes :: Int -> Parser ByteString
takeBytes 0 = pure ""
takeBytes n = Parser $ \bs next ->
let len = BS.length bs
in if len >= n
Expand Down
30 changes: 30 additions & 0 deletions hstream-kafka/protocol/bench/Encoding.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}

module Main where

import Criterion.Main
import qualified Data.ByteString as BS
import Data.Vector (Vector)
import qualified Data.Vector as V
import qualified Kafka.Protocol.Encoding as K

main :: IO ()
main = do
-- Use ./gen_data.py to generate the data file first
!batchBs1K1 <- BS.readFile "/tmp/records_1k_1.data"
!batchBs1K100 <- BS.readFile "/tmp/records_1k_100.data"
!batchBs1K1000 <- BS.readFile "/tmp/records_1k_1000.data"

defaultMain
[ bgroup "vector"
[ bench "pure empty" $ nfIO @(Vector Int) (pure V.empty)
, bench "replicateM 0" $ nfIO @(Vector Int) (V.replicateM 0 (pure 0))
]
, bgroup "decode records"
[ bench "1K*1" $ nfIO $ K.decodeBatchRecords' True batchBs1K1
, bench "1K*100" $ nfIO $ K.decodeBatchRecords' True batchBs1K100
, bench "1K*1000" $ nfIO $ K.decodeBatchRecords' True batchBs1K1000
]
]
34 changes: 34 additions & 0 deletions hstream-kafka/protocol/bench/gen_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from kafka.record import MemoryRecordsBuilder
import random


# magic: [0, 1, 2]
def encode_records(magic, key, value, n):
# compression_type: [0, 1, 2, 3]
builder = MemoryRecordsBuilder(
magic=magic, compression_type=0, batch_size=len(value) * n
)
for offset in range(n):
builder.append(timestamp=10000 + offset, key=key, value=value)
builder.close()
return builder.buffer()


def write_records(file_name, bs):
with open(file_name, "wb") as f:
f.write(bs)


if __name__ == "__main__":
write_records(
"/tmp/records_1k_1.data",
encode_records(2, None, random.randbytes(1024), 1),
)
write_records(
"/tmp/records_1k_100.data",
encode_records(2, None, random.randbytes(1024), 100),
)
write_records(
"/tmp/records_1k_1000.data",
encode_records(2, None, random.randbytes(1024), 1000),
)

0 comments on commit ae85d8a

Please sign in to comment.