From 0b5dc713cc4948cf8ce948c6dd2a0d7f3cf55240 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20M=C3=BCller?= Date: Fri, 20 Oct 2023 22:16:35 +0200 Subject: [PATCH] WIP: Add `IJob::OnFinish` callback called on main thread Add `IJob::OnFinish` which is automatically called on the main thread when a job is done. We currently check if a specific job is done in various places and then perform some final actions. --- src/engine/client/client.cpp | 5 +++ src/engine/engine.h | 1 + src/engine/server/server.cpp | 3 ++ src/engine/shared/engine.cpp | 5 +++ src/engine/shared/jobs.cpp | 56 ++++++++++++++++++++++----- src/engine/shared/jobs.h | 9 +++++ src/game/client/components/skins.cpp | 1 + src/game/client/components/sounds.cpp | 1 + src/game/editor/editor.cpp | 1 + src/test/jobs.cpp | 37 +++++++++++++++++- 10 files changed, 109 insertions(+), 10 deletions(-) diff --git a/src/engine/client/client.cpp b/src/engine/client/client.cpp index 90862b31398..9e994678a20 100644 --- a/src/engine/client/client.cpp +++ b/src/engine/client/client.cpp @@ -2639,6 +2639,7 @@ void CClient::Update() // pump the network PumpNetwork(); + // TODO: use job completion callback if(m_pMapdownloadTask) { if(m_pMapdownloadTask->State() == HTTP_DONE) @@ -2651,6 +2652,7 @@ void CClient::Update() } } + // TODO: use job completion callback if(m_pDDNetInfoTask) { if(m_pDDNetInfoTask->State() == HTTP_DONE) @@ -2667,6 +2669,7 @@ void CClient::Update() } } + // TODO: use job completion callback if(State() == IClient::STATE_ONLINE) { if(!m_EditJobs.empty()) @@ -2685,6 +2688,8 @@ void CClient::Update() } } + Engine()->UpdateJobs(); + // update the server browser m_ServerBrowser.Update(); diff --git a/src/engine/engine.h b/src/engine/engine.h index 1f1c959571f..755d372c7fe 100644 --- a/src/engine/engine.h +++ b/src/engine/engine.h @@ -36,6 +36,7 @@ class IEngine : public IInterface virtual void Init() = 0; virtual void AddJob(std::shared_ptr pJob) = 0; + virtual void UpdateJobs() = 0; virtual void SetAdditionalLogger(std::shared_ptr &&pLogger) = 0; static void RunJobBlocking(IJob *pJob); }; diff --git a/src/engine/server/server.cpp b/src/engine/server/server.cpp index f0d43b38a90..412c0b5350f 100644 --- a/src/engine/server/server.cpp +++ b/src/engine/server/server.cpp @@ -2744,6 +2744,8 @@ int CServer::Run() int64_t t = time_get(); int NewTicks = 0; + pEngine->UpdateJobs(); + // load new map if(m_MapReload || m_CurrentGameTick >= MAX_TICK) // force reload to make sure the ticks stay within a valid range { @@ -2820,6 +2822,7 @@ int CServer::Run() else if(m_aClients[ClientID].m_DnsblState == CClient::DNSBL_STATE_PENDING && m_aClients[ClientID].m_pDnsblLookup->Status() == IJob::STATE_DONE) { + // TODO: use job completion callback if(m_aClients[ClientID].m_pDnsblLookup->m_Result != 0) { // entry not found -> whitelisted diff --git a/src/engine/shared/engine.cpp b/src/engine/shared/engine.cpp index 395cf78531e..5fefe21880c 100644 --- a/src/engine/shared/engine.cpp +++ b/src/engine/shared/engine.cpp @@ -113,6 +113,11 @@ class CEngine : public IEngine m_JobPool.Add(std::move(pJob)); } + void UpdateJobs() override + { + m_JobPool.Update(); + } + void SetAdditionalLogger(std::shared_ptr &&pLogger) override { m_pFutureLogger->Set(pLogger); diff --git a/src/engine/shared/jobs.cpp b/src/engine/shared/jobs.cpp index 36ee17d8d11..144aeb9ffb7 100644 --- a/src/engine/shared/jobs.cpp +++ b/src/engine/shared/jobs.cpp @@ -21,9 +21,12 @@ CJobPool::CJobPool() // empty the pool m_Shutdown = false; m_Lock = lock_create(); + m_FinishedLock = lock_create(); sphore_init(&m_Semaphore); - m_pFirstJob = 0; - m_pLastJob = 0; + m_pFirstJob = nullptr; + m_pLastJob = nullptr; + m_pFirstFinishedJob = nullptr; + m_pLastFinishedJob = nullptr; } CJobPool::~CJobPool() @@ -40,12 +43,11 @@ void CJobPool::WorkerThread(void *pUser) while(!pPool->m_Shutdown) { - std::shared_ptr pJob = 0; - // fetch job from queue sphore_wait(&pPool->m_Semaphore); + std::shared_ptr pJob = nullptr; { - CLockScope ls(pPool->m_Lock); + CLockScope LockScope(pPool->m_Lock); if(pPool->m_pFirstJob) { pJob = pPool->m_pFirstJob; @@ -53,14 +55,26 @@ void CJobPool::WorkerThread(void *pUser) // allow remaining objects in list to destruct, even when current object stays alive pJob->m_pNext = nullptr; if(!pPool->m_pFirstJob) - pPool->m_pLastJob = 0; + pPool->m_pLastJob = nullptr; } } // do the job if we have one if(pJob) { - RunBlocking(pJob.get()); + pJob->m_Status = IJob::STATE_RUNNING; + pJob->Run(); + pJob->m_Status = IJob::STATE_DONE; + + // add job to queue of finished jobs + { + CLockScope LockScope(pPool->m_FinishedLock); + if(pPool->m_pLastFinishedJob) + pPool->m_pLastFinishedJob->m_pNext = pJob; + pPool->m_pLastFinishedJob = std::move(pJob); + if(!pPool->m_pFirstFinishedJob) + pPool->m_pFirstFinishedJob = pPool->m_pLastFinishedJob; + } } } } @@ -86,14 +100,37 @@ void CJobPool::Destroy() thread_wait(pThread); m_vpThreads.clear(); lock_destroy(m_Lock); + lock_destroy(m_FinishedLock); sphore_destroy(&m_Semaphore); } +void CJobPool::Update() +{ + while(true) + { + std::shared_ptr pJob = nullptr; + { + CLockScope LockScope(m_FinishedLock); + if(m_pFirstFinishedJob) + { + pJob = m_pFirstFinishedJob; + m_pFirstFinishedJob = m_pFirstFinishedJob->m_pNext; + pJob->m_pNext = nullptr; + if(!m_pFirstFinishedJob) + m_pLastFinishedJob = nullptr; + } + } + if(!pJob) + break; + pJob->OnFinish(); + } +} + void CJobPool::Add(std::shared_ptr pJob) { + // add job to queue { - CLockScope ls(m_Lock); - // add job to queue + CLockScope LockScope(m_Lock); if(m_pLastJob) m_pLastJob->m_pNext = pJob; m_pLastJob = std::move(pJob); @@ -109,4 +146,5 @@ void CJobPool::RunBlocking(IJob *pJob) pJob->m_Status = IJob::STATE_RUNNING; pJob->Run(); pJob->m_Status = IJob::STATE_DONE; + pJob->OnFinish(); } diff --git a/src/engine/shared/jobs.h b/src/engine/shared/jobs.h index 2a24efbdcc0..b0172469219 100644 --- a/src/engine/shared/jobs.h +++ b/src/engine/shared/jobs.h @@ -21,6 +21,11 @@ class IJob std::atomic m_Status; virtual void Run() = 0; + /** + * Called on the main thread after the job is done. + */ + virtual void OnFinish() {}; + public: IJob(); IJob(const IJob &Other) = delete; @@ -45,6 +50,9 @@ class CJobPool SEMAPHORE m_Semaphore; std::shared_ptr m_pFirstJob GUARDED_BY(m_Lock); std::shared_ptr m_pLastJob GUARDED_BY(m_Lock); + LOCK m_FinishedLock; + std::shared_ptr m_pFirstFinishedJob GUARDED_BY(m_FinishedLock); + std::shared_ptr m_pLastFinishedJob GUARDED_BY(m_FinishedLock); static void WorkerThread(void *pUser) NO_THREAD_SAFETY_ANALYSIS; @@ -54,6 +62,7 @@ class CJobPool void Init(int NumThreads); void Destroy(); + void Update(); void Add(std::shared_ptr pJob) REQUIRES(!m_Lock); static void RunBlocking(IJob *pJob); }; diff --git a/src/game/client/components/skins.cpp b/src/game/client/components/skins.cpp index ce4812224c1..6ae9db9d4d2 100644 --- a/src/game/client/components/skins.cpp +++ b/src/game/client/components/skins.cpp @@ -417,6 +417,7 @@ const CSkin *CSkins::FindImpl(const char *pName) const auto SkinDownloadIt = m_DownloadSkins.find(pName); if(SkinDownloadIt != m_DownloadSkins.end()) { + // TODO: use job completion callback if(SkinDownloadIt->second->m_pTask && SkinDownloadIt->second->m_pTask->State() == HTTP_DONE) { char aPath[IO_MAX_PATH_LENGTH]; diff --git a/src/game/client/components/sounds.cpp b/src/game/client/components/sounds.cpp index 46578cd55d7..2a263420cda 100644 --- a/src/game/client/components/sounds.cpp +++ b/src/game/client/components/sounds.cpp @@ -114,6 +114,7 @@ void CSounds::OnRender() // check for sound initialisation if(m_WaitForSoundJob) { + // TODO: use job completion callback if(m_pSoundJob->Status() == IJob::STATE_DONE) m_WaitForSoundJob = false; else diff --git a/src/game/editor/editor.cpp b/src/game/editor/editor.cpp index ff93b072dce..52ad59bcc50 100644 --- a/src/game/editor/editor.cpp +++ b/src/game/editor/editor.cpp @@ -7781,6 +7781,7 @@ bool CEditor::PerformAutosave() void CEditor::HandleWriterFinishJobs() { + // TODO: use job completion callback if(m_WriterFinishJobs.empty()) return; diff --git a/src/test/jobs.cpp b/src/test/jobs.cpp index 9d7b4c32248..496fb05bf6b 100644 --- a/src/test/jobs.cpp +++ b/src/test/jobs.cpp @@ -23,6 +23,10 @@ class Jobs : public ::testing::Test { m_Pool.Add(std::move(pJob)); } + void Update() + { + m_Pool.Update(); + } void RunBlocking(IJob *pJob) { CJobPool::RunBlocking(pJob); @@ -32,11 +36,15 @@ class Jobs : public ::testing::Test class CJob : public IJob { std::function m_JobFunction; + std::function m_FinishFunction; void Run() override { m_JobFunction(); } + void OnFinish() override { m_FinishFunction(); } public: CJob(std::function &&JobFunction) : - m_JobFunction(JobFunction) {} + m_JobFunction(JobFunction), m_FinishFunction([] {}) {} + CJob(std::function &&JobFunction, std::function &&FinishFunction) : + m_JobFunction(JobFunction), m_FinishFunction(FinishFunction) {} }; TEST_F(Jobs, Constructor) @@ -57,6 +65,15 @@ TEST_F(Jobs, RunBlocking) EXPECT_EQ(Result, 1); } +TEST_F(Jobs, RunBlockingFinish) +{ + int Result = 0; + CJob Job([&] { Result = 1; }, [&] { Result = 2; }); + EXPECT_EQ(Result, 0); + RunBlocking(&Job); + EXPECT_EQ(Result, 2); +} + TEST_F(Jobs, Wait) { SEMAPHORE sphore; @@ -145,3 +162,21 @@ TEST_F(Jobs, Many) } new(&m_Pool) CJobPool(); } + +TEST_F(Jobs, FinishFunction) +{ + int Result = 0; + EXPECT_EQ(Result, 0); + auto pJob = std::make_shared([&] { Result = 1; }, [&] { Result = 2; }); + + Add(pJob); + while(pJob->Status() != IJob::STATE_DONE) + { + // yay, busy loop... + thread_yield(); + } + EXPECT_EQ(Result, 1); + + Update(); + EXPECT_EQ(Result, 2); +}