Skip to content

Commit

Permalink
Address the lag when getting an event from the queue when processing it
Browse files Browse the repository at this point in the history
  • Loading branch information
EmanElsaban committed Sep 5, 2024
1 parent f8bc05a commit 6093ed2
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 27 deletions.
35 changes: 30 additions & 5 deletions task_processing/plugins/kubernetes/kubernetes_pod_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,18 @@ def _pending_event_processing_loop(self) -> None:
"""
logger.debug("Starting Pod event processing.")
event = None
while not self.stopping or not self.pending_events.empty():
while True:
try:
event = self.pending_events.get(timeout=QUEUE_GET_TIMEOUT_S)
if event["type"] == "STOP":
logger.debug("Received a STOP event - stopping processing.")
try:
self.pending_events.task_done()
except ValueError:
logger.error(
"task_done() called on pending events queue too many times!"
)
break
self._process_pod_event(event)
except queue.Empty:
logger.debug(
Expand Down Expand Up @@ -700,17 +709,22 @@ def kill(self, task_id: str) -> bool:
return terminated

def stop(self) -> None:
logger.debug("Preparing to stop all KubernetesPodExecutor threads.")
logger.debug("Preparing to stop all KubernetesPodExecutor processes.")
self.stopping = True

logger.debug("Signaling Pod event Watch to stop streaming events...")
# make sure that we've stopped watching for events before calling join() - otherwise,
# join() will block until we hit the configured timeout (or forever with no timeout).
for watch in self.watches:
watch.stop()

# Add a STOP event to the queue below after stopping the watch to ensure
# no events will be added after the STOP event
stop_event = PodEvent(type="STOP", object=None, raw_object={})
self.pending_events.put(stop_event)

# timeout arbitrarily chosen - we mostly just want to make sure that we have a small
# grace period to flush the current event to the pending_events queue as well as
# any other clean-up - it's possible that after this join() the thread is still alive
# any other clean-up - it's possible that after this join() the process is still alive
# but in that case we can be reasonably sure that we're not dropping any data.
for pod_event_watch_process in self.pod_event_watch_processes:
pod_event_watch_process.join(timeout=POD_WATCH_PROCESS_JOIN_TIMEOUT_S)
Expand All @@ -719,13 +733,24 @@ def stop(self) -> None:
# once we've stopped updating the pending events queue, we then wait until we're done
# processing any events we've received - this will wait until task_done() has been
# called for every item placed in this queue
# since we stopped the watch above, we don't expect any more events to be added to the queue
# this ensure that we're not stuck due to the stop event, if it wasn't processed by the _pending_event_processing_loop loop
if (
self.pending_events.qsize() == 1
and self.pending_events.get(timeout=QUEUE_GET_TIMEOUT_S)["type"] == "STOP"
):
try:
self.pending_events.task_done()
except ValueError:
logger.error(
"task_done() called on pending events queue too many times!"
)
self.pending_events.join()
logger.debug("All pending PodEvents have been processed.")
# and then give ourselves time to do any post-stop cleanup
self.pending_event_processing_process.join(
timeout=POD_EVENT_PROCESS_JOIN_TIMEOUT_S
)

logger.debug("Done stopping KubernetesPodExecutor!")

def get_event_queue(self) -> "JoinableQueue[Event]":
Expand Down
3 changes: 2 additions & 1 deletion task_processing/plugins/kubernetes/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class NodeAffinity(TypedDict):


class PodEvent(TypedDict):
# there are only 3 possible types for Pod events: ADDED, DELETED, MODIFIED
# there are only 4 possible types for Pod events: ADDED, DELETED, MODIFIED or STOP
# STOP is a custom type that we use to signal STOP to all KubernetesPodExecutor processes
# XXX: this should be typed as Literal["ADDED", "DELETED", "MODIFIED"] once we drop support
# for older Python versions
type: str
Expand Down
85 changes: 64 additions & 21 deletions tests/unit/plugins/kubernetes/kubernetes_pod_executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kubernetes.client import V1Pod
from kubernetes.client import V1PodSecurityContext
from kubernetes.client import V1PodSpec
from kubernetes.client import V1PodStatus
from kubernetes.client import V1ProjectedVolumeSource
from kubernetes.client import V1ResourceRequirements
from kubernetes.client import V1SecurityContext
Expand Down Expand Up @@ -697,15 +698,18 @@ def test_process_event_enqueues_task_processing_events_pending_to_running(k8s_ex
mock_pod.metadata.name = "test.1234"
mock_pod.status.phase = "Running"
mock_pod.spec.node_name = "node-1-2-3-4"
task_config = KubernetesTaskConfig(
image="test", command="test", uuid="uuid", name="pod--name"
)
mock_event = PodEvent(
type="MODIFIED",
object=mock_pod,
raw_object=mock.Mock(),
raw_object={},
)
k8s_executor.task_metadata = pmap(
{
mock_pod.metadata.name: KubernetesTaskMetadata(
task_config=mock.Mock(spec=KubernetesTaskConfig),
task_config=task_config,
task_state=KubernetesTaskState.TASK_PENDING,
task_state_history=v(),
)
Expand Down Expand Up @@ -736,15 +740,18 @@ def test_process_event_enqueues_task_processing_events_running_to_terminal(
mock_pod.metadata.name = "test.1234"
mock_pod.status.phase = phase
mock_pod.spec.node_name = "node-1-2-3-4"
task_config = KubernetesTaskConfig(
image="test", command="test", uuid="uuid", name="pod--name"
)
mock_event = PodEvent(
type="MODIFIED",
object=mock_pod,
raw_object=mock.Mock(),
raw_object={},
)
k8s_executor.task_metadata = pmap(
{
mock_pod.metadata.name: KubernetesTaskMetadata(
task_config=mock.Mock(spec=KubernetesTaskConfig),
task_config=task_config,
task_state=KubernetesTaskState.TASK_RUNNING,
task_state_history=v(),
)
Expand Down Expand Up @@ -779,7 +786,7 @@ def test_process_event_enqueues_task_processing_events_no_state_transition(
mock_event = PodEvent(
type="MODIFIED",
object=mock_pod,
raw_object=mock.Mock(),
raw_object={},
)
k8s_executor.task_metadata = pmap(
{
Expand All @@ -804,6 +811,42 @@ def test_process_event_enqueues_task_processing_events_no_state_transition(
)


def test_pending_event_processing_loop_processes_remaining_events_after_stop(
k8s_executor,
):
# Create a V1Pod object to use for testing multiprocess instead of mock.Mock() as
# it is not pickleable
test_pod = V1Pod(
metadata=V1ObjectMeta(
name="test-pod",
namespace="task_processing_tests",
)
)
k8s_executor.pending_events.put(
PodEvent(
type="ADDED",
object=test_pod,
raw_object={},
)
)
k8s_executor.pending_events.put(
PodEvent(
type="STOP",
object=None,
raw_object={},
)
)
with mock.patch.object(
k8s_executor,
"_process_pod_event",
autospec=True,
) as mock_process_event:
k8s_executor._pending_event_processing_loop()

mock_process_event.assert_called_once()
assert k8s_executor.pending_events.qsize() == 0


def test_process_event_enqueues_task_processing_events_deleted(
k8s_executor,
):
Expand All @@ -812,15 +855,18 @@ def test_process_event_enqueues_task_processing_events_deleted(
mock_pod.status.phase = "Running"
mock_pod.status.host_ip = "1.2.3.4"
mock_pod.spec.node_name = "kubenode"
task_config = KubernetesTaskConfig(
image="test", command="test", uuid="uuid", name="pod--name"
)
mock_event = PodEvent(
type="DELETED",
object=mock_pod,
raw_object=mock.Mock(),
raw_object={},
)
k8s_executor.task_metadata = pmap(
{
mock_pod.metadata.name: KubernetesTaskMetadata(
task_config=mock.Mock(spec=KubernetesTaskConfig),
task_config=task_config,
task_state=KubernetesTaskState.TASK_RUNNING,
task_state_history=v(),
)
Expand All @@ -847,14 +893,13 @@ def test_initial_task_metadata(k8s_executor_with_tasks):
def test_reconcile_missing_pod(
k8s_executor,
):
task_config = mock.Mock(spec=KubernetesTaskConfig)
task_config.pod_name = "pod--name.uuid"
task_config.name = "job-name"

task_config = KubernetesTaskConfig(
image="test", command="test", uuid="uuid", name="pod--name"
)
k8s_executor.task_metadata = pmap(
{
task_config.pod_name: KubernetesTaskMetadata(
task_config=mock.Mock(spec=KubernetesTaskConfig),
task_config=task_config,
task_state=KubernetesTaskState.TASK_UNKNOWN,
task_state_history=v(),
)
Expand All @@ -876,14 +921,13 @@ def test_reconcile_missing_pod(
def test_reconcile_multicluster(
k8s_executor_with_watcher_clusters,
):
task_config = mock.Mock(spec=KubernetesTaskConfig)
task_config.pod_name = "pod--name.uuid"
task_config.name = "job-name"

task_config = KubernetesTaskConfig(
image="test", command="test", uuid="uuid", name="pod--name"
)
k8s_executor_with_watcher_clusters.task_metadata = pmap(
{
task_config.pod_name: KubernetesTaskMetadata(
task_config=mock.Mock(spec=KubernetesTaskConfig),
task_config=task_config,
task_state=KubernetesTaskState.TASK_UNKNOWN,
task_state_history=v(),
)
Expand Down Expand Up @@ -945,10 +989,9 @@ def test_reconcile_existing_pods(k8s_executor, mock_task_configs):
def test_reconcile_api_error(
k8s_executor,
):
task_config = mock.Mock(spec=KubernetesTaskConfig)
task_config.pod_name = "pod--name.uuid"
task_config.name = "job-name"

task_config = KubernetesTaskConfig(
image="test", command="test", uuid="uuid", name="pod--name"
)
with mock.patch.object(
k8s_executor, "kube_client", autospec=True
) as mock_kube_client:
Expand Down

0 comments on commit 6093ed2

Please sign in to comment.