Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grpc endpoint parsing #543

Merged
merged 22 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions src/implementation/Client/DaprClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,24 @@ export default class DaprClient {
private readonly logger: Logger;

constructor(options: Partial<DaprClientOptions> = {}) {
this.options = getClientOptions(options, Settings.getDefaultCommunicationProtocol(), undefined);
this.logger = new Logger("DaprClient", "DaprClient", this.options.logger);

// Validation on port
if (this.options.daprPort && !/^[0-9]+$/.test(this.options.daprPort)) {
options = getClientOptions(options, Settings.getDefaultCommunicationProtocol(), undefined);
this.logger = new Logger("DaprClient", "DaprClient", options.logger);

// Legacy validation on port
// URI validation is done later, when we instantiate the HttpEndpoint or GrpcEndpoint
// object in the HttpClient or GrpcClient constructor, but we need to
// keep this additional check for backward compatibility
// TODO: Remove this validation in the next major version
if (options?.daprPort && !/^[0-9]+$/.test(options?.daprPort)) {
throw new Error("DAPR_INCORRECT_SIDECAR_PORT");
}

// Builder
switch (options.communicationProtocol) {
case CommunicationProtocolEnum.GRPC: {
const client = new GRPCClient(this.options);
const client = new GRPCClient(options);
options.daprHost = client.options.daprHost;
options.daprPort = client.options.daprPort;
this.daprClient = client;

this.state = new GRPCClientState(client);
Expand All @@ -119,7 +125,9 @@ export default class DaprClient {
}
case CommunicationProtocolEnum.HTTP:
default: {
const client = new HTTPClient(this.options);
const client = new HTTPClient(options);
options.daprHost = client.options.daprHost;
options.daprPort = client.options.daprPort;
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved
this.daprClient = client;

this.actor = new HTTPClientActor(client); // we use an abstractor here since we interface through a builder with the Actor Runtime
Expand All @@ -139,6 +147,17 @@ export default class DaprClient {
break;
}
}

this.options = {
daprHost: options.daprHost,
daprPort: options.daprPort,
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved
communicationProtocol: options.communicationProtocol ?? Settings.getDefaultCommunicationProtocol(),
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved
isKeepAlive: options.isKeepAlive,
logger: options.logger,
actor: options.actor,
daprApiToken: options.daprApiToken,
maxBodySizeMb: options.maxBodySizeMb,
};
}

static create(client: IClient): DaprClient {
Expand Down
63 changes: 39 additions & 24 deletions src/implementation/Client/GRPCClient/GRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { Logger } from "../../../logger/Logger";
import GRPCClientSidecar from "./sidecar";
import DaprClient from "../DaprClient";
import { SDK_VERSION } from "../../../version";
import communicationProtocolEnum from "../../../enum/CommunicationProtocol.enum";
import { GrpcEndpoint } from "../../../network/GrpcEndpoint";

export default class GRPCClient implements IClient {
readonly options: DaprClientOptions;
Expand All @@ -29,17 +31,34 @@ export default class GRPCClient implements IClient {
private readonly clientCredentials: grpc.ChannelCredentials;
private readonly logger: Logger;
private readonly grpcClientOptions: Partial<grpc.ClientOptions>;
private daprEndpoint: GrpcEndpoint;

constructor(options: Partial<DaprClientOptions>) {
this.daprEndpoint = this.generateEndpoint(options);

this.options = {
daprHost: options?.daprHost || this.daprEndpoint.hostname,
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved
daprPort: options?.daprPort || this.daprEndpoint.port,
communicationProtocol: communicationProtocolEnum.GRPC,
isKeepAlive: options?.isKeepAlive,
logger: options?.logger,
actor: options?.actor,
daprApiToken: options?.daprApiToken,
maxBodySizeMb: options?.maxBodySizeMb,
};

constructor(options: DaprClientOptions) {
this.options = options;
this.clientCredentials = this.generateCredentials();
this.grpcClientOptions = this.generateChannelOptions();

this.logger = new Logger("GRPCClient", "GRPCClient", options.logger);
this.isInitialized = false;

this.logger.info(`Opening connection to ${this.options.daprHost}:${this.options.daprPort}`);
this.client = this.generateClient(this.options.daprHost, this.options.daprPort);
this.client = new GrpcDaprClient(
this.daprEndpoint.endpoint,
this.getClientCredentials(),
this.getGrpcClientOptions(),
);
}

async getClient(requiresInitialization = true): Promise<GrpcDaprClient> {
Expand All @@ -59,8 +78,24 @@ export default class GRPCClient implements IClient {
return this.grpcClientOptions;
}

private generateEndpoint(options: Partial<DaprClientOptions>): GrpcEndpoint {
let host = Settings.getDefaultHost();
let port = Settings.getDefaultPort(communicationProtocolEnum.GRPC);
let uri = `${host}:${port}`;

if (options?.daprHost || options?.daprPort) {
host = options?.daprHost ?? Settings.getDefaultHost();
port = options?.daprPort ?? Settings.getDefaultPort(communicationProtocolEnum.GRPC);
uri = `${host}:${port}`;
} else if (Settings.getDefaultGrpcEndpoint() != "") {
uri = Settings.getDefaultGrpcEndpoint();
}

return new GrpcEndpoint(uri);
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved
}

private generateCredentials(): grpc.ChannelCredentials {
if (this.options.daprHost.startsWith("https")) {
if (this.daprEndpoint?.tls) {
return grpc.ChannelCredentials.createSsl();
}
return grpc.ChannelCredentials.createInsecure();
Expand Down Expand Up @@ -93,26 +128,6 @@ export default class GRPCClient implements IClient {
return options;
}

private generateClient(host: string, port: string): GrpcDaprClient {
return new GrpcDaprClient(
GRPCClient.getEndpoint(host, port),
this.getClientCredentials(),
this.getGrpcClientOptions(),
);
}

// The grpc client doesn't allow http:// or https:// for grpc connections,
// so we need to remove it, if it exists
static getEndpoint(host: string, port: string): string {
let endpoint = `${host}:${port}`;
const parts = endpoint.split("://");
if (parts.length > 1 && parts[0].startsWith("http")) {
endpoint = parts[1];
}

return endpoint;
}

private generateInterceptors(): (options: any, nextCall: any) => grpc.InterceptingCall {
return (options: any, nextCall: any) => {
return new grpc.InterceptingCall(nextCall(options), {
Expand Down
41 changes: 34 additions & 7 deletions src/implementation/Client/HTTPClient/HTTPClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import { Logger } from "../../../logger/Logger";
import HTTPClientSidecar from "./sidecar";
import { SDK_VERSION } from "../../../version";
import * as SerializerUtil from "../../../utils/Serializer.util";
import communicationProtocolEnum from "../../../enum/CommunicationProtocol.enum";
import { HttpEndpoint } from "../../../network/HttpEndpoint";

export default class HTTPClient implements IClient {
readonly options: DaprClientOptions;
Expand All @@ -34,16 +36,25 @@ export default class HTTPClient implements IClient {

private static httpAgent: http.Agent;
private static httpsAgent: https.Agent;
private daprEndpoint: HttpEndpoint;

constructor(options: Partial<DaprClientOptions>) {
this.daprEndpoint = this.generateEndpoint(options);

this.options = {
daprHost: options?.daprHost || this.daprEndpoint.hostname,
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved
daprPort: options?.daprPort || this.daprEndpoint.port,
communicationProtocol: communicationProtocolEnum.HTTP,
isKeepAlive: options?.isKeepAlive,
logger: options?.logger,
actor: options?.actor,
daprApiToken: options?.daprApiToken,
maxBodySizeMb: options?.maxBodySizeMb,
};

constructor(options: DaprClientOptions) {
this.options = options;
this.logger = new Logger("HTTPClient", "HTTPClient", this.options.logger);
this.isInitialized = false;

this.clientUrl = `${this.options.daprHost}:${this.options.daprPort}/v1.0`;
if (!this.clientUrl.startsWith("http://") && !this.clientUrl.startsWith("https://")) {
this.clientUrl = `http://${this.clientUrl}`;
}
this.clientUrl = `${this.daprEndpoint.endpoint}/v1.0`;

if (!HTTPClient.client) {
HTTPClient.client = fetch;
Expand All @@ -63,6 +74,22 @@ export default class HTTPClient implements IClient {
}
}

private generateEndpoint(options: Partial<DaprClientOptions>): HttpEndpoint {
let host = Settings.getDefaultHost();
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved
let port = Settings.getDefaultPort(communicationProtocolEnum.HTTP);
let uri = `${host}:${port}`;

if (options?.daprHost || options?.daprPort) {
host = options?.daprHost ?? Settings.getDefaultHost();
port = options?.daprPort ?? Settings.getDefaultPort(communicationProtocolEnum.HTTP);
uri = `${host}:${port}`;
} else if (Settings.getDefaultHttpEndpoint() != "") {
uri = Settings.getDefaultHttpEndpoint();
}

return new HttpEndpoint(uri);
}

async getClient(requiresInitialization = true): Promise<typeof fetch> {
// Ensure the sidecar has been started
if (requiresInitialization && !this.isInitialized) {
Expand Down
21 changes: 14 additions & 7 deletions src/implementation/Server/DaprServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,31 +48,38 @@ export default class DaprServer {
constructor(serverOptions: Partial<DaprServerOptions> = {}) {
const communicationProtocol = serverOptions.communicationProtocol ?? Settings.getDefaultCommunicationProtocol();
const clientOptions = getClientOptions(serverOptions.clientOptions, communicationProtocol, serverOptions?.logger);

// Legacy validation on port
// URI validation is done later, when we instantiate the HttpEndpoint or GrpcEndpoint
// object in the HttpClient or GrpcClient constructor, but we need to
// keep this additional check for backward compatibility
// TODO: Remove this validation in the next major version
if (clientOptions?.daprPort && !/^[0-9]+$/.test(clientOptions?.daprPort)) {
throw new Error("DAPR_INCORRECT_SIDECAR_PORT");
}

this.client = new DaprClient(clientOptions);

this.serverOptions = {
serverHost: serverOptions.serverHost ?? Settings.getDefaultHost(),
serverPort: serverOptions.serverPort ?? Settings.getDefaultAppPort(communicationProtocol),
communicationProtocol: communicationProtocol,
maxBodySizeMb: serverOptions.maxBodySizeMb,
serverHttp: serverOptions.serverHttp,
clientOptions: clientOptions,
clientOptions: this.client.options,
logger: serverOptions.logger,
};
// Create a client to interface with the sidecar from the server side
this.client = new DaprClient(clientOptions);

// If DAPR_SERVER_PORT was not set, we set it
process.env.DAPR_SERVER_PORT = this.serverOptions.serverPort;
process.env.DAPR_CLIENT_PORT = clientOptions.daprPort;
process.env.DAPR_CLIENT_PORT = this.client.options.daprPort;

// Validation on port
if (!/^[0-9]+$/.test(this.serverOptions.serverPort)) {
throw new Error("DAPR_INCORRECT_SERVER_PORT");
}

if (!/^[0-9]+$/.test(clientOptions.daprPort)) {
throw new Error("DAPR_INCORRECT_SIDECAR_PORT");
}

// Builder
switch (serverOptions.communicationProtocol) {
case CommunicationProtocolEnum.GRPC: {
Expand Down
49 changes: 49 additions & 0 deletions src/network/AbstractEndpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { URL } from "url";
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved

export abstract class Endpoint {
protected _scheme = "";
protected _hostname = "";
protected _port = 0;
protected _tls = false;
protected _authority = "";
protected _url: string;
protected _endpoint = "";
protected _parsedUrl!: URL;
elena-kolevska marked this conversation as resolved.
Show resolved Hide resolved

protected constructor(url: string) {
this._url = url;
}

get tls(): boolean {
return this._tls;
}

get hostname(): string {
return this._hostname;
}

get scheme(): string {
return this._scheme;
}

get port(): string {
return this._port === 0 ? "" : this._port.toString();
}

get endpoint(): string {
return this._endpoint;
}
}
Loading
Loading