diff --git a/go.mod b/go.mod index 0ebe51af3..cc2a6b22f 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/preichenberger/go-coinbasepro/v2 v2.0.5 github.com/segmentio/kafka-go v0.4.35 github.com/shopspring/decimal v1.3.1 - github.com/sirupsen/logrus v1.8.1 + github.com/sirupsen/logrus v1.9.3 github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 github.com/streamingfast/solana-go v0.5.1-0.20220502224452-432fbe84aee8 github.com/stretchr/testify v1.8.1 diff --git a/go.sum b/go.sum index 609941795..6f5f807bf 100644 --- a/go.sum +++ b/go.sum @@ -1496,6 +1496,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE= github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa/go.mod h1:oJyF+mSPHbB5mVY2iO9KV3pTt/QbIkGaO8gQ2WrDbP4= @@ -2103,6 +2105,7 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/pkg/scraper-writers/text-writer.go b/internal/pkg/scraper-writers/text-writer.go deleted file mode 100644 index 9ef544208..000000000 --- a/internal/pkg/scraper-writers/text-writer.go +++ /dev/null @@ -1,41 +0,0 @@ -package writers - -import ( - "fmt" - "os" - "strings" - "time" - - log "github.com/sirupsen/logrus" -) - -// FileWriter - One implementation of the Writer interface. This one will write to txt files, and generate file names like yyyy-mm-dd-exchange-market.txt. -type FileWriter struct{} - -// GetWriteFileName - Will generate the file name for you of the format yyyy-mm-dd-exchange-market.txt. New files will be created at midnight because the scrapers are calling this method each time before writing to file. -func (f *FileWriter) GetWriteFileName(exchange string, market string) string { - now := time.Now() - return f.clean(fmt.Sprintf("%v-%v-%v-%v-%v.txt", now.Year(), int(now.Month()), now.Day(), exchange, market)) -} - -// Write - Will write to the filename the line. -func (f *FileWriter) Write(line string, filename string) (int, error) { - file, err := os.OpenFile(filename, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0666) //nolint:gosec - if err != nil { - return 0, err - } - - n, err := file.WriteString(line) - if err != nil { - cerr := file.Close() - if cerr != nil { - log.Error(cerr) - } - return n, err - } - return n, file.Close() -} - -func (f *FileWriter) clean(s string) string { - return strings.Replace(s, "/", "_", -1) -} diff --git a/internal/pkg/scraper-writers/types.go b/internal/pkg/scraper-writers/types.go deleted file mode 100644 index 0a66cd595..000000000 --- a/internal/pkg/scraper-writers/types.go +++ /dev/null @@ -1,8 +0,0 @@ -package writers - -// Writer is an interface that is responsible for generating dynamic file names (to create new files at midnight) and writing data to them -type Writer interface { - GetWriteFileName(exchange string, market string) string // will return the name of the file in which it will write - Write(line string, filename string) (int, error) // returns number of bytes written or error - // rationale for making a line a pointer - is because it can be very large. filename will always be small. -} diff --git a/pkg/dia/scraper/exchange-scrapers/APIDerivativesScraper.go b/pkg/dia/scraper/exchange-scrapers/APIDerivativesScraper.go deleted file mode 100644 index 6f13bcfe8..000000000 --- a/pkg/dia/scraper/exchange-scrapers/APIDerivativesScraper.go +++ /dev/null @@ -1,37 +0,0 @@ -package scrapers - -import ( - "sync" - - "github.com/diadata-org/diadata/pkg/model" - "github.com/gorilla/websocket" - zap "go.uber.org/zap" -) - -// DeribitScraperKind - used to distinguish between the futures and options scrapers -type DeribitScraperKind int - -const ( - // DeribitFuture - constant to signal the futures scraper - DeribitFuture DeribitScraperKind = iota + 1 - // DeribitOption - constant to signal the options scraper - DeribitOption -) - -// DeribitScraper - used in conjunction with the DeribitScraperKind in a new struct to define futures and options scrapers -type DeribitScraper struct { - Markets []string - WaitGroup *sync.WaitGroup - Logger *zap.SugaredLogger - DataStore *models.DB - WsConnection *websocket.Conn - - // required for deribit to: - // 1. authenticate (trades is a private channel) - // 2. referesh the token from step 1., so that the channel isn't closed - AccessKey string - AccessSecret string - - RefreshTokenEvery int16 // how often we refresh the token (in seconds) - MarketKind DeribitScraperKind -} diff --git a/pkg/dia/scraper/exchange-scrapers/APIFuturesScraper.go b/pkg/dia/scraper/exchange-scrapers/APIFuturesScraper.go deleted file mode 100644 index 592979473..000000000 --- a/pkg/dia/scraper/exchange-scrapers/APIFuturesScraper.go +++ /dev/null @@ -1,11 +0,0 @@ -package scrapers - -// FuturesScraper is an interface for all of the Futures Contracts scrapers -type FuturesScraper interface { - Scrape(market string) // a self-sustained goroutine that scrapes a single market - ScrapeMarkets() // will scrape the futures markets defined during instantiation of the scraper - ScraperClose(market string, websocketConnection interface{}) error - //Authenticate(market string, websocketConnection interface{}) error -} - -const retryIn uint8 = 5 // how long to wait in seconds before restarting a failed websocket diff --git a/pkg/dia/scraper/exchange-scrapers/APIOptionsScraper.go b/pkg/dia/scraper/exchange-scrapers/APIOptionsScraper.go deleted file mode 100644 index 0f061c7f7..000000000 --- a/pkg/dia/scraper/exchange-scrapers/APIOptionsScraper.go +++ /dev/null @@ -1,31 +0,0 @@ -package scrapers - -import ( - "time" -) - -// OptionsScraper is an interface for all of the Options Contracts scrapers -type OptionsScraper interface { - Scrape(market string) // a self-sustained goroutine that scrapes a single market - // ScrapeMarkets() // will scrape the options markets defined during instantiation of the scraper - ScraperClose(market string, websocketConnection interface{}) error - Authenticate(market string, websocketConnection interface{}) error -} - -// ComputedCVI is a struct representing our CVI value at a point in time -type ComputedCVI struct { - CVI float64 - CalculationTime time.Time -} - -// ComputedCVIs is the channel type that will communicate the cvis -type ComputedCVIs chan ComputedCVI - -// OptionSettlement - is an enum, signalling if the settlement is regular or weekly -type OptionSettlement int - -// OptionSettlement enums -const ( - RegularOptionSettlement OptionSettlement = iota + 1 - WeeklyOptionSettlement -) diff --git a/pkg/dia/scraper/exchange-scrapers/FuturesBitflyerScraper.go b/pkg/dia/scraper/exchange-scrapers/FuturesBitflyerScraper.go deleted file mode 100644 index d66a3e0f1..000000000 --- a/pkg/dia/scraper/exchange-scrapers/FuturesBitflyerScraper.go +++ /dev/null @@ -1,208 +0,0 @@ -package scrapers - -import ( - "fmt" - "github.com/diadata-org/diadata/internal/pkg/scraper-writers" - "net/url" - "os" - "os/signal" - "sync" - "syscall" - "time" - - zap "go.uber.org/zap" - - "github.com/gorilla/websocket" -) - -const scrapeDataSaveLocationBitflyer = "" - -// BitflyerScraper - use the NewBitflyerFuturesScraper function to create an instance -type BitflyerScraper struct { - Markets []string - WaitGroup *sync.WaitGroup - Writer writers.Writer - Logger *zap.SugaredLogger -} - -// NewBitflyerFuturesScraper - returns an instance of an options scraper. -func NewBitflyerFuturesScraper(markets []string) FuturesScraper { - wg := sync.WaitGroup{} - writer := writers.FileWriter{} - logger := zap.NewExample().Sugar() // or NewProduction, or NewDevelopment - defer func() { - err := logger.Sync() - if err != nil { - log.Error(err) - } - }() - - var scraper FuturesScraper = &BitflyerScraper{ - WaitGroup: &wg, - Markets: markets, - Writer: &writer, - Logger: logger, - } - - return scraper -} - -func (s *BitflyerScraper) send(message *map[string]interface{}, market string, websocketConn *websocket.Conn) error { - err := websocketConn.WriteJSON(*message) - if err != nil { - return err - } - s.Logger.Debugf("sent message [%s]: %s", market, message) - return nil -} - -// Authenticate - placeholder here, since we do not need to authneticate the connection. -func (s *BitflyerScraper) Authenticate(market string, connection interface{}) error { - return nil -} - -// ScraperClose - safely closes the scraper; We pass the interface connection as the second argument -// primarily for the reason that Huobi scraper does not use the gorilla websocket; It uses golang's x websocket; -// If we did not define this method in our FuturesScraper interface, we could have easily used the pointer -// to gorilla websocket here; However, to make FuturesScraper more ubiquituous, we need an interface here. -func (s *BitflyerScraper) ScraperClose(market string, connection interface{}) error { - switch c := connection.(type) { - case *websocket.Conn: - // unsubscribe from the channel - err := s.send(&map[string]interface{}{"jsonrpc": "2.0", "method": "unsubscribe", "params": &map[string]interface{}{"channel": "lightning_ticker_" + market}}, market, c) - if err != nil { - s.Logger.Errorf("could not send a channel unsubscription message, err: %s", err) - return err - } - // close the websocket connection - err = s.write(websocket.CloseMessage, []byte{}, c) - if err != nil { - return err - } - err = c.Close() - if err != nil { - return err - } - s.Logger.Infof("gracefully shutdown bitflyer scraper on market: %s", market) - time.Sleep(time.Duration(retryIn) * time.Second) - return nil - default: - return fmt.Errorf("unknown connection type: %T", connection) - } -} - -// Scrape starts a websocket scraper for market -func (s *BitflyerScraper) Scrape(market string) { - // this block is for listening to sigterms and interupts - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - userCancelled := make(chan bool, 1) - go func() { - sig := <-sigs - fmt.Println(sig) - userCancelled <- true - }() - - for { - // immediately invoked function expression for easy clenup with defer - func() { - u := url.URL{Scheme: "wss", Host: "ws.lightstream.bitflyer.com", Path: "/json-rpc"} - s.Logger.Debugf("connecting to [%s], market: [%s]", u.String(), market) - ws, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) - if err != nil { - s.Logger.Errorf("could not dial Bitflyer websocket: %s", err) - time.Sleep(time.Duration(retryIn) * time.Second) - return - } - defer func() { - err = resp.Body.Close() - if err != nil { - log.Error(err) - } - }() - defer func() { - err = ws.Close() - if err != nil { - log.Error(err) - } - }() - defer func() { - err = s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - }() - - ws.SetPongHandler(func(appData string) error { - s.Logger.Debugf("received a pong frame") - return nil - }) - err = s.send(&map[string]interface{}{"jsonrpc": "2.0", "method": "subscribe", "params": &map[string]interface{}{"channel": "lightning_ticker_" + market}}, market, ws) - if err != nil { - s.Logger.Errorf("could not send a channel subscription message. retrying, err: %s", err) - return - } - tick := time.NewTicker(15 * time.Second) - defer tick.Stop() - go func() { - for range tick.C { - err := s.write(websocket.PingMessage, []byte{}, ws) - if err != nil { - s.Logger.Errorf("error experienced pinging coinflex, err: %s", err) - return - } - s.Logger.Debugf("pinged the coinflex server. market: [%s]", market) - } - }() - for { - select { - case <-userCancelled: - s.Logger.Infof("received interrupt, gracefully shutting down") - err := s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - os.Exit(0) - default: - _, message, err := ws.ReadMessage() - if err != nil { - s.Logger.Errorf("repeated read error, restarting") - return - } - s.Logger.Debugf("received new message: %s, saving new message", message) - _, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationBitflyer+s.Writer.GetWriteFileName("Bitflyer", market)) - if err != nil { - s.Logger.Errorf("could not write to file, err: %s", err) - return - } - } - } - }() - } -} - -// write's primary purpose is to write a ping frame op code to keep the websocket connection alive -func (s *BitflyerScraper) write(mt int, payload []byte, ws *websocket.Conn) error { - err := ws.SetWriteDeadline(time.Now().Add(15 * time.Second)) - if err != nil { - return err - } - return ws.WriteMessage(mt, payload) -} - -// ScrapeMarkets - will scrape the markets specified during instantiation -func (s *BitflyerScraper) ScrapeMarkets() { - for _, market := range s.Markets { - s.WaitGroup.Add(1) - go s.Scrape(market) - } - s.WaitGroup.Wait() -} - -// usage example -// func main() { -// wg := sync.WaitGroup{} -// futuresBitflyer := scrapers.NewBitflyerFuturesScraper([]string{"BTCJPY27DEC2019", "BTCJPY03JAN2020", "BTCJPY27MAR2020"}) -// futuresBitflyer.ScrapeMarkets() -// wg.Wait() -// } diff --git a/pkg/dia/scraper/exchange-scrapers/FuturesBitmexScraper.go b/pkg/dia/scraper/exchange-scrapers/FuturesBitmexScraper.go deleted file mode 100644 index 76fc49917..000000000 --- a/pkg/dia/scraper/exchange-scrapers/FuturesBitmexScraper.go +++ /dev/null @@ -1,208 +0,0 @@ -package scrapers - -import ( - "fmt" - "github.com/diadata-org/diadata/internal/pkg/scraper-writers" - "net/url" - "os" - "os/signal" - "sync" - "syscall" - "time" - - zap "go.uber.org/zap" - - "github.com/gorilla/websocket" -) - -const scrapeDataSaveLocationBitmex = "" - -// BitmexScraper - use the NewBitmexFuturesScraper function to create an instance -type BitmexScraper struct { - Markets []string - WaitGroup *sync.WaitGroup - Writer writers.Writer - Logger *zap.SugaredLogger -} - -// NewBitmexFuturesScraper - returns an instance of an options scraper. -func NewBitmexFuturesScraper(markets []string) FuturesScraper { - wg := sync.WaitGroup{} - writer := writers.FileWriter{} - logger := zap.NewExample().Sugar() // or NewProduction, or NewDevelopment - defer func() { - err := logger.Sync() - if err != nil { - log.Error(err) - } - }() - - var scraper FuturesScraper = &BitmexScraper{ - WaitGroup: &wg, - Markets: markets, - Writer: &writer, - Logger: logger, - } - - return scraper -} - -func (s *BitmexScraper) send(message *map[string]interface{}, market string, websocketConn *websocket.Conn) error { - err := websocketConn.WriteJSON(*message) - if err != nil { - return err - } - s.Logger.Debugf("sent message [%s]: %s", market, message) - return nil -} - -// Authenticate - placeholder here, since we do not need to authneticate the connection. -func (s *BitmexScraper) Authenticate(market string, connection interface{}) error { - return nil -} - -// ScraperClose - safely closes the scraper; We pass the interface connection as the second argument -// primarily for the reason that Huobi scraper does not use the gorilla websocket; It uses golang's x websocket; -// If we did not define this method in our FuturesScraper interface, we could have easily used the pointer -// to gorilla websocket here; However, to make FuturesScraper more ubiquituous, we need an interface here. -func (s *BitmexScraper) ScraperClose(market string, connection interface{}) error { - switch c := connection.(type) { - case *websocket.Conn: - // unsubscribe from the channel - err := s.send(&map[string]interface{}{"op": "unsubscribe", "args": []string{"trade:" + market}}, market, c) - if err != nil { - s.Logger.Errorf("could not send a channel unsubscription message, err: %s", err) - return err - } - // close the websocket connection - err = s.write(websocket.CloseMessage, []byte{}, c) - if err != nil { - return err - } - err = c.Close() - if err != nil { - return err - } - s.Logger.Infof("gracefully shutdown bitmex scraper on market: %s", market) - time.Sleep(time.Duration(retryIn) * time.Second) - return nil - default: - return fmt.Errorf("unknown connection type: %T", connection) - } -} - -// Scrape starts a websocket scraper for market -func (s *BitmexScraper) Scrape(market string) { - // this block is for listening to sigterms and interupts - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - userCancelled := make(chan bool, 1) - go func() { - sig := <-sigs - fmt.Println(sig) - userCancelled <- true - }() - - for { - // immediately invoked function expression for easy clenup with defer - func() { - u := url.URL{Scheme: "wss", Host: "www.bitmex.com", Path: "/realtime"} - s.Logger.Debugf("connecting to [%s], market: [%s]", u.String(), market) - ws, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) - if err != nil { - s.Logger.Errorf("could not dial Bitmex websocket: %s", err) - time.Sleep(time.Duration(retryIn) * time.Second) - return - } - defer func() { - err = ws.Close() - if err != nil { - log.Error(err) - } - }() - defer func() { - err = resp.Body.Close() - if err != nil { - log.Error(err) - } - }() - defer func() { - err = s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - }() - - ws.SetPongHandler(func(appData string) error { - s.Logger.Debugf("received a pong frame") - return nil - }) - err = s.send(&map[string]interface{}{"op": "subscribe", "args": []string{"trade:" + market}}, market, ws) - if err != nil { - s.Logger.Errorf("could not send a channel subscription message. retrying, err: %s", err) - return - } - tick := time.NewTicker(15 * time.Second) - defer tick.Stop() - go func() { - for range tick.C { - err := s.write(websocket.PingMessage, []byte{}, ws) - if err != nil { - s.Logger.Errorf("error experienced pinging coinflex, err: %s", err) - return - } - s.Logger.Debugf("pinged the coinflex server. market: [%s]", market) - } - }() - for { - select { - case <-userCancelled: - s.Logger.Infof("received interrupt, gracefully shutting down") - err := s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - os.Exit(0) - default: - _, message, err := ws.ReadMessage() - if err != nil { - s.Logger.Errorf("repeated read error, restarting") - return - } - s.Logger.Debugf("received new message: %s, saving new message", message) - _, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationBitmex+s.Writer.GetWriteFileName("Bitmex", market)) - if err != nil { - s.Logger.Errorf("could not write to file, err: %s", err) - return - } - } - } - }() - } -} - -// write's primary purpose is to write a ping frame op code to keep the websocket connection alive -func (s *BitmexScraper) write(mt int, payload []byte, ws *websocket.Conn) error { - err := ws.SetWriteDeadline(time.Now().Add(15 * time.Second)) - if err != nil { - return err - } - return ws.WriteMessage(mt, payload) -} - -// ScrapeMarkets - will scrape the markets specified during instantiation -func (s *BitmexScraper) ScrapeMarkets() { - for _, market := range s.Markets { - s.WaitGroup.Add(1) - go s.Scrape(market) - } - s.WaitGroup.Wait() -} - -// usage example -// func main() { -// wg := sync.WaitGroup{} -// futuresBitmex := scrapers.NewBitmexFuturesScraper([]string{"XBTUSD", "XBTZ19", "XBTH20", "XBTM20", "ETHUSD", "ETHZ19", "ETHH20"}) -// futuresBitmex.ScrapeMarkets() -// wg.Wait() -// } diff --git a/pkg/dia/scraper/exchange-scrapers/FuturesCoinflexScraper.go b/pkg/dia/scraper/exchange-scrapers/FuturesCoinflexScraper.go deleted file mode 100644 index 63cc8a54b..000000000 --- a/pkg/dia/scraper/exchange-scrapers/FuturesCoinflexScraper.go +++ /dev/null @@ -1,357 +0,0 @@ -package scrapers - -import ( - "encoding/json" - "fmt" - "github.com/diadata-org/diadata/internal/pkg/scraper-writers" - "net/url" - "os" - "os/signal" - "strings" - "sync" - "syscall" - "time" - - utils "github.com/diadata-org/diadata/pkg/utils" - "github.com/gorilla/websocket" - zap "go.uber.org/zap" -) - -const scrapeDataSaveLocationCoinflex = "" - -// CoinflexFuturesScraper - scrapes the futures from the Coinflex exchange -type CoinflexFuturesScraper struct { - Markets []string - WaitGroup *sync.WaitGroup - Writer writers.Writer - Logger *zap.SugaredLogger -} - -// the response of https://webapi.coinflex.com/markets/ is a list of marketCoinglex JSON objects -// This is used to validate that the market that you have selected to scrape acutally exists. This is done -// validateMarket function. -type marketCoinflex struct { - Base int64 `json:"base"` - Counter int64 `json:"string"` - Name string `json:"name"` - SpotName string `json:"spot_name"` - Tick int64 `json:"tick"` - Start int64 `json:"start"` - Expires int64 `json:"expires"` - Tenor string `josn:"tenor"` -} - -// the response of https://webapi.coinflex.com/assets/ is a list of [] of assetCoinflex JSON objects -// This is used to find the int id of the assets (base and quote <- coinflex call quote Counter, go figure) -// that you supply via market. So you would pass something like BTCDEC-USDDEC as the market, and the XXX method -// will then (1) check that such a market exists using the validateMarket function, it will then split on "-" and -// will find the int ids of the two assets. It then uses these int ids to make a websocket request to subscribe to -// the trade channel. -type assetCoinflex struct { - ID int64 `json:"id"` - Name string `json:"name"` - SpotID int64 `json:"spot_id"` - SpotName string `json:"spot_name"` - Scale int64 `json:"scale"` -} - -// this is how the message looks like that we receive from the trades channel -// the OrderModified, OrderClosed, OrderOpen have different structures. Also, -// notice that the below is OrdersMatched (Order**s** <- plural, not Order like the others) -type ordersMatchedCoinflex struct { - Notice string `json:"notice"` - Bid int64 `json:"bid"` - BidTonce int64 `json:"bid_tonce"` - Ask int64 `json:"ask"` - AskTonce int64 `json:"ask_tonce"` - Base int64 `json:"base"` - Counter int64 `json:"counter"` - Quantity int64 `json:"quantity"` - Price int64 `json:"price"` - Total int64 `json:"total"` - BidRem int64 `json:"bid_rem"` // if this is 0, then it was a market sell order - AskRem int64 `json:"ask_rem"` // if this is 0, then it was a market buy order - Time int64 `json:"time"` - BidBaseFee int64 `json:"bid_base_fee"` - BidCounterFee int64 `json:"bid_counter_fee"` - AskBaseFee int64 `json:"ask_base_fee"` - AskCounterFee int64 `json:"ask_counter_fee"` -} - -// NewCoinflexFuturesScraper - returns an instance of the coinflex scraper -func NewCoinflexFuturesScraper(markets []string) FuturesScraper { - wg := sync.WaitGroup{} - writer := writers.FileWriter{} - logger := zap.NewExample().Sugar() // or NewProduction, or NewDevelopment - defer func() { - err := logger.Sync() - if err != nil { - log.Error(err) - } - }() - - var scraper FuturesScraper = &CoinflexFuturesScraper{ - WaitGroup: &wg, - Markets: markets, - Writer: &writer, - Logger: logger, - } - - return scraper -} - -func (s *CoinflexFuturesScraper) send(message *map[string]interface{}, market string, websocketConn *websocket.Conn) error { - err := websocketConn.WriteJSON(*message) - if err != nil { - return err - } - s.Logger.Debugf("sent message [%s]: %s", market, message) - return nil -} - -// Authenticate - placeholder here, since we do not need to authneticate the connection. -func (s *CoinflexFuturesScraper) Authenticate(market string, connection interface{}) error { - return nil -} - -// ScraperClose - safely closes the scraper; We pass the interface connection as the second argument -// primarily for the reason that Huobi scraper does not use the gorilla websocket; It uses golang's x websocket; -// If we did not define this method in our FuturesScraper interface, we could have easily used the pointer -// to gorilla websocket here; However, to make FuturesScraper more ubiquituous, we need an interface here. -func (s *CoinflexFuturesScraper) ScraperClose(market string, connection interface{}) error { - switch c := connection.(type) { - case *websocket.Conn: - err := s.write(websocket.CloseMessage, []byte{}, c) - if err != nil { - return err - } - err = c.Close() - if err != nil { - return err - } - s.Logger.Infof("gracefully shutdown coinflex scraper on market: %s", market) - time.Sleep(time.Duration(retryIn) * time.Second) - return nil - default: - return fmt.Errorf("unknown connection type: %T", connection) - } -} - -// Scrape starts a websocket scraper for market -func (s *CoinflexFuturesScraper) Scrape(market string) { - validated, err := s.validateMarket(market) - if !validated || err != nil { - s.Logger.Errorf("could not validate %s market", market) - if err != nil { - s.Logger.Errorf("issue with validating: %s", err) - } - return - } - baseID, quoteID, err := s.getBaseAndCounterID(market) - // splits the string market into the base and the counter and then finds the int id of them. - // coinflex expects that we provide an int for the assets when we make the websocket requests. - if err != nil { - s.Logger.Errorf("issue with getting an id for base and quote: %s", err) - return - } - - // this block is for listening to sigterms and interupts - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - userCancelled := make(chan bool, 1) - go func() { - sig := <-sigs - fmt.Println(sig) - userCancelled <- true - }() - - for { - // immediately invoked function expression for easy clenup with defer - func() { - u := url.URL{Scheme: "wss", Host: "api.coinflex.com", Path: "/v1"} - s.Logger.Debugf("connecting to [%s], market: [%s]", u.String(), market) - ws, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) - if err != nil { - s.Logger.Errorf("dial: %s", err) - time.Sleep(time.Duration(retryIn) * time.Second) - return - } - defer func() { - err = resp.Body.Close() - if err != nil { - log.Error(err) - } - }() - defer func() { - err = ws.Close() - if err != nil { - log.Error(err) - } - }() - defer func() { - err = s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - }() - // to let you know that the websocket connection is alive. Coinflex do not have the heartbeat channel - // and they send you frame pong messages. Thus, this handler. - ws.SetPongHandler(func(appData string) error { - s.Logger.Debugf("received a pong frame") - return nil - }) - err = s.send(&map[string]interface{}{"base": baseID, "counter": quoteID, "watch": true, "method": "WatchOrders"}, market, ws) - if err != nil { - s.Logger.Errorf("could not send a channel subscription message. retrying") - return - } - if err != nil { - s.Logger.Errorf("could not send an initial ping message. retrying") - return - } - tick := time.NewTicker(30 * time.Second) // every 45 seconds we have to ping Coinflex. we also have a 15 second write limit of the ping frame (thus, 30 seconds here) - defer tick.Stop() - // we require a separate goroutine for ticker, because ReadMessage is blocking - // and we may fail sending ping before we get any message on a market, thus - // forcing Coinflex to close our websocket out. - go func() { - for range tick.C { - err := s.write(websocket.PingMessage, []byte{}, ws) - if err != nil { - s.Logger.Errorf("error experienced pinging coinflex, err: %s", err) - return - } - s.Logger.Debugf("pinged the coinflex server. market: [%s]", market) - } - }() - for { - select { - case <-userCancelled: - s.Logger.Infof("received interrupt, gracefully shutting down") - err := s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - os.Exit(0) - default: - _, message, err := ws.ReadMessage() - msg := ordersMatchedCoinflex{} - if err != nil { - s.Logger.Errorf("problem reading coinflex on [%s], err: %s", market, err) - return - } - err = json.Unmarshal(message, &msg) - if err != nil { - s.Logger.Errorf("could not unmarshal coinflex message on [%s], err: %s", market, err) - return - } - s.Logger.Debugf("received a message: %s", message) - if msg.Notice == "OrdersMatched" { - s.Logger.Debugf("received new match message on [%s]: %s", market, message) - _, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market)) - if err != nil { - s.Logger.Errorf("could not save to file: %s, on market: [%s], err: %s", scrapeDataSaveLocationCoinflex+s.Writer.GetWriteFileName("coinflex", market), market, err) - return - } - } - } - } - }() - } -} - -// write's primary purpose is to write a ping frame op code to keep the websocket connection alive -func (s *CoinflexFuturesScraper) write(mt int, payload []byte, ws *websocket.Conn) error { - err := ws.SetWriteDeadline(time.Now().Add(15 * time.Second)) - if err != nil { - return err - } - return ws.WriteMessage(mt, payload) -} - -// ScrapeMarkets - will scrape the markets specified during instantiation -func (s *CoinflexFuturesScraper) ScrapeMarkets() { - for _, market := range s.Markets { - s.WaitGroup.Add(1) - go s.Scrape(market) - } - s.WaitGroup.Wait() -} - -func (s *CoinflexFuturesScraper) getBaseAndCounterID(market string) (int64, int64, error) { - assets := strings.Split(market, "/") - baseID := int64(0) - quoteID := int64(0) - base := assets[0] - quote := assets[1] // coinflex call this "counter" - baseID, err := s.assetID(base) - if err != nil { - return baseID, quoteID, err - } - quoteID, err = s.assetID(quote) - if err != nil { - return baseID, quoteID, err - } - return baseID, quoteID, nil -} - -// ensures that market available to trade -func (s *CoinflexFuturesScraper) validateMarket(market string) (bool, error) { - // should validate that there is an available market - marketAvailable := false - marketsCoinflex, err := s.availableMarketsCoinflex() - fmt.Printf("[DEBUG] all coinflex's available markets are: %v\n", marketsCoinflex) - if err != nil { - return marketAvailable, err - } - for _, availableMarket := range marketsCoinflex { - if availableMarket.Name == market { - marketAvailable = true - } - } - return marketAvailable, nil -} - -func (s *CoinflexFuturesScraper) availableMarketsCoinflex() ([]marketCoinflex, error) { - body, _, err := utils.GetRequest("https://webapi.coinflex.com/markets/") - if err != nil { - return []marketCoinflex{}, err - } - - markets := []marketCoinflex{} - err = json.Unmarshal(body, &markets) - if err != nil { - return []marketCoinflex{}, err - } - return markets, nil -} - -// uses /assets/ GET endpoint to obtain all the Coinflex's assets -func (s *CoinflexFuturesScraper) getAllAssets() ([]assetCoinflex, error) { - body, _, err := utils.GetRequest("https://webapi.coinflex.com/assets/") - if err != nil { - return nil, err - } - s.Logger.Debugf("retrieved all of the Coinflex assets: %s", string(body)) - assets := []assetCoinflex{} - err = json.Unmarshal(body, &assets) - if err != nil { - return nil, err - } - return assets, nil -} - -// gives you the id of the asset. Asset can be, not limited to, ETH, XBTJUL, BTCDEC, etc. -func (s *CoinflexFuturesScraper) assetID(asset string) (int64, error) { - var assetsID int64 = 0 - assets, err := s.getAllAssets() - if err != nil { - return assetsID, fmt.Errorf("could not retrieve all Coinflex's assets: %w", err) - } - for _, assetObj := range assets { - if assetObj.Name == asset { - assetsID = assetObj.ID - } - } - return assetsID, nil -} diff --git a/pkg/dia/scraper/exchange-scrapers/FuturesDeribitScraper.go b/pkg/dia/scraper/exchange-scrapers/FuturesDeribitScraper.go deleted file mode 100644 index 9844b47df..000000000 --- a/pkg/dia/scraper/exchange-scrapers/FuturesDeribitScraper.go +++ /dev/null @@ -1,189 +0,0 @@ -package scrapers - -import ( - "encoding/json" - "errors" - "fmt" - "time" - - utils "github.com/diadata-org/diadata/pkg/utils" - "github.com/gorilla/websocket" -) - -// const scrapeDataSaveLocationDeribit = "" -type deribitInstrument struct { - InstrumentName string `json:"instrument_name"` - Kind string `json:"kind"` - TickSize float64 `json:"tick_size"` - TakerCommission float64 `json:"taker_commission"` - Strike float64 `json:"strike"` - SettlementPeriod string `json:"settlement_period"` - QuoteCurrency string `json:"quote_currency"` - OptionType string `json:"option_type"` - MinTradeAmount float64 `json:"min_trade_amount"` - MakerCommission float64 `json:"maker_commission"` - IsActive bool `json:"is_active"` - ExpirationTimestamp int64 `json:"expiration_timestamp"` - CreationTimestamp int64 `json:"creation_timestamp"` - ContractSize float64 `json:"contract_size"` - BaseCurrency string `json:"base_currency"` -} - -type deribitInstruments struct { - Result []deribitInstrument `json:"result"` -} - -type ParsedDeribitResponse struct { - Jsonrpc string `json:"jsonrpc"` - Method string `json:"method"` - Params ParsedDeribitResponseParams `json:"params"` -} - -type ParsedDeribitResponseParams struct { - Channel string `json:"channel"` - Data ParsedDeribitOptionOrderbookEntry `json:"data"` -} - -type ParsedDeribitOptionOrderbookEntry struct { - Timestamp int64 `json:"timestamp"` - InstrumentName string `json:"instrument_name"` - ChangeId int64 `json:"change_id"` - Bids [][]float64 `json:"bids"` - Asks [][]float64 `json:"asks"` -} - -func (s *DeribitScraper) send(message *map[string]interface{}, websocketConn *websocket.Conn) error { - err := websocketConn.WriteJSON(*message) - if err != nil { - return err - } - return nil -} - -// ScraperClose - responsible for closing out the scraper for a market -func (s *DeribitScraper) ScraperClose(market string, websocketConnection interface{}) error { - switch c := websocketConnection.(type) { - case *websocket.Conn: - err := c.WriteJSON(map[string]string{"op": "unsubscribe", "channel": "trades", "market": market}) - if err != nil { - return err - } - log.Infof("gracefully shutdown deribit scraper on market: %s", market) - time.Sleep(time.Duration(retryIn) * time.Second) - return nil - default: - return fmt.Errorf("unknown connection type, expected gorilla/websocket, got: %T", c) - } -} - -// Scrape starts a websocket scraper for market -func (s *DeribitScraper) Scrape(market string) { - err := s.validateMarket(market, s.MarketKind) - if err != nil { - return - } - s.validateRefreshEveryToken() - - optionRequest := &map[string]interface{}{ - "method": "public/subscribe", - "params": &map[string]interface{}{ - "channels": []string{"book." + market + ".none.1.100ms"}, // will give us orderbook snapshots every 100 ms - }, - "jsonrpc": "2.0", - "id": 0, - } - futureRequest := &map[string]interface{}{ - "method": "private/subscribe", - "params": &map[string]interface{}{ - "channels": []string{"trades." + market + ".raw"}, - }, - "jsonrpc": "2.0", - "id": 0, - } - - switch s.MarketKind { - case DeribitFuture: - err = s.send(futureRequest, s.WsConnection) - case DeribitOption: - err = s.send(optionRequest, s.WsConnection) - default: - panic("unknown market kind") - } - - if err != nil { - log.Errorf("could not send ws message. restarting the websocket, err: %s", err) - return - } -} - -// ScrapeMarkets - will scrape the markets specified during instantiation -func (s *DeribitScraper) ScrapeMarkets() { - for _, market := range s.Markets { - s.WaitGroup.Add(1) - go s.Scrape(market) - } - s.WaitGroup.Wait() -} - -// marketKind can be "future" or "option" -func (s *DeribitScraper) validateMarket(market string, marketKind DeribitScraperKind) error { - allFuturesMarketsDeribit, err := allDeribitMarketsOfKind(marketKind) - if err != nil { - return err - } - containsMarket := utils.Contains(&allFuturesMarketsDeribit, market) - if !containsMarket { - return errors.New(market + " market is unavailable") - } - return nil -} - -func (s *DeribitScraper) validateRefreshEveryToken() { - if s.RefreshTokenEvery >= 900 { - panic("When you use https://testapp.deribit.com/api_console, you will see that upon a successful authentication, the response will include expiresIn field. Which is set at 900. This means that the token we generated is only valid for 900 seconds, and we have to refresh it before then.") - } -} - -func deribitMarkets(market string, marketKind DeribitScraperKind) ([]string, error) { - if market != "BTC" && market != "ETH" { - panic("unsupported market. only btc & eth are supported") - } - body, _, err := utils.GetRequest("https://www.deribit.com/api/v2/public/get_instruments?currency=" + market) - if err != nil { - return nil, err - } - - decodedMsg := deribitInstruments{} - err = json.Unmarshal(body, &decodedMsg) - if err != nil { - return nil, err - } - var allMarkets []string - for _, market := range decodedMsg.Result { - switch marketKind { - case DeribitFuture: - if market.Kind == "future" { - allMarkets = append(allMarkets, market.InstrumentName) - } - case DeribitOption: - if market.Kind == "option" { - allMarkets = append(allMarkets, market.InstrumentName) - } - default: - panic("unknown market kind") - } - } - return allMarkets, nil -} - -func allDeribitMarketsOfKind(marketKind DeribitScraperKind) ([]string, error) { - BTCMarkets, err := deribitMarkets("BTC", marketKind) - if err != nil { - return nil, fmt.Errorf("could not fetch btc futures markets: %w", err) - } - ETHMarkets, err := deribitMarkets("ETH", marketKind) - if err != nil { - return nil, fmt.Errorf("could not fetch eth futures markets: %w", err) - } - return append(BTCMarkets, ETHMarkets...), nil -} diff --git a/pkg/dia/scraper/exchange-scrapers/FuturesFTXScraper.go b/pkg/dia/scraper/exchange-scrapers/FuturesFTXScraper.go deleted file mode 100644 index 9ac9c5abd..000000000 --- a/pkg/dia/scraper/exchange-scrapers/FuturesFTXScraper.go +++ /dev/null @@ -1,214 +0,0 @@ -package scrapers - -import ( - "encoding/json" - "fmt" - "github.com/diadata-org/diadata/internal/pkg/scraper-writers" - "net/url" - "os" - "os/signal" - "sync" - "syscall" - "time" - - utils "github.com/diadata-org/diadata/pkg/utils" - "github.com/gorilla/websocket" - zap "go.uber.org/zap" -) - -var allFuturesMarketsFTX = []string{"BTC-PERP", "ETH-PERP", "LINK-PERP", - "EOS-PERP", "BNB-PERP", "BCH-PERP", "XRP-PERP", "BSV-PERP", "ALGO-PERP", - "DRGN-PERP", "HT-PERP", "LTC-PERP", "LEO-PERP", "SHIT-PERP", "TRX-PERP", - "USDT-PERP", "EXCH-PERP", "BTMX-PERP", "ALT-PERP", "ADA-PERP", "MID-PERP", - "OKB-PERP", "MATIC-PERP", "ATOM-PERP", "ETC-PERP", "TOMO-PERP", "DOGE-PERP"} - -const scrapeDataSaveLocationFTX = "" - -// FTXFuturesScraper - scrapes the futures from the FTX exchange -type FTXFuturesScraper struct { - Markets []string - WaitGroup *sync.WaitGroup - Writer writers.Writer - Logger *zap.SugaredLogger -} - -type tradeMessageFTX struct { - Type string `json:"type"` -} - -// NewFTXFuturesScraper - returns an instance of the FTX scraper -func NewFTXFuturesScraper(markets []string) FuturesScraper { - wg := sync.WaitGroup{} - writer := writers.FileWriter{} - logger := zap.NewExample().Sugar() // or NewProduction, or NewDevelopment - defer func() { - err := logger.Sync() - if err != nil { - log.Error(err) - } - }() - - var scraper FuturesScraper = &FTXFuturesScraper{ - WaitGroup: &wg, - Markets: markets, // []string{"BNB-PERP", "ETH-PERP", "BTC-PERP", "EOS-PERP"} - Writer: &writer, - Logger: logger, - } - - return scraper -} - -func (s *FTXFuturesScraper) send(message *map[string]string, market string, websocketConn *websocket.Conn) error { - err := websocketConn.WriteJSON(*message) - if err != nil { - return err - } - s.Logger.Debugf("sent message [%s]: %s", market, message) - return nil -} - -// Authenticate - placeholder here, since we do not need to authneticate the connection. -func (s *FTXFuturesScraper) Authenticate(market string, connection interface{}) error { return nil } - -// ScraperClose - safely closes the scraper; We pass the interface connection as the second argument -// primarily for the reason that Huobi scraper does not use the gorilla websocket; It uses golang's x websocket; -// If we did not define this method in our FuturesScraper interface, we could have easily used the pointer -// to gorilla websocket here; However, to make FuturesScraper more ubiquituous, we need an interface here. -func (s *FTXFuturesScraper) ScraperClose(market string, connection interface{}) error { - switch c := connection.(type) { - case *websocket.Conn: - err := c.WriteJSON(map[string]string{"op": "unsubscribe", "channel": "trades", "market": market}) - if err != nil { - return err - } - err = c.Close() - if err != nil { - return err - } - s.Logger.Infof("gracefully shutdown ftx scraper on market: %s", market) - time.Sleep(time.Duration(retryIn) * time.Second) - return nil - default: - return fmt.Errorf("unknown connection type, expected gorilla/websocket, got: %T", connection) - } -} - -// Scrape starts a websocket scraper for market -func (s *FTXFuturesScraper) Scrape(market string) { - s.validateMarket(market) - - // this block is for listening to sigterms and interupts - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - userCancelled := make(chan bool, 1) - go func() { - sig := <-sigs - fmt.Println(sig) - userCancelled <- true - }() - - for { - // immediately invoked function expression for easy clenup with defer - func() { - u := url.URL{Scheme: "wss", Host: "ftx.com", Path: "/ws"} - s.Logger.Debugf("connecting to [%s], market: [%s]", u.String(), market) - ws, resp, err := websocket.DefaultDialer.Dial(u.String(), nil) - if err != nil { - s.Logger.Errorf("could not dial ftx websocket: %s", err) - time.Sleep(time.Duration(retryIn) * time.Second) - return - } - defer func() { - err = resp.Body.Close() - if err != nil { - log.Error(err) - } - }() - defer func() { - err = ws.Close() - if err != nil { - log.Error(err) - } - }() - - defer func() { - err = s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - }() - - err = s.send(&map[string]string{"market": market, "channel": "trades", "op": "subscribe"}, market, ws) - if err != nil { - s.Logger.Errorf("could not send a channel subscription message. retrying, err: %s", err) - return - } - err = s.send(&map[string]string{"op": "ping"}, market, ws) - if err != nil { - s.Logger.Errorf("could not send an initial ping message. retrying, err: %s", err) - return - } - tick := time.NewTicker(15 * time.Second) // every 15 seconds we have to ping FTX - defer tick.Stop() - // we require a separate goroutine for ticker, because ReadMessage is blocking - // and we may fail sending ping before we get any message on a market, thus - // forcing FTX to close our websocket out. - go func() { - for range tick.C { - err := s.send(&map[string]string{"op": "ping"}, market, ws) - if err != nil { - log.Error(err) - } - } - }() - for { - select { - case <-userCancelled: - s.Logger.Infof("received interrupt, gracefully shutting down") - err := s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - os.Exit(0) - default: - _, message, err := ws.ReadMessage() - decodedMsg := tradeMessageFTX{} - if err != nil { - s.Logger.Errorf("problem reading ftx on [%s], err: %s", market, err) - return - } - err = json.Unmarshal(message, &decodedMsg) - if err != nil { - s.Logger.Errorf("could not unmarshal ftx message on [%s], err: %s", market, err) - return - } - s.Logger.Debugf("received new message: %s", message) - if decodedMsg.Type != "subscribed" && decodedMsg.Type != "pong" && decodedMsg.Type != "unsubscribed" { - s.Logger.Debugf("saving new message on [%s]", market) - _, err = s.Writer.Write(string(message)+"\n", scrapeDataSaveLocationFTX+s.Writer.GetWriteFileName("ftx", market)) - if err != nil { - s.Logger.Errorf("could not write to file, err: %s", err) - return - } - } - } - } - }() - } -} - -// ScrapeMarkets - will scrape the markets specified during instantiation -func (s *FTXFuturesScraper) ScrapeMarkets() { - for _, market := range s.Markets { - s.WaitGroup.Add(1) - go s.Scrape(market) - } - s.WaitGroup.Wait() -} - -func (s *FTXFuturesScraper) validateMarket(market string) { - containsMarket := utils.Contains(&allFuturesMarketsFTX, market) - if !containsMarket { - panic(fmt.Sprintf("Market %s is not available", market)) - } -} diff --git a/pkg/dia/scraper/exchange-scrapers/FuturesHuobiScraper.go b/pkg/dia/scraper/exchange-scrapers/FuturesHuobiScraper.go deleted file mode 100644 index d58a6f16a..000000000 --- a/pkg/dia/scraper/exchange-scrapers/FuturesHuobiScraper.go +++ /dev/null @@ -1,268 +0,0 @@ -package scrapers - -import ( - "bytes" - "compress/gzip" - "encoding/binary" - "fmt" - "github.com/diadata-org/diadata/internal/pkg/scraper-writers" - "io/ioutil" - "os" - "os/signal" - "strings" - "sync" - "syscall" - "time" - - utils "github.com/diadata-org/diadata/pkg/utils" - zap "go.uber.org/zap" - "golang.org/x/net/websocket" -) - -// --------------------------------- Config -------------------------------------------------- -// Huobi API configuration -const ( - scrapeDataSaveLocationHuobi string = "" // location in which to save. ensure that the folder exists. - // files will be created automatically - - // API Endpoints - // marketURLHuobi string = "https://api.hbdm.com" - wsURLHuobi string = "wss://www.hbdm.com/ws" - pingMsgLengthHuobi int = 22 -) - -var ( - allowedMarketsHuobi = []string{"BTC", "ETC", "ETH", "EOS", "LTC", "BCH", "XRP", "TRX", "BSV"} - allowedFrequenciesHuobi = []string{"CW", "NW", "CQ"} - // bufferHuobi bytes.Buffer -) - -// --------------- - -// HuobiFuturesScraper - scrapes huobi's futures markets -type HuobiFuturesScraper struct { - Markets []string // markets to scrape. To scrape all, call AllFuturesMarketsHuobi() - WaitGroup *sync.WaitGroup - Writer writers.Writer // an interface to write the messages - Logger *zap.SugaredLogger -} - -// -------------------------------------------------------------------------------------------- - -// NewHuobiFuturesScraper - returns an instance of the Huobi scraper -func NewHuobiFuturesScraper(markets []string) FuturesScraper { - wg := sync.WaitGroup{} - writer := writers.FileWriter{} - logger := zap.NewExample().Sugar() // or NewProduction, or NewDevelopment - defer func() { - err := logger.Sync() - if err != nil { - log.Error(err) - } - }() - - var scraper FuturesScraper = &HuobiFuturesScraper{ - WaitGroup: &wg, - Markets: markets, // []string{"BNB-PERP", "ETH-PERP", "BTC-PERP", "EOS-PERP"} - Writer: &writer, - Logger: logger, - } - - return scraper -} - -// sends a byte message to huobi websocket -func (s *HuobiFuturesScraper) send(message []byte, market string, websocketConn *websocket.Conn) (int, error) { - n, err := websocketConn.Write(message) - s.Logger.Debugf("[%s] send: %s", market, message) - return n, err -} - -// pongs back the huobi websocket client to keep the connection alive -func (s *HuobiFuturesScraper) pong(time string, market string, websocketConn *websocket.Conn) (int, error) { - n, err := s.send([]byte(fmt.Sprintf("{\"pong\":%s}", time)), market, websocketConn) - return n, err -} - -// Authenticate - not required for Huobi to scrape the futures data. -func (s *HuobiFuturesScraper) Authenticate(market string, connection interface{}) error { return nil } - -// ScraperClose - clean up after the scraper. -func (s *HuobiFuturesScraper) ScraperClose(market string, connection interface{}) error { - switch c := connection.(type) { - case *websocket.Conn: - message := []byte("{\"Unsub\":\"market." + market + ".trade.detail\"}") - _, err := s.send(message, market, c) - if err != nil { - return err - } - err = c.Close() - if err != nil { - return err - } - s.Logger.Infof("gracefully shutdown huobi scraper on market: %s", market) - time.Sleep(time.Duration(retryIn) * time.Second) - return nil - default: - return fmt.Errorf("unknown connection type, expected golang.org/x/net/websocket, got: %T", connection) - } -} - -// Scrape starts a websocket scraper for market -func (s *HuobiFuturesScraper) Scrape(market string) { - s.validateMarket(market) - - // this block is for listening to sigterms and interupts - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - userCancelled := make(chan bool, 1) - go func() { - sig := <-sigs - fmt.Println(sig) - userCancelled <- true - }() - - for { - // IIFE for easy cleanup with defer - func() { - ws, err := websocket.Dial(wsURLHuobi, "", "http://www.google.com") - // defer inside of the function will cleanup before the next run - defer func() { - err = s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - }() - - if err != nil { - // an error opening is fatal. let this kill the programme - s.Logger.Errorf("[%s] %s", market, err) - time.Sleep(time.Duration(retryIn) * time.Second) - return - } - // subscribe to the trade detail channel - message := []byte("{\"Sub\":\"market." + market + ".trade.detail\"}") - _, err = s.send(message, market, ws) - if err != nil { - s.Logger.Errorf("problem subscriping to the [%s] trade channel, err: %s", market, err) - return - } - // create the conduit for the received messages - var msg = make([]byte, 512) - for { - select { - case <-userCancelled: - s.Logger.Infof("received interrupt, gracefully shutting down") - err := s.ScraperClose(market, ws) - if err != nil { - log.Error(err) - } - os.Exit(0) - default: - m, err := ws.Read(msg) - if err != nil { - s.Logger.Errorf("[%s] %s", market, err) - // an error reading means we may have lost the connection - // return out and just try again - return - } - newmsg := msg[:m] - unzipmsg, err := parseGzip(newmsg) - if err != nil { - s.Logger.Errorf("[%s] problem saving to %s, err: %s", market, s.Writer.GetWriteFileName("huobi", market), err) - return - } - s.Logger.Debugf("[%s] byteLen:%d, unzipLen:%d %s", market, m, len(unzipmsg), unzipmsg) - if len(unzipmsg) == pingMsgLengthHuobi { - if string(unzipmsg[2:6]) == "ping" { - _, err := s.pong(string(unzipmsg[8:21]), market, ws) - if err != nil { - s.Logger.Errorf("[%s] problem ponging the websocket server, err: %s", market, err) - return - } - } - } else { - // ensure that scrapeDataSaveLocation exists - _, err := s.Writer.Write(string(unzipmsg)+"\n", scrapeDataSaveLocationHuobi+s.Writer.GetWriteFileName("huobi", market)) - if err != nil { - s.Logger.Errorf("[%s] problem saving to %s, err: %s", market, s.Writer.GetWriteFileName("huobi", market), err) - return - } - } - } - } - }() - } - // s.waitGroup.Done() -} - -// ScrapeMarkets - will scrape the markets specified during instantiation -func (s *HuobiFuturesScraper) ScrapeMarkets() { - for _, market := range s.Markets { - s.WaitGroup.Add(1) - go s.Scrape(market) - } - s.WaitGroup.Wait() -} - -// ------------- Huobi util functions ------------------- - -// AllFuturesMarketsHuobi - returns all the futures markets tradable on Huobi. -// Lists all of the Huobi Futures markets. TODO: add a REST HTTP call to obtain the list -// of trdabale markets. -func AllFuturesMarketsHuobi() []string { - allContracts := []string{} - for _, market := range allowedMarketsHuobi { - for _, maturity := range allowedFrequenciesHuobi { - allContracts = append(allContracts, market+"_"+maturity) - } - } - return allContracts -} - -// Ensures that the provided market to scrape is supported by Huobi -// This function is only required if we manually write out the markets. -// This function will be removed when we make AllHuobiFuturesMarkets make -// a rest request -func (s *HuobiFuturesScraper) validateMarket(market string) { - parts := strings.Split(market, "_") - if len(parts) != 2 { - panic("incorrect market provided. should be of the form symbol_frequency") - } - tradeSymbol := parts[0] - tradeFrequency := parts[1] - containsSymbol := utils.Contains(&allowedMarketsHuobi, tradeSymbol) - if !containsSymbol { - panic("provided trade symbol is not supported") - } - containsFreq := utils.Contains(&allowedFrequenciesHuobi, tradeFrequency) - if !containsFreq { - panic("provided frequency is not supported") - } -} - -// Huobi websocket API sends back gzips, this will parse it. -func parseGzip(data []byte) (unzipped []byte, err error) { - b := new(bytes.Buffer) - err = binary.Write(b, binary.LittleEndian, data) - if err != nil { - return - } - - r, err := gzip.NewReader(b) - if err != nil { - return - } - defer func() { - cerr := r.Close() - if err == nil { - err = cerr - } - }() - - unzipped, err = ioutil.ReadAll(r) - if err != nil { - return - } - return -}