diff --git a/cmd/exchange-scrapers/collector/go.mod b/cmd/exchange-scrapers/collector/go.mod index e5d50420f..0647803aa 100644 --- a/cmd/exchange-scrapers/collector/go.mod +++ b/cmd/exchange-scrapers/collector/go.mod @@ -3,7 +3,7 @@ module github.com/diadata-org/diadata/exchange-scrapers/collector go 1.19 require ( - github.com/diadata-org/diadata v1.4.524 + github.com/diadata-org/diadata v1.4.529 github.com/segmentio/kafka-go v0.4.35 github.com/sirupsen/logrus v1.9.3 ) diff --git a/config/Bitget.json b/config/Bitget.json new file mode 100644 index 000000000..dd85700da --- /dev/null +++ b/config/Bitget.json @@ -0,0 +1,70 @@ +{ + "Coins": [ + { + "Symbol": "ALPH", + "ForeignName": "ALPHUSDT", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "BTC", + "ForeignName": "BTCUSDT", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "DAI", + "ForeignName": "DAIUSDT", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "DOT", + "ForeignName": "DOTUSDT", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "DOT", + "ForeignName": "DOTUSDC", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "ETH", + "ForeignName": "ETHUSDT", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "SOL", + "ForeignName": "SOLUSDT", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "SYK", + "ForeignName": "SYKUSDT", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "USDC", + "ForeignName": "USDCEUR", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "USDT", + "ForeignName": "USDTEUR", + "Exchange": "Bitget", + "Ignore": false + }, + { + "Symbol": "XRP", + "ForeignName": "XRPUSDT", + "Exchange": "Bitget", + "Ignore": false + } + ] +} \ No newline at end of file diff --git a/config/gitcoinverified/Bitget.json b/config/gitcoinverified/Bitget.json new file mode 100644 index 000000000..e8c0ef3d1 --- /dev/null +++ b/config/gitcoinverified/Bitget.json @@ -0,0 +1,70 @@ +{ + "Tokens": [ + { + "Symbol": "ALPH", + "Exchange": "Bitget", + "Blockchain": "Alephium", + "Address": "tgx7VNFoP9DJiFMFgXXtafQZkUvyEdDHT9ryamHJYrjq" + }, + { + "Symbol": "BTC", + "Exchange": "Bitget", + "Blockchain": "Bitcoin", + "Address": "0x0000000000000000000000000000000000000000" + }, + { + "Symbol": "DAI", + "Exchange": "Bitget", + "Blockchain": "Ethereum", + "Address": "0x6B175474E89094C44Da98b954EedeAC495271d0F" + }, + { + "Symbol": "DOT", + "Exchange": "Bitget", + "Blockchain": "Polkadot", + "Address": "0x0000000000000000000000000000000000000000" + }, + { + "Symbol": "ETH", + "Exchange": "Bitget", + "Blockchain": "Ethereum", + "Address": "0x0000000000000000000000000000000000000000" + }, + { + "Symbol": "EUR", + "Exchange": "Bitget", + "Blockchain": "Fiat", + "Address": "978" + }, + { + "Symbol": "SOL", + "Exchange": "Bitget", + "Blockchain": "Solana", + "Address": "0x0000000000000000000000000000000000000000" + }, + { + "Symbol": "SYK", + "Exchange": "Bitget", + "Blockchain": "Arbitrum", + "Address": "0xACC51FFDeF63fB0c014c882267C3A17261A5eD50" + }, + { + "Symbol": "USDC", + "Exchange": "Bitget", + "Blockchain": "Ethereum", + "Address": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48" + }, + { + "Symbol": "USDT", + "Exchange": "Bitget", + "Blockchain": "Ethereum", + "Address": "0xdAC17F958D2ee523a2206206994597C13D831ec7" + }, + { + "Symbol": "XRP", + "Exchange": "Bitget", + "Blockchain": "Ripple", + "Address": "0x0000000000000000000000000000000000000000" + } + ] +} \ No newline at end of file diff --git a/pkg/dia/Config.go b/pkg/dia/Config.go index 5ed39ce5a..5ea761247 100644 --- a/pkg/dia/Config.go +++ b/pkg/dia/Config.go @@ -119,6 +119,7 @@ const ( ThenaExchange = "Thena" ThenaV3Exchange = "ThenaV3" AyinExchange = "Ayin" + BitgetExchange = "Bitget" // FinageForex = "FinageForex" ) diff --git a/pkg/dia/scraper/exchange-scrapers/APIScraper.go b/pkg/dia/scraper/exchange-scrapers/APIScraper.go index 08364bfe4..b4ab26f90 100644 --- a/pkg/dia/scraper/exchange-scrapers/APIScraper.go +++ b/pkg/dia/scraper/exchange-scrapers/APIScraper.go @@ -279,8 +279,10 @@ func NewAPIScraper(exchange string, scrape bool, key string, secret string, relD return NewUniswapScraper(Exchanges[dia.ThenaExchange], scrape, relDB) case dia.ThenaV3Exchange: return NewUniswapV3Scraper(Exchanges[dia.ThenaV3Exchange], scrape, relDB) - // case dia.FinageForex: - // return NewFinageForexScraper(Exchanges[dia.FinageForex], scrape, relDB, key, secret) + // case dia.FinageForex: + // return NewFinageForexScraper(Exchanges[dia.FinageForex], scrape, relDB, key, secret) + case dia.BitgetExchange: + return NewBitgetScraper(Exchanges[dia.BitgetExchange], scrape, relDB) case dia.MultiChain: return NewBridgeSwapScraper(Exchanges[dia.MultiChain], scrape, relDB) diff --git a/pkg/dia/scraper/exchange-scrapers/BitgetScraper.go b/pkg/dia/scraper/exchange-scrapers/BitgetScraper.go new file mode 100644 index 000000000..9a2456cd3 --- /dev/null +++ b/pkg/dia/scraper/exchange-scrapers/BitgetScraper.go @@ -0,0 +1,326 @@ +package scrapers + +import ( + "errors" + "strconv" + "strings" + "sync" + "time" + + "github.com/diadata-org/diadata/pkg/dia" + models "github.com/diadata-org/diadata/pkg/model" + ws "github.com/gorilla/websocket" + "github.com/zekroTJA/timedmap" +) + +const ( + bitgetWsAPI = "wss://ws.bitget.com/v2/ws/public" + bitgetPingInterval = 30 +) + +type bitgetSubscribeMessage struct { + Operation string `json:"op"` + Arguments []bitgetSubscribeArguments `json:"args"` +} + +type bitgetSubscribeArguments struct { + InstrumentType string `json:"instType"` + Channel string `json:"channel"` + InstrumentID string `json:"instId"` +} + +type bitgetWsResponse struct { + Action string `json:"action"` + Argument bitgetArgument `json:"arg"` + Data []bitgetData `json:"data"` + Timestamp int64 `json:"ts"` +} + +type bitgetArgument struct { + InstrumentType string `json:"instType"` + Channel string `json:"channel"` + InstrumentID string `json:"instId"` +} + +type bitgetData struct { + Timestamp string `json:"ts"` + Price string `json:"price"` + Volume string `json:"size"` + Side string `json:"side"` + ForeignTradeID string `json:"tradeId"` +} + +type BitgetScraper struct { + // signaling channels + shutdown chan nothing + shutdownDone chan nothing + // error handling; to read error or closed, first acquire read lock + // only cleanup method should hold write lock + errorLock sync.RWMutex + error error + closed bool + pairScrapers map[string]*BitgetPairScraper // pc.ExchangePair -> pairScraperSet + wsConn *ws.Conn + exchangeName string + chanTrades chan *dia.Trade + db *models.RelDB +} + +// NewBitgetScraper returns a new BitgetScraper initialized with default values. +// The instance is asynchronously scraping as soon as it is created. +func NewBitgetScraper(exchange dia.Exchange, scrape bool, relDB *models.RelDB) *BitgetScraper { + s := &BitgetScraper{ + shutdown: make(chan nothing), + shutdownDone: make(chan nothing), + pairScrapers: make(map[string]*BitgetPairScraper), + exchangeName: exchange.Name, + error: nil, + chanTrades: make(chan *dia.Trade), + db: relDB, + } + var wsDialer ws.Dialer + SwConn, _, err := wsDialer.Dial(bitgetWsAPI, nil) + if err != nil { + log.Errorf("Dial websocket api: %s", err.Error()) + } + + go s.pingRoutine(time.Duration(bitgetPingInterval * time.Second)) + + s.wsConn = SwConn + if scrape { + go s.mainLoop() + } + return s +} + +// mainLoop runs in a goroutine until channel s is closed. +func (s *BitgetScraper) mainLoop() { + + tmFalseDuplicateTrades := timedmap.New(duplicateTradesScanFrequency) + tmDuplicateTrades := timedmap.New(duplicateTradesScanFrequency) + time.Sleep(5 * time.Second) + + for { + + // Check if we get a pong message back. + _, p, err := s.wsConn.ReadMessage() + if err != nil { + log.Error("ReadMessage: ", err) + } else { + if strings.Contains(string(p), "pong") || strings.Contains(string(p), "ng") { + log.Infof("got %s", string(p)) + } + } + + var message bitgetWsResponse + if err = s.wsConn.ReadJSON(&message); err != nil { + log.Errorf("ReadJSON: %s", err.Error()) + log.Info("instead of pong got ", string(p)) + if strings.Contains(err.Error(), "invalid character") { + continue + } + return + } + + ps, ok := s.pairScrapers[message.Argument.InstrumentID] + if ok && message.Action != "snapshot" { + for _, data := range message.Data { + var f64Price float64 + var f64Volume float64 + var exchangepair dia.ExchangePair + f64Price, err = strconv.ParseFloat(data.Price, 64) + if err != nil { + log.Error("error parsing price " + data.Price) + } + f64Volume, err = strconv.ParseFloat(data.Volume, 64) + if err != nil { + log.Error("error parsing volume " + data.Volume) + } + + if data.Side != "buy" { + f64Volume = -f64Volume + } + + timestamp, err := strconv.ParseInt(data.Timestamp, 10, 64) + if err != nil { + log.Error("Parse timestamp: ", err) + } + + exchangepair, err = s.db.GetExchangePairCache(s.exchangeName, message.Argument.InstrumentID) + if err != nil { + // log.Error("get exchangepair from cache: ", err) + } + t := dia.Trade{ + Symbol: ps.pair.Symbol, + Pair: message.Argument.InstrumentID, + Price: f64Price, + Volume: f64Volume, + Time: time.Unix(0, timestamp*1e6), + ForeignTradeID: data.ForeignTradeID, + Source: s.exchangeName, + VerifiedPair: exchangepair.Verified, + BaseToken: exchangepair.UnderlyingPair.BaseToken, + QuoteToken: exchangepair.UnderlyingPair.QuoteToken, + } + if t.VerifiedPair { + log.Info("got verified trade: ", t) + } else { + log.Infof("got trade at %v : %s -- %v -- %v", t.Time, t.Pair, t.Price, t.Volume) + } + // Handle duplicate trades. + discardTrade := t.IdentifyDuplicateFull(tmFalseDuplicateTrades, duplicateTradesMemory) + if !discardTrade { + t.IdentifyDuplicateTagset(tmDuplicateTrades, duplicateTradesMemory) + ps.parent.chanTrades <- &t + } + } + + } + } + +} + +func (s *BitgetScraper) cleanup(err error) { + s.errorLock.Lock() + defer s.errorLock.Unlock() + if err != nil { + s.error = err + } + s.closed = true + close(s.shutdownDone) // signal that shutdown is complete +} + +// Close closes any existing API connections, as well as channels of +// PairScrapers from calls to ScrapePair +func (s *BitgetScraper) Close() error { + if s.closed { + return errors.New("BitgetScraper: Already closed") + } + err := s.wsConn.Close() + if err != nil { + log.Error(err) + } + close(s.shutdown) + <-s.shutdownDone + s.errorLock.RLock() + defer s.errorLock.RUnlock() + return s.error +} + +func (s *BitgetScraper) NormalizePair(pair dia.ExchangePair) (dia.ExchangePair, error) { + return pair, nil +} + +func (s *BitgetScraper) pingRoutine(d time.Duration) { + ticker := time.NewTicker(d) + for range ticker.C { + if err := s.wsConn.WriteMessage(ws.TextMessage, []byte("ping")); err != nil { + log.Errorf("send ping: %s.", err.Error()) + } else { + log.Info("sent ping.") + } + } +} + +// FetchAvailablePairs returns a list with all available trade pairs +func (s *BitgetScraper) FetchAvailablePairs() (pairs []dia.ExchangePair, err error) { + + // data, _, err := utils.GetRequest("https://api.pro.coinbase.com/products") + // if err != nil { + // return + // } + + // err = json.Unmarshal(data, &ar) + // if err == nil { + // for _, p := range ar { + // pairToNormalise := dia.ExchangePair{ + // Symbol: p.BaseCurrency, + // ForeignName: p.ID, + // Exchange: s.exchangeName, + // } + // pair, serr := s.NormalizePair(pairToNormalise) + // if serr == nil { + // pairs = append(pairs, pair) + // } else { + // log.Error(serr) + // } + // } + // } + return +} + +// FillSymbolData collects all available information on an asset traded on Bitget +func (s *BitgetScraper) FillSymbolData(symbol string) (asset dia.Asset, err error) { + asset.Symbol = symbol + return asset, nil +} + +// BitgetPairScraper implements PairScraper +type BitgetPairScraper struct { + parent *BitgetScraper + pair dia.ExchangePair + closed bool + lastRecord int64 +} + +// ScrapePair returns a PairScraper that can be used to get trades for a single pair from +// this APIScraper +func (s *BitgetScraper) ScrapePair(pair dia.ExchangePair) (PairScraper, error) { + + s.errorLock.RLock() + defer s.errorLock.RUnlock() + if s.error != nil { + return nil, s.error + } + if s.closed { + return nil, errors.New("BitgetScraper: Call ScrapePair on closed scraper") + } + ps := &BitgetPairScraper{ + parent: s, + pair: pair, + lastRecord: 0, + } + + s.pairScrapers[pair.ForeignName] = ps + + subscribeMessage := bitgetSubscribeMessage{ + Operation: "subscribe", + Arguments: []bitgetSubscribeArguments{ + { + InstrumentType: "SPOT", + Channel: "trade", + InstrumentID: pair.ForeignName, + }, + }, + } + if err := s.wsConn.WriteJSON(subscribeMessage); err != nil { + println(err.Error()) + } + log.Info("subscribed to: ", pair.ForeignName) + + return ps, nil +} + +// Channel returns a channel that can be used to receive trades/pricing information +func (ps *BitgetScraper) Channel() chan *dia.Trade { + return ps.chanTrades +} + +func (ps *BitgetPairScraper) Close() error { + ps.closed = true + return nil +} + +// Error returns an error when the channel Channel() is closed +// and nil otherwise +func (ps *BitgetPairScraper) Error() error { + s := ps.parent + s.errorLock.RLock() + defer s.errorLock.RUnlock() + return s.error +} + +// Pair returns the pair this scraper is subscribed to +func (ps *BitgetPairScraper) Pair() dia.ExchangePair { + return ps.pair +}