Skip to content

Commit

Permalink
ope-log-collector: Add pvc event streaming
Browse files Browse the repository at this point in the history
Signed-off-by: Isaiah Stapleton <[email protected]>
  • Loading branch information
IsaiahStapleton committed Oct 21, 2024
1 parent 2e67bd1 commit d89a78a
Showing 1 changed file with 71 additions and 7 deletions.
78 changes: 71 additions & 7 deletions container-images/ope-log-collector/collect-logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async def gather_logs(namespace: str):
w = watch.Watch()
log_tasks = {}
event_tasks = {}
pvc_tasks = {}
try:
async for event in w.stream(
v1.list_namespaced_pod, namespace=namespace, timeout_seconds=0
Expand All @@ -38,6 +39,8 @@ async def gather_logs(namespace: str):
if not pod_name.startswith('jupyter-nb'):
continue

pvc_name = pod.spec.volumes[0].name

# Wait until pods start up
if event_type in ['ADDED', 'MODIFIED']:
pod_status = pod.status.phase
Expand All @@ -52,7 +55,10 @@ async def gather_logs(namespace: str):
stream_pod_logs(v1, namespace, pod_name, container_name)
)
event_tasks[pod_name] = asyncio.create_task(
stream_pod_events(v1, namespace, pod_name, container_name)
stream_pod_events(v1, namespace, pod_name)
)
pvc_tasks[pod_name] = asyncio.create_task(
stream_pvc_events(v1, namespace, pod_name, pvc_name)
)
elif event_type == 'DELETED':
LOG.info(f'Pod deleted: {pod_name}. Cancelling log streaming.')
Expand All @@ -62,6 +68,9 @@ async def gather_logs(namespace: str):
if pod_name in event_tasks:
event_tasks[pod_name].cancel()
del event_tasks[pod_name]
if pod_name in pvc_tasks:
pvc_tasks[pod_name].cancel()
del pvc_tasks[pod_name]

except Exception as e:
LOG.info(f'Server side Timeout: {e}. Re-establishing connection.')
Expand All @@ -74,10 +83,15 @@ async def gather_logs(namespace: str):
for event in event_tasks.values():
event.cancel()

for event in pvc_tasks.values():
event.cancel()

await asyncio.gather(*log_tasks.values(), return_exceptions=True)

await asyncio.gather(*event_tasks.values(), return_exceptions=True)

await asyncio.gather(*pvc_tasks.values(), return_exceptions=True)

await asyncio.sleep(5)


Expand Down Expand Up @@ -128,9 +142,7 @@ async def stream_pod_logs(
await asyncio.sleep(5)


async def stream_pod_events(
v1: client.CoreV1Api, namespace: str, pod_name: str, container_name: str
):
async def stream_pod_events(v1: client.CoreV1Api, namespace: str, pod_name: str):
LOG_DIR = './log'

if not os.path.exists(LOG_DIR):
Expand All @@ -140,8 +152,8 @@ async def stream_pod_events(
LOG.error(f"Could not create log directory '{LOG_DIR}': {e}")
exit(1)

log_file_path = os.path.join(LOG_DIR, f'{pod_name}-events.log')
LOG.info(f"Streaming events for pod '{pod_name}' to '{log_file_path}'.")
log_file_path = os.path.join(LOG_DIR, f'{pod_name}-pod-events.log')
LOG.info(f"Streaming pod events for pod '{pod_name}' to '{log_file_path}'.")

field_selector = f'involvedObject.kind=Pod,involvedObject.name={pod_name},involvedObject.namespace={namespace}'

Expand All @@ -168,7 +180,59 @@ async def stream_pod_events(
if response:
await response.release()
except Exception as e:
LOG.error(f"Unexpected error while streaming logs for pod '{pod_name}': {e}")
LOG.error(f"Unexpected error while streaming events for pod '{pod_name}': {e}")
LOG.error(traceback.format_exc())
raise
finally:
if response:
await response.release()

await asyncio.sleep(5)


async def stream_pvc_events(
v1: client.CoreV1Api, namespace: str, pod_name: str, pvc_name: str
):
LOG_DIR = './log'

if not os.path.exists(LOG_DIR):
try:
os.makedirs(LOG_DIR)
except Exception as e:
LOG.error(f"Could not create log directory '{LOG_DIR}': {e}")
exit(1)

log_file_path = os.path.join(LOG_DIR, f'{pod_name}-pvc-events.log')
LOG.info(f"Streaming pvc events for pod '{pod_name}' to '{log_file_path}'.")

field_selector = f'involvedObject.kind=PersistentVolumeClaim,involvedObject.name=ope-logs,involvedObject.namespace={namespace}'

response = None
try:
events = await v1.list_namespaced_event(
namespace=namespace, field_selector=field_selector
)

# Read logs and write to log file
async with aiofiles.open(log_file_path, 'w') as log_file:
for event in events.items:
if event:
event_time = event.last_timestamp
event_type = event.type
event_message = event.message

event_entry = f'{event_time} | Type: {event_type} | Message: {event_message}\n'

await log_file.write(event_entry)
await log_file.flush()

except asyncio.CancelledError:
if response:
await response.release()
except Exception as e:
LOG.error(
f"Unexpected error while streaming pvc events for pod '{pod_name}': {e}"
)
LOG.error(traceback.format_exc())
raise
finally:
Expand Down

0 comments on commit d89a78a

Please sign in to comment.