Skip to content

Commit

Permalink
Handle namespace recreated in resource syncer
Browse files Browse the repository at this point in the history
If a namespace is deleted and the namespace was previously seen,
ie there was at least one resource previously synced in that namespace,
then retrieve all the cached resource keys in the namespace and add them
to the missingNamespaces set. If the namespace is later recreated,
then those resources will be re-queued and re-synced.

Signed-off-by: Tom Pantelis <[email protected]>
  • Loading branch information
tpantelis committed Aug 30, 2024
1 parent b8dc57b commit 31ab910
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 42 deletions.
33 changes: 27 additions & 6 deletions pkg/federate/fake/federator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

. "github.com/onsi/gomega"
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/resource"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -35,6 +36,7 @@ type Federator struct {
lock sync.Mutex
distribute chan *unstructured.Unstructured
delete chan *unstructured.Unstructured
delegator federate.Federator
failOnDistribute error
failOnDelete error
ResetOnFailure atomic.Bool
Expand All @@ -50,6 +52,13 @@ func New() *Federator {
return f
}

func (f *Federator) SetDelegator(d federate.Federator) {
f.lock.Lock()
defer f.lock.Unlock()

f.delegator = d
}

func (f *Federator) FailOnDistribute(err error) {
f.lock.Lock()
defer f.lock.Unlock()
Expand All @@ -64,7 +73,7 @@ func (f *Federator) FailOnDelete(err error) {
f.failOnDelete = err
}

func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error {
func (f *Federator) Distribute(ctx context.Context, obj runtime.Object) error {
f.lock.Lock()
defer f.lock.Unlock()

Expand All @@ -77,12 +86,18 @@ func (f *Federator) Distribute(_ context.Context, obj runtime.Object) error {
return err
}

f.distribute <- resource.MustToUnstructured(obj)
if f.delegator != nil {
err = f.delegator.Distribute(ctx, obj)
}

return nil
if err == nil {
f.distribute <- resource.MustToUnstructured(obj)
}

return err
}

func (f *Federator) Delete(_ context.Context, obj runtime.Object) error {
func (f *Federator) Delete(ctx context.Context, obj runtime.Object) error {
f.lock.Lock()
defer f.lock.Unlock()

Expand All @@ -95,9 +110,15 @@ func (f *Federator) Delete(_ context.Context, obj runtime.Object) error {
return err
}

f.delete <- resource.MustToUnstructured(obj)
if f.delegator != nil {
err = f.delegator.Delete(ctx, obj)
}

if err == nil {
f.delete <- resource.MustToUnstructured(obj)
}

return nil
return err
}

func (f *Federator) VerifyDistribute(expected runtime.Object) {
Expand Down
63 changes: 53 additions & 10 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import (

const (
OrigNamespaceLabelKey = "submariner-io/originatingNamespace"
namespaceKey = "$namespace$"
namespaceAddedKey = "$namespace-added$"
namespaceDeletedKey = "$namespace-deleted$"
)

type SyncDirection int
Expand Down Expand Up @@ -316,7 +317,13 @@ func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) {
if config.NamespaceInformer != nil {
_, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, _ bool) {
syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceKey, resourceUtil.MustToMeta(obj).GetName()).String()))
syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceAddedKey, resourceUtil.MustToMeta(obj).GetName()).String()))
},
DeleteFunc: func(obj interface{}) {
objName, err := cache.DeletionHandlingObjectToName(obj)
utilruntime.Must(err)

syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceDeletedKey, objName.Name).String()))
},
})
if err != nil {
Expand Down Expand Up @@ -520,11 +527,16 @@ func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any
}

func (r *resourceSyncer) processNextWorkItem(key, name, ns string) (bool, error) {
if ns == namespaceKey {
if ns == namespaceAddedKey {
r.handleNamespaceAdded(name)
return false, nil
}

if ns == namespaceDeletedKey {
r.handleNamespaceDeleted(name)
return false, nil
}

var (
requeue bool
err error
Expand Down Expand Up @@ -600,6 +612,8 @@ func (r *resourceSyncer) handleCreatedOrUpdated(key string, created *unstructure
return true, errors.Wrapf(err, "error distributing resource %q", key)
}

r.recordNamespaceSeen(resource.GetNamespace())

if r.syncCounter != nil {
r.syncCounter.With(prometheus.Labels{
DirectionLabel: r.config.Direction.String(),
Expand Down Expand Up @@ -796,20 +810,22 @@ func (r *resourceSyncer) assertUnstructured(obj interface{}) *unstructured.Unstr
return u
}

func (r *resourceSyncer) recordNamespaceSeen(namespace string) {
_, ok := r.missingNamespaces[namespace]
if !ok {
r.missingNamespaces[namespace] = set.New[string]()
}
}

func (r *resourceSyncer) handleMissingNamespace(key, namespace string) {
r.log.Warningf("Syncer %q: Unable to distribute resource %q due to missing namespace %q", r.config.Name, key, namespace)

if r.config.NamespaceInformer == nil {
return
}

keys, ok := r.missingNamespaces[namespace]
if !ok {
keys = set.New[string]()
r.missingNamespaces[namespace] = keys
}

keys.Insert(key)
r.recordNamespaceSeen(namespace)
r.missingNamespaces[namespace].Insert(key)
}

func (r *resourceSyncer) handleNamespaceAdded(namespace string) {
Expand All @@ -826,6 +842,33 @@ func (r *resourceSyncer) handleNamespaceAdded(namespace string) {
}
}

func (r *resourceSyncer) handleNamespaceDeleted(namespace string) {
keys, ok := r.missingNamespaces[namespace]
if !ok {
return
}

for _, key := range r.store.ListKeys() {
obj, exists, _ := r.store.GetByKey(key)
if !exists {
continue
}

resource, _, _ := r.transform(r.assertUnstructured(obj), key, Create)
if resource == nil {
continue
}

if resource.GetNamespace() == namespace {
keys.Insert(key)
}
}

if keys.Len() > 0 {
r.log.Infof("Syncer %q: namespace %q deleted - recorded %d missing resources", r.config.Name, namespace, keys.Len())
}
}

func getClusterIDLabel(resource runtime.Object) (string, bool) {
clusterID, found := resourceUtil.MustToMeta(resource).GetLabels()[federate.ClusterIDLabelKey]
return clusterID, found
Expand Down
102 changes: 76 additions & 26 deletions pkg/syncer/resource_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/prometheus/client_golang/prometheus"
fakereactor "github.com/submariner-io/admiral/pkg/fake"
"github.com/submariner-io/admiral/pkg/federate"
"github.com/submariner-io/admiral/pkg/federate/fake"
. "github.com/submariner-io/admiral/pkg/gomega"
resourceutils "github.com/submariner-io/admiral/pkg/resource"
Expand All @@ -43,11 +45,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
fakeClient "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
fakeK8s "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -1236,50 +1236,66 @@ func testWithSharedInformer() {
}

func testWithMissingNamespace() {
const transformedNamespace = "transformed-ns"
const (
transformedNamespace = "transformed-ns"
noTransform = "no-transform"
)

d := newTestDriver(test.LocalNamespace, "", syncer.LocalToRemote)

var (
k8sClient kubernetes.Interface
nsInformerFactory informers.SharedInformerFactory
)
namespaceClient := func() dynamic.ResourceInterface {
return d.config.SourceClient.Resource(corev1.SchemeGroupVersion.WithResource("namespaces")).Namespace(metav1.NamespaceNone)
}

createNamespace := func(name string) {
test.CreateResource(namespaceClient(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
})
}

BeforeEach(func() {
d.config.Transform = func(obj runtime.Object, _ int, _ syncer.Operation) (runtime.Object, bool) {
obj = obj.DeepCopyObject()
resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace)
if resourceutils.MustToMeta(obj).GetName() == noTransform {
return nil, false
} else if resourceutils.MustToMeta(obj).GetNamespace() == test.LocalNamespace {
obj = obj.DeepCopyObject()
resourceutils.MustToMeta(obj).SetNamespace(transformedNamespace)
}

return obj, false
}

d.federator.FailOnDistribute(apierrors.NewNotFound(schema.GroupResource{
Resource: "namespaces",
}, transformedNamespace))

k8sClient = fakeK8s.NewClientset()
nsInformerFactory = informers.NewSharedInformerFactory(k8sClient, 0)
d.config.NamespaceInformer = nsInformerFactory.Core().V1().Namespaces().Informer()
d.config.NamespaceInformer = cache.NewSharedInformer(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return namespaceClient().List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return namespaceClient().Watch(context.TODO(), options)
},
}, resourceutils.MustToUnstructured(&corev1.Namespace{}), 0)
})

JustBeforeEach(func() {
nsInformerFactory.Start(d.stopCh)
d.federator.SetDelegator(federate.NewCreateFederator(d.config.SourceClient, d.config.RestMapper, transformedNamespace))

createNamespace(test.LocalNamespace)

fakereactor.AddVerifyNamespaceReactor(&d.config.SourceClient.(*fakeClient.FakeDynamicClient).Fake, "pods")

if d.config.NamespaceInformer != nil {
go d.config.NamespaceInformer.Run(d.stopCh)
}
})

Specify("distribute should eventually succeed when the namespace is created", func() {
resource := test.CreateResource(d.sourceClient, d.resource)
d.federator.VerifyNoDistribute()

d.federator.FailOnDistribute(nil)

By("Creating namespace")

_, err := k8sClient.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: transformedNamespace,
},
}, metav1.CreateOptions{})
Expect(err).To(Succeed())
createNamespace(transformedNamespace)

resource.SetNamespace(transformedNamespace)
d.federator.VerifyDistribute(resource)
Expand All @@ -1295,6 +1311,40 @@ func testWithMissingNamespace() {
d.federator.VerifyNoDistribute()
})
})

Context("after a namespace is created and distribute succeeds", func() {
const otherNS = "other-ns"

JustBeforeEach(func() {
createNamespace(transformedNamespace)
createNamespace(otherNS)
})

It("should eventually redistribute when the namespace is recreated", func() {
resource := test.CreateResource(d.sourceClient, d.resource)
resource.SetNamespace(transformedNamespace)
d.federator.VerifyDistribute(resource)

other := d.resource.DeepCopy()
other.Name = noTransform
test.CreateResource(d.sourceClient, other)

other = d.resource.DeepCopy()
other.Namespace = otherNS
test.CreateResource(d.config.SourceClient.Resource(
*test.GetGroupVersionResourceFor(d.config.RestMapper, other)).Namespace(otherNS), other)

By("Deleting namespace")

err := namespaceClient().Delete(context.TODO(), transformedNamespace, metav1.DeleteOptions{})
Expect(err).To(Succeed())

By("Recreating namespace")

createNamespace(transformedNamespace)
d.federator.VerifyDistribute(resource)
})
})
}

func testEventOrdering() {
Expand Down

0 comments on commit 31ab910

Please sign in to comment.