Skip to content

Commit

Permalink
Merge pull request #547 from martinhesko/subject_access_review
Browse files Browse the repository at this point in the history
Add test for the subject access review non resource attributes
  • Loading branch information
averevki authored Jan 28, 2025
2 parents 9606102 + da73c71 commit 76366ce
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 15 deletions.
2 changes: 1 addition & 1 deletion testsuite/kuadrant/policy/authorization/sections.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def add_external_opa_policy(self, name, endpoint, ttl=0, **common_features):
self.add_item(name, {"opa": {"externalPolicy": {"url": endpoint, "ttl": ttl}}}, **common_features)

@modify
def add_kubernetes(self, name: str, user: ABCValue, resource_attributes: dict, **common_features):
def add_kubernetes(self, name: str, user: ABCValue, resource_attributes: dict = None, **common_features):
"""Adds Kubernetes authorization
:param name: name of kubernetes authorization
Expand Down
59 changes: 59 additions & 0 deletions testsuite/kubernetes/cluster_role.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""ClusterRole and ClusterRoleBinding objects for Kubernetes"""

from typing import Any
from testsuite.kubernetes import KubernetesObject


class ClusterRole(KubernetesObject):
"""Kubernetes ClusterRole"""

@classmethod
def create_instance(
cls,
cluster,
name,
rules: list[dict[str, Any]] = None,
labels: dict[str, str] = None,
):
"""Creates a new ClusterRole instance"""
model: dict = {
"kind": "ClusterRole",
"apiVersion": "rbac.authorization.k8s.io/v1",
"metadata": {
"name": name,
"labels": labels,
},
"rules": rules,
}
return cls(model, context=cluster.context)


class ClusterRoleBinding(KubernetesObject):
"""Kubernetes ClusterRoleBinding"""

@classmethod
def create_instance(
cls,
cluster,
name,
cluster_role: str,
serviceaccounts: list[str],
labels: dict[str, str] = None,
):
"""Creates a new ClusterRoleBinding object"""
model: dict = {
"kind": "ClusterRoleBinding",
"apiVersion": "rbac.authorization.k8s.io/v1",
"metadata": {
"name": name,
"labels": labels,
},
"roleRef": {
"kind": "ClusterRole",
"name": cluster_role,
},
"subjects": [
{"kind": "ServiceAccount", "name": name, "namespace": cluster.project} for name in serviceaccounts
],
}
return cls(model, context=cluster.context)
15 changes: 15 additions & 0 deletions testsuite/tests/singlecluster/authorino/identity/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,21 @@

import pytest

from testsuite.kubernetes.service_account import ServiceAccount


@pytest.fixture(scope="module")
def create_service_account(request, cluster, blame, module_label):
"""Creates and returns service account"""

def _create_service_account(name):
service_account = ServiceAccount.create_instance(cluster, blame(name), labels={"app": module_label})
request.addfinalizer(service_account.delete)
service_account.commit()
return service_account

return _create_service_account


@pytest.fixture(scope="module")
def authorization(authorization):
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Conftest for SubjectAccessReview related tests."""

import pytest

from testsuite.httpx.auth import HeaderApiKeyAuth
from testsuite.kuadrant.policy.authorization import ValueFrom
from testsuite.kubernetes.cluster_role import ClusterRole, ClusterRoleBinding


@pytest.fixture(scope="module")
def audience(hostname):
"""Return hostname as only audience for the service account bound token"""
return [hostname.hostname]


@pytest.fixture(scope="module")
def authorization(authorization):
"""Add kubernetes token-review and subject-access-review identity"""
authorization.identity.add_kubernetes("token-review-host")
user = ValueFrom("auth.identity.user.username")
authorization.authorization.add_kubernetes("subject-access-review-host", user)
return authorization


@pytest.fixture(scope="module")
def create_cluster_role_binding(request, cluster, blame, module_label):
"""Creates and returns a ClusterRoleBinding"""

def _create_cluster_role_binding(cluster_role, service_accounts):
cluster_role_binding = ClusterRoleBinding.create_instance(
cluster, blame("crb"), cluster_role, service_accounts, labels={"app": module_label}
)
request.addfinalizer(cluster_role_binding.delete)
cluster_role_binding.commit()
return cluster_role_binding

return _create_cluster_role_binding


@pytest.fixture(scope="module")
def cluster_role(request, cluster, blame, module_label):
"""Creates and returns a ClusterRole"""
rules = [{"nonResourceURLs": ["/get"], "verbs": ["get"]}]
cluster_role = ClusterRole.create_instance(cluster, blame("cr"), rules, labels={"app": module_label})
request.addfinalizer(cluster_role.delete)
cluster_role.commit()
return cluster_role


@pytest.fixture(scope="module")
def bound_service_account_token(cluster_role, create_service_account, create_cluster_role_binding, audience):
"""Create a ServiceAccount, bind it to a ClusterRole and return its token with a given audience"""
service_account = create_service_account("tkn-auth")
create_cluster_role_binding(cluster_role.model.metadata.name, [service_account.model.metadata.name])
return service_account.get_auth_token(audience)


@pytest.fixture(scope="module")
def auth(bound_service_account_token):
"""Create request auth with service account token as API key"""
return HeaderApiKeyAuth(bound_service_account_token, "Bearer")


@pytest.fixture(scope="module")
def service_account_token(create_service_account, audience):
"""Create a non-authorized service account and request its bound token with the hostname as audience"""
service_account = create_service_account("tkn-non-auth")
return service_account.get_auth_token(audience)


@pytest.fixture(scope="module")
def auth2(service_account_token):
"""Create request auth with service account token as API key"""
return HeaderApiKeyAuth(service_account_token, "Bearer")
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Test kubernetes SubjectAccessReview authorization by verifying only a
ServiceAccount bound to a ClusterRole is authorized to access a resource"""

import pytest

pytestmark = [pytest.mark.authorino]


def test_subject_access_review_non_resource_attributes(client, auth, auth2):
"""Test Kubernetes SubjectAccessReview functionality by setting up authentication and authorization for an endpoint
and querying it with authorized and non-authorized ServiceAccount."""
response = client.get("/get", auth=auth)
assert response.status_code == 200

response = client.get("/get", auth=auth2)
assert response.status_code == 403
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,6 @@
import pytest

from testsuite.httpx.auth import HeaderApiKeyAuth
from testsuite.kubernetes.service_account import ServiceAccount


@pytest.fixture(scope="module")
def create_service_account(request, cluster, blame, module_label):
"""Creates and returns service account"""

def _create_service_account(name):
service_account = ServiceAccount.create_instance(cluster, blame(name), labels={"app": module_label})
request.addfinalizer(service_account.delete)
service_account.commit()
return service_account

return _create_service_account


@pytest.fixture(scope="module")
Expand Down

0 comments on commit 76366ce

Please sign in to comment.