Skip to content

Commit

Permalink
add cacheStore for hstream (#1828)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Jul 5, 2024
1 parent d8e7c63 commit 5c59fc2
Show file tree
Hide file tree
Showing 32 changed files with 1,314 additions and 223 deletions.
16 changes: 12 additions & 4 deletions common/hstream/HStream/MetaStore/ZookeeperUtils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}

module HStream.MetaStore.ZookeeperUtils
where
module HStream.MetaStore.ZookeeperUtils where

import Control.Exception (catch, try)
import Control.Monad (void)
import Control.Monad (void, when)
import Data.Aeson (FromJSON, ToJSON)
import qualified Data.Aeson as Aeson
import qualified Data.ByteString as BS
Expand All @@ -16,10 +16,11 @@ import Z.Data.CBytes (CBytes)
import Z.Data.Vector (Bytes)
import qualified Z.Foreign as ZF
import ZooKeeper (zooCreate, zooDelete, zooGet,
zooGetChildren, zooSet)
zooGetChildren, zooSet, zooState)
import ZooKeeper.Exception
import ZooKeeper.Types (DataCompletion (..), StringVector (..),
StringsCompletion (..), ZHandle,
pattern ZooConnectedState,
pattern ZooPersistent, zooOpenAclUnsafe)

import qualified HStream.Logger as Log
Expand Down Expand Up @@ -81,3 +82,10 @@ createPath :: HasCallStack => ZHandle -> CBytes -> IO ()
createPath zk path = do
Log.debug . Log.buildString $ "create path " <> show path
void $ zooCreate zk path Nothing zooOpenAclUnsafe ZooPersistent

checkRecoverable :: ZHandle -> IO Bool
checkRecoverable zk = do
st <- zooState zk
when (st /= ZooConnectedState) $ do
Log.fatal $ "zk connect is unhealty, current state: " <> Log.build (show st)
return $ st == ZooConnectedState
4 changes: 3 additions & 1 deletion common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ module HStream.Common.Server.Lookup
) where

import Control.Concurrent.STM
import Control.Exception (SomeException (..), try)
import Control.Exception (SomeException (..), throwIO,
try)
import Data.List (find)
import Data.Text (Text)
import qualified Data.Vector as V
Expand All @@ -21,6 +22,7 @@ import HStream.Common.Server.HashRing (LoadBalanceHashRing,
readLoadBalanceHashRing)
import HStream.Common.Server.MetaData (TaskAllocation (..))
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
import HStream.Gossip (GossipContext, getMemberList)
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
Expand Down
48 changes: 42 additions & 6 deletions common/stats/HStream/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ module HStream.Stats
, CounterExports(connector, delivered_in_records)
, CounterExports(connector, delivered_in_bytes)

-- * PerCacheStoreStats
, cache_store_stat_erase
-- ** Counters
, cache_store_stat_getall
, CounterExports(cache_store, cs_append_total)
, CounterExports(cache_store, cs_append_failed)
, CounterExports(cache_store, cs_append_in_bytes)
, CounterExports(cache_store, cs_append_in_records)
, CounterExports(cache_store, cs_read_in_bytes)
, CounterExports(cache_store, cs_read_in_records)
, CounterExports(cache_store, cs_delivered_in_records)
, CounterExports(cache_store, cs_delivered_total)
, CounterExports(cache_store, cs_delivered_failed)

-- * PerQueryStats
, query_stat_erase
-- ** Counters
Expand Down Expand Up @@ -201,6 +215,8 @@ PER_X_STAT(stream_)
PER_X_STAT(subscription_)
-- connector_stat_getall, connector_stat_erase
PER_X_STAT(connector_)
-- cache_store_stat_getall, cache_store_stat_erase
PER_X_STAT(cache_store_)
-- query_stat_getall, query_stat_erase
PER_X_STAT(query_)
-- view_stat_getall, view_stat_erase
Expand Down Expand Up @@ -295,6 +311,14 @@ PER_X_STAT_GET(connector_stat_, name) \
PER_X_STAT_GETALL_SEP(connector_stat_, name)
#include "../include/per_connector_stats.inc"

-- cache_store
#define STAT_DEFINE(name, _) \
PER_X_STAT_ADD(cache_store_stat_, name) \
PER_X_STAT_SET(cache_store_stat_, name) \
PER_X_STAT_GET(cache_store_stat_, name) \
PER_X_STAT_GETALL_SEP(cache_store_stat_, name)
#include "../include/per_cache_store_stats.inc"

-- Query
#define STAT_DEFINE(name, _) \
PER_X_STAT_ADD(query_stat_, name) \
Expand Down Expand Up @@ -428,20 +452,32 @@ data ServerHistogramLabel
= SHL_AppendRequestLatency
| SHL_AppendLatency
| SHL_ReadLatency
| SHL_AppendCacheStoreLatency
| SHL_ReadCacheStoreLatency
| SHL_CheckStoreClusterLatency
| SHL_CheckMetaClusterLatency

packServerHistogramLabel :: ServerHistogramLabel -> CBytes
packServerHistogramLabel SHL_AppendRequestLatency = "append_request_latency"
packServerHistogramLabel SHL_AppendLatency = "append_latency"
packServerHistogramLabel SHL_ReadLatency = "read_latency"
packServerHistogramLabel SHL_AppendRequestLatency = "append_request_latency"
packServerHistogramLabel SHL_AppendLatency = "append_latency"
packServerHistogramLabel SHL_ReadLatency = "read_latency"
packServerHistogramLabel SHL_AppendCacheStoreLatency = "append_cache_store_latency"
packServerHistogramLabel SHL_ReadCacheStoreLatency = "read_cache_store_latency"
packServerHistogramLabel SHL_CheckStoreClusterLatency = "check_store_cluster_healthy_latency"
packServerHistogramLabel SHL_CheckMetaClusterLatency = "check_meta_cluster_healthy_latency"

instance Read ServerHistogramLabel where
readPrec = do
l <- Read.lexP
return $
case l of
Read.Ident "append_request_latency" -> SHL_AppendRequestLatency
Read.Ident "append_latency" -> SHL_AppendLatency
Read.Ident "read_latency" -> SHL_ReadLatency
Read.Ident "append_request_latency" -> SHL_AppendRequestLatency
Read.Ident "append_latency" -> SHL_AppendLatency
Read.Ident "read_latency" -> SHL_ReadLatency
Read.Ident "append_cache_store_latency" -> SHL_AppendCacheStoreLatency
Read.Ident "read_cache_store_latency" -> SHL_ReadCacheStoreLatency
Read.Ident "check_store_cluster_healthy_latency" -> SHL_CheckStoreClusterLatency
Read.Ident "check_meta_cluster_healthy_latency" -> SHL_CheckMetaClusterLatency
x -> errorWithoutStackTrace $ "cannot parse ServerHistogramLabel: " <> show x

serverHistogramAdd :: StatsHolder -> ServerHistogramLabel -> Int64 -> IO ()
Expand Down
5 changes: 5 additions & 0 deletions common/stats/HStream/Stats/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ PER_X_STAT(stream_)
PER_X_STAT(subscription_)
-- connector__stat_getall, connector_stat_erase
PER_X_STAT(connector_)
-- cache_store__stat_getall, cache_store_stat_erase
PER_X_STAT(cache_store_)
-- query_stat_getall, query_stat_erase
PER_X_STAT(query_)
-- view_stat_getall, view_stat_erase
Expand Down Expand Up @@ -105,6 +107,9 @@ foreign import ccall unsafe "hs_stats.h prefix##getall_##name" \
#define STAT_DEFINE(name, _) PER_X_STAT_DEFINE(subscription_stat_, name)
#include "../include/per_subscription_stats.inc"

#define STAT_DEFINE(name, _) PER_X_STAT_DEFINE(cache_store_stat_, name)
#include "../include/per_cache_store_stats.inc"

#define TIME_SERIES_DEFINE(name, _, __, ___) \
foreign import ccall unsafe "hs_stats.h stream_time_series_add_##name" \
stream_time_series_add_##name \
Expand Down
26 changes: 24 additions & 2 deletions common/stats/cbits/hs_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ void setPerConnectorStatsMember(const char* stat_name,
#include "per_connector_stats.inc"
}

void setPerCacheStoreStatsMember(
const char* stat_name, StatsCounter PerCacheStoreStats::*& member_ptr) {
#define STAT_DEFINE(name, _) \
if (#name == std::string(stat_name)) { \
member_ptr = &PerCacheStoreStats::name##_counter; \
}
#include "per_cache_store_stats.inc"
}

void setPerQueryStatsMember(const char* stat_name,
StatsCounter PerQueryStats::*& member_ptr) {
#define STAT_DEFINE(name, _) \
Expand Down Expand Up @@ -164,7 +173,19 @@ int stream_time_series_getall_by_name(
#include "per_connector_stats.inc"

// connector_stat_getall, connector_stat_erase
PER_X_STAT(connector_, PerConnectorStats, per_connector_stats, setPerConnectorStatsMember)
PER_X_STAT(connector_, PerConnectorStats, per_connector_stats,
setPerConnectorStatsMember)

// ----------------------------------------------------------------------------
// PerCacheStoreStats
#define STAT_DEFINE(name, _) \
PER_X_STAT_DEFINE(cache_store_stat_, per_cache_store_stats, \
PerCacheStoreStats, name)
#include "per_cache_store_stats.inc"

// cache_store_stat_getall, cache_store_stat_erase
PER_X_STAT(cache_store_, PerCacheStoreStats, per_cache_store_stats,
setPerCacheStoreStatsMember)

// ----------------------------------------------------------------------------
// PerQueryStats
Expand Down Expand Up @@ -199,7 +220,8 @@ PER_X_STAT(view_, PerViewStats, per_view_stats, setPerViewStatsMember)
#include "per_subscription_time_series.inc"

// subscription_stat_getall, subscription_stat_erase
PER_X_STAT(subscription_, PerSubscriptionStats, per_subscription_stats, setPerSubscriptionStatsMember)
PER_X_STAT(subscription_, PerSubscriptionStats, per_subscription_stats,
setPerSubscriptionStatsMember)

int subscription_time_series_get(StatsHolder* stats_holder,
const char* stat_name, const char* subs_name,
Expand Down
14 changes: 14 additions & 0 deletions common/stats/cbits/stats/ServerHistograms.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ struct ServerHistograms : public HistogramBundle {
{"append_request_latency", &append_request_latency},
{"append_latency", &append_latency},
{"read_latency", &read_latency},
{"append_cache_store_latency", &append_cache_store_latency},
{"read_cache_store_latency", &read_cache_store_latency},
{"check_store_cluster_healthy_latency",
&check_store_cluster_healthy_latency},
{"check_meta_cluster_healthy_latency",
&check_meta_cluster_healthy_latency},
};
}

Expand All @@ -23,6 +29,14 @@ struct ServerHistograms : public HistogramBundle {
LatencyHistogram append_latency;
// Latency of logdevice read
LatencyHistogram read_latency;
// Latency of cache store writes
LatencyHistogram append_cache_store_latency;
// Latency of cache store read
LatencyHistogram read_cache_store_latency;
// Latency of check store cluster healthy
LatencyHistogram check_store_cluster_healthy_latency;
// Latency of check meta cluster healthy
LatencyHistogram check_meta_cluster_healthy_latency;
};

}} // namespace hstream::common
88 changes: 54 additions & 34 deletions common/stats/cbits/stats/Stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ namespace hstream { namespace common {

static void aggregateStat(StatsAgg agg, StatsCounter& out, int64_t in) {
switch (agg) {
case StatsAgg::SUM:
out += in;
break;
case StatsAgg::MAX:
if (in > out) {
case StatsAgg::SUM:
out += in;
break;
case StatsAgg::MAX:
if (in > out) {
out = in;
}
break;
case StatsAgg::SUBTRACT:
out -= in;
break;
case StatsAgg::ASSIGN:
out = in;
}
break;
case StatsAgg::SUBTRACT:
out -= in;
break;
case StatsAgg::ASSIGN:
out = in;
break;
break;
}
}

Expand All @@ -37,19 +37,19 @@ static void aggregateHistogram(StatsAggOptional agg, H& out, const H& in) {
return;
}
switch (agg.value()) {
case StatsAgg::SUM:
out.merge(in);
break;
case StatsAgg::MAX:
// MAX doesn't make much sense for histograms. Let's just merge them.
out.merge(in);
break;
case StatsAgg::SUBTRACT:
out.subtract(in);
break;
case StatsAgg::ASSIGN:
out = in;
break;
case StatsAgg::SUM:
out.merge(in);
break;
case StatsAgg::MAX:
// MAX doesn't make much sense for histograms. Let's just merge them.
out.merge(in);
break;
case StatsAgg::SUBTRACT:
out.subtract(in);
break;
case StatsAgg::ASSIGN:
out = in;
break;
}
}

Expand Down Expand Up @@ -77,7 +77,7 @@ std::string PerStreamStats::toJson() {
}

void PerConnectorStats::aggregate(PerConnectorStats const& other,
StatsAggOptional agg_override) {
StatsAggOptional agg_override) {
#define STAT_DEFINE(name, agg) \
aggregateStat(StatsAgg::agg, agg_override, name##_counter, \
other.name##_counter);
Expand All @@ -97,6 +97,27 @@ std::string PerConnectorStats::toJson() {
return folly::toJson(this->toJsonObj());
}

void PerCacheStoreStats::aggregate(PerCacheStoreStats const& other,
StatsAggOptional agg_override) {
#define STAT_DEFINE(name, agg) \
aggregateStat(StatsAgg::agg, agg_override, name##_counter, \
other.name##_counter);
#include "per_cache_store_stats.inc"
}

folly::dynamic PerCacheStoreStats::toJsonObj() {
folly::dynamic map = folly::dynamic::object;
#define STAT_DEFINE(name, _) \
/* we know that all names are unique */ \
map[#name] = name##_counter.load();
#include "per_cache_store_stats.inc"
return map;
}

std::string PerCacheStoreStats::toJson() {
return folly::toJson(this->toJsonObj());
}

void PerQueryStats::aggregate(PerQueryStats const& other,
StatsAggOptional agg_override) {
#define STAT_DEFINE(name, agg) \
Expand All @@ -114,12 +135,10 @@ folly::dynamic PerQueryStats::toJsonObj() {
return map;
}

std::string PerQueryStats::toJson() {
return folly::toJson(this->toJsonObj());
}
std::string PerQueryStats::toJson() { return folly::toJson(this->toJsonObj()); }

void PerViewStats::aggregate(PerViewStats const& other,
StatsAggOptional agg_override) {
StatsAggOptional agg_override) {
#define STAT_DEFINE(name, agg) \
aggregateStat(StatsAgg::agg, agg_override, name##_counter, \
other.name##_counter);
Expand All @@ -135,9 +154,7 @@ folly::dynamic PerViewStats::toJsonObj() {
return map;
}

std::string PerViewStats::toJson() {
return folly::toJson(this->toJsonObj());
}
std::string PerViewStats::toJson() { return folly::toJson(this->toJsonObj()); }

void PerSubscriptionStats::aggregate(PerSubscriptionStats const& other,
StatsAggOptional agg_override) {
Expand Down Expand Up @@ -255,6 +272,7 @@ void Stats::aggregateCompoundStats(Stats const& other,
// while we aggregate.
_PER_STATS(per_stream_stats, PerStreamStats)
_PER_STATS(per_connector_stats, PerConnectorStats)
_PER_STATS(per_cache_store_stats, PerCacheStoreStats)
_PER_STATS(per_query_stats, PerQueryStats)
_PER_STATS(per_view_stats, PerViewStats)
_PER_STATS(per_subscription_stats, PerSubscriptionStats)
Expand All @@ -272,6 +290,7 @@ void Stats::deriveStats() {}
void Stats::reset() {
per_stream_stats.wlock()->clear();
per_connector_stats.wlock()->clear();
per_cache_store_stats.wlock()->clear();
per_query_stats.wlock()->clear();
per_view_stats.wlock()->clear();
per_subscription_stats.wlock()->clear();
Expand All @@ -298,6 +317,7 @@ folly::dynamic Stats::toJsonObj() {

_PER_TO_JSON(per_stream_stats)
_PER_TO_JSON(per_connector_stats)
_PER_TO_JSON(per_cache_store_stats)
_PER_TO_JSON(per_query_stats)
_PER_TO_JSON(per_view_stats)
_PER_TO_JSON(per_subscription_stats)
Expand Down
Loading

0 comments on commit 5c59fc2

Please sign in to comment.