From 1c84fd18404d64ad735fe6ae8832ff432f20bdfd Mon Sep 17 00:00:00 2001 From: Eman Elsabban Date: Fri, 19 Jan 2024 13:33:32 -0800 Subject: [PATCH] Handling Added events and adding timers and logs in signalfx and scribereader --- .../plugins/kubernetes/kubernetes_pod_executor.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/task_processing/plugins/kubernetes/kubernetes_pod_executor.py b/task_processing/plugins/kubernetes/kubernetes_pod_executor.py index 1fd23e04..e3cd7776 100644 --- a/task_processing/plugins/kubernetes/kubernetes_pod_executor.py +++ b/task_processing/plugins/kubernetes/kubernetes_pod_executor.py @@ -210,7 +210,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None: if pod.status.phase not in SUPPORTED_POD_MODIFIED_EVENT_PHASES: logger.debug( - f"Got a MODIFIED event for {pod_name} for unhandled phase: " + f"Got a {event['type']} event for {pod_name} for unhandled phase: " f"{pod.status.phase} - ignoring." ) return @@ -319,7 +319,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None: and task_metadata.task_state is not KubernetesTaskState.TASK_LOST ): logger.info( - f"Got a MODIFIED event for {pod_name} with unknown phase, host likely " + f"Got a {event['type']} event for {pod_name} with unknown phase, host likely " "unexpectedly died" ) self.task_metadata = self.task_metadata.set( @@ -357,7 +357,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None: ) else: logger.info( - f"Ignoring MODIFIED event for {pod_name} as it did not result " + f"Ignoring {event['type']} event for {pod_name} as it did not result " "in a state transition", ) @@ -388,7 +388,7 @@ def _process_pod_event(self, event: PodEvent) -> None: elif event["type"] == "DELETED": self.__handle_deleted_pod_event(event) - elif event["type"] == "MODIFIED": + elif event["type"] in {"MODIFIED", "ADDED"}: self.__handle_modified_pod_event(event) else: @@ -404,6 +404,10 @@ def _pending_event_processing_loop(self) -> None: event = None while not self.stopping or not self.pending_events.empty(): try: + # we might see that their are gaps 0.5s because thats how long it will take it to see if there are stuff in the queue + # will give you whats in the queue or wait 0.5 sec to receive events, if no events are received then it will throw the empty exception + # and start again + # I think below might be taking some time to get from a queue an event, should time these two separately event = self.pending_events.get(timeout=QUEUE_GET_TIMEOUT_S) self._process_pod_event(event) except queue.Empty: