Skip to content

Commit

Permalink
Merge pull request #5959 from mysteriumnetwork/add-quality-cache
Browse files Browse the repository at this point in the history
Add cache for quality oracle responses
  • Loading branch information
soffokl authored Feb 2, 2024
2 parents 9fc0299 + fe446f0 commit 7794f7c
Showing 1 changed file with 34 additions and 22 deletions.
56 changes: 34 additions & 22 deletions core/quality/mysterium_morqa.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"time"

gocache "github.com/patrickmn/go-cache"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -62,8 +63,7 @@ type MysteriumMORQA struct {
once sync.Once
stop chan struct{}

lastCallToQualityAt time.Time
lastMonitoringStatusResponse MonitoringStatusResponse
cache *gocache.Cache
}

type batchWithTimeout struct {
Expand All @@ -82,6 +82,8 @@ func NewMorqaClient(httpClient *requests.HTTPClient, baseURL string, signer iden
batch: make(map[string]*batchWithTimeout),
metrics: make(chan metric, 1000*maxBatchMetricsToKeep),
stop: make(chan struct{}),

cache: gocache.New(1*time.Minute, 10*time.Minute),
}

return morqa
Expand Down Expand Up @@ -239,10 +241,6 @@ func (m *MysteriumMORQA) sendMetrics(owner string) error {

// MonitoringStatus retrieve monitoring statuses.
func (m *MysteriumMORQA) MonitoringStatus(providerIds []string) MonitoringStatusResponse {
if m.lastCallToQualityAt.After(time.Now().Add(time.Minute * -1)) {
return m.lastMonitoringStatusResponse
}

payload := struct {
Providers []string `json:"providers"`
}{Providers: providerIds}
Expand All @@ -261,13 +259,11 @@ func (m *MysteriumMORQA) MonitoringStatus(providerIds []string) MonitoringStatus
defer response.Body.Close()

var r MonitoringStatusResponse
if err = parseResponseJSON(response, &r); err != nil {
if err = m.parseAndCacheResponse(request, response, time.Minute, &r); err != nil {
log.Warn().Err(err).Msg("Failed parsing response from POST: /providers/monitoring-status")
return nil
}

m.lastCallToQualityAt = time.Now()
m.lastMonitoringStatusResponse = r
return r
}

Expand All @@ -289,7 +285,7 @@ func (m *MysteriumMORQA) ProposalsQuality() []ProposalQuality {
defer response.Body.Close()

var qualityResponse []ProposalQuality
if err = parseResponseJSON(response, &qualityResponse); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &qualityResponse); err != nil {
log.Warn().Err(err).Msg("Failed to parse proposals quality")

return nil
Expand Down Expand Up @@ -318,7 +314,7 @@ func (m *MysteriumMORQA) ProviderSessions(providerID string) []ProviderSession {
var responseBody struct {
Connects []ProviderSession `json:"connects"`
}
if err = parseResponseJSON(response, &responseBody); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &responseBody); err != nil {
log.Warn().Err(err).Msg("Failed to parse proposals quality")

return nil
Expand All @@ -344,7 +340,7 @@ func (m *MysteriumMORQA) ProviderStatuses(providerID string) (node.MonitoringAge

var statuses node.MonitoringAgentStatuses

if err = parseResponseJSON(response, &statuses); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &statuses); err != nil {
log.Err(err).Msg("Failed to parse provider monitoring agent statuses")
return nil, err
}
Expand All @@ -367,7 +363,7 @@ func (m *MysteriumMORQA) ProviderSessionsList(id identity.Identity, rangeTime st

var sessions []node.SessionItem

if err = parseResponseJSON(response, &sessions); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &sessions); err != nil {
log.Err(err).Msg("Failed to parse provider monitoring sessions list")
return nil, err
}
Expand All @@ -389,7 +385,7 @@ func (m *MysteriumMORQA) ProviderTransferredData(id identity.Identity, rangeTime
}
defer response.Body.Close()

if err = parseResponseJSON(response, &data); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &data); err != nil {
log.Err(err).Msg("Failed to parse provider transferred data")
return data, err
}
Expand All @@ -411,7 +407,7 @@ func (m *MysteriumMORQA) ProviderSessionsCount(id identity.Identity, rangeTime s
}
defer response.Body.Close()

if err = parseResponseJSON(response, &count); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &count); err != nil {
log.Err(err).Msg("Failed to parse provider monitoring sessions count")
return count, err
}
Expand All @@ -433,7 +429,7 @@ func (m *MysteriumMORQA) ProviderConsumersCount(id identity.Identity, rangeTime
}
defer response.Body.Close()

if err = parseResponseJSON(response, &count); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &count); err != nil {
log.Err(err).Msg("Failed to parse provider monitoring consumers count")
return count, err
}
Expand All @@ -455,7 +451,7 @@ func (m *MysteriumMORQA) ProviderEarningsSeries(id identity.Identity, rangeTime
}
defer response.Body.Close()

if err = parseResponseJSON(response, &data); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &data); err != nil {
log.Err(err).Msg("Failed to parse provider series earnings")
return data, err
}
Expand All @@ -477,7 +473,7 @@ func (m *MysteriumMORQA) ProviderSessionsSeries(id identity.Identity, rangeTime
}
defer response.Body.Close()

if err = parseResponseJSON(response, &data); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &data); err != nil {
log.Err(err).Msg("Failed to parse provider series sessions")
return data, err
}
Expand All @@ -499,7 +495,7 @@ func (m *MysteriumMORQA) ProviderTransferredDataSeries(id identity.Identity, ran
}
defer response.Body.Close()

if err = parseResponseJSON(response, &data); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &data); err != nil {
log.Err(err).Msg("Failed to parse provider series data")
return data, err
}
Expand All @@ -521,7 +517,7 @@ func (m *MysteriumMORQA) ProviderServiceEarnings(id identity.Identity) (node.Ear
}
defer response.Body.Close()

if err = parseResponseJSON(response, &data); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &data); err != nil {
log.Err(err).Msg("Failed to parse service earnings")
return data, err
}
Expand All @@ -543,7 +539,7 @@ func (m *MysteriumMORQA) ProviderActivityStats(id identity.Identity) (node.Activ
}
defer response.Body.Close()

if err = parseResponseJSON(response, &stats); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &stats); err != nil {
log.Err(err).Msg("Failed to parse provider activity stats")
return stats, err
}
Expand All @@ -565,7 +561,7 @@ func (m *MysteriumMORQA) ProviderQuality(id identity.Identity) (node.QualityInfo
}
defer response.Body.Close()

if err = parseResponseJSON(response, &res); err != nil {
if err = m.parseAndCacheResponse(request, response, 10*time.Minute, &res); err != nil {
log.Err(err).Msg("Failed to parse provider quality")
return res, err
}
Expand Down Expand Up @@ -621,6 +617,22 @@ func (m *MysteriumMORQA) newRequestBinary(method, path string, payload proto.Mes
return request, err
}

func (m *MysteriumMORQA) parseAndCacheResponse(request *http.Request, response *http.Response, ttl time.Duration, dto interface{}) error {
if resp, ok := m.cache.Get(request.URL.RequestURI()); ok {
dto = resp
return nil
}

err := parseResponseJSON(response, dto)
if err != nil {
return err
}

m.cache.Set(request.URL.RequestURI(), dto, ttl)

return nil
}

func parseResponseJSON(response *http.Response, dto interface{}) error {
responseJSON, err := io.ReadAll(response.Body)
if err != nil {
Expand Down

0 comments on commit 7794f7c

Please sign in to comment.