Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cacheStore for hstream #1828

Merged
merged 36 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
125621c
Basic implementation of CacheStore
YangKian Jun 12, 2024
4677e03
refactor cache store
YangKian Jun 13, 2024
6c37ea2
implement health checker for hstore
YangKian Jun 17, 2024
48d0c20
refactor cacheStore: split read/write mode for cacheStore && add retr…
YangKian Jun 17, 2024
9c33981
fix cacheStore: make sure key is unique
YangKian Jun 19, 2024
448769e
code clean for hstore checker
YangKian Jun 19, 2024
a28aab2
implement health monitor
YangKian Jun 19, 2024
adcc752
handle lookup when meta cluster unavailable
YangKian Jun 19, 2024
1de7b0b
healthMonitor: update cabal file
YangKian Jun 19, 2024
23dc65d
add config
YangKian Jun 19, 2024
41c0df9
integration
YangKian Jun 19, 2024
d0b4009
cache store: fix encode key to make sure record write in order
YangKian Jun 26, 2024
f7c9df9
change ld_checker ffi to safe
YangKian Jun 26, 2024
b90ab56
every time server switch to backup mode, recreate column family itera…
YangKian Jun 26, 2024
73339b8
fix open db
YangKian Jun 26, 2024
71cfb2d
update
YangKian Jun 26, 2024
390b6a2
add more rocksdb options
YangKian Jun 26, 2024
f8f9c9f
bypass reader when server is in backup mode
YangKian Jun 26, 2024
98fbed0
log update
YangKian Jun 28, 2024
7e975b9
add HStreamEnableCacheStore flag
YangKian Jun 28, 2024
2ae14c6
Revert "add HStreamEnableCacheStore flag"
YangKian Jul 1, 2024
e924d36
add cli option to enable server cache
YangKian Jul 1, 2024
350bc1e
add cache store related stats
YangKian Jul 1, 2024
1945152
add healthy monitor related stats
YangKian Jul 2, 2024
35e0cb7
add per_cache_store_stats.inc
YangKian Jul 2, 2024
d9c4318
update stats handler
YangKian Jul 2, 2024
4e6a6a4
change writeRecord method to return an Either result
YangKian Jul 2, 2024
056c4be
fix stats
YangKian Jul 3, 2024
2c5e425
catch meta exception when get ConnectorStatsIsAlive stat
YangKian Jul 3, 2024
3df16e7
fix stream append stats
YangKian Jul 3, 2024
9dd6646
update cache store stats
YangKian Jul 3, 2024
b9e3173
update protocol
YangKian Jul 3, 2024
7f94aaf
disable unsupported rocksdb option
YangKian Jul 3, 2024
7e853c8
fix test
YangKian Jul 4, 2024
f5599c4
update
YangKian Jul 4, 2024
2f10a09
handle rocksdb exception in write path
YangKian Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions common/hstream/HStream/MetaStore/ZookeeperUtils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}

module HStream.MetaStore.ZookeeperUtils
where
module HStream.MetaStore.ZookeeperUtils where

import Control.Exception (catch, try)
import Control.Monad (void)
import Control.Monad (void, when)
import Data.Aeson (FromJSON, ToJSON)
import qualified Data.Aeson as Aeson
import qualified Data.ByteString as BS
Expand All @@ -16,10 +16,11 @@ import Z.Data.CBytes (CBytes)
import Z.Data.Vector (Bytes)
import qualified Z.Foreign as ZF
import ZooKeeper (zooCreate, zooDelete, zooGet,
zooGetChildren, zooSet)
zooGetChildren, zooSet, zooState)
import ZooKeeper.Exception
import ZooKeeper.Types (DataCompletion (..), StringVector (..),
StringsCompletion (..), ZHandle,
pattern ZooConnectedState,
pattern ZooPersistent, zooOpenAclUnsafe)

import qualified HStream.Logger as Log
Expand Down Expand Up @@ -81,3 +82,10 @@ createPath :: HasCallStack => ZHandle -> CBytes -> IO ()
createPath zk path = do
Log.debug . Log.buildString $ "create path " <> show path
void $ zooCreate zk path Nothing zooOpenAclUnsafe ZooPersistent

checkRecoverable :: ZHandle -> IO Bool
checkRecoverable zk = do
st <- zooState zk
when (st /= ZooConnectedState) $ do
Log.fatal $ "zk connect is unhealty, current state: " <> Log.build (show st)
return $ st == ZooConnectedState
4 changes: 3 additions & 1 deletion common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ module HStream.Common.Server.Lookup
) where

import Control.Concurrent.STM
import Control.Exception (SomeException (..), try)
import Control.Exception (SomeException (..), throwIO,
try)
import Data.List (find)
import Data.Text (Text)
import qualified Data.Vector as V
Expand All @@ -21,6 +22,7 @@ import HStream.Common.Server.HashRing (LoadBalanceHashRing,
readLoadBalanceHashRing)
import HStream.Common.Server.MetaData (TaskAllocation (..))
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
import HStream.Gossip (GossipContext, getMemberList)
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
Expand Down
48 changes: 42 additions & 6 deletions common/stats/HStream/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ module HStream.Stats
, CounterExports(connector, delivered_in_records)
, CounterExports(connector, delivered_in_bytes)

-- * PerCacheStoreStats
, cache_store_stat_erase
-- ** Counters
, cache_store_stat_getall
, CounterExports(cache_store, cs_append_total)
, CounterExports(cache_store, cs_append_failed)
, CounterExports(cache_store, cs_append_in_bytes)
, CounterExports(cache_store, cs_append_in_records)
, CounterExports(cache_store, cs_read_in_bytes)
, CounterExports(cache_store, cs_read_in_records)
, CounterExports(cache_store, cs_delivered_in_records)
, CounterExports(cache_store, cs_delivered_total)
, CounterExports(cache_store, cs_delivered_failed)

-- * PerQueryStats
, query_stat_erase
-- ** Counters
Expand Down Expand Up @@ -201,6 +215,8 @@ PER_X_STAT(stream_)
PER_X_STAT(subscription_)
-- connector_stat_getall, connector_stat_erase
PER_X_STAT(connector_)
-- cache_store_stat_getall, cache_store_stat_erase
PER_X_STAT(cache_store_)
-- query_stat_getall, query_stat_erase
PER_X_STAT(query_)
-- view_stat_getall, view_stat_erase
Expand Down Expand Up @@ -295,6 +311,14 @@ PER_X_STAT_GET(connector_stat_, name) \
PER_X_STAT_GETALL_SEP(connector_stat_, name)
#include "../include/per_connector_stats.inc"

-- cache_store
#define STAT_DEFINE(name, _) \
PER_X_STAT_ADD(cache_store_stat_, name) \
PER_X_STAT_SET(cache_store_stat_, name) \
PER_X_STAT_GET(cache_store_stat_, name) \
PER_X_STAT_GETALL_SEP(cache_store_stat_, name)
#include "../include/per_cache_store_stats.inc"

-- Query
#define STAT_DEFINE(name, _) \
PER_X_STAT_ADD(query_stat_, name) \
Expand Down Expand Up @@ -428,20 +452,32 @@ data ServerHistogramLabel
= SHL_AppendRequestLatency
| SHL_AppendLatency
| SHL_ReadLatency
| SHL_AppendCacheStoreLatency
| SHL_ReadCacheStoreLatency
| SHL_CheckStoreClusterLatency
| SHL_CheckMetaClusterLatency

packServerHistogramLabel :: ServerHistogramLabel -> CBytes
packServerHistogramLabel SHL_AppendRequestLatency = "append_request_latency"
packServerHistogramLabel SHL_AppendLatency = "append_latency"
packServerHistogramLabel SHL_ReadLatency = "read_latency"
packServerHistogramLabel SHL_AppendRequestLatency = "append_request_latency"
packServerHistogramLabel SHL_AppendLatency = "append_latency"
packServerHistogramLabel SHL_ReadLatency = "read_latency"
packServerHistogramLabel SHL_AppendCacheStoreLatency = "append_cache_store_latency"
packServerHistogramLabel SHL_ReadCacheStoreLatency = "read_cache_store_latency"
packServerHistogramLabel SHL_CheckStoreClusterLatency = "check_store_cluster_healthy_latency"
packServerHistogramLabel SHL_CheckMetaClusterLatency = "check_meta_cluster_healthy_latency"

instance Read ServerHistogramLabel where
readPrec = do
l <- Read.lexP
return $
case l of
Read.Ident "append_request_latency" -> SHL_AppendRequestLatency
Read.Ident "append_latency" -> SHL_AppendLatency
Read.Ident "read_latency" -> SHL_ReadLatency
Read.Ident "append_request_latency" -> SHL_AppendRequestLatency
Read.Ident "append_latency" -> SHL_AppendLatency
Read.Ident "read_latency" -> SHL_ReadLatency
Read.Ident "append_cache_store_latency" -> SHL_AppendCacheStoreLatency
Read.Ident "read_cache_store_latency" -> SHL_ReadCacheStoreLatency
Read.Ident "check_store_cluster_healthy_latency" -> SHL_CheckStoreClusterLatency
Read.Ident "check_meta_cluster_healthy_latency" -> SHL_CheckMetaClusterLatency
x -> errorWithoutStackTrace $ "cannot parse ServerHistogramLabel: " <> show x

serverHistogramAdd :: StatsHolder -> ServerHistogramLabel -> Int64 -> IO ()
Expand Down
5 changes: 5 additions & 0 deletions common/stats/HStream/Stats/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ PER_X_STAT(stream_)
PER_X_STAT(subscription_)
-- connector__stat_getall, connector_stat_erase
PER_X_STAT(connector_)
-- cache_store__stat_getall, cache_store_stat_erase
PER_X_STAT(cache_store_)
-- query_stat_getall, query_stat_erase
PER_X_STAT(query_)
-- view_stat_getall, view_stat_erase
Expand Down Expand Up @@ -105,6 +107,9 @@ foreign import ccall unsafe "hs_stats.h prefix##getall_##name" \
#define STAT_DEFINE(name, _) PER_X_STAT_DEFINE(subscription_stat_, name)
#include "../include/per_subscription_stats.inc"

#define STAT_DEFINE(name, _) PER_X_STAT_DEFINE(cache_store_stat_, name)
#include "../include/per_cache_store_stats.inc"

#define TIME_SERIES_DEFINE(name, _, __, ___) \
foreign import ccall unsafe "hs_stats.h stream_time_series_add_##name" \
stream_time_series_add_##name \
Expand Down
26 changes: 24 additions & 2 deletions common/stats/cbits/hs_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ void setPerConnectorStatsMember(const char* stat_name,
#include "per_connector_stats.inc"
}

void setPerCacheStoreStatsMember(
const char* stat_name, StatsCounter PerCacheStoreStats::*& member_ptr) {
#define STAT_DEFINE(name, _) \
if (#name == std::string(stat_name)) { \
member_ptr = &PerCacheStoreStats::name##_counter; \
}
#include "per_cache_store_stats.inc"
}

void setPerQueryStatsMember(const char* stat_name,
StatsCounter PerQueryStats::*& member_ptr) {
#define STAT_DEFINE(name, _) \
Expand Down Expand Up @@ -164,7 +173,19 @@ int stream_time_series_getall_by_name(
#include "per_connector_stats.inc"

// connector_stat_getall, connector_stat_erase
PER_X_STAT(connector_, PerConnectorStats, per_connector_stats, setPerConnectorStatsMember)
PER_X_STAT(connector_, PerConnectorStats, per_connector_stats,
setPerConnectorStatsMember)

// ----------------------------------------------------------------------------
// PerCacheStoreStats
#define STAT_DEFINE(name, _) \
PER_X_STAT_DEFINE(cache_store_stat_, per_cache_store_stats, \
PerCacheStoreStats, name)
#include "per_cache_store_stats.inc"

// cache_store_stat_getall, cache_store_stat_erase
PER_X_STAT(cache_store_, PerCacheStoreStats, per_cache_store_stats,
setPerCacheStoreStatsMember)

// ----------------------------------------------------------------------------
// PerQueryStats
Expand Down Expand Up @@ -199,7 +220,8 @@ PER_X_STAT(view_, PerViewStats, per_view_stats, setPerViewStatsMember)
#include "per_subscription_time_series.inc"

// subscription_stat_getall, subscription_stat_erase
PER_X_STAT(subscription_, PerSubscriptionStats, per_subscription_stats, setPerSubscriptionStatsMember)
PER_X_STAT(subscription_, PerSubscriptionStats, per_subscription_stats,
setPerSubscriptionStatsMember)

int subscription_time_series_get(StatsHolder* stats_holder,
const char* stat_name, const char* subs_name,
Expand Down
14 changes: 14 additions & 0 deletions common/stats/cbits/stats/ServerHistograms.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ struct ServerHistograms : public HistogramBundle {
{"append_request_latency", &append_request_latency},
{"append_latency", &append_latency},
{"read_latency", &read_latency},
{"append_cache_store_latency", &append_cache_store_latency},
{"read_cache_store_latency", &read_cache_store_latency},
{"check_store_cluster_healthy_latency",
&check_store_cluster_healthy_latency},
{"check_meta_cluster_healthy_latency",
&check_meta_cluster_healthy_latency},
};
}

Expand All @@ -23,6 +29,14 @@ struct ServerHistograms : public HistogramBundle {
LatencyHistogram append_latency;
// Latency of logdevice read
LatencyHistogram read_latency;
// Latency of cache store writes
LatencyHistogram append_cache_store_latency;
// Latency of cache store read
LatencyHistogram read_cache_store_latency;
// Latency of check store cluster healthy
LatencyHistogram check_store_cluster_healthy_latency;
// Latency of check meta cluster healthy
LatencyHistogram check_meta_cluster_healthy_latency;
};

}} // namespace hstream::common
88 changes: 54 additions & 34 deletions common/stats/cbits/stats/Stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ namespace hstream { namespace common {

static void aggregateStat(StatsAgg agg, StatsCounter& out, int64_t in) {
switch (agg) {
case StatsAgg::SUM:
out += in;
break;
case StatsAgg::MAX:
if (in > out) {
case StatsAgg::SUM:
out += in;
break;
case StatsAgg::MAX:
if (in > out) {
out = in;
}
break;
case StatsAgg::SUBTRACT:
out -= in;
break;
case StatsAgg::ASSIGN:
out = in;
}
break;
case StatsAgg::SUBTRACT:
out -= in;
break;
case StatsAgg::ASSIGN:
out = in;
break;
break;
}
}

Expand All @@ -37,19 +37,19 @@ static void aggregateHistogram(StatsAggOptional agg, H& out, const H& in) {
return;
}
switch (agg.value()) {
case StatsAgg::SUM:
out.merge(in);
break;
case StatsAgg::MAX:
// MAX doesn't make much sense for histograms. Let's just merge them.
out.merge(in);
break;
case StatsAgg::SUBTRACT:
out.subtract(in);
break;
case StatsAgg::ASSIGN:
out = in;
break;
case StatsAgg::SUM:
out.merge(in);
break;
case StatsAgg::MAX:
// MAX doesn't make much sense for histograms. Let's just merge them.
out.merge(in);
break;
case StatsAgg::SUBTRACT:
out.subtract(in);
break;
case StatsAgg::ASSIGN:
out = in;
break;
}
}

Expand Down Expand Up @@ -77,7 +77,7 @@ std::string PerStreamStats::toJson() {
}

void PerConnectorStats::aggregate(PerConnectorStats const& other,
StatsAggOptional agg_override) {
StatsAggOptional agg_override) {
#define STAT_DEFINE(name, agg) \
aggregateStat(StatsAgg::agg, agg_override, name##_counter, \
other.name##_counter);
Expand All @@ -97,6 +97,27 @@ std::string PerConnectorStats::toJson() {
return folly::toJson(this->toJsonObj());
}

void PerCacheStoreStats::aggregate(PerCacheStoreStats const& other,
StatsAggOptional agg_override) {
#define STAT_DEFINE(name, agg) \
aggregateStat(StatsAgg::agg, agg_override, name##_counter, \
other.name##_counter);
#include "per_cache_store_stats.inc"
}

folly::dynamic PerCacheStoreStats::toJsonObj() {
folly::dynamic map = folly::dynamic::object;
#define STAT_DEFINE(name, _) \
/* we know that all names are unique */ \
map[#name] = name##_counter.load();
#include "per_cache_store_stats.inc"
return map;
}

std::string PerCacheStoreStats::toJson() {
return folly::toJson(this->toJsonObj());
}

void PerQueryStats::aggregate(PerQueryStats const& other,
StatsAggOptional agg_override) {
#define STAT_DEFINE(name, agg) \
Expand All @@ -114,12 +135,10 @@ folly::dynamic PerQueryStats::toJsonObj() {
return map;
}

std::string PerQueryStats::toJson() {
return folly::toJson(this->toJsonObj());
}
std::string PerQueryStats::toJson() { return folly::toJson(this->toJsonObj()); }

void PerViewStats::aggregate(PerViewStats const& other,
StatsAggOptional agg_override) {
StatsAggOptional agg_override) {
#define STAT_DEFINE(name, agg) \
aggregateStat(StatsAgg::agg, agg_override, name##_counter, \
other.name##_counter);
Expand All @@ -135,9 +154,7 @@ folly::dynamic PerViewStats::toJsonObj() {
return map;
}

std::string PerViewStats::toJson() {
return folly::toJson(this->toJsonObj());
}
std::string PerViewStats::toJson() { return folly::toJson(this->toJsonObj()); }

void PerSubscriptionStats::aggregate(PerSubscriptionStats const& other,
StatsAggOptional agg_override) {
Expand Down Expand Up @@ -255,6 +272,7 @@ void Stats::aggregateCompoundStats(Stats const& other,
// while we aggregate.
_PER_STATS(per_stream_stats, PerStreamStats)
_PER_STATS(per_connector_stats, PerConnectorStats)
_PER_STATS(per_cache_store_stats, PerCacheStoreStats)
_PER_STATS(per_query_stats, PerQueryStats)
_PER_STATS(per_view_stats, PerViewStats)
_PER_STATS(per_subscription_stats, PerSubscriptionStats)
Expand All @@ -272,6 +290,7 @@ void Stats::deriveStats() {}
void Stats::reset() {
per_stream_stats.wlock()->clear();
per_connector_stats.wlock()->clear();
per_cache_store_stats.wlock()->clear();
per_query_stats.wlock()->clear();
per_view_stats.wlock()->clear();
per_subscription_stats.wlock()->clear();
Expand All @@ -298,6 +317,7 @@ folly::dynamic Stats::toJsonObj() {

_PER_TO_JSON(per_stream_stats)
_PER_TO_JSON(per_connector_stats)
_PER_TO_JSON(per_cache_store_stats)
_PER_TO_JSON(per_query_stats)
_PER_TO_JSON(per_view_stats)
_PER_TO_JSON(per_subscription_stats)
Expand Down
Loading
Loading