diff --git a/task_processing/plugins/kubernetes/kubernetes_pod_executor.py b/task_processing/plugins/kubernetes/kubernetes_pod_executor.py index e51f9981..bab2be36 100644 --- a/task_processing/plugins/kubernetes/kubernetes_pod_executor.py +++ b/task_processing/plugins/kubernetes/kubernetes_pod_executor.py @@ -35,13 +35,20 @@ from task_processing.plugins.kubernetes.utils import get_kubernetes_empty_volume_mounts from task_processing.plugins.kubernetes.utils import get_kubernetes_env_vars from task_processing.plugins.kubernetes.utils import get_kubernetes_secret_volume_mounts +from task_processing.plugins.kubernetes.utils import ( + get_kubernetes_service_account_token_volume_mounts, +) from task_processing.plugins.kubernetes.utils import get_kubernetes_volume_mounts from task_processing.plugins.kubernetes.utils import get_node_affinity from task_processing.plugins.kubernetes.utils import get_pod_empty_volumes from task_processing.plugins.kubernetes.utils import get_pod_secret_volumes +from task_processing.plugins.kubernetes.utils import ( + get_pod_service_account_token_volumes, +) from task_processing.plugins.kubernetes.utils import get_pod_volumes from task_processing.plugins.kubernetes.utils import get_sanitised_kubernetes_name + logger = logging.getLogger(__name__) POD_WATCH_THREAD_JOIN_TIMEOUT_S = 1.0 @@ -441,6 +448,9 @@ def _create_container_definition( get_kubernetes_volume_mounts(task_config.volumes) + get_kubernetes_empty_volume_mounts(task_config.empty_volumes) + get_kubernetes_secret_volume_mounts(task_config.secret_volumes) + + get_kubernetes_service_account_token_volume_mounts( + task_config.projected_sa_volumes + ) ) capabilities = get_capabilities_for_capability_changes( @@ -514,6 +524,9 @@ def run(self, task_config: KubernetesTaskConfig) -> Optional[str]: get_pod_volumes(task_config.volumes) + get_pod_empty_volumes(task_config.empty_volumes) + get_pod_secret_volumes(task_config.secret_volumes) + + get_pod_service_account_token_volumes( + task_config.projected_sa_volumes + ) ) pod = V1Pod( diff --git a/task_processing/plugins/kubernetes/task_config.py b/task_processing/plugins/kubernetes/task_config.py index 9d3397a2..4eceadab 100644 --- a/task_processing/plugins/kubernetes/task_config.py +++ b/task_processing/plugins/kubernetes/task_config.py @@ -22,8 +22,12 @@ from task_processing.plugins.kubernetes.types import NodeAffinity from task_processing.plugins.kubernetes.types import NodeAffinityOperator from task_processing.plugins.kubernetes.types import ObjectFieldSelectorSource +from task_processing.plugins.kubernetes.types import ProjectedSAVolume from task_processing.plugins.kubernetes.types import SecretVolume from task_processing.plugins.kubernetes.types import SecretVolumeItem +from task_processing.plugins.kubernetes.utils import ( + DEFAULT_PROJECTED_SA_TOKEN_EXPIRATION_SECONDS, +) from task_processing.plugins.kubernetes.utils import get_sanitised_kubernetes_name from task_processing.plugins.kubernetes.utils import mode_to_int @@ -190,6 +194,34 @@ def _valid_secret_volumes( return (True, None) +def _valid_projected_sa_volumes( + sa_volumes: Sequence[ProjectedSAVolume], +) -> Tuple[bool, Optional[str]]: + min_expiration = 600 + for volume in sa_volumes: + if not volume.get("audience"): + return ( + False, + "No token audience set for projected service account volume", + ) + if not volume.get("container_path"): + return ( + False, + "No token container_path set for projected service account volume", + ) + if ( + volume.get( + "expiration_seconds", DEFAULT_PROJECTED_SA_TOKEN_EXPIRATION_SECONDS + ) + < min_expiration + ): + return ( + False, + f"Expiration for service account projected token must be at least {min_expiration} seconds", + ) + return (True, None) + + def _valid_secret_envs( secret_envs: Mapping[str, "SecretEnvSource"] ) -> Tuple[bool, Optional[str]]: @@ -353,6 +385,12 @@ def __invariant__(self) -> Tuple[Tuple[bool, str], ...]: factory=pvector, invariant=_valid_secret_volumes, ) + projected_sa_volumes = field( + type=PVector if not TYPE_CHECKING else PVector["ProjectedSAVolume"], + initial=v(), + factory=pvector, + invariant=_valid_projected_sa_volumes, + ) extra_containers = field( type=PMap if not TYPE_CHECKING else PMap[str, "KubernetesTaskConfig"], diff --git a/task_processing/plugins/kubernetes/types.py b/task_processing/plugins/kubernetes/types.py index c44e6fbe..8437a7e3 100644 --- a/task_processing/plugins/kubernetes/types.py +++ b/task_processing/plugins/kubernetes/types.py @@ -34,6 +34,12 @@ class SecretVolume(TypedDict): items: List[SecretVolumeItem] +class ProjectedSAVolume(TypedDict, total=False): + container_path: str + audience: str + expiration_seconds: int + + class SecretEnvSource(TypedDict): secret_name: str # full name of k8s secret resource key: str diff --git a/task_processing/plugins/kubernetes/utils.py b/task_processing/plugins/kubernetes/utils.py index 98daecac..89a61174 100644 --- a/task_processing/plugins/kubernetes/utils.py +++ b/task_processing/plugins/kubernetes/utils.py @@ -16,10 +16,13 @@ from kubernetes.client import V1NodeSelectorRequirement from kubernetes.client import V1NodeSelectorTerm from kubernetes.client import V1ObjectFieldSelector +from kubernetes.client import V1ProjectedVolumeSource from kubernetes.client import V1SecretKeySelector from kubernetes.client import V1SecretVolumeSource +from kubernetes.client import V1ServiceAccountTokenProjection from kubernetes.client import V1Volume from kubernetes.client import V1VolumeMount +from kubernetes.client import V1VolumeProjection from pyrsistent.typing import PMap from pyrsistent.typing import PVector @@ -32,9 +35,13 @@ from task_processing.plugins.kubernetes.types import NodeAffinity from task_processing.plugins.kubernetes.types import SecretEnvSource from task_processing.plugins.kubernetes.types import ObjectFieldSelectorSource + from task_processing.plugins.kubernetes.types import ProjectedSAVolume + logger = logging.getLogger(__name__) +DEFAULT_PROJECTED_SA_TOKEN_EXPIRATION_SECONDS = 1800 + def get_capabilities_for_capability_changes( cap_add: PVector[str], @@ -350,3 +357,63 @@ def get_node_affinity(affinities: PVector["NodeAffinity"]) -> Optional[V1NodeAff ], ), ) + + +def _get_service_account_token_volume_name(audience: str) -> str: + """Generate name for service account projected volume + + :param str audience: audience of the authentication token + :return: volume name + """ + return get_sanitised_volume_name( + f"projected-sa--{audience}", + length_limit=63, + ) + + +def get_pod_service_account_token_volumes( + sa_volumes: PVector["ProjectedSAVolume"], +) -> List[V1Volume]: + """Build projected service account volumes for pod + + :param PVector["ProjectedSAVolume"] sa_volumes: list of projected service account volume configs + :return: list of kubernetes pod volume objects + """ + return [ + V1Volume( + name=_get_service_account_token_volume_name(volume["audience"]), + projected=V1ProjectedVolumeSource( + sources=[ + V1VolumeProjection( + service_account_token=V1ServiceAccountTokenProjection( + audience=volume["audience"], + expiration_seconds=volume.get( + "expiration_seconds", + DEFAULT_PROJECTED_SA_TOKEN_EXPIRATION_SECONDS, + ), + path="token", + ), + ), + ], + ), + ) + for volume in sa_volumes + ] + + +def get_kubernetes_service_account_token_volume_mounts( + sa_volumes: PVector["ProjectedSAVolume"], +) -> List[V1VolumeMount]: + """Build container mounts for projected service account volumes + + :param PVector["ProjectedSAVolume"] sa_volumes: list of projected service account volume configs + :return: list of kubernetes volume mount objects + """ + return [ + V1VolumeMount( + mount_path=volume["container_path"], + name=_get_service_account_token_volume_name(volume["audience"]), + read_only=True, + ) + for volume in sa_volumes + ] diff --git a/tests/unit/plugins/kubernetes/kubernetes_pod_executor_test.py b/tests/unit/plugins/kubernetes/kubernetes_pod_executor_test.py index 558a73f5..7a6568d9 100644 --- a/tests/unit/plugins/kubernetes/kubernetes_pod_executor_test.py +++ b/tests/unit/plugins/kubernetes/kubernetes_pod_executor_test.py @@ -12,10 +12,13 @@ from kubernetes.client import V1Pod from kubernetes.client import V1PodSecurityContext from kubernetes.client import V1PodSpec +from kubernetes.client import V1ProjectedVolumeSource from kubernetes.client import V1ResourceRequirements from kubernetes.client import V1SecurityContext +from kubernetes.client import V1ServiceAccountTokenProjection from kubernetes.client import V1Volume from kubernetes.client import V1VolumeMount +from kubernetes.client import V1VolumeProjection from pyrsistent import InvariantException from pyrsistent import pmap from pyrsistent import pvector @@ -547,6 +550,118 @@ def test_run_failed_exception(k8s_executor): assert k8s_executor.run(task_config) is None +@mock.patch( + "task_processing.plugins.kubernetes.kubernetes_pod_executor.get_node_affinity", + autospec=True, +) +def test_run_authentication_token(mock_get_node_affinity, k8s_executor): + task_config = KubernetesTaskConfig( + name="fake_task_name", + uuid="fake_id", + image="fake_docker_image", + command="fake_command", + cpus=1, + cpus_request=0.5, + memory=1024, + disk=1024, + volumes=[], + projected_sa_volumes=[ + {"audience": "foo.bar.com", "container_path": "/var/secret/whatever"} + ], + node_selectors={"hello": "world"}, + node_affinities=[dict(key="a_label", operator="In", value=[])], + labels={ + "some_label": "some_label_value", + }, + annotations={ + "paasta.yelp.com/some_annotation": "some_value", + }, + service_account_name="testsa", + ports=[8888], + stdin=True, + stdin_once=True, + tty=True, + ) + expected_container = V1Container( + image=task_config.image, + name="main", + command=["/bin/sh", "-c"], + args=[task_config.command], + security_context=V1SecurityContext( + capabilities=V1Capabilities(drop=list(task_config.cap_drop)), + ), + resources=V1ResourceRequirements( + limits={ + "cpu": 1.0, + "memory": "1024.0Mi", + "ephemeral-storage": "1024.0Mi", + }, + requests={"cpu": 0.5}, + ), + env=[], + volume_mounts=[ + V1VolumeMount( + mount_path="/var/secret/whatever", + name="projected-sa--foodot-bardot-com", + read_only=True, + ), + ], + ports=[V1ContainerPort(container_port=8888)], + stdin=True, + stdin_once=True, + tty=True, + ) + expected_pod = V1Pod( + metadata=V1ObjectMeta( + name=task_config.pod_name, + namespace="task_processing_tests", + labels={ + "some_label": "some_label_value", + }, + annotations={ + "paasta.yelp.com/some_annotation": "some_value", + }, + ), + spec=V1PodSpec( + restart_policy=task_config.restart_policy, + containers=[expected_container], + volumes=[ + V1Volume( + name="projected-sa--foodot-bardot-com", + projected=V1ProjectedVolumeSource( + sources=[ + V1VolumeProjection( + service_account_token=V1ServiceAccountTokenProjection( + audience="foo.bar.com", + expiration_seconds=1800, + path="token", + ), + ), + ], + ), + ), + ], + share_process_namespace=True, + security_context=V1PodSecurityContext( + fs_group=task_config.fs_group, + ), + node_selector={"hello": "world"}, + affinity=V1Affinity(node_affinity=mock_get_node_affinity.return_value), + dns_policy="Default", + service_account_name=task_config.service_account_name, + ), + ) + + assert all(v is not None for v in expected_container.resources.requests.values()) + assert k8s_executor.run(task_config) == task_config.pod_name + assert k8s_executor.kube_client.core.create_namespaced_pod.call_args_list == [ + mock.call(body=expected_pod, namespace="task_processing_tests") + ] + assert mock_get_node_affinity.call_args_list == [ + mock.call(pvector([dict(key="a_label", operator="In", value=[])])), + ] + + def test_process_event_enqueues_task_processing_events_pending_to_running(k8s_executor): mock_pod = mock.Mock(spec=V1Pod) mock_pod.metadata.name = "test.1234" diff --git a/tests/unit/plugins/kubernetes/kubernetes_task_config_test.py b/tests/unit/plugins/kubernetes/kubernetes_task_config_test.py index 2ba5dd87..9ca65b82 100644 --- a/tests/unit/plugins/kubernetes/kubernetes_task_config_test.py +++ b/tests/unit/plugins/kubernetes/kubernetes_task_config_test.py @@ -569,3 +569,20 @@ def test_valid_ports_invariant(ports): ) assert task_config.ports == ports + + +@pytest.mark.parametrize( + "sa_volume", + ( + {"container_path": "foo"}, + {"audience": "bar"}, + {"container_path": "foo", "audience": "bar", "expiration_seconds": 42}, + ), +) +def test_projected_sa_volumes_invariant_failure(sa_volume): + with pytest.raises(InvariantException): + KubernetesTaskConfig( + image="fake_docker_image", + command="fake_command", + projected_sa_volumes=[sa_volume], + ) diff --git a/tests/unit/plugins/kubernetes/kubernetes_utils_test.py b/tests/unit/plugins/kubernetes/kubernetes_utils_test.py index 63787893..76ef77db 100644 --- a/tests/unit/plugins/kubernetes/kubernetes_utils_test.py +++ b/tests/unit/plugins/kubernetes/kubernetes_utils_test.py @@ -10,10 +10,13 @@ from kubernetes.client import V1NodeSelectorRequirement from kubernetes.client import V1NodeSelectorTerm from kubernetes.client import V1ObjectFieldSelector +from kubernetes.client import V1ProjectedVolumeSource from kubernetes.client import V1SecretKeySelector from kubernetes.client import V1SecretVolumeSource +from kubernetes.client import V1ServiceAccountTokenProjection from kubernetes.client import V1Volume from kubernetes.client import V1VolumeMount +from kubernetes.client import V1VolumeProjection from pyrsistent import pmap from pyrsistent import pvector from pyrsistent import v @@ -25,10 +28,16 @@ from task_processing.plugins.kubernetes.utils import get_kubernetes_empty_volume_mounts from task_processing.plugins.kubernetes.utils import get_kubernetes_env_vars from task_processing.plugins.kubernetes.utils import get_kubernetes_secret_volume_mounts +from task_processing.plugins.kubernetes.utils import ( + get_kubernetes_service_account_token_volume_mounts, +) from task_processing.plugins.kubernetes.utils import get_kubernetes_volume_mounts from task_processing.plugins.kubernetes.utils import get_node_affinity from task_processing.plugins.kubernetes.utils import get_pod_empty_volumes from task_processing.plugins.kubernetes.utils import get_pod_secret_volumes +from task_processing.plugins.kubernetes.utils import ( + get_pod_service_account_token_volumes, +) from task_processing.plugins.kubernetes.utils import get_pod_volumes from task_processing.plugins.kubernetes.utils import get_sanitised_kubernetes_name from task_processing.plugins.kubernetes.utils import get_sanitised_volume_name @@ -490,3 +499,42 @@ def test_get_node_affinity_ok(): def test_get_node_affinity_empty(): assert get_node_affinity([]) is None + + +def test_get_pod_service_account_token_volume(): + assert get_pod_service_account_token_volumes( + [{"audience": "foo.bar.com", "expiration_seconds": 1234}] + ) == [ + V1Volume( + name="projected-sa--foodot-bardot-com", + projected=V1ProjectedVolumeSource( + sources=[ + V1VolumeProjection( + service_account_token=V1ServiceAccountTokenProjection( + audience="foo.bar.com", + expiration_seconds=1234, + path="token", + ), + ), + ], + ), + ) + ] + + +def test_get_kubernetes_service_account_token_volume_mount(): + assert get_kubernetes_service_account_token_volume_mounts( + [ + { + "audience": "foo.bar.com", + "expiration_seconds": 1234, + "container_path": "/var/run/secrets/foo.bar.com/serviceaccount", + } + ], + ) == [ + V1VolumeMount( + mount_path="/var/run/secrets/foo.bar.com/serviceaccount", + name="projected-sa--foodot-bardot-com", + read_only=True, + ) + ]