Skip to content

Commit

Permalink
Handling Added events and adding timers and logs in signalfx and scri…
Browse files Browse the repository at this point in the history
…bereader
  • Loading branch information
EmanElsaban committed Jan 19, 2024
1 parent 0f0cbe6 commit 1ee611d
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions task_processing/plugins/kubernetes/kubernetes_pod_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _initialize_existing_task(self, task_config: KubernetesTaskConfig) -> None:
),
),
)

def _pod_event_watch_loop(self) -> None:
logger.debug(f"Starting watching Pod events for namespace={self.namespace}.")
# TODO(TASKPROC-243): we'll need to correctly handle resourceVersion expiration for the case
Expand Down Expand Up @@ -209,8 +209,8 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
raw_event = event["raw_object"] if event else None

if pod.status.phase not in SUPPORTED_POD_MODIFIED_EVENT_PHASES:
logger.debug(
f"Got a MODIFIED event for {pod_name} for unhandled phase: "
logger.info(
f"Got a {event['type']} event for {pod_name} for unhandled phase: "
f"{pod.status.phase} - ignoring."
)
return
Expand Down Expand Up @@ -319,7 +319,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
and task_metadata.task_state is not KubernetesTaskState.TASK_LOST
):
logger.info(
f"Got a MODIFIED event for {pod_name} with unknown phase, host likely "
f"Got a {event['type']} event for {pod_name} with unknown phase, host likely "
"unexpectedly died"
)
self.task_metadata = self.task_metadata.set(
Expand Down Expand Up @@ -357,7 +357,7 @@ def __update_modified_pod(self, pod: V1Pod, event: Optional[PodEvent]) -> None:
)
else:
logger.info(
f"Ignoring MODIFIED event for {pod_name} as it did not result "
f"Ignoring {event['type']} event for {pod_name} as it did not result "
"in a state transition",
)

Expand Down Expand Up @@ -388,7 +388,7 @@ def _process_pod_event(self, event: PodEvent) -> None:
elif event["type"] == "DELETED":
self.__handle_deleted_pod_event(event)

elif event["type"] == "MODIFIED":
elif event["type"] in {"MODIFIED", "ADDED"}:
self.__handle_modified_pod_event(event)

else:
Expand All @@ -404,6 +404,10 @@ def _pending_event_processing_loop(self) -> None:
event = None
while not self.stopping or not self.pending_events.empty():
try:
# we might see that their are gaps 0.5s because thats how long it will take it to see if there are stuff in the queue
# will give you whats in the queue or wait 0.5 sec to receive events, if no events are received then it will throw the empty exception
# and start again
# I think below might be taking some time to get from a queue an event, should time these two separately
event = self.pending_events.get(timeout=QUEUE_GET_TIMEOUT_S)
self._process_pod_event(event)
except queue.Empty:
Expand Down

0 comments on commit 1ee611d

Please sign in to comment.