From 68c04ac6d52b96d475cbedcd3e19434238ab4706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Cortenraede?= Date: Tue, 25 Jun 2024 12:27:06 +0200 Subject: [PATCH 1/2] Started implementation of current shown stream being saved in Redis --- src/api/api/routers/v1/internal_streams.py | 25 +++++++++++++++++++ src/api/api/routers/v1/streams.py | 6 +++-- .../video_streamer/modules/file_streamer.py | 9 +++++-- 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/api/api/routers/v1/internal_streams.py b/src/api/api/routers/v1/internal_streams.py index 1e101e9..54dc8e7 100644 --- a/src/api/api/routers/v1/internal_streams.py +++ b/src/api/api/routers/v1/internal_streams.py @@ -5,6 +5,7 @@ from litestar import Controller, Response, get, post from litestar.background_tasks import BackgroundTask from litestar.contrib.sqlalchemy.repository import SQLAlchemyAsyncRepository +from litestar.datastructures import State from litestar.di import Provide from litestar.params import Body from models.animal import Animal @@ -60,6 +61,12 @@ async def provide_stream_animals_repository( ) +async def save_current_stream(redis, stream_id: UUID, delay: int = 300) -> None: + # TODO: Set current stream in redis. + + pass + + async def store_animals( animals_repository: AnimalRepository, stream_animals_repository: StreamAnimalRepository, @@ -142,6 +149,24 @@ async def get_stream( return stream + @post("/streams/{stream_id:uuid}/current") + async def set_current_stream( + self, state: State, streams_repository: StreamRepository, stream_id: UUID + ) -> Response: + stream = await streams_repository.get( + item_id=stream_id, + ) + + return Response( + f"Set current shown stream to {stream.id}.", + status_code=200, + background=BackgroundTask( + save_current_stream, + redis=state.r, + stream_id=stream.id, + ), + ) + @post( "/streams/{stream_id:uuid}/animals", dependencies={ diff --git a/src/api/api/routers/v1/streams.py b/src/api/api/routers/v1/streams.py index fc2a1c1..392c5b8 100644 --- a/src/api/api/routers/v1/streams.py +++ b/src/api/api/routers/v1/streams.py @@ -68,8 +68,10 @@ class StreamsController(Controller): dependencies = {"streams_repository": Provide(provide_streams_repository)} @get("/current") - async def get_current_stream(self, db_session: AsyncSession) -> Redirect: - # TODO: Get current stream id, instead of first. + async def get_current_stream( + self, state: State, db_session: AsyncSession + ) -> Redirect: + # TODO: Get current stream id from Redis, instead of first. current_stream_id = ( (await db_session.execute(select(Stream.id))).scalars().first() ) diff --git a/src/video_streamer/video_streamer/modules/file_streamer.py b/src/video_streamer/video_streamer/modules/file_streamer.py index 8e1f4ab..4efd264 100644 --- a/src/video_streamer/video_streamer/modules/file_streamer.py +++ b/src/video_streamer/video_streamer/modules/file_streamer.py @@ -2,10 +2,10 @@ import subprocess import time +import requests import structlog -from db.redis_connection import RedisConnection - from core.config import settings +from db.redis_connection import RedisConnection # Global variables. VIDEO_ITERATION_DELAY = 8.5 # TODO: Test with delay make configurable. @@ -146,6 +146,11 @@ def start_stream_file(event): logger.debug(f"Time needed to add new file: {time.time() - start_time}") + # Send request to internal API of current stream being shown. + requests.post( + f"http://localhost:8003/v1/internal-streams/streams/{stream_id}/current" # TODO: Make URL configurable, and stream_id correct. + ) + # Remove intermediate files. pathlib.Path(intermediate_file_path).unlink() pathlib.Path() From 77856921258916395d58ff20c00b8b42ea770f2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Cortenraede?= Date: Tue, 25 Jun 2024 17:58:05 +0200 Subject: [PATCH 2/2] Saved current stream to Redis --- src/api/api/routers/v1/internal_streams.py | 9 +++++---- src/api/api/routers/v1/streams.py | 10 +++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/api/api/routers/v1/internal_streams.py b/src/api/api/routers/v1/internal_streams.py index 54dc8e7..362f819 100644 --- a/src/api/api/routers/v1/internal_streams.py +++ b/src/api/api/routers/v1/internal_streams.py @@ -1,3 +1,4 @@ +import asyncio from dataclasses import dataclass from typing import Annotated from uuid import UUID @@ -61,10 +62,10 @@ async def provide_stream_animals_repository( ) -async def save_current_stream(redis, stream_id: UUID, delay: int = 300) -> None: - # TODO: Set current stream in redis. +async def save_current_stream(r, stream_id: UUID, delay: int = 300) -> None: + await asyncio.sleep(delay) - pass + await r.set("current_stream_id", str(stream_id)) async def store_animals( @@ -162,7 +163,7 @@ async def set_current_stream( status_code=200, background=BackgroundTask( save_current_stream, - redis=state.r, + r=state.r, stream_id=stream.id, ), ) diff --git a/src/api/api/routers/v1/streams.py b/src/api/api/routers/v1/streams.py index 392c5b8..4b4d11b 100644 --- a/src/api/api/routers/v1/streams.py +++ b/src/api/api/routers/v1/streams.py @@ -69,14 +69,14 @@ class StreamsController(Controller): @get("/current") async def get_current_stream( - self, state: State, db_session: AsyncSession + self, state: State, streams_repository: StreamRepository ) -> Redirect: - # TODO: Get current stream id from Redis, instead of first. - current_stream_id = ( - (await db_session.execute(select(Stream.id))).scalars().first() + current_stream_id = await state.r.get("current_stream_id") + current_stream = await streams_repository.get( + item_id=current_stream_id, ) - return Redirect(path=f"/v1/streams/{current_stream_id}") + return Redirect(path=f"/v1/streams/{current_stream.id}") @get("/{stream_id:uuid}") async def get_stream(