Skip to content

Commit

Permalink
feat: make auth optional for wallets and orchestration
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Jan 12, 2024
1 parent d07c77b commit 71015bd
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
)

// TopicController reconciles a BrokerTopic object
type TopicController struct{}
// BrokerTopicController reconciles a BrokerTopic object
type BrokerTopicController struct{}

//+kubebuilder:rbac:groups=formance.com,resources=brokertopics,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=formance.com,resources=brokertopics/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=formance.com,resources=brokertopics/finalizers,verbs=update

func (r *TopicController) Reconcile(ctx Context, topic *v1beta1.BrokerTopic) error {
func (r *BrokerTopicController) Reconcile(ctx Context, topic *v1beta1.BrokerTopic) error {

if len(topic.GetOwnerReferences()) == 0 {
if topic.Status.Ready && topic.Status.Configuration != nil {
Expand Down Expand Up @@ -89,7 +89,7 @@ func (r *TopicController) Reconcile(ctx Context, topic *v1beta1.BrokerTopic) err
return nil
}

func (r *TopicController) createJob(ctx Context,
func (r *BrokerTopicController) createJob(ctx Context,
topic *v1beta1.BrokerTopic, configuration v1beta1.BrokerConfiguration) (*batchv1.Job, error) {

job, _, err := CreateOrUpdate[*batchv1.Job](ctx, types.NamespacedName{
Expand Down Expand Up @@ -122,7 +122,7 @@ func (r *TopicController) createJob(ctx Context,
return job, err
}

func (r *TopicController) createDeleteJob(ctx Context, topic *v1beta1.BrokerTopic) (*batchv1.Job, error) {
func (r *BrokerTopicController) createDeleteJob(ctx Context, topic *v1beta1.BrokerTopic) (*batchv1.Job, error) {
job, _, err := CreateOrUpdate[*batchv1.Job](ctx, types.NamespacedName{
Namespace: topic.Spec.Stack,
Name: fmt.Sprintf("%s-delete-topic", topic.Spec.Service),
Expand All @@ -144,7 +144,7 @@ func (r *TopicController) createDeleteJob(ctx Context, topic *v1beta1.BrokerTopi
}

// SetupWithManager sets up the controller with the Manager.
func (r *TopicController) SetupWithManager(mgr Manager) (*builder.Builder, error) {
func (r *BrokerTopicController) SetupWithManager(mgr Manager) (*builder.Builder, error) {

indexer := mgr.GetFieldIndexer()
if err := indexer.IndexField(context.Background(), &v1beta1.BrokerTopic{}, ".spec.service", func(rawObj client.Object) []string {
Expand All @@ -162,6 +162,6 @@ func (r *TopicController) SetupWithManager(mgr Manager) (*builder.Builder, error
Owns(&batchv1.Job{}), nil
}

func ForTopic() *TopicController {
return &TopicController{}
func ForBrokerTopic() *BrokerTopicController {
return &BrokerTopicController{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var _ = Describe("BrokerTopicController", func() {
Context("When creating a BrokerTopic", func() {
var (
stack *v1beta1.Stack
topic *v1beta1.BrokerTopic
brokerTopic *v1beta1.BrokerTopic
brokerConfiguration *v1beta1.BrokerConfiguration
)
BeforeEach(func() {
Expand All @@ -42,7 +42,7 @@ var _ = Describe("BrokerTopicController", func() {
},
}
Expect(Create(brokerConfiguration)).To(Succeed())
topic = &v1beta1.BrokerTopic{
brokerTopic = &v1beta1.BrokerTopic{
ObjectMeta: v1.ObjectMeta{
Name: uuid.NewString(),
},
Expand All @@ -53,41 +53,41 @@ var _ = Describe("BrokerTopicController", func() {
Service: "ledger",
},
}
Expect(controllerutil.SetOwnerReference(stack, topic, GetScheme())).To(Succeed())
Expect(Create(topic)).To(Succeed())
Expect(controllerutil.SetOwnerReference(stack, brokerTopic, GetScheme())).To(Succeed())
Expect(Create(brokerTopic)).To(Succeed())
})
It("Should be set to ready status", func() {
t := &v1beta1.BrokerTopic{}
Eventually(func(g Gomega) bool {
g.Expect(Get(core.GetResourceName(topic.Name), t)).To(Succeed())
g.Expect(Get(core.GetResourceName(brokerTopic.Name), t)).To(Succeed())
return t.Status.Ready
}).Should(BeTrue())
})
It("Should create a topic creation job", func() {
It("Should create a broker topic creation job", func() {
Eventually(func() error {
return LoadResource(stack.Name, fmt.Sprintf("%s-create-topic", topic.Spec.Service), &batchv1.Job{})
return LoadResource(stack.Name, fmt.Sprintf("%s-create-topic", brokerTopic.Spec.Service), &batchv1.Job{})
}).Should(Succeed())
})
Context("Then updating removing all owner references", func() {
BeforeEach(func() {
Eventually(func(g Gomega) bool {
t := &v1beta1.BrokerTopic{}
g.Expect(Get(core.GetResourceName(topic.Name), t)).To(Succeed())
g.Expect(Get(core.GetResourceName(brokerTopic.Name), t)).To(Succeed())
return t.Status.Ready
}).Should(BeTrue())

patch := client.MergeFrom(topic.DeepCopy())
Expect(controllerutil.RemoveOwnerReference(stack, topic, GetScheme())).To(Succeed())
Expect(Patch(topic, patch)).To(Succeed())
patch := client.MergeFrom(brokerTopic.DeepCopy())
Expect(controllerutil.RemoveOwnerReference(stack, brokerTopic, GetScheme())).To(Succeed())
Expect(Patch(brokerTopic, patch)).To(Succeed())
})
It("Should trigger the deletion of the topic object", func() {
It("Should trigger the deletion of the brokerTopic object", func() {
Eventually(func(g Gomega) error {
return LoadResource("", topic.Name, topic)
return LoadResource("", brokerTopic.Name, brokerTopic)
}).Should(BeNotFound())
})
It("Should create a topic deletion job", func() {
It("Should create a brokerTopic deletion job", func() {
Eventually(func() error {
return LoadResource(stack.Name, fmt.Sprintf("%s-delete-topic", topic.Spec.Service), &batchv1.Job{})
return LoadResource(stack.Name, fmt.Sprintf("%s-delete-topic", brokerTopic.Spec.Service), &batchv1.Job{})
}).Should(Succeed())
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ const (
gcFinalizer = "gc"
)

// TopicQueryController reconciles a BrokerTopicConsumer object
type TopicQueryController struct{}
// BrokerTopicConsumer reconciles a BrokerTopicConsumer object
type BrokerTopicConsumer struct{}

//+kubebuilder:rbac:groups=formance.com,resources=topicqueries,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=formance.com,resources=topicqueries/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=formance.com,resources=topicqueries/finalizers,verbs=update

func (r *TopicQueryController) Reconcile(ctx Context, topicQuery *v1beta1.BrokerTopicConsumer) error {
func (r *BrokerTopicConsumer) Reconcile(ctx Context, topicQuery *v1beta1.BrokerTopicConsumer) error {

if !topicQuery.DeletionTimestamp.IsZero() {
topic := &v1beta1.BrokerTopic{}
Expand Down Expand Up @@ -121,7 +121,7 @@ func (r *TopicQueryController) Reconcile(ctx Context, topicQuery *v1beta1.Broker
}

// SetupWithManager sets up the controller with the Manager.
func (r *TopicQueryController) SetupWithManager(mgr Manager) (*builder.Builder, error) {
func (r *BrokerTopicConsumer) SetupWithManager(mgr Manager) (*builder.Builder, error) {

indexer := mgr.GetFieldIndexer()
if err := indexer.IndexField(context.Background(), &v1beta1.BrokerTopicConsumer{}, ".spec.service", func(rawObj client.Object) []string {
Expand Down Expand Up @@ -152,6 +152,6 @@ func (r *TopicQueryController) SetupWithManager(mgr Manager) (*builder.Builder,
), nil
}

func ForTopicQuery() *TopicQueryController {
return &TopicQueryController{}
func ForBrokerTopicConsumer() *BrokerTopicConsumer {
return &BrokerTopicConsumer{}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

var _ = Describe("TopicQueryController", func() {
var _ = Describe("BrokerTopicConsumer", func() {
Context("When creating a BrokerTopicConsumer", func() {
var (
topicQuery *v1beta1.BrokerTopicConsumer
brokerConfig *v1beta1.BrokerConfiguration
stack *v1beta1.Stack
brokerTopicConsumer *v1beta1.BrokerTopicConsumer
brokerConfig *v1beta1.BrokerConfiguration
stack *v1beta1.Stack
)
BeforeEach(func() {
stack = &v1beta1.Stack{
Expand All @@ -34,7 +34,7 @@ var _ = Describe("TopicQueryController", func() {
Spec: v1beta1.BrokerConfigurationSpec{},
}
Expect(Create(brokerConfig)).To(Succeed())
topicQuery = &v1beta1.BrokerTopicConsumer{
brokerTopicConsumer = &v1beta1.BrokerTopicConsumer{
ObjectMeta: RandObjectMeta(),
Spec: v1beta1.BrokerTopicConsumerSpec{
Service: "ledger",
Expand All @@ -44,73 +44,73 @@ var _ = Describe("TopicQueryController", func() {
},
},
}
Expect(Create(topicQuery)).To(Succeed())
Expect(Create(brokerTopicConsumer)).To(Succeed())
})
It("Should create a BrokerTopic", func() {
t := &v1beta1.BrokerTopic{}
Eventually(func(g Gomega) *v1beta1.BrokerTopic {
g.Expect(Get(core.GetResourceName(
core.GetObjectName(stack.Name, topicQuery.Spec.Service)), t)).To(Succeed())
core.GetObjectName(stack.Name, brokerTopicConsumer.Spec.Service)), t)).To(Succeed())
return t
}).Should(BeOwnedBy(topicQuery))
}).Should(BeOwnedBy(brokerTopicConsumer))
})
Context("Then when the BrokerTopic is ready", func() {
t := &v1beta1.BrokerTopic{}
BeforeEach(func() {
Eventually(func(g Gomega) bool {
g.Expect(Get(core.GetResourceName(
core.GetObjectName(stack.Name, topicQuery.Spec.Service)), t)).To(Succeed())
core.GetObjectName(stack.Name, brokerTopicConsumer.Spec.Service)), t)).To(Succeed())
return t.Status.Ready
}).Should(BeTrue())
})
It("Should set the BrokerTopicConsumer to ready status", func() {
Eventually(func(g Gomega) bool {
g.Expect(LoadResource("", topicQuery.Name, topicQuery)).To(Succeed())
g.Expect(LoadResource("", brokerTopicConsumer.Name, brokerTopicConsumer)).To(Succeed())

return topicQuery.Status.Ready
return brokerTopicConsumer.Status.Ready
}).Should(BeTrue())
})
Context("Then create a new BrokerTopicConsumer on the same service", func() {
topicQuery2 := &v1beta1.BrokerTopicConsumer{}
brokerTopicConsumer2 := &v1beta1.BrokerTopicConsumer{}
BeforeEach(func() {
topicQuery2 = &v1beta1.BrokerTopicConsumer{
brokerTopicConsumer2 = &v1beta1.BrokerTopicConsumer{
ObjectMeta: RandObjectMeta(),
Spec: v1beta1.BrokerTopicConsumerSpec{
Service: topicQuery.Spec.Service,
Service: brokerTopicConsumer.Spec.Service,
QueriedBy: "webhooks",
StackDependency: v1beta1.StackDependency{
Stack: stack.Name,
},
},
}
Expect(Create(topicQuery2)).To(Succeed())
Expect(Create(brokerTopicConsumer2)).To(Succeed())
})
It("Should be set to ready too", func() {
Eventually(func(g Gomega) bool {
g.Expect(LoadResource("", topicQuery2.Name, topicQuery2)).To(Succeed())
g.Expect(LoadResource("", brokerTopicConsumer2.Name, brokerTopicConsumer2)).To(Succeed())

return topicQuery2.Status.Ready
return brokerTopicConsumer2.Status.Ready
}).Should(BeTrue())
})
Context("Then first BrokerTopicConsumer object", func() {
BeforeEach(func() {
Expect(Delete(topicQuery)).To(Succeed())
Expect(Delete(brokerTopicConsumer)).To(Succeed())
})
It("Should remove the service from the queries of the topic", func() {
Eventually(func(g Gomega) *v1beta1.BrokerTopic {
topic := &v1beta1.BrokerTopic{}
g.Expect(Get(core.GetResourceName(core.GetObjectName(stack.Name, topicQuery.Spec.Service)), topic)).To(Succeed())
g.Expect(Get(core.GetResourceName(core.GetObjectName(stack.Name, brokerTopicConsumer.Spec.Service)), topic)).To(Succeed())
return topic
}).ShouldNot(BeControlledBy(topicQuery))
}).ShouldNot(BeControlledBy(brokerTopicConsumer))
})
Context("Then removing the last BrokerTopicConsumer", func() {
BeforeEach(func() {
Expect(Delete(topicQuery2)).To(Succeed())
Expect(Delete(brokerTopicConsumer2)).To(Succeed())
})
It("Should completely remove the BrokerTopic object", func() {
Eventually(func(g Gomega) bool {
t := &v1beta1.BrokerTopic{}
err := Get(core.GetResourceName(core.GetObjectName(stack.Name, topicQuery.Spec.Service)), t)
err := Get(core.GetResourceName(core.GetObjectName(stack.Name, brokerTopicConsumer.Spec.Service)), t)

return errors.IsNotFound(err)
}).Should(BeTrue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *OrchestrationController) handleAuthClient(ctx Context, stack *v1beta1.S
return nil, err
}
if auth == nil {
return nil, errors.New("requires auth service")
return nil, nil
}

return authclients.Create(ctx, stack, orchestration, "orchestration",
Expand Down Expand Up @@ -144,7 +144,9 @@ func (r *OrchestrationController) createDeployment(ctx Context, stack *v1beta1.S
}
env = append(env, authEnvVars...)

env = append(env, authclients.GetEnvVars(client)...)
if client != nil {
env = append(env, authclients.GetEnvVars(client)...)
}

if orchestration.Spec.Temporal.TLS.SecretName == "" {
env = append(env,
Expand Down Expand Up @@ -204,6 +206,11 @@ func (r *OrchestrationController) SetupWithManager(mgr Manager) (*builder.Builde
handler.EnqueueRequestsFromMapFunc(
stacks.WatchDependents[*v1beta1.Orchestration](mgr)),
).
Watches(
&v1beta1.Auth{},
handler.EnqueueRequestsFromMapFunc(
stacks.WatchDependents[*v1beta1.Orchestration](mgr)),
).
Watches(
&v1beta1.Payments{},
handler.EnqueueRequestsFromMapFunc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
func init() {
reconcilers.Register(
reconcilers.New[*v1beta1.Stack](ForStack()),
reconcilers.New[*v1beta1.BrokerTopic](ForTopic()),
reconcilers.New[*v1beta1.BrokerTopicConsumer](ForTopicQuery()),
reconcilers.New[*v1beta1.BrokerTopic](ForBrokerTopic()),
reconcilers.New[*v1beta1.BrokerTopicConsumer](ForBrokerTopicConsumer()),
reconcilers.New[*v1beta1.Ledger](ForLedger()),
reconcilers.New[*v1beta1.HTTPAPI](ForHTTPAPI()),
reconcilers.New[*v1beta1.Gateway](ForGateway()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,19 @@ func (r *WalletsController) Reconcile(ctx Context, wallets *v1beta1.Wallets) err
return err
}

authClient, err := authclients.Create(ctx, stack, wallets, "wallets", func(spec *v1beta1.AuthClientSpec) {
spec.Scopes = []string{"ledger:read", "ledger:write"}
})
hasAuth, err := stacks.HasDependency[*v1beta1.Auth](ctx, wallets.Spec.Stack)
if err != nil {
return err
}
var authClient *v1beta1.AuthClient
if hasAuth {
authClient, err = authclients.Create(ctx, stack, wallets, "wallets", func(spec *v1beta1.AuthClientSpec) {
spec.Scopes = []string{"ledger:read", "ledger:write"}
})
if err != nil {
return err
}
}

if err := r.createDeployment(ctx, stack, wallets, authClient); err != nil {
return err
Expand All @@ -72,7 +79,9 @@ func (r *WalletsController) createDeployment(ctx Context, stack *v1beta1.Stack,
if err != nil {
return err
}
env = append(env, authclients.GetEnvVars(authClient)...)
if authClient != nil {
env = append(env, authclients.GetEnvVars(authClient)...)
}

authEnvVars, err := auths.EnvVars(ctx, stack, "wallets", wallets.Spec.Auth)
if err != nil {
Expand Down Expand Up @@ -115,6 +124,10 @@ func (r *WalletsController) SetupWithManager(mgr Manager) (*builder.Builder, err
&v1beta1.RegistriesConfiguration{},
handler.EnqueueRequestsFromMapFunc(stacks.WatchUsingLabels[*v1beta1.Wallets](mgr)),
).
Watches(
&v1beta1.Auth{},
handler.EnqueueRequestsFromMapFunc(stacks.WatchDependents[*v1beta1.Wallets](mgr)),
).
Owns(&v1beta1.AuthClient{}).
Owns(&appsv1.Deployment{}).
Owns(&v1beta1.HTTPAPI{}).
Expand Down
Loading

0 comments on commit 71015bd

Please sign in to comment.