Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/u/mpiano/SEC-18955'
Browse files Browse the repository at this point in the history
  • Loading branch information
piax93 committed Jun 6, 2024
2 parents 48e48ac + 6ffd941 commit c5883b8
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 0 deletions.
13 changes: 13 additions & 0 deletions task_processing/plugins/kubernetes/kubernetes_pod_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,20 @@
from task_processing.plugins.kubernetes.utils import get_kubernetes_empty_volume_mounts
from task_processing.plugins.kubernetes.utils import get_kubernetes_env_vars
from task_processing.plugins.kubernetes.utils import get_kubernetes_secret_volume_mounts
from task_processing.plugins.kubernetes.utils import (
get_kubernetes_service_account_token_volume_mounts,
)
from task_processing.plugins.kubernetes.utils import get_kubernetes_volume_mounts
from task_processing.plugins.kubernetes.utils import get_node_affinity
from task_processing.plugins.kubernetes.utils import get_pod_empty_volumes
from task_processing.plugins.kubernetes.utils import get_pod_secret_volumes
from task_processing.plugins.kubernetes.utils import (
get_pod_service_account_token_volumes,
)
from task_processing.plugins.kubernetes.utils import get_pod_volumes
from task_processing.plugins.kubernetes.utils import get_sanitised_kubernetes_name


logger = logging.getLogger(__name__)

POD_WATCH_THREAD_JOIN_TIMEOUT_S = 1.0
Expand Down Expand Up @@ -441,6 +448,9 @@ def _create_container_definition(
get_kubernetes_volume_mounts(task_config.volumes)
+ get_kubernetes_empty_volume_mounts(task_config.empty_volumes)
+ get_kubernetes_secret_volume_mounts(task_config.secret_volumes)
+ get_kubernetes_service_account_token_volume_mounts(
task_config.projected_sa_volumes
)
)

capabilities = get_capabilities_for_capability_changes(
Expand Down Expand Up @@ -514,6 +524,9 @@ def run(self, task_config: KubernetesTaskConfig) -> Optional[str]:
get_pod_volumes(task_config.volumes)
+ get_pod_empty_volumes(task_config.empty_volumes)
+ get_pod_secret_volumes(task_config.secret_volumes)
+ get_pod_service_account_token_volumes(
task_config.projected_sa_volumes
)
)

pod = V1Pod(
Expand Down
38 changes: 38 additions & 0 deletions task_processing/plugins/kubernetes/task_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
from task_processing.plugins.kubernetes.types import NodeAffinity
from task_processing.plugins.kubernetes.types import NodeAffinityOperator
from task_processing.plugins.kubernetes.types import ObjectFieldSelectorSource
from task_processing.plugins.kubernetes.types import ProjectedSAVolume
from task_processing.plugins.kubernetes.types import SecretVolume
from task_processing.plugins.kubernetes.types import SecretVolumeItem
from task_processing.plugins.kubernetes.utils import (
DEFAULT_PROJECTED_SA_TOKEN_EXPIRATION_SECONDS,
)
from task_processing.plugins.kubernetes.utils import get_sanitised_kubernetes_name
from task_processing.plugins.kubernetes.utils import mode_to_int

Expand Down Expand Up @@ -190,6 +194,34 @@ def _valid_secret_volumes(
return (True, None)


def _valid_projected_sa_volumes(
sa_volumes: Sequence[ProjectedSAVolume],
) -> Tuple[bool, Optional[str]]:
min_expiration = 600
for volume in sa_volumes:
if not volume.get("audience"):
return (
False,
"No token audience set for projected service account volume",
)
if not volume.get("container_path"):
return (
False,
"No token container_path set for projected service account volume",
)
if (
volume.get(
"expiration_seconds", DEFAULT_PROJECTED_SA_TOKEN_EXPIRATION_SECONDS
)
< min_expiration
):
return (
False,
f"Expiration for service account projected token must be at least {min_expiration} seconds",
)
return (True, None)


def _valid_secret_envs(
secret_envs: Mapping[str, "SecretEnvSource"]
) -> Tuple[bool, Optional[str]]:
Expand Down Expand Up @@ -353,6 +385,12 @@ def __invariant__(self) -> Tuple[Tuple[bool, str], ...]:
factory=pvector,
invariant=_valid_secret_volumes,
)
projected_sa_volumes = field(
type=PVector if not TYPE_CHECKING else PVector["ProjectedSAVolume"],
initial=v(),
factory=pvector,
invariant=_valid_projected_sa_volumes,
)

extra_containers = field(
type=PMap if not TYPE_CHECKING else PMap[str, "KubernetesTaskConfig"],
Expand Down
6 changes: 6 additions & 0 deletions task_processing/plugins/kubernetes/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class SecretVolume(TypedDict):
items: List[SecretVolumeItem]


class ProjectedSAVolume(TypedDict, total=False):
container_path: str
audience: str
expiration_seconds: int


class SecretEnvSource(TypedDict):
secret_name: str # full name of k8s secret resource
key: str
Expand Down
67 changes: 67 additions & 0 deletions task_processing/plugins/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
from kubernetes.client import V1NodeSelectorRequirement
from kubernetes.client import V1NodeSelectorTerm
from kubernetes.client import V1ObjectFieldSelector
from kubernetes.client import V1ProjectedVolumeSource
from kubernetes.client import V1SecretKeySelector
from kubernetes.client import V1SecretVolumeSource
from kubernetes.client import V1ServiceAccountTokenProjection
from kubernetes.client import V1Volume
from kubernetes.client import V1VolumeMount
from kubernetes.client import V1VolumeProjection
from pyrsistent.typing import PMap
from pyrsistent.typing import PVector

Expand All @@ -32,9 +35,13 @@
from task_processing.plugins.kubernetes.types import NodeAffinity
from task_processing.plugins.kubernetes.types import SecretEnvSource
from task_processing.plugins.kubernetes.types import ObjectFieldSelectorSource
from task_processing.plugins.kubernetes.types import ProjectedSAVolume


logger = logging.getLogger(__name__)

DEFAULT_PROJECTED_SA_TOKEN_EXPIRATION_SECONDS = 1800


def get_capabilities_for_capability_changes(
cap_add: PVector[str],
Expand Down Expand Up @@ -350,3 +357,63 @@ def get_node_affinity(affinities: PVector["NodeAffinity"]) -> Optional[V1NodeAff
],
),
)


def _get_service_account_token_volume_name(audience: str) -> str:
"""Generate name for service account projected volume
:param str audience: audience of the authentication token
:return: volume name
"""
return get_sanitised_volume_name(
f"projected-sa--{audience}",
length_limit=63,
)


def get_pod_service_account_token_volumes(
sa_volumes: PVector["ProjectedSAVolume"],
) -> List[V1Volume]:
"""Build projected service account volumes for pod
:param PVector["ProjectedSAVolume"] sa_volumes: list of projected service account volume configs
:return: list of kubernetes pod volume objects
"""
return [
V1Volume(
name=_get_service_account_token_volume_name(volume["audience"]),
projected=V1ProjectedVolumeSource(
sources=[
V1VolumeProjection(
service_account_token=V1ServiceAccountTokenProjection(
audience=volume["audience"],
expiration_seconds=volume.get(
"expiration_seconds",
DEFAULT_PROJECTED_SA_TOKEN_EXPIRATION_SECONDS,
),
path="token",
),
),
],
),
)
for volume in sa_volumes
]


def get_kubernetes_service_account_token_volume_mounts(
sa_volumes: PVector["ProjectedSAVolume"],
) -> List[V1VolumeMount]:
"""Build container mounts for projected service account volumes
:param PVector["ProjectedSAVolume"] sa_volumes: list of projected service account volume configs
:return: list of kubernetes volume mount objects
"""
return [
V1VolumeMount(
mount_path=volume["container_path"],
name=_get_service_account_token_volume_name(volume["audience"]),
read_only=True,
)
for volume in sa_volumes
]
115 changes: 115 additions & 0 deletions tests/unit/plugins/kubernetes/kubernetes_pod_executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
from kubernetes.client import V1Pod
from kubernetes.client import V1PodSecurityContext
from kubernetes.client import V1PodSpec
from kubernetes.client import V1ProjectedVolumeSource
from kubernetes.client import V1ResourceRequirements
from kubernetes.client import V1SecurityContext
from kubernetes.client import V1ServiceAccountTokenProjection
from kubernetes.client import V1Volume
from kubernetes.client import V1VolumeMount
from kubernetes.client import V1VolumeProjection
from pyrsistent import InvariantException
from pyrsistent import pmap
from pyrsistent import pvector
Expand Down Expand Up @@ -547,6 +550,118 @@ def test_run_failed_exception(k8s_executor):
assert k8s_executor.run(task_config) is None


@mock.patch(
"task_processing.plugins.kubernetes.kubernetes_pod_executor.get_node_affinity",
autospec=True,
)
def test_run_authentication_token(mock_get_node_affinity, k8s_executor):
task_config = KubernetesTaskConfig(
name="fake_task_name",
uuid="fake_id",
image="fake_docker_image",
command="fake_command",
cpus=1,
cpus_request=0.5,
memory=1024,
disk=1024,
volumes=[],
projected_sa_volumes=[
{"audience": "foo.bar.com", "container_path": "/var/secret/whatever"}
],
node_selectors={"hello": "world"},
node_affinities=[dict(key="a_label", operator="In", value=[])],
labels={
"some_label": "some_label_value",
},
annotations={
"paasta.yelp.com/some_annotation": "some_value",
},
service_account_name="testsa",
ports=[8888],
stdin=True,
stdin_once=True,
tty=True,
)
expected_container = V1Container(
image=task_config.image,
name="main",
command=["/bin/sh", "-c"],
args=[task_config.command],
security_context=V1SecurityContext(
capabilities=V1Capabilities(drop=list(task_config.cap_drop)),
),
resources=V1ResourceRequirements(
limits={
"cpu": 1.0,
"memory": "1024.0Mi",
"ephemeral-storage": "1024.0Mi",
},
requests={"cpu": 0.5},
),
env=[],
volume_mounts=[
V1VolumeMount(
mount_path="/var/secret/whatever",
name="projected-sa--foodot-bardot-com",
read_only=True,
),
],
ports=[V1ContainerPort(container_port=8888)],
stdin=True,
stdin_once=True,
tty=True,
)
expected_pod = V1Pod(
metadata=V1ObjectMeta(
name=task_config.pod_name,
namespace="task_processing_tests",
labels={
"some_label": "some_label_value",
},
annotations={
"paasta.yelp.com/some_annotation": "some_value",
},
),
spec=V1PodSpec(
restart_policy=task_config.restart_policy,
containers=[expected_container],
volumes=[
V1Volume(
name="projected-sa--foodot-bardot-com",
projected=V1ProjectedVolumeSource(
sources=[
V1VolumeProjection(
service_account_token=V1ServiceAccountTokenProjection(
audience="foo.bar.com",
expiration_seconds=1800,
path="token",
),
),
],
),
),
],
share_process_namespace=True,
security_context=V1PodSecurityContext(
fs_group=task_config.fs_group,
),
node_selector={"hello": "world"},
affinity=V1Affinity(node_affinity=mock_get_node_affinity.return_value),
dns_policy="Default",
service_account_name=task_config.service_account_name,
),
)

assert all(v is not None for v in expected_container.resources.requests.values())
assert k8s_executor.run(task_config) == task_config.pod_name
assert k8s_executor.kube_client.core.create_namespaced_pod.call_args_list == [
mock.call(body=expected_pod, namespace="task_processing_tests")
]
assert mock_get_node_affinity.call_args_list == [
mock.call(pvector([dict(key="a_label", operator="In", value=[])])),
]


def test_process_event_enqueues_task_processing_events_pending_to_running(k8s_executor):
mock_pod = mock.Mock(spec=V1Pod)
mock_pod.metadata.name = "test.1234"
Expand Down
17 changes: 17 additions & 0 deletions tests/unit/plugins/kubernetes/kubernetes_task_config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,3 +569,20 @@ def test_valid_ports_invariant(ports):
)

assert task_config.ports == ports


@pytest.mark.parametrize(
"sa_volume",
(
{"container_path": "foo"},
{"audience": "bar"},
{"container_path": "foo", "audience": "bar", "expiration_seconds": 42},
),
)
def test_projected_sa_volumes_invariant_failure(sa_volume):
with pytest.raises(InvariantException):
KubernetesTaskConfig(
image="fake_docker_image",
command="fake_command",
projected_sa_volumes=[sa_volume],
)
Loading

0 comments on commit c5883b8

Please sign in to comment.