From bb88e2c1cb89b66c2440eeda721ca719d1365a1d Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Wed, 27 Jan 2021 12:07:07 +0800 Subject: [PATCH] feast client pool Signed-off-by: Oleksii Moskalenko --- main.go | 59 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/main.go b/main.go index 86eed9a..8f95976 100644 --- a/main.go +++ b/main.go @@ -18,11 +18,32 @@ import ( ) type Config struct { - FeastServingHost string `default:"localhost" split_words:"true"` - FeastServingPort int `default:"6566" split_words:"true"` - ListenPort string `default:"8080" split_words:"true"` - ProjectName string `default:"default" split_words:"true"` - SpecificationPath string `default:"loadSpec.yml" split_words:"true"` + FeastServingHost string `default:"localhost" split_words:"true"` + FeastServingPort int `default:"6566" split_words:"true"` + ListenPort string `default:"8080" split_words:"true"` + ProjectName string `default:"default" split_words:"true"` + SpecificationPath string `default:"loadSpec.yml" split_words:"true"` + ClientPoolSize int `default:"4" split_words:"true"` +} + +type FeastClientPool struct { + clients []feast.Client +} + +func newFeastClientPool(host string, port int, poolSize int) (*FeastClientPool, error) { + var clients []feast.Client + for i := 0; i < poolSize; i++ { + client, err := feast.NewGrpcClient(host, port) + if err != nil { + return nil, err + } + clients = append(clients, client) + } + return &FeastClientPool{clients}, nil +} + +func (p *FeastClientPool) GetClient() feast.Client { + return p.clients[rand.Intn(len(p.clients))] } func main() { @@ -34,21 +55,21 @@ func main() { } log.Printf("Creating client to connect to Feast Serving at %s:%d", c.FeastServingHost, c.FeastServingPort) - client, err := feast.NewGrpcClient(c.FeastServingHost, c.FeastServingPort) + pool, err := newFeastClientPool(c.FeastServingHost, c.FeastServingPort, c.ClientPoolSize) if err != nil { log.Fatalf("Could not connect to: %v", err) } log.Printf("Loading specification at %s", c.SpecificationPath) yamlSpec, err := ioutil.ReadFile(c.SpecificationPath) - if err != nil { - log.Fatalf("Error reading specification file at %s", err) - } - loadSpec := generator.LoadSpec{} - err = yaml.Unmarshal(yamlSpec, &loadSpec) - if err != nil { - log.Fatalf("Unmarshal: %v", err) - } + if err != nil { + log.Fatalf("Error reading specification file at %s", err) + } + loadSpec := generator.LoadSpec{} + err = yaml.Unmarshal(yamlSpec, &loadSpec) + if err != nil { + log.Fatalf("Unmarshal: %v", err) + } requestGenerator, err := generator.NewRequestGenerator(loadSpec, c.ProjectName) if err != nil { log.Fatalf("Unable to instantiate request requestGenerator: %v", err) @@ -59,11 +80,11 @@ func main() { } http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(context.Background(), 60 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() requests := requestsPool[rand.Intn(len(requestsPool))] if len(requests) == 1 { - resp, err := client.GetOnlineFeatures(ctx, &requests[0]) + resp, err := pool.GetClient().GetOnlineFeatures(ctx, &requests[0]) if err != nil { w.WriteHeader(500) } else { @@ -82,7 +103,7 @@ func main() { request := request go func() { defer wg.Done() - resp, err := client.GetOnlineFeatures(ctx, &request) + resp, err := pool.GetClient().GetOnlineFeatures(ctx, &request) if err != nil { fatalErrors <- err } @@ -113,10 +134,10 @@ func main() { }) http.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(context.Background(), 60 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() var req serving.GetFeastServingInfoRequest - _, err := client.GetFeastServingInfo(ctx, &req) + _, err := pool.GetClient().GetFeastServingInfo(ctx, &req) if err != nil { log.Fatalf("%v", err) }