Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix namespaces deletion handling #159

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 51 additions & 72 deletions src/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def on_field_data(
old: Dict[str, str],
new: Dict[str, str],
body: Dict[str, Any],
meta: kopf.Meta,
name: str,
namespace: Optional[str],
uid: str,
Expand All @@ -126,61 +125,21 @@ def on_field_data(
logger.debug(f'Updating Object body == {body}')
syncedns = body.get('status', {}).get('create_fn', {}).get('syncedns', [])

secret_type = body.get('type', 'Opaque')

cached_cluster_secret = csecs_cache.get_cluster_secret(uid)
if cached_cluster_secret is None:
logger.error('Received an event for an unknown ClusterSecret.')

updated_syncedns = syncedns.copy()
for ns in syncedns:
logger.info(f'Re Syncing secret {name} in ns {ns}')
ns_sec_body = client.V1Secret(
api_version='v1',
data={str(key): str(value) for key, value in new.items()},
kind='Secret',
metadata=create_secret_metadata(
name=name,
namespace=ns,
annotations={str(key): str(value) for key, value in meta.annotations.items()},
labels={str(key): str(value) for key, value in meta.labels.items()},
),
type=secret_type,
)
logger.debug(f'body: {ns_sec_body}')
# Ensuring the secret still exist.
if secret_exists(logger=logger, name=name, namespace=ns, v1=v1):
response = v1.replace_namespaced_secret(name=name, namespace=ns, body=ns_sec_body)
else:
try:
v1.read_namespace(name=ns)
except client.exceptions.ApiException as e:
if e.status != 404:
raise
response = f'Namespace {ns} not found'
updated_syncedns.remove(ns)
logger.info(f'Namespace {ns} not found while Syncing secret {name}')
else:
response = v1.create_namespaced_secret(namespace=ns, body=ns_sec_body)
logger.debug(response)

if updated_syncedns != syncedns:
# Patch synced_ns field
logger.debug(f'Patching clustersecret {name} in namespace {namespace}')
body = patch_clustersecret_status(
logger=logger,
name=name,
new_status={'create_fn': {'syncedns': updated_syncedns}},
custom_objects_api=custom_objects_api,
)
sync_secret(logger, ns, body, v1)

# Updating the cache
csecs_cache.set_cluster_secret(BaseClusterSecret(
uid=uid,
name=name,
namespace=namespace or "",
body=body,
synced_namespace=updated_syncedns,
synced_namespace=syncedns,
))


Expand Down Expand Up @@ -221,41 +180,61 @@ async def create_fn(


@kopf.on.create('', 'v1', 'namespaces')
async def namespace_watcher(logger: logging.Logger, meta: kopf.Meta, **_):
@kopf.on.delete('', 'v1', 'namespaces')
async def namespace_watcher(logger: logging.Logger, reason: kopf.Reason, meta: kopf.Meta, **_):
"""Watch for namespace events
"""
new_ns = meta.name
logger.debug(f'New namespace created: {new_ns} re-syncing')
ns_new_list = []
for cluster_secret in csecs_cache.all_cluster_secret():
obj_body = cluster_secret.body
name = cluster_secret.name

matcheddns = cluster_secret.synced_namespace

logger.debug(f'Old matched namespace: {matcheddns} - name: {name}')
ns_new_list = get_ns_list(logger, obj_body, v1)
logger.debug(f'new matched list: {ns_new_list}')
if new_ns in ns_new_list:
logger.debug(f'Cloning secret {name} into the new namespace {new_ns}')
if reason not in ["create", "delete"]:
logger.error(f'Function "namespace_watcher" was called with incorrect reason: {reason}')
return

ns_name = meta.name
logger.info(f'Namespace {"created" if reason == "create" else "deleted"}: {ns_name}. Re-syncing')

ns_list_new = []
for cached_cluster_secret in csecs_cache.all_cluster_secret():
body = cached_cluster_secret.body
name = cached_cluster_secret.name
ns_list_synced = cached_cluster_secret.synced_namespace
ns_list_new = get_ns_list(logger, body, v1)
ns_list_changed = False

logger.debug(f'ClusterSecret: {name}. Old matched namespaces: {ns_list_synced}')
logger.debug(f'ClusterSecret: {name}. New matched namespaces: {ns_list_new}')

if reason == "create" and ns_name in ns_list_new:
logger.info(f'Cloning secret {name} into the new namespace: {ns_name}')
sync_secret(
logger=logger,
namespace=new_ns,
body=obj_body,
namespace=ns_name,
body=body,
v1=v1,
)

# if there is a new matching ns, refresh cache
cluster_secret.synced_namespace = ns_new_list
csecs_cache.set_cluster_secret(cluster_secret)

# update ns_new_list on the object so then we also delete from there
patch_clustersecret_status(
logger=logger,
name=cluster_secret.name,
new_status={'create_fn': {'syncedns': ns_new_list}},
custom_objects_api=custom_objects_api,
)
ns_list_changed = True

if reason == "delete" and ns_name in ns_list_synced:
logger.info(f'Secret {name} removed from deleted namespace: {ns_name}')
# Ensure that deleted namespace will not come in new list - on moment when this event handled by kopf the namespace in kubernetes can still exists
if ns_name in ns_list_new:
ns_list_new.remove(ns_name)
ns_list_changed = True

# Update ClusterSecret only if there are changes in list of his namespaces
if ns_list_changed:
# Update in-memory cache
cached_cluster_secret.synced_namespace = ns_list_new
csecs_cache.set_cluster_secret(cached_cluster_secret)

# Update the list of synced namespaces in kubernetes object
logger.debug(f'Patching ClusterSecret: {name}')
patch_clustersecret_status(
logger=logger,
name=name,
new_status={'create_fn': {'syncedns': ns_list_new}},
custom_objects_api=custom_objects_api,
)
else:
logger.debug(f'There are no changes in the list of namespaces for ClusterSecret: {name}')


@kopf.on.startup()
Expand Down
139 changes: 1 addition & 138 deletions src/tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,144 +118,6 @@ def test_on_field_data_sync(self):
{"key": "newvalue"},
)

def test_on_field_data_ns_deleted(self):
"""Don't fail the sync if one of the namespaces was deleted.
"""

mock_v1 = Mock()

def read_namespaced_secret(name, namespace, **kwargs):
if namespace == "myns2":
# Old data in the namespaced secret of the myns namespace.
return V1Secret(
api_version='v1',
data={"key": "oldvalue"},
kind='Secret',
metadata=create_secret_metadata(
name="mysecret",
namespace="myns2",
),
type="Opaque",
)
else:
# Deleted namespace.
raise ApiException(status=404, reason="Not Found")

mock_v1.read_namespaced_secret = read_namespaced_secret

create_namespaced_secret_called_count_for_ns2 = 0

def create_namespaced_secret(namespace, body, **kwargs):
if namespace == "myns2":
nonlocal create_namespaced_secret_called_count_for_ns2
create_namespaced_secret_called_count_for_ns2 += 1
else:
# Deleted namespace.
raise ApiException(status=404, reason="Not Found")

mock_v1.create_namespaced_secret = create_namespaced_secret

replace_namespaced_secret_called_count_for_ns2 = 0

def replace_namespaced_secret(name, namespace, body, **kwargs):
if namespace == "myns2":
nonlocal replace_namespaced_secret_called_count_for_ns2
replace_namespaced_secret_called_count_for_ns2 += 1
self.assertEqual(name, csec.name)

# Namespaced secret should be updated with the new data.
self.assertEqual(
body.data,
{"key": "newvalue"},
)

return V1Secret(
api_version='v1',
data=body.data,
kind='Secret',
metadata=create_secret_metadata(
name="mysecret",
namespace="myns2",
),
type="Opaque",
)
else:
# Deleted namespace.
raise ApiException(status=404, reason="Not Found")

mock_v1.replace_namespaced_secret = replace_namespaced_secret

def read_namespace(name, **kwargs):
if name != "myns2":
# Deleted namespace.
raise ApiException(status=404, reason="Not Found")

mock_v1.read_namespace = read_namespace

patch_clustersecret_status = Mock()
patch_clustersecret_status.return_value = {
"metadata": {"name": "mysecret", "uid": "mysecretuid"},
"data": {"key": "newvalue"},
"status": {"create_fn": {"syncedns": ["myns2"]}},
}

# Old data in the cache.
csec = BaseClusterSecret(
uid="mysecretuid",
name="mysecret",
namespace="",
body={
"metadata": {"name": "mysecret", "uid": "mysecretuid"},
"data": {"key": "oldvalue"},
"status": {"create_fn": {"syncedns": ["myns1", "myns2"]}},
},
synced_namespace=["myns1", "myns2"],
)

csecs_cache.set_cluster_secret(csec)

# New data coming into the callback.
new_body = {
"metadata": {"name": "mysecret", "uid": "mysecretuid"},
"data": {"key": "newvalue"},
"status": {"create_fn": {"syncedns": ["myns1", "myns2"]}},
}

with patch("handlers.v1", mock_v1), \
patch("handlers.patch_clustersecret_status", patch_clustersecret_status):
on_field_data(
old={"key": "oldvalue"},
new={"key": "newvalue"},
body=new_body,
meta=kopf.Meta({"metadata": {"name": "mysecret"}}),
name="mysecret",
namespace=None,
uid="mysecretuid",
logger=self.logger,
)

# Namespaced secret should be updated with the new data.
self.assertEqual(replace_namespaced_secret_called_count_for_ns2, 1)
self.assertEqual(create_namespaced_secret_called_count_for_ns2, 0)

# The namespace should be deleted from the syncedns status of the clustersecret.
patch_clustersecret_status.assert_called_once_with(
logger=self.logger,
name=csec.name,
new_status={'create_fn': {'syncedns': ["myns2"]}},
custom_objects_api=custom_objects_api,
)

# Namespace should be deleted from the cache.
self.assertEqual(
csecs_cache.get_cluster_secret("mysecretuid").body.get("status"),
{"create_fn": {"syncedns": ["myns2"]}},
)
self.assertEqual(
csecs_cache.get_cluster_secret("mysecretuid").synced_namespace,
["myns2"],
)

def test_create_fn(self):
"""Namespace name must be correct in the cache.
"""
Expand Down Expand Up @@ -324,6 +186,7 @@ def test_ns_create(self):
asyncio.run(
namespace_watcher(
logger=self.logger,
reason=kopf.Reason("create"),
meta=kopf.Meta({"metadata": {"name": "myns"}}),
)
)
Expand Down
Loading