From 73aaecaae60952cdcdfcc2341be64608daa90b38 Mon Sep 17 00:00:00 2001 From: Matteo Piano Date: Fri, 7 Jun 2024 06:43:26 -0700 Subject: [PATCH 1/2] automatic service account token mounting --- requirements-minimal.txt | 2 +- requirements.txt | 2 +- tests/core/actionrun_test.py | 11 +++- tests/kubernetes_test.py | 7 +++ tests/utils/authentication_test.py | 25 +++++++++ tron/core/actionrun.py | 7 +++ tron/kubernetes.py | 3 ++ tron/utils/authentication.py | 87 ++++++++++++++++++++++++++++++ 8 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 tests/utils/authentication_test.py create mode 100644 tron/utils/authentication.py diff --git a/requirements-minimal.txt b/requirements-minimal.txt index dc9bfb9f1..f4711d882 100644 --- a/requirements-minimal.txt +++ b/requirements-minimal.txt @@ -21,7 +21,7 @@ pytimeparse pytz PyYAML>=5.1 requests -task_processing[mesos_executor,k8s] +task_processing[mesos_executor,k8s]>=1.1.0 Twisted>=19.7.0 urllib3>=1.24.2 Werkzeug>=0.15.3 diff --git a/requirements.txt b/requirements.txt index 28f140eff..5e0bcfc70 100644 --- a/requirements.txt +++ b/requirements.txt @@ -80,7 +80,7 @@ setuptools==65.5.1 six==1.15.0 sshpubkeys==3.1.0 stack-data==0.6.2 -task-processing==0.14.0 +task-processing==1.1.0 traitlets==5.0.0 Twisted==22.10.0 typing-extensions==4.5.0 diff --git a/tests/core/actionrun_test.py b/tests/core/actionrun_test.py index 63170f85d..fcd9d9d41 100644 --- a/tests/core/actionrun_test.py +++ b/tests/core/actionrun_test.py @@ -1861,9 +1861,10 @@ def test_handler_exiting_failstart_failed(self, mock_k8s_action_run): ) assert mock_k8s_action_run.is_failed + @mock.patch("tron.core.actionrun.get_projected_sa_volumes", autospec=True) @mock.patch("tron.core.actionrun.filehandler", autospec=True) @mock.patch("tron.core.actionrun.KubernetesClusterRepository", autospec=True) - def test_recover(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run): + def test_recover(self, mock_cluster_repo, mock_filehandler, mock_get_projected_sa_volumes, mock_k8s_action_run): mock_k8s_action_run.machine.state = ActionRun.UNKNOWN mock_k8s_action_run.end_time = 1000 mock_k8s_action_run.exit_status = 0 @@ -1894,6 +1895,7 @@ def test_recover(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run) serializer=serializer, volumes=mock_k8s_action_run.command_config.extra_volumes, secret_volumes=mock_k8s_action_run.command_config.secret_volumes, + projected_sa_volumes=mock_get_projected_sa_volumes.return_value, cap_add=mock_k8s_action_run.command_config.cap_add, cap_drop=mock_k8s_action_run.command_config.cap_drop, task_id=last_attempt.kubernetes_task_id, @@ -1916,6 +1918,7 @@ def test_recover(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run) mock_filehandler.OutputStreamSerializer.assert_called_with( mock_k8s_action_run.output_path, ) + mock_get_projected_sa_volumes.assert_called_once_with("mock_namespace") @mock.patch("tron.core.actionrun.filehandler", autospec=True) @mock.patch("tron.core.actionrun.MesosClusterRepository", autospec=True) @@ -1950,9 +1953,12 @@ def test_recover_no_k8s_task_id( assert mock_k8s_action_run.is_unknown assert mock_k8s_action_run.end_time is not None + @mock.patch("tron.core.actionrun.get_projected_sa_volumes", autospec=True) @mock.patch("tron.core.actionrun.filehandler", autospec=True) @mock.patch("tron.core.actionrun.KubernetesClusterRepository", autospec=True) - def test_recover_task_none(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run): + def test_recover_task_none( + self, mock_cluster_repo, mock_filehandler, mock_get_projected_sa_volumes, mock_k8s_action_run + ): mock_k8s_action_run.machine.state = ActionRun.UNKNOWN last_attempt = mock_k8s_action_run.create_attempt() last_attempt.kubernetes_task_id = "test-kubernetes-task-id" @@ -1965,6 +1971,7 @@ def test_recover_task_none(self, mock_cluster_repo, mock_filehandler, mock_k8s_a assert mock_k8s_action_run.is_unknown assert mock_get_cluster.return_value.recover.call_count == 0 assert mock_k8s_action_run.end_time is not None + mock_get_projected_sa_volumes.assert_called_once_with("mock_namespace") @mock.patch("tron.core.actionrun.KubernetesClusterRepository", autospec=True) def test_kill_task_k8s(self, mock_cluster_repo, mock_k8s_action_run): diff --git a/tests/kubernetes_test.py b/tests/kubernetes_test.py index 0b7084d4a..66b25c80b 100644 --- a/tests/kubernetes_test.py +++ b/tests/kubernetes_test.py @@ -464,6 +464,7 @@ def test_create_task_disabled(): env={}, secret_env={}, secret_volumes=[], + projected_sa_volumes=[], field_selector_env={}, volumes=[], cap_add=[], @@ -493,6 +494,7 @@ def test_create_task(mock_kubernetes_cluster): env={}, secret_env={}, secret_volumes=[], + projected_sa_volumes=[], field_selector_env={}, volumes=[], cap_add=[], @@ -523,6 +525,7 @@ def test_create_task_with_task_id(mock_kubernetes_cluster): env={}, secret_env={}, secret_volumes=[], + projected_sa_volumes=[], field_selector_env={}, volumes=[], cap_add=[], @@ -556,6 +559,7 @@ def test_create_task_with_invalid_task_id(mock_kubernetes_cluster): env={}, secret_env={}, secret_volumes=[], + projected_sa_volumes=[], field_selector_env={}, volumes=[], cap_add=[], @@ -590,6 +594,7 @@ def test_create_task_with_config(mock_kubernetes_cluster): ] config_secrets = {"TEST_SECRET": ConfigSecretSource(secret_name="tron-secret-test-secret--A", key="secret_A")} config_field_selector = {"POD_IP": ConfigFieldSelectorSource(field_path="status.podIP")} + arg_sa_volumes = [{"audience": "for.bar.com", "container_path": "/var/run/secrets/whatever"}] expected_args = { "name": mock.ANY, @@ -601,6 +606,7 @@ def test_create_task_with_config(mock_kubernetes_cluster): "environment": {"TEST_ENV": "foo"}, "secret_environment": {k: v._asdict() for k, v in config_secrets.items()}, "secret_volumes": [v._asdict() for v in config_secret_volumes], + "projected_sa_volumes": arg_sa_volumes, "field_selector_environment": {k: v._asdict() for k, v in config_field_selector.items()}, "volumes": [v._asdict() for v in default_volumes + config_volumes], "cap_add": ["KILL"], @@ -625,6 +631,7 @@ def test_create_task_with_config(mock_kubernetes_cluster): env=expected_args["environment"], secret_env=config_secrets, secret_volumes=config_secret_volumes, + projected_sa_volumes=arg_sa_volumes, field_selector_env=config_field_selector, volumes=config_volumes, cap_add=["KILL"], diff --git a/tests/utils/authentication_test.py b/tests/utils/authentication_test.py new file mode 100644 index 000000000..b9cec0208 --- /dev/null +++ b/tests/utils/authentication_test.py @@ -0,0 +1,25 @@ +import pytest +import staticconf.testing + +from tron.config.static_config import NAMESPACE +from tron.utils.authentication import get_projected_sa_volumes + + +@pytest.mark.parametrize( + "service,expected", + ( + ("service_a", []), + ("service_b", [{"foo": "bar"}]), + ), +) +def test_get_projected_sa_volumes(service, expected, tmpdir): + with (tmpdir / "authenticating.yaml").open("w") as f: + f.write("services:\n- service_b\n- service_c\n") + with (tmpdir / "jwt_service_auth.json").open("w") as f: + f.write(r'{"service_auth_token_settings": {"foo": "bar"}}') + paths_config = { + "soa_path": str(tmpdir), + "paasta_config_path": str(tmpdir), + } + with staticconf.testing.MockConfiguration(paths_config, namespace=NAMESPACE): + assert get_projected_sa_volumes(service) == expected diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index db66d3b0d..fe8732a3a 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -33,6 +33,7 @@ from tron.utils import maybe_decode from tron.utils import proxy from tron.utils import timeutils +from tron.utils.authentication import get_projected_sa_volumes from tron.utils.observer import Observable from tron.utils.observer import Observer from tron.utils.state import Machine @@ -347,6 +348,10 @@ def id(self): def name(self): return self.action_name + @property + def service_name(self): + return self.job_run_id.split(".", 1)[0] + @property def last_attempt(self): if self.attempts: @@ -1164,6 +1169,7 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: env=build_environment(original_env=attempt.command_config.env, run_id=self.id), secret_env=attempt.command_config.secret_env, secret_volumes=attempt.command_config.secret_volumes, + projected_sa_volumes=get_projected_sa_volumes(self.service_name), field_selector_env=attempt.command_config.field_selector_env, serializer=filehandler.OutputStreamSerializer(self.output_path), volumes=attempt.command_config.extra_volumes, @@ -1239,6 +1245,7 @@ def recover(self) -> Optional[KubernetesTask]: field_selector_env=last_attempt.command_config.field_selector_env, serializer=filehandler.OutputStreamSerializer(self.output_path), secret_volumes=last_attempt.command_config.secret_volumes, + projected_sa_volumes=get_projected_sa_volumes(self.service_name), volumes=last_attempt.command_config.extra_volumes, cap_add=last_attempt.command_config.cap_add, cap_drop=last_attempt.command_config.cap_drop, diff --git a/tron/kubernetes.py b/tron/kubernetes.py index baa92dbcd..2648c6402 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -9,6 +9,7 @@ from task_processing.interfaces.event import Event from task_processing.plugins.kubernetes.task_config import KubernetesTaskConfig +from task_processing.plugins.kubernetes.types import ProjectedSAVolume from task_processing.runners.subscription import Subscription from task_processing.task_processor import TaskProcessor from twisted.internet.defer import Deferred @@ -477,6 +478,7 @@ def create_task( env: Dict[str, str], secret_env: Dict[str, ConfigSecretSource], secret_volumes: Collection[ConfigSecretVolume], + projected_sa_volumes: List[ProjectedSAVolume], field_selector_env: Dict[str, ConfigFieldSelectorSource], volumes: Collection[ConfigVolume], cap_add: Collection[str], @@ -512,6 +514,7 @@ def create_task( environment=env, secret_environment={k: v._asdict() for k, v in secret_env.items()}, secret_volumes=[volume._asdict() for volume in secret_volumes], + projected_sa_volumes=projected_sa_volumes, field_selector_environment={k: v._asdict() for k, v in field_selector_env.items()}, cap_add=cap_add, cap_drop=cap_drop, diff --git a/tron/utils/authentication.py b/tron/utils/authentication.py new file mode 100644 index 000000000..111e349a8 --- /dev/null +++ b/tron/utils/authentication.py @@ -0,0 +1,87 @@ +import logging +import os +from functools import lru_cache +from functools import partial +from typing import cast +from typing import List +from typing import Optional +from typing import Set + +import staticconf # type: ignore +from task_processing.plugins.kubernetes.types import ProjectedSAVolume + +from tron.config.static_config import NAMESPACE as TRON_NAMESPACE + + +log = logging.getLogger(__name__) +TOKEN_VOLUME_CONF_NAMESPACE = "service_account_token" +AUTH_SERVICES_CONF_NAMESPACE = "authenticating_services" +DEFAULT_SOA_PATH = "/nail/etc/services" +DEFAULT_PAASTA_CONFIG_PATH = "/etc/paasta" +CACHED_AUTHENTICATING_SERVICES = None + + +@lru_cache +def _get_config_watcher(filepath: str, namespace: str) -> Optional[staticconf.ConfigurationWatcher]: + """Load configuration file and return a watcher for it + + :param str filepath: path to JSON/YAML configuration + """ + watcher = None + loader_class = staticconf.JSONConfiguration if filepath.endswith(".json") else staticconf.YamlConfiguration + loader = partial(loader_class, filepath, namespace=namespace, flatten=False) + reloader = (staticconf.config.ReloadCallbackChain(namespace),) + try: + loader() + watcher = staticconf.ConfigurationWatcher(loader, filepath, min_interval=10, reloader=reloader) + except Exception as e: + # soft failing, as authz features are currently optional + log.warning(f"Failed loading {filepath}: {e}") + return watcher + + +def get_authenticating_services() -> Set[str]: + """Load list of services participating in authenticated traffic + + :param str soa_dir: SOA configuration directory + :return: set of service names + """ + global CACHED_AUTHENTICATING_SERVICES + soa_path = staticconf.read_string("soa_path", namespace=TRON_NAMESPACE, default=DEFAULT_SOA_PATH) + authenticating_services_conf_path = os.path.join(soa_path, "authenticating.yaml") + watcher = _get_config_watcher(authenticating_services_conf_path, AUTH_SERVICES_CONF_NAMESPACE) + if (watcher and watcher.reload_if_changed()) or CACHED_AUTHENTICATING_SERVICES is None: + CACHED_AUTHENTICATING_SERVICES = set( + staticconf.read_list("services", namespace=AUTH_SERVICES_CONF_NAMESPACE, default=[]) + ) + return CACHED_AUTHENTICATING_SERVICES + + +def get_service_auth_token_volume_config() -> ProjectedSAVolume: + """Get service authentication token mount configuration + + :param str paasta_config_dir: PaaSTA configuration directory + :return: configuration as dictionary, if present + """ + paasta_config_path = staticconf.read_string( + "paasta_config_path", namespace=TRON_NAMESPACE, default=DEFAULT_PAASTA_CONFIG_PATH + ) + config_path = os.path.join(paasta_config_path, "jwt_service_auth.json") + watcher = _get_config_watcher(config_path, TOKEN_VOLUME_CONF_NAMESPACE) + if watcher: + watcher.reload_if_changed() + return cast( + ProjectedSAVolume, + staticconf.read("service_auth_token_settings", namespace=TOKEN_VOLUME_CONF_NAMESPACE, default={}), + ) + + +def get_projected_sa_volumes(service: str) -> List[ProjectedSAVolume]: + """Return projected service account volume, as a single elemenet list, + if service participates in authenticated communications. + + :param str service: name of the service + :return: list of volume config, empty if not needed + """ + volume_config = get_service_auth_token_volume_config() + return [volume_config] if volume_config and service in get_authenticating_services() else [] From b11c0155137fefef94a8cc4eaca01553ed49357b Mon Sep 17 00:00:00 2001 From: Matteo Piano Date: Mon, 10 Jun 2024 03:36:54 -0700 Subject: [PATCH 2/2] just support project SA volumes from config --- tests/core/actionrun_test.py | 12 ++--- tests/kubernetes_test.py | 7 +-- tests/utils/authentication_test.py | 25 --------- tron/config/schema.py | 9 +++- tron/core/action.py | 2 + tron/core/actionrun.py | 9 +--- tron/kubernetes.py | 6 +-- tron/utils/authentication.py | 87 ------------------------------ 8 files changed, 22 insertions(+), 135 deletions(-) delete mode 100644 tests/utils/authentication_test.py delete mode 100644 tron/utils/authentication.py diff --git a/tests/core/actionrun_test.py b/tests/core/actionrun_test.py index fcd9d9d41..8362a879a 100644 --- a/tests/core/actionrun_test.py +++ b/tests/core/actionrun_test.py @@ -1861,10 +1861,9 @@ def test_handler_exiting_failstart_failed(self, mock_k8s_action_run): ) assert mock_k8s_action_run.is_failed - @mock.patch("tron.core.actionrun.get_projected_sa_volumes", autospec=True) @mock.patch("tron.core.actionrun.filehandler", autospec=True) @mock.patch("tron.core.actionrun.KubernetesClusterRepository", autospec=True) - def test_recover(self, mock_cluster_repo, mock_filehandler, mock_get_projected_sa_volumes, mock_k8s_action_run): + def test_recover(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run): mock_k8s_action_run.machine.state = ActionRun.UNKNOWN mock_k8s_action_run.end_time = 1000 mock_k8s_action_run.exit_status = 0 @@ -1895,7 +1894,7 @@ def test_recover(self, mock_cluster_repo, mock_filehandler, mock_get_projected_s serializer=serializer, volumes=mock_k8s_action_run.command_config.extra_volumes, secret_volumes=mock_k8s_action_run.command_config.secret_volumes, - projected_sa_volumes=mock_get_projected_sa_volumes.return_value, + projected_sa_volumes=mock_k8s_action_run.command_config.projected_sa_volumes, cap_add=mock_k8s_action_run.command_config.cap_add, cap_drop=mock_k8s_action_run.command_config.cap_drop, task_id=last_attempt.kubernetes_task_id, @@ -1918,7 +1917,6 @@ def test_recover(self, mock_cluster_repo, mock_filehandler, mock_get_projected_s mock_filehandler.OutputStreamSerializer.assert_called_with( mock_k8s_action_run.output_path, ) - mock_get_projected_sa_volumes.assert_called_once_with("mock_namespace") @mock.patch("tron.core.actionrun.filehandler", autospec=True) @mock.patch("tron.core.actionrun.MesosClusterRepository", autospec=True) @@ -1953,12 +1951,9 @@ def test_recover_no_k8s_task_id( assert mock_k8s_action_run.is_unknown assert mock_k8s_action_run.end_time is not None - @mock.patch("tron.core.actionrun.get_projected_sa_volumes", autospec=True) @mock.patch("tron.core.actionrun.filehandler", autospec=True) @mock.patch("tron.core.actionrun.KubernetesClusterRepository", autospec=True) - def test_recover_task_none( - self, mock_cluster_repo, mock_filehandler, mock_get_projected_sa_volumes, mock_k8s_action_run - ): + def test_recover_task_none(self, mock_cluster_repo, mock_filehandler, mock_k8s_action_run): mock_k8s_action_run.machine.state = ActionRun.UNKNOWN last_attempt = mock_k8s_action_run.create_attempt() last_attempt.kubernetes_task_id = "test-kubernetes-task-id" @@ -1971,7 +1966,6 @@ def test_recover_task_none( assert mock_k8s_action_run.is_unknown assert mock_get_cluster.return_value.recover.call_count == 0 assert mock_k8s_action_run.end_time is not None - mock_get_projected_sa_volumes.assert_called_once_with("mock_namespace") @mock.patch("tron.core.actionrun.KubernetesClusterRepository", autospec=True) def test_kill_task_k8s(self, mock_cluster_repo, mock_k8s_action_run): diff --git a/tests/kubernetes_test.py b/tests/kubernetes_test.py index 66b25c80b..62be2b254 100644 --- a/tests/kubernetes_test.py +++ b/tests/kubernetes_test.py @@ -7,6 +7,7 @@ from task_processing.plugins.kubernetes.task_config import KubernetesTaskConfig from tron.config.schema import ConfigFieldSelectorSource +from tron.config.schema import ConfigProjectedSAVolume from tron.config.schema import ConfigSecretSource from tron.config.schema import ConfigSecretVolume from tron.config.schema import ConfigSecretVolumeItem @@ -594,7 +595,7 @@ def test_create_task_with_config(mock_kubernetes_cluster): ] config_secrets = {"TEST_SECRET": ConfigSecretSource(secret_name="tron-secret-test-secret--A", key="secret_A")} config_field_selector = {"POD_IP": ConfigFieldSelectorSource(field_path="status.podIP")} - arg_sa_volumes = [{"audience": "for.bar.com", "container_path": "/var/run/secrets/whatever"}] + config_sa_volumes = [ConfigProjectedSAVolume(audience="for.bar.com", container_path="/var/run/secrets/whatever")] expected_args = { "name": mock.ANY, @@ -606,7 +607,7 @@ def test_create_task_with_config(mock_kubernetes_cluster): "environment": {"TEST_ENV": "foo"}, "secret_environment": {k: v._asdict() for k, v in config_secrets.items()}, "secret_volumes": [v._asdict() for v in config_secret_volumes], - "projected_sa_volumes": arg_sa_volumes, + "projected_sa_volumes": [v._asdict() for v in config_sa_volumes], "field_selector_environment": {k: v._asdict() for k, v in config_field_selector.items()}, "volumes": [v._asdict() for v in default_volumes + config_volumes], "cap_add": ["KILL"], @@ -631,7 +632,7 @@ def test_create_task_with_config(mock_kubernetes_cluster): env=expected_args["environment"], secret_env=config_secrets, secret_volumes=config_secret_volumes, - projected_sa_volumes=arg_sa_volumes, + projected_sa_volumes=config_sa_volumes, field_selector_env=config_field_selector, volumes=config_volumes, cap_add=["KILL"], diff --git a/tests/utils/authentication_test.py b/tests/utils/authentication_test.py deleted file mode 100644 index b9cec0208..000000000 --- a/tests/utils/authentication_test.py +++ /dev/null @@ -1,25 +0,0 @@ -import pytest -import staticconf.testing - -from tron.config.static_config import NAMESPACE -from tron.utils.authentication import get_projected_sa_volumes - - -@pytest.mark.parametrize( - "service,expected", - ( - ("service_a", []), - ("service_b", [{"foo": "bar"}]), - ), -) -def test_get_projected_sa_volumes(service, expected, tmpdir): - with (tmpdir / "authenticating.yaml").open("w") as f: - f.write("services:\n- service_b\n- service_c\n") - with (tmpdir / "jwt_service_auth.json").open("w") as f: - f.write(r'{"service_auth_token_settings": {"foo": "bar"}}') - paths_config = { - "soa_path": str(tmpdir), - "paasta_config_path": str(tmpdir), - } - with staticconf.testing.MockConfiguration(paths_config, namespace=NAMESPACE): - assert get_projected_sa_volumes(service) == expected diff --git a/tron/config/schema.py b/tron/config/schema.py index 6cdd78098..605f27f8a 100644 --- a/tron/config/schema.py +++ b/tron/config/schema.py @@ -169,6 +169,7 @@ def config_object_factory(name, required=None, optional=None): "env", # dict "secret_env", # dict of str, ConfigSecretSource "secret_volumes", # List of ConfigSecretVolume + "projected_sa_volumes", # List of ConfigProjectedSAVolume "field_selector_env", # dict of str, ConfigFieldSelectorSource "extra_volumes", # List of ConfigVolume "expected_runtime", # datetime.Timedelta @@ -208,6 +209,7 @@ def config_object_factory(name, required=None, optional=None): "env", # dict "secret_env", # dict of str, ConfigSecretSource "secret_volumes", # List of ConfigSecretVolume + "projected_sa_volumes", # List of ConfigProjectedSAVolume "field_selector_env", # dict of str, ConfigFieldSelectorSource "extra_volumes", # List of ConfigVolume "trigger_downstreams", # None, bool or dict @@ -252,7 +254,6 @@ def config_object_factory(name, required=None, optional=None): optional=["mode"], ) - _ConfigSecretVolume = config_object_factory( name="ConfigSecretVolume", required=["secret_volume_name", "secret_name", "container_path"], @@ -283,6 +284,12 @@ def _asdict(self) -> dict: optional=[], ) +ConfigProjectedSAVolume = config_object_factory( + name="ConfigProjectedSAVolume", + required=["container_path", "audience"], + optional=["expiration_seconds"], +) + ConfigFieldSelectorSource = config_object_factory( name="ConfigFieldSelectorSource", required=["field_path"], diff --git a/tron/core/action.py b/tron/core/action.py index ad8a72b8f..2f65d0900 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -11,6 +11,7 @@ from tron.config.schema import CLEANUP_ACTION_NAME from tron.config.schema import ConfigAction from tron.config.schema import ConfigNodeAffinity +from tron.config.schema import ConfigProjectedSAVolume from tron.config.schema import ConfigSecretVolume log = logging.getLogger(__name__) @@ -33,6 +34,7 @@ class ActionCommandConfig: env: dict = field(default_factory=dict) secret_env: dict = field(default_factory=dict) secret_volumes: List[ConfigSecretVolume] = field(default_factory=list) + projected_sa_volumes: List[ConfigProjectedSAVolume] = field(default_factory=list) field_selector_env: dict = field(default_factory=dict) extra_volumes: set = field(default_factory=set) node_selectors: dict = field(default_factory=dict) diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index fe8732a3a..580cf263c 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -33,7 +33,6 @@ from tron.utils import maybe_decode from tron.utils import proxy from tron.utils import timeutils -from tron.utils.authentication import get_projected_sa_volumes from tron.utils.observer import Observable from tron.utils.observer import Observer from tron.utils.state import Machine @@ -348,10 +347,6 @@ def id(self): def name(self): return self.action_name - @property - def service_name(self): - return self.job_run_id.split(".", 1)[0] - @property def last_attempt(self): if self.attempts: @@ -1169,7 +1164,7 @@ def submit_command(self, attempt: ActionRunAttempt) -> Optional[KubernetesTask]: env=build_environment(original_env=attempt.command_config.env, run_id=self.id), secret_env=attempt.command_config.secret_env, secret_volumes=attempt.command_config.secret_volumes, - projected_sa_volumes=get_projected_sa_volumes(self.service_name), + projected_sa_volumes=attempt.command_config.projected_sa_volumes, field_selector_env=attempt.command_config.field_selector_env, serializer=filehandler.OutputStreamSerializer(self.output_path), volumes=attempt.command_config.extra_volumes, @@ -1245,7 +1240,7 @@ def recover(self) -> Optional[KubernetesTask]: field_selector_env=last_attempt.command_config.field_selector_env, serializer=filehandler.OutputStreamSerializer(self.output_path), secret_volumes=last_attempt.command_config.secret_volumes, - projected_sa_volumes=get_projected_sa_volumes(self.service_name), + projected_sa_volumes=last_attempt.command_config.projected_sa_volumes, volumes=last_attempt.command_config.extra_volumes, cap_add=last_attempt.command_config.cap_add, cap_drop=last_attempt.command_config.cap_drop, diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 2648c6402..4c06f31bb 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -9,7 +9,6 @@ from task_processing.interfaces.event import Event from task_processing.plugins.kubernetes.task_config import KubernetesTaskConfig -from task_processing.plugins.kubernetes.types import ProjectedSAVolume from task_processing.runners.subscription import Subscription from task_processing.task_processor import TaskProcessor from twisted.internet.defer import Deferred @@ -22,6 +21,7 @@ from tron.config.schema import ConfigFieldSelectorSource from tron.config.schema import ConfigKubernetes from tron.config.schema import ConfigNodeAffinity +from tron.config.schema import ConfigProjectedSAVolume from tron.config.schema import ConfigSecretSource from tron.config.schema import ConfigSecretVolume from tron.config.schema import ConfigVolume @@ -478,7 +478,7 @@ def create_task( env: Dict[str, str], secret_env: Dict[str, ConfigSecretSource], secret_volumes: Collection[ConfigSecretVolume], - projected_sa_volumes: List[ProjectedSAVolume], + projected_sa_volumes: Collection[ConfigProjectedSAVolume], field_selector_env: Dict[str, ConfigFieldSelectorSource], volumes: Collection[ConfigVolume], cap_add: Collection[str], @@ -514,7 +514,7 @@ def create_task( environment=env, secret_environment={k: v._asdict() for k, v in secret_env.items()}, secret_volumes=[volume._asdict() for volume in secret_volumes], - projected_sa_volumes=projected_sa_volumes, + projected_sa_volumes=[volume._asdict() for volume in projected_sa_volumes], field_selector_environment={k: v._asdict() for k, v in field_selector_env.items()}, cap_add=cap_add, cap_drop=cap_drop, diff --git a/tron/utils/authentication.py b/tron/utils/authentication.py deleted file mode 100644 index 111e349a8..000000000 --- a/tron/utils/authentication.py +++ /dev/null @@ -1,87 +0,0 @@ -import logging -import os -from functools import lru_cache -from functools import partial -from typing import cast -from typing import List -from typing import Optional -from typing import Set - -import staticconf # type: ignore -from task_processing.plugins.kubernetes.types import ProjectedSAVolume - -from tron.config.static_config import NAMESPACE as TRON_NAMESPACE - - -log = logging.getLogger(__name__) -TOKEN_VOLUME_CONF_NAMESPACE = "service_account_token" -AUTH_SERVICES_CONF_NAMESPACE = "authenticating_services" -DEFAULT_SOA_PATH = "/nail/etc/services" -DEFAULT_PAASTA_CONFIG_PATH = "/etc/paasta" -CACHED_AUTHENTICATING_SERVICES = None - - -@lru_cache -def _get_config_watcher(filepath: str, namespace: str) -> Optional[staticconf.ConfigurationWatcher]: - """Load configuration file and return a watcher for it - - :param str filepath: path to JSON/YAML configuration - """ - watcher = None - loader_class = staticconf.JSONConfiguration if filepath.endswith(".json") else staticconf.YamlConfiguration - loader = partial(loader_class, filepath, namespace=namespace, flatten=False) - reloader = (staticconf.config.ReloadCallbackChain(namespace),) - try: - loader() - watcher = staticconf.ConfigurationWatcher(loader, filepath, min_interval=10, reloader=reloader) - except Exception as e: - # soft failing, as authz features are currently optional - log.warning(f"Failed loading {filepath}: {e}") - return watcher - - -def get_authenticating_services() -> Set[str]: - """Load list of services participating in authenticated traffic - - :param str soa_dir: SOA configuration directory - :return: set of service names - """ - global CACHED_AUTHENTICATING_SERVICES - soa_path = staticconf.read_string("soa_path", namespace=TRON_NAMESPACE, default=DEFAULT_SOA_PATH) - authenticating_services_conf_path = os.path.join(soa_path, "authenticating.yaml") - watcher = _get_config_watcher(authenticating_services_conf_path, AUTH_SERVICES_CONF_NAMESPACE) - if (watcher and watcher.reload_if_changed()) or CACHED_AUTHENTICATING_SERVICES is None: - CACHED_AUTHENTICATING_SERVICES = set( - staticconf.read_list("services", namespace=AUTH_SERVICES_CONF_NAMESPACE, default=[]) - ) - return CACHED_AUTHENTICATING_SERVICES - - -def get_service_auth_token_volume_config() -> ProjectedSAVolume: - """Get service authentication token mount configuration - - :param str paasta_config_dir: PaaSTA configuration directory - :return: configuration as dictionary, if present - """ - paasta_config_path = staticconf.read_string( - "paasta_config_path", namespace=TRON_NAMESPACE, default=DEFAULT_PAASTA_CONFIG_PATH - ) - config_path = os.path.join(paasta_config_path, "jwt_service_auth.json") - watcher = _get_config_watcher(config_path, TOKEN_VOLUME_CONF_NAMESPACE) - if watcher: - watcher.reload_if_changed() - return cast( - ProjectedSAVolume, - staticconf.read("service_auth_token_settings", namespace=TOKEN_VOLUME_CONF_NAMESPACE, default={}), - ) - - -def get_projected_sa_volumes(service: str) -> List[ProjectedSAVolume]: - """Return projected service account volume, as a single elemenet list, - if service participates in authenticated communications. - - :param str service: name of the service - :return: list of volume config, empty if not needed - """ - volume_config = get_service_auth_token_volume_config() - return [volume_config] if volume_config and service in get_authenticating_services() else []