Skip to content

Commit

Permalink
WIP: Add IJob::Done callback called on main thread
Browse files Browse the repository at this point in the history
We currently check if specific jobs are done in various places and then perform some final actions, which is simplified by adding the `IJob::Done` function to perform these final actions which is automatically called on the main thread when a job is done.
  • Loading branch information
Robyt3 committed Oct 21, 2023
1 parent 9a966fa commit a4fd4ea
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 11 deletions.
5 changes: 5 additions & 0 deletions src/engine/client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,7 @@ void CClient::Update()
// pump the network
PumpNetwork();

// TODO: use job completion callback
if(m_pMapdownloadTask)
{
if(m_pMapdownloadTask->State() == HTTP_DONE)
Expand All @@ -2651,6 +2652,7 @@ void CClient::Update()
}
}

// TODO: use job completion callback
if(m_pDDNetInfoTask)
{
if(m_pDDNetInfoTask->State() == HTTP_DONE)
Expand All @@ -2667,6 +2669,7 @@ void CClient::Update()
}
}

// TODO: use job completion callback
if(State() == IClient::STATE_ONLINE)
{
if(!m_EditJobs.empty())
Expand All @@ -2685,6 +2688,8 @@ void CClient::Update()
}
}

Engine()->UpdateJobs();

// update the server browser
m_ServerBrowser.Update();

Expand Down
1 change: 1 addition & 0 deletions src/engine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class IEngine : public IInterface

virtual void Init() = 0;
virtual void AddJob(std::shared_ptr<IJob> pJob) = 0;
virtual void UpdateJobs() = 0;
virtual void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) = 0;
static void RunJobBlocking(IJob *pJob);
};
Expand Down
3 changes: 3 additions & 0 deletions src/engine/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,8 @@ int CServer::Run()
int64_t t = time_get();
int NewTicks = 0;

pEngine->UpdateJobs();

// load new map
if(m_MapReload || m_CurrentGameTick >= MAX_TICK) // force reload to make sure the ticks stay within a valid range
{
Expand Down Expand Up @@ -2820,6 +2822,7 @@ int CServer::Run()
else if(m_aClients[ClientID].m_DnsblState == CClient::DNSBL_STATE_PENDING &&
m_aClients[ClientID].m_pDnsblLookup->Status() == IJob::STATE_DONE)
{
// TODO: use job completion callback
if(m_aClients[ClientID].m_pDnsblLookup->m_Result != 0)
{
// entry not found -> whitelisted
Expand Down
5 changes: 5 additions & 0 deletions src/engine/shared/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ class CEngine : public IEngine
m_JobPool.Add(std::move(pJob));
}

void UpdateJobs() override
{
m_JobPool.Update();
}

void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) override
{
m_pFutureLogger->Set(pLogger);
Expand Down
59 changes: 49 additions & 10 deletions src/engine/shared/jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ CJobPool::CJobPool()
m_Shutdown = false;
m_Lock = lock_create();
sphore_init(&m_Semaphore);
m_pFirstJob = 0;
m_pLastJob = 0;
m_pFirstJob = nullptr;
m_pLastJob = nullptr;
m_LockDone = lock_create();
m_pFirstJobDone = nullptr;
m_pLastJobDone = nullptr;
}

CJobPool::~CJobPool()
Expand All @@ -40,27 +43,38 @@ void CJobPool::WorkerThread(void *pUser)

while(!pPool->m_Shutdown)
{
std::shared_ptr<IJob> pJob = 0;

// fetch job from queue
sphore_wait(&pPool->m_Semaphore);
std::shared_ptr<IJob> pJob = nullptr;
{
CLockScope ls(pPool->m_Lock);
CLockScope LockScope(pPool->m_Lock);
if(pPool->m_pFirstJob)
{
pJob = pPool->m_pFirstJob;
pPool->m_pFirstJob = pPool->m_pFirstJob->m_pNext;
// allow remaining objects in list to destruct, even when current object stays alive
pJob->m_pNext = nullptr;
if(!pPool->m_pFirstJob)
pPool->m_pLastJob = 0;
pPool->m_pLastJob = nullptr;
}
}

// do the job if we have one
// run the job if we have one
if(pJob)
{
RunBlocking(pJob.get());
pJob->m_Status = IJob::STATE_RUNNING;
pJob->Run();
pJob->m_Status = IJob::STATE_DONE;

// add job to queue of done jobs
{
CLockScope LockScope(pPool->m_LockDone);
if(pPool->m_pLastJobDone)
pPool->m_pLastJobDone->m_pNext = pJob;
pPool->m_pLastJobDone = std::move(pJob);
if(!pPool->m_pFirstJobDone)
pPool->m_pFirstJobDone = pPool->m_pLastJobDone;
}
}
}
}
Expand All @@ -86,14 +100,38 @@ void CJobPool::Destroy()
thread_wait(pThread);
m_vpThreads.clear();
lock_destroy(m_Lock);
lock_destroy(m_LockDone);
sphore_destroy(&m_Semaphore);
}

void CJobPool::Update()
{
while(true)
{
// fetch job from queue of done jobs
std::shared_ptr<IJob> pJob = nullptr;
{
CLockScope LockScope(m_LockDone);
if(m_pFirstJobDone)
{
pJob = m_pFirstJobDone;
m_pFirstJobDone = m_pFirstJobDone->m_pNext;
pJob->m_pNext = nullptr;
if(!m_pFirstJobDone)
m_pLastJobDone = nullptr;
}
}
if(!pJob)
break;
pJob->Done();
}
}

void CJobPool::Add(std::shared_ptr<IJob> pJob)
{
// add job to queue
{
CLockScope ls(m_Lock);
// add job to queue
CLockScope LockScope(m_Lock);
if(m_pLastJob)
m_pLastJob->m_pNext = pJob;
m_pLastJob = std::move(pJob);
Expand All @@ -109,4 +147,5 @@ void CJobPool::RunBlocking(IJob *pJob)
pJob->m_Status = IJob::STATE_RUNNING;
pJob->Run();
pJob->m_Status = IJob::STATE_DONE;
pJob->Done();
}
49 changes: 49 additions & 0 deletions src/engine/shared/jobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,18 @@ class IJob
std::shared_ptr<IJob> m_pNext;
std::atomic<EJobState> m_Status;

/**
* Performs tasks in a worker thread.
*/
virtual void Run() = 0;

/**
* Performs final tasks on the main thread after job is done.
*/
virtual void Done()
{
}

public:
IJob();
IJob(const IJob &Other) = delete;
Expand All @@ -47,15 +57,54 @@ class CJobPool
std::shared_ptr<IJob> m_pFirstJob GUARDED_BY(m_Lock);
std::shared_ptr<IJob> m_pLastJob GUARDED_BY(m_Lock);

LOCK m_LockDone;
std::shared_ptr<IJob> m_pFirstJobDone GUARDED_BY(m_LockDone);
std::shared_ptr<IJob> m_pLastJobDone GUARDED_BY(m_LockDone);

static void WorkerThread(void *pUser) NO_THREAD_SAFETY_ANALYSIS;

public:
CJobPool();
~CJobPool();

/**
* Initializes the job pool with the given number of worker threads.
*
* @param NumTheads The number of worker threads.
*
* @remark Must be called on the main thread.
*/
void Init(int NumThreads);

/**
* Destroys the job pool. Waits for worker threads to complete their current jobs.
*
* @remark Must be called on the main thread.
*/
void Destroy();

/**
* Updates the job pool, calling the `Done` function for completed jobs.
*
* @remark Must be called on the main thread.
*/
void Update() REQUIRES(!m_LockDone);

/**
* Adds a job to the queue of the job pool.
*
* @param pJob The job to enqueue.
*/
void Add(std::shared_ptr<IJob> pJob) REQUIRES(!m_Lock);

/**
* Runs a job's lifecycle functions in the current thread.
*
* @param pJob The job to run.
*
* @remark Must be called on the main thread, unless the job has not
* implemented a `Done` function that must run on the main thread.
*/
static void RunBlocking(IJob *pJob);
};
#endif
1 change: 1 addition & 0 deletions src/game/client/components/skins.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ const CSkin *CSkins::FindImpl(const char *pName)
const auto SkinDownloadIt = m_DownloadSkins.find(pName);
if(SkinDownloadIt != m_DownloadSkins.end())
{
// TODO: use job completion callback
if(SkinDownloadIt->second->m_pTask && SkinDownloadIt->second->m_pTask->State() == HTTP_DONE)
{
char aPath[IO_MAX_PATH_LENGTH];
Expand Down
1 change: 1 addition & 0 deletions src/game/client/components/sounds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void CSounds::OnRender()
// check for sound initialisation
if(m_WaitForSoundJob)
{
// TODO: use job completion callback
if(m_pSoundJob->Status() == IJob::STATE_DONE)
m_WaitForSoundJob = false;
else
Expand Down
1 change: 1 addition & 0 deletions src/game/editor/editor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7781,6 +7781,7 @@ bool CEditor::PerformAutosave()

void CEditor::HandleWriterFinishJobs()
{
// TODO: use job completion callback
if(m_WriterFinishJobs.empty())
return;

Expand Down
37 changes: 36 additions & 1 deletion src/test/jobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class Jobs : public ::testing::Test
{
m_Pool.Add(std::move(pJob));
}
void Update()
{
m_Pool.Update();
}
void RunBlocking(IJob *pJob)
{
CJobPool::RunBlocking(pJob);
Expand All @@ -32,11 +36,15 @@ class Jobs : public ::testing::Test
class CJob : public IJob
{
std::function<void()> m_JobFunction;
std::function<void()> m_DoneFunction;
void Run() override { m_JobFunction(); }
void Done() override { m_DoneFunction(); }

public:
CJob(std::function<void()> &&JobFunction) :
m_JobFunction(JobFunction) {}
m_JobFunction(JobFunction), m_DoneFunction([] {}) {}
CJob(std::function<void()> &&JobFunction, std::function<void()> &&DoneFunction) :
m_JobFunction(JobFunction), m_DoneFunction(DoneFunction) {}
};

TEST_F(Jobs, Constructor)
Expand All @@ -57,6 +65,15 @@ TEST_F(Jobs, RunBlocking)
EXPECT_EQ(Result, 1);
}

TEST_F(Jobs, RunBlockingDone)
{
int Result = 0;
CJob Job([&] { Result = 1; }, [&] { Result = 2; });
EXPECT_EQ(Result, 0);
RunBlocking(&Job);
EXPECT_EQ(Result, 2);
}

TEST_F(Jobs, Wait)
{
SEMAPHORE sphore;
Expand Down Expand Up @@ -145,3 +162,21 @@ TEST_F(Jobs, Many)
}
new(&m_Pool) CJobPool();
}

TEST_F(Jobs, DoneFunction)
{
int Result = 0;
EXPECT_EQ(Result, 0);
auto pJob = std::make_shared<CJob>([&] { Result = 1; }, [&] { Result = 2; });

Add(pJob);
while(pJob->Status() != IJob::STATE_DONE)
{
// yay, busy loop...
thread_yield();
}
EXPECT_EQ(Result, 1);

Update();
EXPECT_EQ(Result, 2);
}

0 comments on commit a4fd4ea

Please sign in to comment.