Skip to content

Commit

Permalink
WIP: Add IJob::OnFinish callback called on main thread
Browse files Browse the repository at this point in the history
Add `IJob::OnFinish` which is automatically called on the main thread when a job is done. We currently check if a specific job is done in various places and then perform some final actions.
  • Loading branch information
Robyt3 committed Oct 20, 2023
1 parent 09978f3 commit 0b5dc71
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 10 deletions.
5 changes: 5 additions & 0 deletions src/engine/client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,7 @@ void CClient::Update()
// pump the network
PumpNetwork();

// TODO: use job completion callback
if(m_pMapdownloadTask)
{
if(m_pMapdownloadTask->State() == HTTP_DONE)
Expand All @@ -2651,6 +2652,7 @@ void CClient::Update()
}
}

// TODO: use job completion callback
if(m_pDDNetInfoTask)
{
if(m_pDDNetInfoTask->State() == HTTP_DONE)
Expand All @@ -2667,6 +2669,7 @@ void CClient::Update()
}
}

// TODO: use job completion callback
if(State() == IClient::STATE_ONLINE)
{
if(!m_EditJobs.empty())
Expand All @@ -2685,6 +2688,8 @@ void CClient::Update()
}
}

Engine()->UpdateJobs();

// update the server browser
m_ServerBrowser.Update();

Expand Down
1 change: 1 addition & 0 deletions src/engine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class IEngine : public IInterface

virtual void Init() = 0;
virtual void AddJob(std::shared_ptr<IJob> pJob) = 0;
virtual void UpdateJobs() = 0;
virtual void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) = 0;
static void RunJobBlocking(IJob *pJob);
};
Expand Down
3 changes: 3 additions & 0 deletions src/engine/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,8 @@ int CServer::Run()
int64_t t = time_get();
int NewTicks = 0;

pEngine->UpdateJobs();

// load new map
if(m_MapReload || m_CurrentGameTick >= MAX_TICK) // force reload to make sure the ticks stay within a valid range
{
Expand Down Expand Up @@ -2820,6 +2822,7 @@ int CServer::Run()
else if(m_aClients[ClientID].m_DnsblState == CClient::DNSBL_STATE_PENDING &&
m_aClients[ClientID].m_pDnsblLookup->Status() == IJob::STATE_DONE)
{
// TODO: use job completion callback
if(m_aClients[ClientID].m_pDnsblLookup->m_Result != 0)
{
// entry not found -> whitelisted
Expand Down
5 changes: 5 additions & 0 deletions src/engine/shared/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class CEngine : public IEngine
m_JobPool.Add(std::move(pJob));
}

void UpdateJobs() override
{
m_JobPool.Update();
}

void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) override
{
m_pFutureLogger->Set(pLogger);
Expand Down
56 changes: 47 additions & 9 deletions src/engine/shared/jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ CJobPool::CJobPool()
// empty the pool
m_Shutdown = false;
m_Lock = lock_create();
m_FinishedLock = lock_create();
sphore_init(&m_Semaphore);
m_pFirstJob = 0;
m_pLastJob = 0;
m_pFirstJob = nullptr;
m_pLastJob = nullptr;
m_pFirstFinishedJob = nullptr;
m_pLastFinishedJob = nullptr;
}

CJobPool::~CJobPool()
Expand All @@ -40,27 +43,38 @@ void CJobPool::WorkerThread(void *pUser)

while(!pPool->m_Shutdown)
{
std::shared_ptr<IJob> pJob = 0;

// fetch job from queue
sphore_wait(&pPool->m_Semaphore);
std::shared_ptr<IJob> pJob = nullptr;
{
CLockScope ls(pPool->m_Lock);
CLockScope LockScope(pPool->m_Lock);
if(pPool->m_pFirstJob)
{
pJob = pPool->m_pFirstJob;
pPool->m_pFirstJob = pPool->m_pFirstJob->m_pNext;
// allow remaining objects in list to destruct, even when current object stays alive
pJob->m_pNext = nullptr;
if(!pPool->m_pFirstJob)
pPool->m_pLastJob = 0;
pPool->m_pLastJob = nullptr;
}
}

// do the job if we have one
if(pJob)
{
RunBlocking(pJob.get());
pJob->m_Status = IJob::STATE_RUNNING;
pJob->Run();
pJob->m_Status = IJob::STATE_DONE;

// add job to queue of finished jobs
{
CLockScope LockScope(pPool->m_FinishedLock);
if(pPool->m_pLastFinishedJob)
pPool->m_pLastFinishedJob->m_pNext = pJob;
pPool->m_pLastFinishedJob = std::move(pJob);
if(!pPool->m_pFirstFinishedJob)
pPool->m_pFirstFinishedJob = pPool->m_pLastFinishedJob;
}
}
}
}
Expand All @@ -86,14 +100,37 @@ void CJobPool::Destroy()
thread_wait(pThread);
m_vpThreads.clear();
lock_destroy(m_Lock);
lock_destroy(m_FinishedLock);
sphore_destroy(&m_Semaphore);
}

void CJobPool::Update()
{
while(true)
{
std::shared_ptr<IJob> pJob = nullptr;
{
CLockScope LockScope(m_FinishedLock);
if(m_pFirstFinishedJob)
{
pJob = m_pFirstFinishedJob;
m_pFirstFinishedJob = m_pFirstFinishedJob->m_pNext;
pJob->m_pNext = nullptr;
if(!m_pFirstFinishedJob)
m_pLastFinishedJob = nullptr;
}
}
if(!pJob)
break;
pJob->OnFinish();
}
}

void CJobPool::Add(std::shared_ptr<IJob> pJob)
{
// add job to queue
{
CLockScope ls(m_Lock);
// add job to queue
CLockScope LockScope(m_Lock);
if(m_pLastJob)
m_pLastJob->m_pNext = pJob;
m_pLastJob = std::move(pJob);
Expand All @@ -109,4 +146,5 @@ void CJobPool::RunBlocking(IJob *pJob)
pJob->m_Status = IJob::STATE_RUNNING;
pJob->Run();
pJob->m_Status = IJob::STATE_DONE;
pJob->OnFinish();
}
9 changes: 9 additions & 0 deletions src/engine/shared/jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ class IJob
std::atomic<int> m_Status;
virtual void Run() = 0;

/**
* Called on the main thread after the job is done.
*/
virtual void OnFinish() {};

public:
IJob();
IJob(const IJob &Other) = delete;
Expand All @@ -45,6 +50,9 @@ class CJobPool
SEMAPHORE m_Semaphore;
std::shared_ptr<IJob> m_pFirstJob GUARDED_BY(m_Lock);
std::shared_ptr<IJob> m_pLastJob GUARDED_BY(m_Lock);
LOCK m_FinishedLock;
std::shared_ptr<IJob> m_pFirstFinishedJob GUARDED_BY(m_FinishedLock);
std::shared_ptr<IJob> m_pLastFinishedJob GUARDED_BY(m_FinishedLock);

static void WorkerThread(void *pUser) NO_THREAD_SAFETY_ANALYSIS;

Expand All @@ -54,6 +62,7 @@ class CJobPool

void Init(int NumThreads);
void Destroy();
void Update();
void Add(std::shared_ptr<IJob> pJob) REQUIRES(!m_Lock);
static void RunBlocking(IJob *pJob);
};
Expand Down
1 change: 1 addition & 0 deletions src/game/client/components/skins.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ const CSkin *CSkins::FindImpl(const char *pName)
const auto SkinDownloadIt = m_DownloadSkins.find(pName);
if(SkinDownloadIt != m_DownloadSkins.end())
{
// TODO: use job completion callback
if(SkinDownloadIt->second->m_pTask && SkinDownloadIt->second->m_pTask->State() == HTTP_DONE)
{
char aPath[IO_MAX_PATH_LENGTH];
Expand Down
1 change: 1 addition & 0 deletions src/game/client/components/sounds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void CSounds::OnRender()
// check for sound initialisation
if(m_WaitForSoundJob)
{
// TODO: use job completion callback
if(m_pSoundJob->Status() == IJob::STATE_DONE)
m_WaitForSoundJob = false;
else
Expand Down
1 change: 1 addition & 0 deletions src/game/editor/editor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7781,6 +7781,7 @@ bool CEditor::PerformAutosave()

void CEditor::HandleWriterFinishJobs()
{
// TODO: use job completion callback
if(m_WriterFinishJobs.empty())
return;

Expand Down
37 changes: 36 additions & 1 deletion src/test/jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class Jobs : public ::testing::Test
{
m_Pool.Add(std::move(pJob));
}
void Update()
{
m_Pool.Update();
}
void RunBlocking(IJob *pJob)
{
CJobPool::RunBlocking(pJob);
Expand All @@ -32,11 +36,15 @@ class Jobs : public ::testing::Test
class CJob : public IJob
{
std::function<void()> m_JobFunction;
std::function<void()> m_FinishFunction;
void Run() override { m_JobFunction(); }
void OnFinish() override { m_FinishFunction(); }

public:
CJob(std::function<void()> &&JobFunction) :
m_JobFunction(JobFunction) {}
m_JobFunction(JobFunction), m_FinishFunction([] {}) {}
CJob(std::function<void()> &&JobFunction, std::function<void()> &&FinishFunction) :
m_JobFunction(JobFunction), m_FinishFunction(FinishFunction) {}
};

TEST_F(Jobs, Constructor)
Expand All @@ -57,6 +65,15 @@ TEST_F(Jobs, RunBlocking)
EXPECT_EQ(Result, 1);
}

TEST_F(Jobs, RunBlockingFinish)
{
int Result = 0;
CJob Job([&] { Result = 1; }, [&] { Result = 2; });
EXPECT_EQ(Result, 0);
RunBlocking(&Job);
EXPECT_EQ(Result, 2);
}

TEST_F(Jobs, Wait)
{
SEMAPHORE sphore;
Expand Down Expand Up @@ -145,3 +162,21 @@ TEST_F(Jobs, Many)
}
new(&m_Pool) CJobPool();
}

TEST_F(Jobs, FinishFunction)
{
int Result = 0;
EXPECT_EQ(Result, 0);
auto pJob = std::make_shared<CJob>([&] { Result = 1; }, [&] { Result = 2; });

Add(pJob);
while(pJob->Status() != IJob::STATE_DONE)
{
// yay, busy loop...
thread_yield();
}
EXPECT_EQ(Result, 1);

Update();
EXPECT_EQ(Result, 2);
}

0 comments on commit 0b5dc71

Please sign in to comment.