diff --git a/connector/client/client.go b/connector/client/client.go index c7464b8..2ccfceb 100644 --- a/connector/client/client.go +++ b/connector/client/client.go @@ -21,6 +21,7 @@ package client import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -38,9 +39,9 @@ import ( // Client provides interface to send requests to the connector service. type Client interface { - Post(url url.URL, activity schema.Activity) error - Delete(url url.URL, activity schema.Activity) error - Put(url url.URL, activity schema.Activity) error + Post(ctx context.Context, url url.URL, activity schema.Activity) error + Delete(ctx context.Context, url url.URL, activity schema.Activity) error + Put(ctx context.Context, url url.URL, activity schema.Activity) error } // ConnectorClient implements Client to send HTTP requests to the connector service. @@ -56,6 +57,14 @@ func NewClient(config *Config) (Client, error) { return nil, errors.New("Invalid client configuration") } + if config.AuthClient == nil { + config.AuthClient = &http.Client{} + } + + if config.ReplyClient == nil { + config.ReplyClient = &http.Client{} + } + return &ConnectorClient{*config, cache.AuthCache{}}, nil } @@ -63,12 +72,12 @@ func NewClient(config *Config) (Client, error) { // // Creates a HTTP POST request with the provided activity as the body and a Bearer token in the header. // Returns any error as received from the call to connector service. -func (client *ConnectorClient) Post(target url.URL, activity schema.Activity) error { +func (client *ConnectorClient) Post(ctx context.Context, target url.URL, activity schema.Activity) error { jsonStr, err := json.Marshal(activity) if err != nil { return err } - req, err := http.NewRequest(http.MethodPost, target.String(), bytes.NewBuffer(jsonStr)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, target.String(), bytes.NewBuffer(jsonStr)) if err != nil { return err } @@ -79,8 +88,8 @@ func (client *ConnectorClient) Post(target url.URL, activity schema.Activity) er // // Creates a HTTP DELETE request with the provided activity ID and a Bearer token in the header. // Returns any error as received from the call to connector service. -func (client *ConnectorClient) Delete(target url.URL, activity schema.Activity) error { - req, err := http.NewRequest(http.MethodDelete, target.String(), nil) +func (client *ConnectorClient) Delete(ctx context.Context, target url.URL, activity schema.Activity) error { + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, target.String(), nil) if err != nil { return err } @@ -91,12 +100,12 @@ func (client *ConnectorClient) Delete(target url.URL, activity schema.Activity) // // Creates a HTTP PUT request with the provided activity payload and a Bearer token in the header. // Returns any error as received from the call to connector service. -func (client *ConnectorClient) Put(target url.URL, activity schema.Activity) error { +func (client *ConnectorClient) Put(ctx context.Context, target url.URL, activity schema.Activity) error { jsonStr, err := json.Marshal(activity) if err != nil { return err } - req, err := http.NewRequest(http.MethodPut, target.String(), bytes.NewBuffer(jsonStr)) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, target.String(), bytes.NewBuffer(jsonStr)) if err != nil { return err } @@ -104,7 +113,7 @@ func (client *ConnectorClient) Put(target url.URL, activity schema.Activity) err } func (client *ConnectorClient) sendRequest(req *http.Request, activity schema.Activity) error { - token, err := client.getToken() + token, err := client.getToken(req.Context()) if err != nil { return err } @@ -112,9 +121,7 @@ func (client *ConnectorClient) sendRequest(req *http.Request, activity schema.Ac req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+token) - replyClient := &http.Client{} - - return client.checkRespError(replyClient.Do(req)) + return client.checkRespError(client.ReplyClient.Do(req)) } func (client *ConnectorClient) checkRespError(resp *http.Response, err error) error { @@ -138,7 +145,7 @@ func (client *ConnectorClient) checkRespError(resp *http.Response, err error) er } } -func (client *ConnectorClient) getToken() (string, error) { +func (client *ConnectorClient) getToken(ctx context.Context) (string, error) { // Return cached JWT if !client.AuthCache.IsExpired() { @@ -157,8 +164,7 @@ func (client *ConnectorClient) getToken() (string, error) { return "", err } - authClient := &http.Client{} - r, err := http.NewRequest("POST", u.String(), strings.NewReader(data.Encode())) + r, err := http.NewRequestWithContext(ctx, "POST", u.String(), strings.NewReader(data.Encode())) if err != nil { return "", err } @@ -166,7 +172,7 @@ func (client *ConnectorClient) getToken() (string, error) { r.Header.Add("Content-Type", "application/x-www-form-urlencoded") r.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) - resp, err := authClient.Do(r) + resp, err := client.AuthClient.Do(r) if err != nil { return "", customerror.HTTPError{ StatusCode: resp.StatusCode, diff --git a/connector/client/client_config.go b/connector/client/client_config.go index b807037..89e7b76 100644 --- a/connector/client/client_config.go +++ b/connector/client/client_config.go @@ -22,6 +22,7 @@ package client import ( "errors" "github.com/infracloudio/msbotbuilder-go/connector/auth" + "net/http" "net/url" ) @@ -29,6 +30,8 @@ import ( type Config struct { Credentials auth.CredentialProvider AuthURL url.URL + AuthClient *http.Client + ReplyClient *http.Client } // NewClientConfig creates configuration for ConnectorClient. diff --git a/core/activity/response.go b/core/activity/response.go index cd8e99a..ebc6078 100644 --- a/core/activity/response.go +++ b/core/activity/response.go @@ -20,6 +20,7 @@ package activity import ( + "context" "fmt" "net/url" "path" @@ -31,9 +32,9 @@ import ( // Response provides functionalities to send activity to the connector service. type Response interface { - SendActivity(activity schema.Activity) error - DeleteActivity(activity schema.Activity) error - UpdateActivity(activity schema.Activity) error + SendActivity(ctx context.Context, activity schema.Activity) error + DeleteActivity(ctx context.Context, activity schema.Activity) error + UpdateActivity(ctx context.Context, activity schema.Activity) error } const ( @@ -50,7 +51,7 @@ type DefaultResponse struct { } // DeleteActivity sends a Delete activity method to the BOT connector service. -func (response *DefaultResponse) DeleteActivity(activity schema.Activity) error { +func (response *DefaultResponse) DeleteActivity(ctx context.Context, activity schema.Activity) error { u, err := url.Parse(activity.ServiceURL) if err != nil { return errors.Wrapf(err, "Failed to parse ServiceURL %s.", activity.ServiceURL) @@ -60,12 +61,12 @@ func (response *DefaultResponse) DeleteActivity(activity schema.Activity) error // Send activity to client u.Path = path.Join(u.Path, respPath) - err = response.Client.Delete(*u, activity) + err = response.Client.Delete(ctx, *u, activity) return errors.Wrap(err, "Failed to delete response.") } // SendActivity sends an activity to the BOT connector service. -func (response *DefaultResponse) SendActivity(activity schema.Activity) error { +func (response *DefaultResponse) SendActivity(ctx context.Context, activity schema.Activity) error { u, err := url.Parse(activity.ServiceURL) if err != nil { return errors.Wrapf(err, "Failed to parse ServiceURL %s.", activity.ServiceURL) @@ -80,12 +81,12 @@ func (response *DefaultResponse) SendActivity(activity schema.Activity) error { // Send activity to client u.Path = path.Join(u.Path, respPath) - err = response.Client.Post(*u, activity) + err = response.Client.Post(ctx, *u, activity) return errors.Wrap(err, "Failed to send response.") } // UpdateActivity sends a Put activity method to the BOT connector service. -func (response *DefaultResponse) UpdateActivity(activity schema.Activity) error { +func (response *DefaultResponse) UpdateActivity(ctx context.Context, activity schema.Activity) error { u, err := url.Parse(activity.ServiceURL) if err != nil { return errors.Wrapf(err, "Failed to parse ServiceURL %s.", activity.ServiceURL) @@ -95,7 +96,7 @@ func (response *DefaultResponse) UpdateActivity(activity schema.Activity) error // Send activity to client u.Path = path.Join(u.Path, respPath) - err = response.Client.Put(*u, activity) + err = response.Client.Put(ctx, *u, activity) return errors.Wrap(err, "Failed to update response.") } diff --git a/core/bot_framework_adapter.go b/core/bot_framework_adapter.go index a079dac..8d8a77c 100644 --- a/core/bot_framework_adapter.go +++ b/core/bot_framework_adapter.go @@ -50,6 +50,8 @@ type AdapterSetting struct { OpenIDMetadata string ChannelService string CredentialProvider auth.CredentialProvider + AuthClient *http.Client + ReplyClient *http.Client } // BotFrameworkAdapter implements Adapter and is currently the only implementation returned to the user program. @@ -77,6 +79,14 @@ func NewBotAdapter(settings AdapterSetting) (Adapter, error) { return nil, err } + if settings.AuthClient != nil { + clientConfig.AuthClient = settings.AuthClient + } + + if settings.ReplyClient != nil { + clientConfig.ReplyClient = settings.ReplyClient + } + connectorClient, err := client.NewClient(clientConfig) if err != nil { return nil, errors.Wrap(err, "Failed to create Connector Client.") @@ -102,7 +112,7 @@ func (bf *BotFrameworkAdapter) ProcessActivity(ctx context.Context, req schema.A return errors.Wrap(err, "Failed to create response object.") } - return response.SendActivity(replyActivity) + return response.SendActivity(ctx, replyActivity) } // ProactiveMessage sends activity to a conversation. @@ -124,7 +134,7 @@ func (bf *BotFrameworkAdapter) DeleteActivity(ctx context.Context, activityID st return errors.Wrap(err, "Failed to create response object.") } - return response.DeleteActivity(req) + return response.DeleteActivity(ctx, req) } // ParseRequest parses the received activity in a HTTP reuqest to: @@ -164,5 +174,5 @@ func (bf *BotFrameworkAdapter) UpdateActivity(ctx context.Context, req schema.Ac if err != nil { return errors.Wrap(err, "Failed to create response object.") } - return response.UpdateActivity(req) + return response.UpdateActivity(ctx, req) }