diff --git a/src/engine/client/client.cpp b/src/engine/client/client.cpp index 90862b31398..9e994678a20 100644 --- a/src/engine/client/client.cpp +++ b/src/engine/client/client.cpp @@ -2639,6 +2639,7 @@ void CClient::Update() // pump the network PumpNetwork(); + // TODO: use job completion callback if(m_pMapdownloadTask) { if(m_pMapdownloadTask->State() == HTTP_DONE) @@ -2651,6 +2652,7 @@ void CClient::Update() } } + // TODO: use job completion callback if(m_pDDNetInfoTask) { if(m_pDDNetInfoTask->State() == HTTP_DONE) @@ -2667,6 +2669,7 @@ void CClient::Update() } } + // TODO: use job completion callback if(State() == IClient::STATE_ONLINE) { if(!m_EditJobs.empty()) @@ -2685,6 +2688,8 @@ void CClient::Update() } } + Engine()->UpdateJobs(); + // update the server browser m_ServerBrowser.Update(); diff --git a/src/engine/engine.h b/src/engine/engine.h index 1f1c959571f..755d372c7fe 100644 --- a/src/engine/engine.h +++ b/src/engine/engine.h @@ -36,6 +36,7 @@ class IEngine : public IInterface virtual void Init() = 0; virtual void AddJob(std::shared_ptr pJob) = 0; + virtual void UpdateJobs() = 0; virtual void SetAdditionalLogger(std::shared_ptr &&pLogger) = 0; static void RunJobBlocking(IJob *pJob); }; diff --git a/src/engine/server/server.cpp b/src/engine/server/server.cpp index f0d43b38a90..412c0b5350f 100644 --- a/src/engine/server/server.cpp +++ b/src/engine/server/server.cpp @@ -2744,6 +2744,8 @@ int CServer::Run() int64_t t = time_get(); int NewTicks = 0; + pEngine->UpdateJobs(); + // load new map if(m_MapReload || m_CurrentGameTick >= MAX_TICK) // force reload to make sure the ticks stay within a valid range { @@ -2820,6 +2822,7 @@ int CServer::Run() else if(m_aClients[ClientID].m_DnsblState == CClient::DNSBL_STATE_PENDING && m_aClients[ClientID].m_pDnsblLookup->Status() == IJob::STATE_DONE) { + // TODO: use job completion callback if(m_aClients[ClientID].m_pDnsblLookup->m_Result != 0) { // entry not found -> whitelisted diff --git a/src/engine/shared/engine.cpp b/src/engine/shared/engine.cpp index 395cf78531e..5fefe21880c 100644 --- a/src/engine/shared/engine.cpp +++ b/src/engine/shared/engine.cpp @@ -113,6 +113,11 @@ class CEngine : public IEngine m_JobPool.Add(std::move(pJob)); } + void UpdateJobs() override + { + m_JobPool.Update(); + } + void SetAdditionalLogger(std::shared_ptr &&pLogger) override { m_pFutureLogger->Set(pLogger); diff --git a/src/engine/shared/jobs.cpp b/src/engine/shared/jobs.cpp index caf6c51caa5..f6c91ea00c9 100644 --- a/src/engine/shared/jobs.cpp +++ b/src/engine/shared/jobs.cpp @@ -22,8 +22,11 @@ CJobPool::CJobPool() m_Shutdown = false; m_Lock = lock_create(); sphore_init(&m_Semaphore); - m_pFirstJob = 0; - m_pLastJob = 0; + m_pFirstJob = nullptr; + m_pLastJob = nullptr; + m_LockDone = lock_create(); + m_pFirstJobDone = nullptr; + m_pLastJobDone = nullptr; } CJobPool::~CJobPool() @@ -40,12 +43,11 @@ void CJobPool::WorkerThread(void *pUser) while(!pPool->m_Shutdown) { - std::shared_ptr pJob = 0; - // fetch job from queue sphore_wait(&pPool->m_Semaphore); + std::shared_ptr pJob = nullptr; { - CLockScope ls(pPool->m_Lock); + CLockScope LockScope(pPool->m_Lock); if(pPool->m_pFirstJob) { pJob = pPool->m_pFirstJob; @@ -53,14 +55,26 @@ void CJobPool::WorkerThread(void *pUser) // allow remaining objects in list to destruct, even when current object stays alive pJob->m_pNext = nullptr; if(!pPool->m_pFirstJob) - pPool->m_pLastJob = 0; + pPool->m_pLastJob = nullptr; } } - // do the job if we have one + // run the job if we have one if(pJob) { - RunBlocking(pJob.get()); + pJob->m_Status = IJob::STATE_RUNNING; + pJob->Run(); + pJob->m_Status = IJob::STATE_DONE; + + // add job to queue of done jobs + { + CLockScope LockScope(pPool->m_LockDone); + if(pPool->m_pLastJobDone) + pPool->m_pLastJobDone->m_pNext = pJob; + pPool->m_pLastJobDone = std::move(pJob); + if(!pPool->m_pFirstJobDone) + pPool->m_pFirstJobDone = pPool->m_pLastJobDone; + } } } } @@ -86,14 +100,38 @@ void CJobPool::Destroy() thread_wait(pThread); m_vpThreads.clear(); lock_destroy(m_Lock); + lock_destroy(m_LockDone); sphore_destroy(&m_Semaphore); } +void CJobPool::Update() +{ + while(true) + { + // fetch job from queue of done jobs + std::shared_ptr pJob = nullptr; + { + CLockScope LockScope(m_LockDone); + if(m_pFirstJobDone) + { + pJob = m_pFirstJobDone; + m_pFirstJobDone = m_pFirstJobDone->m_pNext; + pJob->m_pNext = nullptr; + if(!m_pFirstJobDone) + m_pLastJobDone = nullptr; + } + } + if(!pJob) + break; + pJob->Done(); + } +} + void CJobPool::Add(std::shared_ptr pJob) { + // add job to queue { - CLockScope ls(m_Lock); - // add job to queue + CLockScope LockScope(m_Lock); if(m_pLastJob) m_pLastJob->m_pNext = pJob; m_pLastJob = std::move(pJob); @@ -109,4 +147,5 @@ void CJobPool::RunBlocking(IJob *pJob) pJob->m_Status = IJob::STATE_RUNNING; pJob->Run(); pJob->m_Status = IJob::STATE_DONE; + pJob->Done(); } diff --git a/src/engine/shared/jobs.h b/src/engine/shared/jobs.h index 080f0f8a62c..8955bb5db82 100644 --- a/src/engine/shared/jobs.h +++ b/src/engine/shared/jobs.h @@ -27,8 +27,18 @@ class IJob std::shared_ptr m_pNext; std::atomic m_Status; + /** + * Performs tasks in a worker thread. + */ virtual void Run() = 0; + /** + * Performs final tasks on the main thread after job is done. + */ + virtual void Done() + { + } + public: IJob(); IJob(const IJob &Other) = delete; @@ -47,15 +57,54 @@ class CJobPool std::shared_ptr m_pFirstJob GUARDED_BY(m_Lock); std::shared_ptr m_pLastJob GUARDED_BY(m_Lock); + LOCK m_LockDone; + std::shared_ptr m_pFirstJobDone GUARDED_BY(m_LockDone); + std::shared_ptr m_pLastJobDone GUARDED_BY(m_LockDone); + static void WorkerThread(void *pUser) NO_THREAD_SAFETY_ANALYSIS; public: CJobPool(); ~CJobPool(); + /** + * Initializes the job pool with the given number of worker threads. + * + * @param NumTheads The number of worker threads. + * + * @remark Must be called on the main thread. + */ void Init(int NumThreads); + + /** + * Destroys the job pool. Waits for worker threads to complete their current jobs. + * + * @remark Must be called on the main thread. + */ void Destroy(); + + /** + * Updates the job pool, calling the `Done` function for completed jobs. + * + * @remark Must be called on the main thread. + */ + void Update() REQUIRES(!m_LockDone); + + /** + * Adds a job to the queue of the job pool. + * + * @param pJob The job to enqueue. + */ void Add(std::shared_ptr pJob) REQUIRES(!m_Lock); + + /** + * Runs a job's lifecycle functions in the current thread. + * + * @param pJob The job to run. + * + * @remark Must be called on the main thread, unless the job has not + * implemented a `Done` function that must run on the main thread. + */ static void RunBlocking(IJob *pJob); }; #endif diff --git a/src/game/client/components/skins.cpp b/src/game/client/components/skins.cpp index ce4812224c1..6ae9db9d4d2 100644 --- a/src/game/client/components/skins.cpp +++ b/src/game/client/components/skins.cpp @@ -417,6 +417,7 @@ const CSkin *CSkins::FindImpl(const char *pName) const auto SkinDownloadIt = m_DownloadSkins.find(pName); if(SkinDownloadIt != m_DownloadSkins.end()) { + // TODO: use job completion callback if(SkinDownloadIt->second->m_pTask && SkinDownloadIt->second->m_pTask->State() == HTTP_DONE) { char aPath[IO_MAX_PATH_LENGTH]; diff --git a/src/game/client/components/sounds.cpp b/src/game/client/components/sounds.cpp index 46578cd55d7..2a263420cda 100644 --- a/src/game/client/components/sounds.cpp +++ b/src/game/client/components/sounds.cpp @@ -114,6 +114,7 @@ void CSounds::OnRender() // check for sound initialisation if(m_WaitForSoundJob) { + // TODO: use job completion callback if(m_pSoundJob->Status() == IJob::STATE_DONE) m_WaitForSoundJob = false; else diff --git a/src/game/editor/editor.cpp b/src/game/editor/editor.cpp index ff93b072dce..52ad59bcc50 100644 --- a/src/game/editor/editor.cpp +++ b/src/game/editor/editor.cpp @@ -7781,6 +7781,7 @@ bool CEditor::PerformAutosave() void CEditor::HandleWriterFinishJobs() { + // TODO: use job completion callback if(m_WriterFinishJobs.empty()) return; diff --git a/src/test/jobs.cpp b/src/test/jobs.cpp index 9d7b4c32248..8d5d776704d 100644 --- a/src/test/jobs.cpp +++ b/src/test/jobs.cpp @@ -23,6 +23,10 @@ class Jobs : public ::testing::Test { m_Pool.Add(std::move(pJob)); } + void Update() + { + m_Pool.Update(); + } void RunBlocking(IJob *pJob) { CJobPool::RunBlocking(pJob); @@ -32,11 +36,15 @@ class Jobs : public ::testing::Test class CJob : public IJob { std::function m_JobFunction; + std::function m_DoneFunction; void Run() override { m_JobFunction(); } + void Done() override { m_DoneFunction(); } public: CJob(std::function &&JobFunction) : - m_JobFunction(JobFunction) {} + m_JobFunction(JobFunction), m_DoneFunction([] {}) {} + CJob(std::function &&JobFunction, std::function &&DoneFunction) : + m_JobFunction(JobFunction), m_DoneFunction(DoneFunction) {} }; TEST_F(Jobs, Constructor) @@ -57,6 +65,15 @@ TEST_F(Jobs, RunBlocking) EXPECT_EQ(Result, 1); } +TEST_F(Jobs, RunBlockingDone) +{ + int Result = 0; + CJob Job([&] { Result = 1; }, [&] { Result = 2; }); + EXPECT_EQ(Result, 0); + RunBlocking(&Job); + EXPECT_EQ(Result, 2); +} + TEST_F(Jobs, Wait) { SEMAPHORE sphore; @@ -145,3 +162,21 @@ TEST_F(Jobs, Many) } new(&m_Pool) CJobPool(); } + +TEST_F(Jobs, DoneFunction) +{ + int Result = 0; + EXPECT_EQ(Result, 0); + auto pJob = std::make_shared([&] { Result = 1; }, [&] { Result = 2; }); + + Add(pJob); + while(pJob->Status() != IJob::STATE_DONE) + { + // yay, busy loop... + thread_yield(); + } + EXPECT_EQ(Result, 1); + + Update(); + EXPECT_EQ(Result, 2); +}