From 5cdc50eaed8922d98432f6558273dcbe2c57d83f Mon Sep 17 00:00:00 2001 From: Lawrence Angrave Date: Wed, 16 Oct 2024 16:52:35 -0500 Subject: [PATCH] Updates and Testing --- ClassTranscribeDatabase/CommonUtils.cs | 6 ++- .../Controllers/PlaylistsController.cs | 4 +- ClassTranscribeServer/Utils/WakeDownloader.cs | 2 +- PythonRpcServer/server.py | 2 +- TaskEngine/Program.cs | 37 ++++++++++--------- TaskEngine/Tasks/AzureTranscriptionTask.cs | 2 +- TaskEngine/Tasks/LocalTranscriptionTask.cs | 11 +++--- TaskEngine/Tasks/QueueAwakerTask.cs | 2 +- pythonrpcserver.Dockerfile | 2 +- 9 files changed, 36 insertions(+), 32 deletions(-) diff --git a/ClassTranscribeDatabase/CommonUtils.cs b/ClassTranscribeDatabase/CommonUtils.cs index 1e178e23..4713fd64 100644 --- a/ClassTranscribeDatabase/CommonUtils.cs +++ b/ClassTranscribeDatabase/CommonUtils.cs @@ -23,7 +23,7 @@ public enum TaskType DownloadPlaylistInfo = 3, DownloadMedia = 4, ConvertMedia = 5, - TranscribeVideo = 6, + // TranscribeVideo = 6, ProcessVideo = 7, Aggregator = 8, GenerateVTTFile = 9, @@ -39,7 +39,9 @@ public enum TaskType PythonCrawler = 19, DescribeVideo = 20, - DescribeImage = 21 + DescribeImage = 21, + AzureTranscribeVideo = 22, + LocalTranscribeVideo = 23 } diff --git a/ClassTranscribeServer/Controllers/PlaylistsController.cs b/ClassTranscribeServer/Controllers/PlaylistsController.cs index 118ad68c..e8083935 100644 --- a/ClassTranscribeServer/Controllers/PlaylistsController.cs +++ b/ClassTranscribeServer/Controllers/PlaylistsController.cs @@ -170,7 +170,7 @@ public async Task>> GetPlaylists2(string o JsonMetadata = m.JsonMetadata, CreatedAt = m.CreatedAt, SceneDetectReady = m.Video.HasSceneObjectData(), - Ready = m.Video != null && "NoError" == m.Video.TranscriptionStatus , + Ready = m.Video != null && Video.TranscriptionStatusMessages.NOERROR == m.Video.TranscriptionStatus , SourceType = m.SourceType, Duration = m.Video?.Duration, PublishStatus = m.PublishStatus, @@ -265,7 +265,7 @@ public async Task> GetPlaylist(string id) PublishStatus = m.PublishStatus, Options = m.GetOptionsAsJson(), SceneDetectReady = m.Video != null && m.Video.HasSceneObjectData(), - Ready = m.Video != null && "NoError" == m.Video.TranscriptionStatus , + Ready = m.Video != null && Video.TranscriptionStatusMessages.NOERROR == m.Video.TranscriptionStatus , Video = m.Video == null ? null : new VideoDTO { Id = m.Video.Id, diff --git a/ClassTranscribeServer/Utils/WakeDownloader.cs b/ClassTranscribeServer/Utils/WakeDownloader.cs index b2c939a7..22f6479a 100644 --- a/ClassTranscribeServer/Utils/WakeDownloader.cs +++ b/ClassTranscribeServer/Utils/WakeDownloader.cs @@ -104,7 +104,7 @@ public void TranscribeVideo(string videoOrMediaId, bool deleteExisting) { JObject msg = new JObject { - { "Type", TaskType.TranscribeVideo.ToString() }, + { "Type", TaskType.LocalTranscribeVideo.ToString() }, { "videoOrMediaId", videoOrMediaId }, { "DeleteExisting", deleteExisting } }; diff --git a/PythonRpcServer/server.py b/PythonRpcServer/server.py index 9934151e..c568d32c 100644 --- a/PythonRpcServer/server.py +++ b/PythonRpcServer/server.py @@ -152,7 +152,7 @@ def serve(): # Until we can ensure no timeouts on remote services, the default here is set to a conservative low number # This is to ensure we can still make progress even if every python tasks tries to use all cpu cores. max_workers=int(os.getenv('NUM_PYTHON_WORKERS', 3)) - print(f"max_workers={max_workers}") + print(f"max_workers={max_workers}. Starting up grpc server...") server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) diff --git a/TaskEngine/Program.cs b/TaskEngine/Program.cs index 1aa1977e..d8210111 100644 --- a/TaskEngine/Program.cs +++ b/TaskEngine/Program.cs @@ -139,30 +139,31 @@ static void runQueueAwakerForever() { // TODO/REVIEW: However some tasks also publish the next items while (true) { - // try { - // _logger.LogInformation("Periodic Check"); - // queueAwakerTask.Publish(new JObject - // { - // { "Type", TaskType.PeriodicCheck.ToString() } - // }); - // } catch (Exception e) { - // _logger.LogError(e, "Error in Periodic Check"); - // } - try { - var videoId = "ddceb720-a9d6-417d-b5ea-e94c6c0a86c6"; - _logger.LogInformation("Transcription Task Initiated"); + _logger.LogInformation("Periodic Check"); queueAwakerTask.Publish(new JObject { - { "Type", TaskType.TranscribeVideo.ToString() }, - { "videoOrMediaId", videoId } + { "Type", TaskType.PeriodicCheck.ToString() } }); - - _logger.LogInformation("Transcription Task Published Successfully"); } catch (Exception e) { - _logger.LogError(e, "Error in Transcription Task"); + _logger.LogError(e, "Error in Periodic Check"); } + // Hacky testing... + // try { + // var videoId = "ddceb720-a9d6-417d-b5ea-e94c6c0a86c6"; + // _logger.LogInformation("Transcription Task Initiated"); + // queueAwakerTask.Publish(new JObject + // { + // { "Type", TaskType.LocalTranscribeVideo.ToString() }, + // { "videoOrMediaId", videoId } + // }); + + // _logger.LogInformation("Transcription Task Published Successfully"); + // } catch (Exception e) { + // _logger.LogError(e, "Error in Transcription Task"); + // } + // Thread.Sleep(timeInterval); Task.Delay(timeInterval).Wait(); @@ -208,7 +209,7 @@ static void createTaskQueues() { _serviceProvider.GetService().Consume(DISABLED_TASK); // We dont want concurrency for these tasks - _logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers."); + _logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers!"); _serviceProvider.GetService().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY? // does nothing at the moment _serviceProvider.GetService().Consume(NO_CONCURRENCY); _serviceProvider.GetService().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode); diff --git a/TaskEngine/Tasks/AzureTranscriptionTask.cs b/TaskEngine/Tasks/AzureTranscriptionTask.cs index 6bf250ad..420cfc94 100644 --- a/TaskEngine/Tasks/AzureTranscriptionTask.cs +++ b/TaskEngine/Tasks/AzureTranscriptionTask.cs @@ -32,7 +32,7 @@ class AzureTranscriptionTask : RabbitMQTask public AzureTranscriptionTask(RabbitMQConnection rabbitMQ, MSTranscriptionService msTranscriptionService, // GenerateVTTFileTask generateVTTFileTask, ILogger logger, CaptionQueries captionQueries) - : base(rabbitMQ, TaskType.TranscribeVideo, logger) + : base(rabbitMQ, TaskType.AzureTranscribeVideo, logger) { _msTranscriptionService = msTranscriptionService; // nope _generateVTTFileTask = generateVTTFileTask; diff --git a/TaskEngine/Tasks/LocalTranscriptionTask.cs b/TaskEngine/Tasks/LocalTranscriptionTask.cs index 4e5a948b..b20ee82b 100644 --- a/TaskEngine/Tasks/LocalTranscriptionTask.cs +++ b/TaskEngine/Tasks/LocalTranscriptionTask.cs @@ -36,7 +36,7 @@ public LocalTranscriptionTask(RabbitMQConnection rabbitMQ, RpcClient rpcClient, // GenerateVTTFileTask generateVTTFileTask, ILogger logger, CaptionQueries captionQueries) - : base(rabbitMQ, TaskType.TranscribeVideo, logger) + : base(rabbitMQ, TaskType.LocalTranscribeVideo, logger) { _rpcClient = rpcClient; _captionQueries = captionQueries; @@ -90,8 +90,9 @@ protected async override Task OnConsume(string videoId, TaskParameters taskParam GetLogger().LogInformation($"{videoId}: Updated TranscribingAttempts = {video.TranscribingAttempts}"); try { + var mockWhisperResult = Globals.appSettings.MOCK_RECOGNITION == "MOCK"; - GetLogger().LogInformation($"{videoId}: Calling RecognitionWithVideoStreamAsync"); + GetLogger().LogInformation($"{videoId}: Calling RecognitionWithVideoStreamAsync( mock={mockWhisperResult})"); var request = new CTGrpc.TranscriptionRequest { @@ -99,7 +100,7 @@ protected async override Task OnConsume(string videoId, TaskParameters taskParam FilePath = video.Video1.VMPath, Model = "en", Language = "en", - Testing = true + Testing = mockWhisperResult // PhraseHints = phraseHints, // CourseHints = "", // OutputLanguages = "en" @@ -161,7 +162,7 @@ protected async override Task OnConsume(string videoId, TaskParameters taskParam { TranscriptionType = TranscriptionType.Caption, Captions = theCaptions, - Language = theLanguage, + Language = "en-US" , /* Must be en-US for FrontEnd; Cant be just "en" */ VideoId = video.Id, Label = $"{theLanguage} (ClassTranscribe)", SourceInternalRef = SOURCEINTERNALREF, // @@ -177,7 +178,7 @@ protected async override Task OnConsume(string videoId, TaskParameters taskParam } - video.TranscriptionStatus = "NoError"; + video.TranscriptionStatus = Video.TranscriptionStatusMessages.NOERROR; // video.JsonMetadata["LastSuccessfulTime"] = result.LastSuccessTime.ToString(); GetLogger().LogInformation($"{videoId}: Saving captions"); diff --git a/TaskEngine/Tasks/QueueAwakerTask.cs b/TaskEngine/Tasks/QueueAwakerTask.cs index 0fb5fe45..78c21704 100644 --- a/TaskEngine/Tasks/QueueAwakerTask.cs +++ b/TaskEngine/Tasks/QueueAwakerTask.cs @@ -401,7 +401,7 @@ protected async override Task OnConsume(JObject jObject, TaskParameters taskPara var sourceId = jObject["SourceId"].ToString(); _pythonCrawlerTask.Publish(sourceId); } - else if (type == TaskType.TranscribeVideo.ToString()) + else if (type == TaskType.LocalTranscribeVideo.ToString()) { var id = jObject["videoOrMediaId"].ToString(); diff --git a/pythonrpcserver.Dockerfile b/pythonrpcserver.Dockerfile index a2905617..dc6061fe 100644 --- a/pythonrpcserver.Dockerfile +++ b/pythonrpcserver.Dockerfile @@ -31,7 +31,7 @@ RUN python -m grpc_tools.protoc -I . --python_out=./ --grpc_python_out=./ ct.proto COPY ./PythonRpcServer . - +# The output of this file is used when we set MOCK_RECOGNITION=MOCK for quick testing RUN whisper -ojf -f transcribe_hellohellohello.wav CMD [ "nice", "-n", "18", "ionice", "-c", "2", "-n", "6", "python3", "-u", "/PythonRpcServer/server.py" ]