From 5ee1aaba37704d702b676e0a34223ce53f030ab2 Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:31:59 -0800 Subject: [PATCH 1/6] feat: astroport provider --- config/supported_assets.go | 2 + oracle/oracle.go | 4 +- oracle/provider/astroport.go | 290 +++++++++++++++++++++++++++++++++++ 3 files changed, 295 insertions(+), 1 deletion(-) create mode 100644 oracle/provider/astroport.go diff --git a/config/supported_assets.go b/config/supported_assets.go index 5ac650da..2b15db4c 100644 --- a/config/supported_assets.go +++ b/config/supported_assets.go @@ -26,6 +26,7 @@ var ( provider.ProviderPolygon: true, provider.ProviderEthUniswap: false, provider.ProviderKujira: false, + provider.ProviderAstroport: false, provider.ProviderMock: false, } @@ -45,6 +46,7 @@ var ( {Base: "JUNO", Quote: "USDT"}: {}, {Base: "WETH", Quote: "USDC"}: {}, {Base: "WBTC", Quote: "WETH"}: {}, + {Base: "INJ", Quote: "USD"}: {}, } SupportedUniswapCurrencies = map[string]struct{}{ diff --git a/oracle/oracle.go b/oracle/oracle.go index a225a9c9..062c15e8 100644 --- a/oracle/oracle.go +++ b/oracle/oracle.go @@ -331,7 +331,6 @@ func (o *Oracle) GetComputedPrices( providerCandles types.AggregatedProviderCandles, providerPrices types.AggregatedProviderPrices, ) (types.CurrencyPairDec, error) { - conversionRates, err := CalcCurrencyPairRates( providerCandles, providerPrices, @@ -474,6 +473,9 @@ func NewProvider( case provider.ProviderEthUniswap: return provider.NewUniswapProvider(ctx, logger, endpoint, providerPairs...) + + case provider.ProviderAstroport: + return provider.NewAstroportProvider(ctx, logger, endpoint, providerPairs...) } return nil, fmt.Errorf("provider %s not found", providerName) diff --git a/oracle/provider/astroport.go b/oracle/provider/astroport.go new file mode 100644 index 00000000..2c57054e --- /dev/null +++ b/oracle/provider/astroport.go @@ -0,0 +1,290 @@ +package provider + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + "github.com/ojo-network/ojo/util/decmath" + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" +) + +var _ Provider = (*AstroportProvider)(nil) + +const ( + ProviderAstroport = "astroport" + restURL = "https://markets-api.astroport.fi" + tickersURL = "/markets/cg/tickers" + assetsURL = "/markets/cmc/v1/assets" + pollInterval = 3 * time.Second +) + +type ( + AstroportProvider struct { + logger zerolog.Logger + mtx sync.RWMutex + endpoints Endpoint + + client *http.Client + priceStore + } + + AstroportAssetResponse struct { + BaseID string `json:"base_id"` + BaseName string `json:"base_name"` + BaseSymbol string `json:"base_symbol"` + QuoteID string `json:"quote_id"` + QuoteName string `json:"quote_name"` + QuoteSymbol string `json:"quote_symbol"` + LastPrice float64 `json:"last_price"` + BaseVolume float64 `json:"base_volume"` + QuoteVolume float64 `json:"quote_volume"` + USDVolume float64 `json:"USD_volume"` + } + + AstroportTickersResponse struct { + TickerID string `json:"ticker_id"` + BaseCurrency string `json:"base_currency"` + TargetCurrency string `json:"target_currency"` + LastPrice float64 `json:"last_price"` + LiquidityInUSD float64 `json:"liquidity_in_usd"` + BaseVolume float64 `json:"base_volume"` + TargetVolume float64 `json:"target_volume"` + PoolID string `json:"pool_id"` + } + + AstroportTickerPairs struct { + ticker AstroportTickersResponse + pair types.CurrencyPair + } +) + +func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) { + lp, err := decmath.NewDecFromFloat(atr.LastPrice) + if err != nil { + return types.TickerPrice{}, err + } + volume, err := decmath.NewDecFromFloat(atr.BaseVolume) + if err != nil { + return types.TickerPrice{}, err + } + return types.TickerPrice{ + Price: lp, + Volume: volume, + }, nil +} + +func NewAstroportProvider( + ctx context.Context, + logger zerolog.Logger, + endpoints Endpoint, + pairs ...types.CurrencyPair, +) (*AstroportProvider, error) { + if (endpoints.Name) != ProviderAstroport { + endpoints = Endpoint{ + Name: ProviderAstroport, + Rest: restURL, + } + } + + astroLogger := logger.With().Str("provider", string(ProviderAstroport)).Logger() + + provider := &AstroportProvider{ + logger: astroLogger, + endpoints: endpoints, + priceStore: newPriceStore(astroLogger), + client: &http.Client{}, + } + + confirmedPairs, err := ConfirmPairAvailability( + provider, + provider.endpoints.Name, + provider.logger, + pairs..., + ) + if err != nil { + return nil, err + } + + go func() { + logger.Debug().Msg("starting ftx polling...") + err := provider.pollCache(ctx, pairs...) + if err != nil { + logger.Err(err).Msg("astroport provider unable to poll new data") + } + }() + + provider.setSubscribedPairs(confirmedPairs...) + + return provider, nil +} + +// GetTickerPrices returns the tickerPrices based on the provided pairs. +func (p AstroportProvider) setTickers() error { + tickers, err := p.queryTickers() + if err != nil { + return err + } + for _, v := range tickers { + p.setTickerPair(v.ticker, v.pair.String()) + } + return nil +} + +func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, error) { + queryingPairs := p.subscribedPairs + _, pairToTickerIDMap, err := p.getTickerMaps() + if err != nil { + return nil, err + } + + // map of ticker IDs -> pairs + tickerIDs := make(map[string]types.CurrencyPair, len(queryingPairs)) + for _, pair := range queryingPairs { + if tickerID, ok := pairToTickerIDMap[pair.String()]; ok { + tickerIDs[tickerID] = pair + } + } + return tickerIDs, nil +} + +func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[string]string, error) { + res, err := p.client.Get(p.endpoints.Rest + assetsURL) + if err != nil { + return nil, nil, err + } + defer res.Body.Close() + + bz, err := io.ReadAll(res.Body) + if err != nil { + return nil, nil, fmt.Errorf("failed to read response: %w", err) + } + + astroportAssets := []map[string]AstroportAssetResponse{} + if err := json.Unmarshal(bz, &astroportAssets); err != nil { + return nil, nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + availablePairs := map[string]types.CurrencyPair{} + for _, assetMap := range astroportAssets { + for tickerID, asset := range assetMap { + availablePairs[tickerID] = types.CurrencyPair{ + Base: strings.ToUpper(asset.BaseSymbol), + Quote: strings.ToUpper(asset.QuoteSymbol), + } + } + } + + pairToTickerID := map[string]string{} + for tickerID, pair := range availablePairs { + pairToTickerID[pair.String()] = tickerID + } + + return availablePairs, pairToTickerID, nil +} + +func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { + res, err := p.client.Get(p.endpoints.Rest + tickersURL) + if err != nil { + return nil, err + } + defer res.Body.Close() + + bz, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response: %w", err) + } + + astroportTickers := []AstroportTickersResponse{} + if err := json.Unmarshal(bz, &astroportTickers); err != nil { + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) + } + + tickerMap, err := p.findTickersForPairs() + if err != nil { + return nil, err + } + tickers := []AstroportTickerPairs{} + for tickerID, v := range tickerMap { + for _, ticker := range astroportTickers { + if ticker.TickerID == tickerID { + tickers = append(tickers, AstroportTickerPairs{ + ticker: ticker, + pair: v, + }) + } + } + } + + return tickers, nil +} + +// GetAvailablePairs return all available pairs symbol to subscribe. +func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { + availablePairs, _, err := p.getTickerMaps() + if err != nil { + return nil, err + } + + availableSymbols := map[string]struct{}{} + for _, pair := range availablePairs { + availableSymbols[pair.String()] = struct{}{} + } + + return availableSymbols, nil +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array +func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + newPairs := []types.CurrencyPair{} + for _, cp := range cps { + if _, ok := p.subscribedPairs[cp.String()]; !ok { + newPairs = append(newPairs, cp) + } + } + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + newPairs..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +// This function periodically calls setTickers to update the priceStore. +func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.CurrencyPair) error { + for { + select { + case <-ctx.Done(): + return nil + + default: + p.logger.Debug().Msg("querying astroport api") + + err := p.setTickers() + if err != nil { + return err + } + + time.Sleep(pollInterval) + } + } +} + +// StartConnections starts the websocket connections. +func (p AstroportProvider) StartConnections() {} From fc441d4ff9c6d9dceb73919bf43e67c15c4d0c3b Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Tue, 21 Nov 2023 18:43:36 -0800 Subject: [PATCH 2/6] cleanup --- oracle/provider/astroport.go | 147 +++++++++++++++++++---------------- 1 file changed, 81 insertions(+), 66 deletions(-) diff --git a/oracle/provider/astroport.go b/oracle/provider/astroport.go index 2c57054e..57e7ec36 100644 --- a/oracle/provider/astroport.go +++ b/oracle/provider/astroport.go @@ -35,6 +35,7 @@ type ( priceStore } + // AstroportAssetResponse is the response from the Astroport assets endpoint. AstroportAssetResponse struct { BaseID string `json:"base_id"` BaseName string `json:"base_name"` @@ -47,7 +48,7 @@ type ( QuoteVolume float64 `json:"quote_volume"` USDVolume float64 `json:"USD_volume"` } - + // AstroportTickersResponse is the response from the Astroport tickers endpoint. AstroportTickersResponse struct { TickerID string `json:"ticker_id"` BaseCurrency string `json:"base_currency"` @@ -58,28 +59,10 @@ type ( TargetVolume float64 `json:"target_volume"` PoolID string `json:"pool_id"` } - - AstroportTickerPairs struct { - ticker AstroportTickersResponse - pair types.CurrencyPair - } ) -func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) { - lp, err := decmath.NewDecFromFloat(atr.LastPrice) - if err != nil { - return types.TickerPrice{}, err - } - volume, err := decmath.NewDecFromFloat(atr.BaseVolume) - if err != nil { - return types.TickerPrice{}, err - } - return types.TickerPrice{ - Price: lp, - Volume: volume, - }, nil -} - +// NewAstroportProvider returns a new AstroportProvider. +// It also starts a go routine to poll for new data. func NewAstroportProvider( ctx context.Context, logger zerolog.Logger, @@ -125,7 +108,77 @@ func NewAstroportProvider( return provider, nil } -// GetTickerPrices returns the tickerPrices based on the provided pairs. +// GetAvailablePairs return all available pair symbols. +func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { + availablePairs, _, err := p.getTickerMaps() + if err != nil { + return nil, err + } + + availableSymbols := map[string]struct{}{} + for _, pair := range availablePairs { + availableSymbols[pair.String()] = struct{}{} + } + + return availableSymbols, nil +} + +// SubscribeCurrencyPairs sends the new subscription messages to the websocket +// and adds them to the providers subscribedPairs array. +func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + newPairs := []types.CurrencyPair{} + for _, cp := range cps { + if _, ok := p.subscribedPairs[cp.String()]; !ok { + newPairs = append(newPairs, cp) + } + } + + confirmedPairs, err := ConfirmPairAvailability( + p, + p.endpoints.Name, + p.logger, + newPairs..., + ) + if err != nil { + return + } + + p.setSubscribedPairs(confirmedPairs...) +} + +// StartConnections starts the websocket connections. +// This function is a no-op for the astroport provider. +func (p AstroportProvider) StartConnections() {} + +// AstroportTickerPairs is a struct to hold the AstroportTickersResponse and the +// corresponding pair. It satisfies the TickerPrice interface. +type AstroportTickerPairs struct { + ticker AstroportTickersResponse + pair types.CurrencyPair +} + +// toTickerPrice converts the AstroportTickerPairs to a TickerPrice. +// It satisfies the TickerPrice interface. +func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) { + lp, err := decmath.NewDecFromFloat(atr.LastPrice) + if err != nil { + return types.TickerPrice{}, err + } + volume, err := decmath.NewDecFromFloat(atr.BaseVolume) + if err != nil { + return types.TickerPrice{}, err + } + return types.TickerPrice{ + Price: lp, + Volume: volume, + }, nil +} + +// setTickers queries the Astroport API for the latest tickers and updates the +// priceStore. func (p AstroportProvider) setTickers() error { tickers, err := p.queryTickers() if err != nil { @@ -137,6 +190,8 @@ func (p AstroportProvider) setTickers() error { return nil } +// findTickersForPairs returns a map of ticker IDs -> pairs, but filters out +// pairs that we are not subscribed to. func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, error) { queryingPairs := p.subscribedPairs _, pairToTickerIDMap, err := p.getTickerMaps() @@ -154,6 +209,8 @@ func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, return tickerIDs, nil } +// getTickerMaps returns all available assets from the api. +// It returns a map of ticker IDs -> pairs and a map of pairs -> ticker IDs. func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[string]string, error) { res, err := p.client.Get(p.endpoints.Rest + assetsURL) if err != nil { @@ -189,6 +246,7 @@ func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[s return availablePairs, pairToTickerID, nil } +// queryTickers returns the AstroportTickerPairs available from the API. func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { res, err := p.client.Get(p.endpoints.Rest + tickersURL) if err != nil { @@ -210,6 +268,7 @@ func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { if err != nil { return nil, err } + // filter out tickers that we are not subscribed to tickers := []AstroportTickerPairs{} for tickerID, v := range tickerMap { for _, ticker := range astroportTickers { @@ -225,47 +284,6 @@ func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { return tickers, nil } -// GetAvailablePairs return all available pairs symbol to subscribe. -func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { - availablePairs, _, err := p.getTickerMaps() - if err != nil { - return nil, err - } - - availableSymbols := map[string]struct{}{} - for _, pair := range availablePairs { - availableSymbols[pair.String()] = struct{}{} - } - - return availableSymbols, nil -} - -// SubscribeCurrencyPairs sends the new subscription messages to the websocket -// and adds them to the providers subscribedPairs array -func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { - p.mtx.Lock() - defer p.mtx.Unlock() - - newPairs := []types.CurrencyPair{} - for _, cp := range cps { - if _, ok := p.subscribedPairs[cp.String()]; !ok { - newPairs = append(newPairs, cp) - } - } - - confirmedPairs, err := ConfirmPairAvailability( - p, - p.endpoints.Name, - p.logger, - newPairs..., - ) - if err != nil { - return - } - - p.setSubscribedPairs(confirmedPairs...) -} - // This function periodically calls setTickers to update the priceStore. func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.CurrencyPair) error { for { @@ -285,6 +303,3 @@ func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.Currenc } } } - -// StartConnections starts the websocket connections. -func (p AstroportProvider) StartConnections() {} From e3b8ba0408d419821ba8fcda73c6645848010d20 Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Tue, 21 Nov 2023 18:46:50 -0800 Subject: [PATCH 3/6] lint fixes --- oracle/provider/astroport.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/oracle/provider/astroport.go b/oracle/provider/astroport.go index 57e7ec36..d4d2c708 100644 --- a/oracle/provider/astroport.go +++ b/oracle/provider/astroport.go @@ -97,7 +97,7 @@ func NewAstroportProvider( go func() { logger.Debug().Msg("starting ftx polling...") - err := provider.pollCache(ctx, pairs...) + err := provider.poll(ctx) if err != nil { logger.Err(err).Msg("astroport provider unable to poll new data") } @@ -109,7 +109,7 @@ func NewAstroportProvider( } // GetAvailablePairs return all available pair symbols. -func (p AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { +func (p *AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { availablePairs, _, err := p.getTickerMaps() if err != nil { return nil, err @@ -151,7 +151,7 @@ func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { // StartConnections starts the websocket connections. // This function is a no-op for the astroport provider. -func (p AstroportProvider) StartConnections() {} +func (p *AstroportProvider) StartConnections() {} // AstroportTickerPairs is a struct to hold the AstroportTickersResponse and the // corresponding pair. It satisfies the TickerPrice interface. @@ -179,7 +179,7 @@ func (atr AstroportTickersResponse) toTickerPrice() (types.TickerPrice, error) { // setTickers queries the Astroport API for the latest tickers and updates the // priceStore. -func (p AstroportProvider) setTickers() error { +func (p *AstroportProvider) setTickers() error { tickers, err := p.queryTickers() if err != nil { return err @@ -192,7 +192,7 @@ func (p AstroportProvider) setTickers() error { // findTickersForPairs returns a map of ticker IDs -> pairs, but filters out // pairs that we are not subscribed to. -func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, error) { +func (p *AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, error) { queryingPairs := p.subscribedPairs _, pairToTickerIDMap, err := p.getTickerMaps() if err != nil { @@ -211,7 +211,7 @@ func (p AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, // getTickerMaps returns all available assets from the api. // It returns a map of ticker IDs -> pairs and a map of pairs -> ticker IDs. -func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[string]string, error) { +func (p *AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[string]string, error) { res, err := p.client.Get(p.endpoints.Rest + assetsURL) if err != nil { return nil, nil, err @@ -247,7 +247,7 @@ func (p AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[s } // queryTickers returns the AstroportTickerPairs available from the API. -func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { +func (p *AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { res, err := p.client.Get(p.endpoints.Rest + tickersURL) if err != nil { return nil, err @@ -285,7 +285,7 @@ func (p AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { } // This function periodically calls setTickers to update the priceStore. -func (p AstroportProvider) pollCache(ctx context.Context, pairs ...types.CurrencyPair) error { +func (p *AstroportProvider) poll(ctx context.Context) error { for { select { case <-ctx.Done(): From 75454f3d9c457a615d9f8ed1d28cccf9a4d2c41d Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Mon, 27 Nov 2023 13:32:02 -0800 Subject: [PATCH 4/6] cleanup, delete code --- oracle/provider/astroport.go | 50 ++++++++++-------------------------- 1 file changed, 13 insertions(+), 37 deletions(-) diff --git a/oracle/provider/astroport.go b/oracle/provider/astroport.go index d4d2c708..8b2e3913 100644 --- a/oracle/provider/astroport.go +++ b/oracle/provider/astroport.go @@ -96,7 +96,7 @@ func NewAstroportProvider( } go func() { - logger.Debug().Msg("starting ftx polling...") + logger.Debug().Msg("starting astroport polling...") err := provider.poll(ctx) if err != nil { logger.Err(err).Msg("astroport provider unable to poll new data") @@ -110,7 +110,7 @@ func NewAstroportProvider( // GetAvailablePairs return all available pair symbols. func (p *AstroportProvider) GetAvailablePairs() (map[string]struct{}, error) { - availablePairs, _, err := p.getTickerMaps() + availablePairs, err := p.getAvailableAssets() if err != nil { return nil, err } @@ -190,44 +190,26 @@ func (p *AstroportProvider) setTickers() error { return nil } -// findTickersForPairs returns a map of ticker IDs -> pairs, but filters out -// pairs that we are not subscribed to. -func (p *AstroportProvider) findTickersForPairs() (map[string]types.CurrencyPair, error) { - queryingPairs := p.subscribedPairs - _, pairToTickerIDMap, err := p.getTickerMaps() - if err != nil { - return nil, err - } - - // map of ticker IDs -> pairs - tickerIDs := make(map[string]types.CurrencyPair, len(queryingPairs)) - for _, pair := range queryingPairs { - if tickerID, ok := pairToTickerIDMap[pair.String()]; ok { - tickerIDs[tickerID] = pair - } - } - return tickerIDs, nil -} - -// getTickerMaps returns all available assets from the api. -// It returns a map of ticker IDs -> pairs and a map of pairs -> ticker IDs. -func (p *AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[string]string, error) { +// getAvailableAssets returns all available assets from the api. +// It returns a map of ticker IDs -> pairs. +func (p *AstroportProvider) getAvailableAssets() (map[string]types.CurrencyPair, error) { res, err := p.client.Get(p.endpoints.Rest + assetsURL) if err != nil { - return nil, nil, err + return nil, err } defer res.Body.Close() bz, err := io.ReadAll(res.Body) if err != nil { - return nil, nil, fmt.Errorf("failed to read response: %w", err) + return nil, fmt.Errorf("failed to read response: %w", err) } astroportAssets := []map[string]AstroportAssetResponse{} if err := json.Unmarshal(bz, &astroportAssets); err != nil { - return nil, nil, fmt.Errorf("failed to unmarshal response body: %w", err) + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) } + // convert the astroport assets to a map of ticker IDs -> pairs availablePairs := map[string]types.CurrencyPair{} for _, assetMap := range astroportAssets { for tickerID, asset := range assetMap { @@ -237,13 +219,7 @@ func (p *AstroportProvider) getTickerMaps() (map[string]types.CurrencyPair, map[ } } } - - pairToTickerID := map[string]string{} - for tickerID, pair := range availablePairs { - pairToTickerID[pair.String()] = tickerID - } - - return availablePairs, pairToTickerID, nil + return availablePairs, nil } // queryTickers returns the AstroportTickerPairs available from the API. @@ -264,13 +240,14 @@ func (p *AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { return nil, fmt.Errorf("failed to unmarshal response body: %w", err) } - tickerMap, err := p.findTickersForPairs() + availableAssets, err := p.getAvailableAssets() if err != nil { return nil, err } + // filter out tickers that we are not subscribed to tickers := []AstroportTickerPairs{} - for tickerID, v := range tickerMap { + for tickerID, v := range availableAssets { for _, ticker := range astroportTickers { if ticker.TickerID == tickerID { tickers = append(tickers, AstroportTickerPairs{ @@ -280,7 +257,6 @@ func (p *AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { } } } - return tickers, nil } From aa052044b36a626e71b5fc5226e76aac36d54f30 Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Mon, 27 Nov 2023 14:07:53 -0800 Subject: [PATCH 5/6] cleanup + add a basic test --- config/supported_assets.go | 3 ++- oracle/provider/astroport.go | 28 +++++++++++---------- oracle/provider/astroport_test.go | 41 +++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 14 deletions(-) create mode 100644 oracle/provider/astroport_test.go diff --git a/config/supported_assets.go b/config/supported_assets.go index 2b15db4c..160f83c3 100644 --- a/config/supported_assets.go +++ b/config/supported_assets.go @@ -41,12 +41,13 @@ var ( {Base: "ETH", Quote: "USD"}: {}, {Base: "ATOM", Quote: "USD"}: {}, {Base: "OSMO", Quote: "USD"}: {}, + {Base: "INJ", Quote: "USD"}: {}, {Base: "OSMO", Quote: "USDT"}: {}, {Base: "JUNO", Quote: "USDT"}: {}, {Base: "WETH", Quote: "USDC"}: {}, {Base: "WBTC", Quote: "WETH"}: {}, - {Base: "INJ", Quote: "USD"}: {}, + {Base: "INJ", Quote: "USDT"}: {}, } SupportedUniswapCurrencies = map[string]struct{}{ diff --git a/oracle/provider/astroport.go b/oracle/provider/astroport.go index 8b2e3913..c70138ea 100644 --- a/oracle/provider/astroport.go +++ b/oracle/provider/astroport.go @@ -33,6 +33,7 @@ type ( client *http.Client priceStore + ctx context.Context } // AstroportAssetResponse is the response from the Astroport assets endpoint. @@ -83,6 +84,7 @@ func NewAstroportProvider( endpoints: endpoints, priceStore: newPriceStore(astroLogger), client: &http.Client{}, + ctx: ctx, } confirmedPairs, err := ConfirmPairAvailability( @@ -95,14 +97,6 @@ func NewAstroportProvider( return nil, err } - go func() { - logger.Debug().Msg("starting astroport polling...") - err := provider.poll(ctx) - if err != nil { - logger.Err(err).Msg("astroport provider unable to poll new data") - } - }() - provider.setSubscribedPairs(confirmedPairs...) return provider, nil @@ -149,9 +143,17 @@ func (p *AstroportProvider) SubscribeCurrencyPairs(cps ...types.CurrencyPair) { p.setSubscribedPairs(confirmedPairs...) } -// StartConnections starts the websocket connections. -// This function is a no-op for the astroport provider. -func (p *AstroportProvider) StartConnections() {} +// StartConnections begins the polling process for +// the astroport provider. +func (p *AstroportProvider) StartConnections() { + go func() { + p.logger.Debug().Msg("starting astroport polling...") + err := p.poll() + if err != nil { + p.logger.Err(err).Msg("astroport provider unable to poll new data") + } + }() +} // AstroportTickerPairs is a struct to hold the AstroportTickersResponse and the // corresponding pair. It satisfies the TickerPrice interface. @@ -261,10 +263,10 @@ func (p *AstroportProvider) queryTickers() ([]AstroportTickerPairs, error) { } // This function periodically calls setTickers to update the priceStore. -func (p *AstroportProvider) poll(ctx context.Context) error { +func (p *AstroportProvider) poll() error { for { select { - case <-ctx.Done(): + case <-p.ctx.Done(): return nil default: diff --git a/oracle/provider/astroport_test.go b/oracle/provider/astroport_test.go new file mode 100644 index 00000000..27010067 --- /dev/null +++ b/oracle/provider/astroport_test.go @@ -0,0 +1,41 @@ +package provider + +import ( + "context" + "os" + "testing" + "time" + + "github.com/ojo-network/price-feeder/oracle/types" + "github.com/rs/zerolog" + "github.com/stretchr/testify/require" +) + +// TestAstroportProvider_GetTickers tests the polling process. +// TODO: Make this more comprehensive. +// +// Ref: https://github.com/ojo-network/price-feeder/issues/317 +func TestAstroportProvider_GetTickers(t *testing.T) { + ctx := context.Background() + pairs := []types.CurrencyPair{{ + Base: "STINJ", + Quote: "INJ", + }} + p, err := NewAstroportProvider( + ctx, + zerolog.New(os.Stdout).With().Timestamp().Logger(), + Endpoint{}, + pairs..., + ) + require.NoError(t, err) + availPairs, err := p.GetAvailablePairs() + require.NoError(t, err) + require.NotEmpty(t, availPairs) + + p.StartConnections() + time.Sleep(2 * time.Second) + + res, err := p.GetTickerPrices(pairs...) + require.NoError(t, err) + require.NotEmpty(t, res) +} From d7864a9d046869d11180e0dbe4c6a9bfcf236b60 Mon Sep 17 00:00:00 2001 From: Adam Wozniak <29418299+adamewozniak@users.noreply.github.com> Date: Mon, 27 Nov 2023 14:10:44 -0800 Subject: [PATCH 6/6] add INJ & stINJ to config --- ojo-provider-config/currency-pairs.toml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/ojo-provider-config/currency-pairs.toml b/ojo-provider-config/currency-pairs.toml index ef7a940a..982689e4 100644 --- a/ojo-provider-config/currency-pairs.toml +++ b/ojo-provider-config/currency-pairs.toml @@ -217,3 +217,19 @@ providers = [ "gate" ] quote = "USDT" + +[[currency_pairs]] +base = "INJ" +providers = [ + "gate", + "binance", + "mexc", +] +quote = "USDT" + +[[currency_pairs]] +base = "STINJ" +providers = [ + "astroport", +] +quote = "INJ"