Skip to content

Commit

Permalink
some tweaks to prices.go (GetZECPrice, GetCurrentZECPrice)
Browse files Browse the repository at this point in the history
Simplified locking; there is no need for a read-write mutex, this code
is not performance-critical. Removed some of the go routines because
they're not needed and the locking and concurrency are easier to reason
about without them.

NOTE: there is some test code left in here, search for "XXX testing" and
remove before committing! This test code makes things happen faster:
Instead of fetching prices every 15 minutes, do it every minute. Also,
write historical data every minute, instead of once per day.

Another change needed is to remove some of the logging. It's good for
now during testing, but it's too much for production.
  • Loading branch information
Larry Ruane committed Jul 15, 2022
1 parent f25ab2e commit 466b862
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 151 deletions.
257 changes: 110 additions & 147 deletions common/prices.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import (
)

var (
// Map of all historical prices. Date as "yyyy-mm-dd" to price in cents
// Map of all historical prices. Date as "yyyy-mm-dd" to price in USD
historicalPrices map[string]float64 = make(map[string]float64)

// The latest price
latestPrice float64 = -1

// Latest price was fetched at
latestPriceAt time.Time
latestPriceTime time.Time

// Mutex to control both historical and latest price
pricesRwMutex sync.RWMutex
// Mutex to guard both historical and latest price
mutex sync.Mutex

// Full path of the persistence file
pricesFileName string
Expand Down Expand Up @@ -78,7 +78,6 @@ func fetchAPIPrice(url string, resultPath []string) (float64, error) {

func fetchCoinbasePrice() (float64, error) {
return fetchAPIPrice("https://api.coinbase.com/v2/exchange-rates?currency=ZEC", []string{"data", "rates", "USD"})

}

func fetchCoinCapPrice() (float64, error) {
Expand All @@ -89,98 +88,86 @@ func fetchBinancePrice() (float64, error) {
return fetchAPIPrice("https://api.binance.com/api/v3/avgPrice?symbol=ZECUSDC", []string{"price"})
}

func fetchHistoricalCoingeckoPrice(ts *time.Time) (float64, error) {
func fetchHistoricalCoingeckoPrice(ts time.Time) (float64, error) {
dt := ts.Format("02-01-2006") // dd-mm-yyyy
url := fmt.Sprintf("https://api.coingecko.com/api/v3/coins/zcash/history?date=%s?id=zcash", dt)
url := fmt.Sprintf("https://api.coingecko.com/api/v3/coins/zcash/history?date=%s", dt)

return fetchAPIPrice(url, []string{"market_data", "current_price", "usd"})
}

// Median gets the median number in a slice of numbers
func median(inp []float64) (median float64) {

// Start by sorting a copy of the slice
sort.Float64s(inp)

// No math is needed if there are no numbers
// calcMedian calculates the median of a sorted slice of numbers
func calcMedian(inp []float64) (median float64) {
// For even numbers we add the two middle numbers
// and divide by two using the mean function above
// For odd numbers we just use the middle number
l := len(inp)
if l == 0 {
return -1
} else if l%2 == 0 {
return (inp[l/2-1] + inp[l/2]) / 2
n := len(inp)
if n%2 == 0 {
return (inp[n/2-1] + inp[n/2]) / 2
} else {
return inp[l/2]
return inp[n/2]
}
}

// fetchPriceFromWebAPI will fetch prices from multiple places, discard outliers and return the
// concensus price
// concensus price. This function doesn't need the mutex.
func fetchPriceFromWebAPI() (float64, error) {
// We'll fetch prices from all our endpoints, and use the median price from that
priceProviders := []func() (float64, error){fetchBinancePrice, fetchCoinCapPrice, fetchCoinbasePrice}

ch := make(chan float64)

// Get all prices
for _, provider := range priceProviders {
go func(provider func() (float64, error)) {
price, err := provider()
if err != nil {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"provider": runtime.FuncForPC(reflect.ValueOf(provider).Pointer()).Name(),
"error": err,
}).Error("Service")

ch <- -1
} else {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"provider": runtime.FuncForPC(reflect.ValueOf(provider).Pointer()).Name(),
"price": price,
}).Info("Service")

ch <- price
}
}(provider)
}

prices := make([]float64, 0)
for range priceProviders {
price := <-ch
if price > 0 {
for _, provider := range priceProviders {
price, err := provider()
if err == nil {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"provider": runtime.FuncForPC(reflect.ValueOf(provider).Pointer()).Name(),
"price": price,
}).Info("Service")
prices = append(prices, price)
} else {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"provider": runtime.FuncForPC(reflect.ValueOf(provider).Pointer()).Name(),
"error": err,
}).Error("Service")
}
}

// sort
if len(prices) == 0 {
return -1, errors.New("no price providers are available")
}
sort.Float64s(prices)

// Get the median price
median1 := median(prices)
median := calcMedian(prices)

// Discard all values that are more than 20% outside the median
validPrices := make([]float64, 0)
for _, price := range prices {
if (math.Abs(price-median1) / median1) > 0.2 {
if (math.Abs(price-median) / median) > 0.2 {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"error": fmt.Sprintf("Discarding price (%.2f) because too far away from median (%.2f", price, median1),
"error": fmt.Sprintf("Discarding price (%.2f) because too far away from median (%.2f", price, median),
}).Error("Service")
} else {
validPrices = append(validPrices, price)
}
}

// At least 2 (valid) providers are required; we don't want to depend on just one
if len(validPrices) < 2 {
return -1, errors.New("insufficient price providers are available")
}

// If we discarded too many, return an error
if len(validPrices) < (len(prices)/2 + 1) {
return -1, errors.New("not enough valid prices")
} else {
return median(validPrices), nil
}
median = calcMedian(validPrices)
if median <= 0 {
return -1, errors.New("median price is <= 0")
}
return median, nil
}

func readHistoricalPricesFile() (map[string]float64, error) {
Expand Down Expand Up @@ -217,132 +204,98 @@ func writeHistoricalPricesMap() {
defer f.Close()

j := gob.NewEncoder(f)

{
// Read lock
pricesRwMutex.RLock()
err = j.Encode(historicalPrices)
pricesRwMutex.RUnlock()

if err != nil {
Log.Errorf("Couldn't encode historical prices: %v", err)
return
}
if err = j.Encode(historicalPrices); err != nil {
Log.Errorf("Couldn't encode historical prices: %v", err)
return
}

Log.WithFields(logrus.Fields{
"method": "HistoricalPrice",
"action": "Wrote historical prices file",
}).Info("Service")
}

// GetCurrentPrice is a top-level API, returns the latest price that we
// have fetched if no more than 3 hours old, else an error. An error
// should not occur unless we can't reach enough price oracles.
func GetCurrentPrice() (float64, error) {
// Read lock
pricesRwMutex.RLock()
defer pricesRwMutex.RUnlock()
mutex.Lock()
defer mutex.Unlock()

if latestPriceTime.IsZero() {
return -1, errors.New("starting up, prices not available yet")
}

// If the current price is too old, don't return it.
if time.Since(latestPriceAt).Hours() > 3 {
if time.Since(latestPriceTime).Hours() > 3 {
return -1, errors.New("price too old")
}

return latestPrice, nil
}

func writeLatestPrice(price float64) {
{
// Read lock
pricesRwMutex.RLock()

// Check if the time has "rolled over", if yes then preserve the last price
// as the previous day's historical price
if latestPrice > 0 && latestPriceAt.Format("2006-01-02") != time.Now().Format("2006-01-02") {
// update the historical price.
// First, make a copy of the time
t := time.Unix(latestPriceAt.Unix(), 0)

go addHistoricalPrice(latestPrice, &t)
}
pricesRwMutex.RUnlock()
}

// Write lock
pricesRwMutex.Lock()

latestPrice = price
latestPriceAt = time.Now()

pricesRwMutex.Unlock()
// return the time in YYYY-MM-DD string format
func day(t time.Time) string {
return t.Format("2006-01-02")
}

func GetHistoricalPrice(ts *time.Time) (float64, *time.Time, error) {
dt := ts.Format("2006-01-02")
// GetHistoricalPrice returns the price for the given day, but only
// accurate to day granularity.
func GetHistoricalPrice(ts time.Time) (float64, *time.Time, error) {
dt := day(ts)
canonicalTime, err := time.Parse("2006-01-02", dt)
if err != nil {
return -1, nil, err
}

{
// Read lock
pricesRwMutex.RLock()
val, ok := historicalPrices[dt]
pricesRwMutex.RUnlock()

if ok {
return val, &canonicalTime, nil
}
mutex.Lock()
defer mutex.Unlock()
if val, ok := historicalPrices[dt]; ok {
return val, &canonicalTime, nil
}

{
// Check if this is the same as the current latest price

// Read lock
pricesRwMutex.RLock()
var price = latestPrice
var returnPrice = price > 0 && latestPriceAt.Format("2006-01-02") == dt
pricesRwMutex.RUnlock()

if returnPrice {
return price, &canonicalTime, nil
}
// Check if this is the same as the current latest price
if latestPrice > 0 && day(latestPriceTime) == dt {
return latestPrice, &canonicalTime, nil
}

// Fetch price from web API
mutex.Unlock()
price, err := fetchHistoricalCoingeckoPrice(ts)
mutex.Lock()
if err != nil {
Log.Errorf("Couldn't read historical prices from Coingecko: %v", err)
return -1, nil, err
}

go addHistoricalPrice(price, ts)

return price, &canonicalTime, err
}

func addHistoricalPrice(price float64, ts *time.Time) {
if price <= 0 {
return
Log.Errorf("historical prices from Coingecko <= 0")
return -1, nil, errors.New("bad Coingecko result")
}
dt := ts.Format("2006-01-02")
// add to our cache so we don't have to hit Coingecko again
// for the same date
addHistoricalPrice(price, ts)

// Read Lock
pricesRwMutex.RLock()
_, ok := historicalPrices[dt]
pricesRwMutex.RUnlock()
return price, &canonicalTime, nil
}

if !ok {
// Write lock
pricesRwMutex.Lock()
// Add a price entry for the given day both to our map
// and to the file (so we'll have it after a restart).
// This caching allows us to hit coingecko less often,
// and provides resilience when that site is down.
//
// There are two ways a historical price can be added:
// - When a client calls GetZECPrice to get a past price
// - When a new day begins, we'll save the previous day's price
//
func addHistoricalPrice(price float64, ts time.Time) {
dt := day(ts)
if _, ok := historicalPrices[dt]; !ok {
// an entry for this day doesn't exist, add it
historicalPrices[dt] = price
defer pricesRwMutex.Unlock()

go Log.WithFields(logrus.Fields{
Log.WithFields(logrus.Fields{
"method": "HistoricalPrice",
"action": "Add",
"date": dt,
"price": price,
}).Info("Service")
go writeHistoricalPricesMap()
writeHistoricalPricesMap()
}
}

Expand All @@ -352,14 +305,14 @@ func StartPriceFetcher(dbPath string, chainName string) {
pricesFileName = filepath.Join(dbPath, chainName, "prices")

// Read the historical prices if available
mutex.Lock()
if prices, err := readHistoricalPricesFile(); err != nil {
Log.Errorf("Couldn't read historical prices, starting with empty map")
} else {
// Write lock
pricesRwMutex.Lock()
historicalPrices = prices
pricesRwMutex.Unlock()
Log.Infof("prices at start: %v", prices)
}
mutex.Unlock()

// Fetch the current price every 15 mins
go func() {
Expand All @@ -373,10 +326,20 @@ func StartPriceFetcher(dbPath string, chainName string) {
"price": price,
}).Info("Service")

writeLatestPrice(price)
mutex.Lock()
// If the day has changed, save the previous day's
// historical price. Historical prices are per-day.
if day(latestPriceTime) != day(time.Now()) {
if latestPrice > 0 {
t := time.Unix(latestPriceTime.Unix(), 0)
addHistoricalPrice(latestPrice, t)
}
}
latestPrice = price
latestPriceTime = time.Now()
mutex.Unlock()
}

// Refresh every
// price data 15 minutes out of date is probably okay
time.Sleep(15 * time.Minute)
}
}()
Expand Down
Loading

0 comments on commit 466b862

Please sign in to comment.