Skip to content

Commit

Permalink
Merge pull request #6 from feast-dev/client-pool
Browse files Browse the repository at this point in the history
Feast client pool
  • Loading branch information
pyalex authored Jan 27, 2021
2 parents 0e2df06 + bb88e2c commit a609646
Showing 1 changed file with 40 additions and 19 deletions.
59 changes: 40 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,32 @@ import (
)

type Config struct {
FeastServingHost string `default:"localhost" split_words:"true"`
FeastServingPort int `default:"6566" split_words:"true"`
ListenPort string `default:"8080" split_words:"true"`
ProjectName string `default:"default" split_words:"true"`
SpecificationPath string `default:"loadSpec.yml" split_words:"true"`
FeastServingHost string `default:"localhost" split_words:"true"`
FeastServingPort int `default:"6566" split_words:"true"`
ListenPort string `default:"8080" split_words:"true"`
ProjectName string `default:"default" split_words:"true"`
SpecificationPath string `default:"loadSpec.yml" split_words:"true"`
ClientPoolSize int `default:"4" split_words:"true"`
}

type FeastClientPool struct {
clients []feast.Client
}

func newFeastClientPool(host string, port int, poolSize int) (*FeastClientPool, error) {
var clients []feast.Client
for i := 0; i < poolSize; i++ {
client, err := feast.NewGrpcClient(host, port)
if err != nil {
return nil, err
}
clients = append(clients, client)
}
return &FeastClientPool{clients}, nil
}

func (p *FeastClientPool) GetClient() feast.Client {
return p.clients[rand.Intn(len(p.clients))]
}

func main() {
Expand All @@ -34,21 +55,21 @@ func main() {
}

log.Printf("Creating client to connect to Feast Serving at %s:%d", c.FeastServingHost, c.FeastServingPort)
client, err := feast.NewGrpcClient(c.FeastServingHost, c.FeastServingPort)
pool, err := newFeastClientPool(c.FeastServingHost, c.FeastServingPort, c.ClientPoolSize)
if err != nil {
log.Fatalf("Could not connect to: %v", err)
}

log.Printf("Loading specification at %s", c.SpecificationPath)
yamlSpec, err := ioutil.ReadFile(c.SpecificationPath)
if err != nil {
log.Fatalf("Error reading specification file at %s", err)
}
loadSpec := generator.LoadSpec{}
err = yaml.Unmarshal(yamlSpec, &loadSpec)
if err != nil {
log.Fatalf("Unmarshal: %v", err)
}
if err != nil {
log.Fatalf("Error reading specification file at %s", err)
}
loadSpec := generator.LoadSpec{}
err = yaml.Unmarshal(yamlSpec, &loadSpec)
if err != nil {
log.Fatalf("Unmarshal: %v", err)
}
requestGenerator, err := generator.NewRequestGenerator(loadSpec, c.ProjectName)
if err != nil {
log.Fatalf("Unable to instantiate request requestGenerator: %v", err)
Expand All @@ -59,11 +80,11 @@ func main() {
}

http.HandleFunc("/send", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 60 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
requests := requestsPool[rand.Intn(len(requestsPool))]
if len(requests) == 1 {
resp, err := client.GetOnlineFeatures(ctx, &requests[0])
resp, err := pool.GetClient().GetOnlineFeatures(ctx, &requests[0])
if err != nil {
w.WriteHeader(500)
} else {
Expand All @@ -82,7 +103,7 @@ func main() {
request := request
go func() {
defer wg.Done()
resp, err := client.GetOnlineFeatures(ctx, &request)
resp, err := pool.GetClient().GetOnlineFeatures(ctx, &request)
if err != nil {
fatalErrors <- err
}
Expand Down Expand Up @@ -113,10 +134,10 @@ func main() {
})

http.HandleFunc("/echo", func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 60 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var req serving.GetFeastServingInfoRequest
_, err := client.GetFeastServingInfo(ctx, &req)
_, err := pool.GetClient().GetFeastServingInfo(ctx, &req)
if err != nil {
log.Fatalf("%v", err)
}
Expand Down

0 comments on commit a609646

Please sign in to comment.