-
Notifications
You must be signed in to change notification settings - Fork 16
/
sdk.go
217 lines (184 loc) · 6.83 KB
/
sdk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/*
Copyright 2023 The Crossplane Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package function is an SDK for building Composition Functions.
package function
import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"os"
"path/filepath"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
ginsecure "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"
"github.com/crossplane/function-sdk-go/logging"
v1 "github.com/crossplane/function-sdk-go/proto/v1"
"github.com/crossplane/function-sdk-go/proto/v1beta1"
)
// Default ServeOptions.
const (
DefaultNetwork = "tcp"
DefaultAddress = ":9443"
DefaultMaxRecvMsgSize = 1024 * 1024 * 4
)
// ServeOptions configure how a Function is served.
type ServeOptions struct {
Network string
Address string
MaxRecvMsgSize int
Credentials credentials.TransportCredentials
}
// A ServeOption configures how a Function is served.
type ServeOption func(o *ServeOptions) error
// Listen configures the network, address and maximum message size on which the Function will
// listen for RunFunctionRequests.
func Listen(network, address string) ServeOption {
return func(o *ServeOptions) error {
o.Network = network
o.Address = address
return nil
}
}
// MTLSCertificates specifies a directory from which to load mTLS certificates.
// The directory must contain the server certificate (tls.key and tls.crt), as
// well as a CA certificate (ca.crt) that will be used to authenticate clients.
func MTLSCertificates(dir string) ServeOption {
return func(o *ServeOptions) error {
if dir == "" {
// We want to support passing both MTLSCertificates and
// Insecure as they were supplied as flags. So we don't
// want this to fail because no dir was supplied.
// If no TLS dir is supplied and insecure is false we'll
// return an error due to having no credentials specified.
return nil
}
crt, err := tls.LoadX509KeyPair(
filepath.Clean(filepath.Join(dir, "tls.crt")),
filepath.Clean(filepath.Join(dir, "tls.key")),
)
if err != nil {
return errors.Wrap(err, "cannot load X509 keypair")
}
ca, err := os.ReadFile(filepath.Clean(filepath.Join(dir, "ca.crt")))
if err != nil {
return errors.Wrap(err, "cannot read CA certificate")
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(ca) {
return errors.New("invalid CA certificate")
}
o.Credentials = credentials.NewTLS(&tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{crt},
ClientCAs: pool,
ClientAuth: tls.RequireAndVerifyClientCert,
})
return nil
}
}
// Insecure specifies whether this Function should be served insecurely - i.e.
// without mTLS authentication. This is only useful for testing and development.
// Crossplane will always send requests using mTLS.
func Insecure(insecure bool) ServeOption {
return func(o *ServeOptions) error {
if insecure {
o.Credentials = ginsecure.NewCredentials()
}
return nil
}
}
// MaxRecvMessageSize returns a ServeOption to set the max message size in bytes the server can receive.
// If this is not set, gRPC uses the default limit.
func MaxRecvMessageSize(sz int) ServeOption {
return func(o *ServeOptions) error {
o.MaxRecvMsgSize = sz
return nil
}
}
// Serve the supplied Function by creating a gRPC server and listening for
// RunFunctionRequests. Blocks until the server returns an error.
func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error {
so := &ServeOptions{
Network: DefaultNetwork,
Address: DefaultAddress,
MaxRecvMsgSize: DefaultMaxRecvMsgSize,
}
for _, fn := range o {
if err := fn(so); err != nil {
return errors.Wrap(err, "cannot apply ServeOption")
}
}
if so.Credentials == nil {
return errors.New("no credentials provided - did you specify the Insecure or MTLSCertificates options?")
}
lis, err := net.Listen(so.Network, so.Address)
if err != nil {
return errors.Wrapf(err, "cannot listen for %s connections at address %q", so.Network, so.Address)
}
srv := grpc.NewServer(grpc.MaxRecvMsgSize(so.MaxRecvMsgSize), grpc.Creds(so.Credentials))
reflection.Register(srv)
v1.RegisterFunctionRunnerServiceServer(srv, fn)
v1beta1.RegisterFunctionRunnerServiceServer(srv, ServeBeta(fn))
return errors.Wrap(srv.Serve(lis), "cannot serve mTLS gRPC connections")
}
// NewLogger returns a new logger.
func NewLogger(debug bool) (logging.Logger, error) {
return logging.NewLogger(debug)
}
// A BetaServer is a v1beta1 FunctionRunnerServiceServer that wraps an identical
// v1 FunctionRunnerServiceServer. This requires the v1 and v1beta1 protos to be
// identical.
//
// Functions were promoted from v1beta1 to v1 in Crossplane v1.17. Crossplane
// v1.16 and earlier only sends v1beta1 RunFunctionRequests. Functions should
// use the BetaServer for backward compatibility, to support Crossplane v1.16
// and earlier.
type BetaServer struct {
wrapped v1.FunctionRunnerServiceServer
v1beta1.UnimplementedFunctionRunnerServiceServer
}
// ServeBeta returns a v1beta1.FunctionRunnerServiceServer that wraps the
// suppled v1.FunctionRunnerServiceServer.
func ServeBeta(s v1.FunctionRunnerServiceServer) *BetaServer {
return &BetaServer{wrapped: s}
}
// RunFunction calls the RunFunction method of the wrapped
// v1.FunctionRunnerServiceServer. It converts from v1beta1 to v1 and back by
// round-tripping through protobuf marshaling.
func (s *BetaServer) RunFunction(ctx context.Context, req *v1beta1.RunFunctionRequest) (*v1beta1.RunFunctionResponse, error) {
gareq := &v1.RunFunctionRequest{}
b, err := proto.Marshal(req)
if err != nil {
return nil, errors.Wrap(err, "cannot marshal v1beta1 RunFunctionRequest to protobuf bytes")
}
if err := proto.Unmarshal(b, gareq); err != nil {
return nil, errors.Wrap(err, "cannot unmarshal v1 RunFunctionRequest from v1beta1 protobuf bytes")
}
garsp, err := s.wrapped.RunFunction(ctx, gareq)
if err != nil {
// This error is intentionally not wrapped. This middleware is just
// calling an underlying RunFunction.
return nil, err
}
b, err = proto.Marshal(garsp)
if err != nil {
return nil, errors.Wrap(err, "cannot marshal v1beta1 RunFunctionResponse to protobuf bytes")
}
rsp := &v1beta1.RunFunctionResponse{}
err = proto.Unmarshal(b, rsp)
return rsp, errors.Wrap(err, "cannot unmarshal v1 RunFunctionResponse from v1beta1 protobuf bytes")
}