Skip to content

Commit

Permalink
WIP: Add IJob::Done callback called on main thread
Browse files Browse the repository at this point in the history
We currently check if specific jobs are done in various places and then perform some final actions, which is simplified by adding the `IJob::Done` function to perform these final actions which is automatically called on the main thread when a job is done.

TODO: Ensure that Done is called for all jobs that have started before the client quits?

TODO: Ensure that all jobs that have been queueed are completed before quitting the client? There are important jobs like saving editor maps, which should not be ignored when the client quits but has too many other jobs queue as well.
  • Loading branch information
Robyt3 committed Oct 22, 2023
1 parent 8bd9871 commit e826d8e
Show file tree
Hide file tree
Showing 19 changed files with 342 additions and 181 deletions.
125 changes: 57 additions & 68 deletions src/engine/client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,18 @@ void CClient::ProcessServerPacket(CNetChunk *pPacket, int Conn, bool Dummy)
m_pMapdownloadTask = HttpGetFile(pMapUrl ? pMapUrl : aUrl, Storage(), m_aMapdownloadFilenameTemp, IStorage::TYPE_SAVE);
m_pMapdownloadTask->Timeout(CTimeout{g_Config.m_ClMapDownloadConnectTimeoutMs, 0, g_Config.m_ClMapDownloadLowSpeedLimit, g_Config.m_ClMapDownloadLowSpeedTime});
m_pMapdownloadTask->MaxResponseSize(1024 * 1024 * 1024); // 1 GiB
m_pMapdownloadTask->DoneHandler([this](CHttpRequest *pRequest) {
if(pRequest->State() == HTTP_DONE)
{
FinishMapDownload();
}
else if(pRequest->State() == HTTP_ERROR || pRequest->State() == HTTP_ABORTED)
{
dbg_msg("webdl", "http failed, falling back to gameserver");
ResetMapDownload();
SendMapRequest();
}
});
Engine()->AddJob(m_pMapdownloadTask);
}
else
Expand Down Expand Up @@ -2639,51 +2651,7 @@ void CClient::Update()
// pump the network
PumpNetwork();

if(m_pMapdownloadTask)
{
if(m_pMapdownloadTask->State() == HTTP_DONE)
FinishMapDownload();
else if(m_pMapdownloadTask->State() == HTTP_ERROR || m_pMapdownloadTask->State() == HTTP_ABORTED)
{
dbg_msg("webdl", "http failed, falling back to gameserver");
ResetMapDownload();
SendMapRequest();
}
}

if(m_pDDNetInfoTask)
{
if(m_pDDNetInfoTask->State() == HTTP_DONE)
FinishDDNetInfo();
else if(m_pDDNetInfoTask->State() == HTTP_ERROR)
{
Storage()->RemoveFile(m_aDDNetInfoTmp, IStorage::TYPE_SAVE);
ResetDDNetInfo();
}
else if(m_pDDNetInfoTask->State() == HTTP_ABORTED)
{
Storage()->RemoveFile(m_aDDNetInfoTmp, IStorage::TYPE_SAVE);
m_pDDNetInfoTask = NULL;
}
}

if(State() == IClient::STATE_ONLINE)
{
if(!m_EditJobs.empty())
{
std::shared_ptr<CDemoEdit> pJob = m_EditJobs.front();
if(pJob->Status() == IJob::STATE_DONE)
{
char aBuf[IO_MAX_PATH_LENGTH + 64];
str_format(aBuf, sizeof(aBuf), "Successfully saved the replay to %s!", pJob->Destination());
m_pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "replay", aBuf);

GameClient()->Echo(Localize("Successfully saved the replay!"));

m_EditJobs.pop_front();
}
}
}
Engine()->UpdateJobs();

// update the server browser
m_ServerBrowser.Update();
Expand Down Expand Up @@ -3551,39 +3519,49 @@ void CClient::SaveReplay(const int Length, const char *pFilename)
}

if(!DemoRecorder(RECORDER_REPLAYS)->IsRecording())
{
m_pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "replay", "ERROR: demorecorder isn't recording. Try to rejoin to fix that.");
return;
}
else if(DemoRecorder(RECORDER_REPLAYS)->Length() < 1)
m_pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "replay", "ERROR: demorecorder isn't recording for at least 1 second.");
else
{
// First we stop the recorder to slice correctly the demo after
DemoRecorder_Stop(RECORDER_REPLAYS);
m_pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "replay", "ERROR: demorecorder isn't recording for at least 1 second.");
return;
}

char aDate[64];
str_timestamp(aDate, sizeof(aDate));
char aDate[64];
str_timestamp(aDate, sizeof(aDate));

char aFilename[IO_MAX_PATH_LENGTH];
if(str_comp(pFilename, "") == 0)
str_format(aFilename, sizeof(aFilename), "demos/replays/%s_%s (replay).demo", m_aCurrentMap, aDate);
else
str_format(aFilename, sizeof(aFilename), "demos/replays/%s.demo", pFilename);
char aFilename[IO_MAX_PATH_LENGTH];
if(str_comp(pFilename, "") == 0)
str_format(aFilename, sizeof(aFilename), "demos/replays/%s_%s (replay).demo", m_aCurrentMap, aDate);
else
str_format(aFilename, sizeof(aFilename), "demos/replays/%s.demo", pFilename);

char *pSrc = m_aDemoRecorder[RECORDER_REPLAYS].GetCurrentFilename();
// Check if replay with this name is already being saved at the moment
if(std::any_of(std::begin(m_vpEditJobs), std::end(m_vpEditJobs), [aFilename](const auto &pJob) { return str_comp(aFilename, pJob->Destination()) == 0; }))
{
m_pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "replay", "ERROR: demorecorder is already recording to a file with this name.");
return;
}

// Slice the demo to get only the last cl_replay_length seconds
const int EndTick = GameTick(g_Config.m_ClDummy);
const int StartTick = EndTick - Length * GameTickSpeed();
// First we stop the recorder to slice correctly the demo after
DemoRecorder_Stop(RECORDER_REPLAYS);
char *pSrc = m_aDemoRecorder[RECORDER_REPLAYS].GetCurrentFilename();

m_pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "replay", "Saving replay...");
// Slice the demo to get only the last cl_replay_length seconds
const int EndTick = GameTick(g_Config.m_ClDummy);
const int StartTick = EndTick - Length * GameTickSpeed();

// Create a job to do this slicing in background because it can be a bit long depending on the file size
std::shared_ptr<CDemoEdit> pDemoEditTask = std::make_shared<CDemoEdit>(GameClient()->NetVersion(), &m_SnapshotDelta, m_pStorage, pSrc, aFilename, StartTick, EndTick);
Engine()->AddJob(pDemoEditTask);
m_EditJobs.push_back(pDemoEditTask);
m_pConsole->Print(IConsole::OUTPUT_LEVEL_STANDARD, "replay", "Saving replay...");

// And we restart the recorder
DemoRecorder_StartReplayRecorder();
}
// Create a job to do this slicing in background because it can be a bit long depending on the file size
std::shared_ptr<CDemoEdit> pDemoEditTask = std::make_shared<CDemoEdit>(this, GameClient()->NetVersion(), &m_SnapshotDelta, pSrc, aFilename, StartTick, EndTick);
Engine()->AddJob(pDemoEditTask);
m_vpEditJobs.push_back(pDemoEditTask);

// And we restart the recorder
DemoRecorder_StartReplayRecorder();
}

void CClient::DemoSlice(const char *pDstPath, CLIENTFUNC_FILTER pfnFilter, void *pUser)
Expand Down Expand Up @@ -4660,6 +4638,17 @@ void CClient::RequestDDNetInfo()
m_pDDNetInfoTask = HttpGetFile(aUrl, Storage(), m_aDDNetInfoTmp, IStorage::TYPE_SAVE);
m_pDDNetInfoTask->Timeout(CTimeout{10000, 0, 500, 10});
m_pDDNetInfoTask->IpResolve(IPRESOLVE::V4);
m_pDDNetInfoTask->DoneHandler([this](CHttpRequest *pRequest) {
if(pRequest->State() == HTTP_DONE)
{
FinishDDNetInfo();
}
else if(pRequest->State() == HTTP_ERROR || pRequest->State() == HTTP_ABORTED)
{
Storage()->RemoveFile(m_aDDNetInfoTmp, IStorage::TYPE_SAVE);
ResetDDNetInfo();
}
});
Engine()->AddJob(m_pDDNetInfoTask);
}

Expand Down
6 changes: 4 additions & 2 deletions src/engine/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
#ifndef ENGINE_CLIENT_CLIENT_H
#define ENGINE_CLIENT_CLIENT_H

#include <deque>
#include <memory>
#include <vector>

#include <base/hash.h>
#include <base/logger.h>
Expand All @@ -23,6 +23,7 @@
#include <engine/shared/host_lookup.h>
#include <engine/shared/http.h>
#include <engine/shared/network.h>
#include <engine/textrender.h>
#include <engine/warning.h>

#include "graph.h"
Expand Down Expand Up @@ -196,7 +197,7 @@ class CClient : public IClient, public CDemoPlayer::IListener

CSnapshotDelta m_SnapshotDelta;

std::deque<std::shared_ptr<CDemoEdit>> m_EditJobs;
std::vector<std::shared_ptr<CDemoEdit>> m_vpEditJobs;

//
bool m_CanReceiveServerCapabilities;
Expand Down Expand Up @@ -254,6 +255,7 @@ class CClient : public IClient, public CDemoPlayer::IListener
public:
IConfigManager *ConfigManager() { return m_pConfigManager; }
CConfig *Config() { return m_pConfig; }
IConsole *Console() { return m_pConsole; }
IDiscord *Discord() { return m_pDiscord; }
IEngine *Engine() { return m_pEngine; }
IGameClient *GameClient() { return m_pGameClient; }
Expand Down
26 changes: 21 additions & 5 deletions src/engine/client/demoedit.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#include "demoedit.h"

#include <engine/client/client.h>
#include <engine/console.h>
#include <engine/shared/demo.h>
#include <engine/storage.h>

CDemoEdit::CDemoEdit(const char *pNetVersion, class CSnapshotDelta *pSnapshotDelta, IStorage *pStorage, const char *pDemo, const char *pDst, int StartTick, int EndTick) :
m_SnapshotDelta(*pSnapshotDelta),
m_pStorage(pStorage)
#include <game/localization.h>

CDemoEdit::CDemoEdit(CClient *pClient, const char *pNetVersion, class CSnapshotDelta *pSnapshotDelta, const char *pDemo, const char *pDst, int StartTick, int EndTick) :
m_pClient(pClient),
m_SnapshotDelta(*pSnapshotDelta)
{
str_copy(m_aDemo, pDemo);
str_copy(m_aDst, pDst);
Expand All @@ -14,13 +18,25 @@ CDemoEdit::CDemoEdit(const char *pNetVersion, class CSnapshotDelta *pSnapshotDel
m_EndTick = EndTick;

// Init the demoeditor
m_DemoEditor.Init(pNetVersion, &m_SnapshotDelta, NULL, pStorage);
m_DemoEditor.Init(pNetVersion, &m_SnapshotDelta, NULL, pClient->Storage());
}

void CDemoEdit::Run()
{
// Slice the current demo
m_DemoEditor.Slice(m_aDemo, m_aDst, m_StartTick, m_EndTick, NULL, 0);
// We remove the temporary demo file
m_pStorage->RemoveFile(m_aDemo, IStorage::TYPE_SAVE);
m_pClient->Storage()->RemoveFile(m_aDemo, IStorage::TYPE_SAVE);
}

void CDemoEdit::Done()
{
if(m_pClient->State() != IClient::STATE_ONLINE)
return;

char aBuf[IO_MAX_PATH_LENGTH + 64];
str_format(aBuf, sizeof(aBuf), "Successfully saved the replay to '%s'!", Destination());
m_pClient->Console()->Print(IConsole::OUTPUT_LEVEL_STANDARD, "replay", aBuf);

m_pClient->GameClient()->Echo(Localize("Successfully saved the replay!"));
}
12 changes: 7 additions & 5 deletions src/engine/client/demoedit.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
#include <engine/shared/jobs.h>
#include <engine/shared/snapshot.h>

class IStorage;
class CClient;

class CDemoEdit : public IJob
{
CClient *m_pClient;
CSnapshotDelta m_SnapshotDelta;
IStorage *m_pStorage;

CDemoEditor m_DemoEditor;

Expand All @@ -19,9 +19,11 @@ class CDemoEdit : public IJob
int m_StartTick;
int m_EndTick;

public:
CDemoEdit(const char *pNetVersion, CSnapshotDelta *pSnapshotDelta, IStorage *pStorage, const char *pDemo, const char *pDst, int StartTick, int EndTick);
void Run() override;
char *Destination() { return m_aDst; }
void Done() override;

public:
CDemoEdit(CClient *pClient, const char *pNetVersion, CSnapshotDelta *pSnapshotDelta, const char *pDemo, const char *pDst, int StartTick, int EndTick);
const char *Destination() const { return m_aDst; }
};
#endif
1 change: 1 addition & 0 deletions src/engine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class IEngine : public IInterface

virtual void Init() = 0;
virtual void AddJob(std::shared_ptr<IJob> pJob) = 0;
virtual void UpdateJobs() = 0;
virtual void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) = 0;
static void RunJobBlocking(IJob *pJob);
};
Expand Down
3 changes: 3 additions & 0 deletions src/engine/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2745,6 +2745,8 @@ int CServer::Run()
int64_t t = time_get();
int NewTicks = 0;

pEngine->UpdateJobs();

// load new map
if(m_MapReload || m_CurrentGameTick >= MAX_TICK) // force reload to make sure the ticks stay within a valid range
{
Expand Down Expand Up @@ -2821,6 +2823,7 @@ int CServer::Run()
else if(m_aClients[ClientID].m_DnsblState == CClient::DNSBL_STATE_PENDING &&
m_aClients[ClientID].m_pDnsblLookup->Status() == IJob::STATE_DONE)
{
// TODO: use job completion callback
if(m_aClients[ClientID].m_pDnsblLookup->m_Result != 0)
{
// entry not found -> whitelisted
Expand Down
5 changes: 5 additions & 0 deletions src/engine/shared/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class CEngine : public IEngine
m_JobPool.Add(std::move(pJob));
}

void UpdateJobs() override
{
m_JobPool.Update();
}

void SetAdditionalLogger(std::shared_ptr<ILogger> &&pLogger) override
{
m_pFutureLogger->Set(pLogger);
Expand Down
8 changes: 8 additions & 0 deletions src/engine/shared/http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ void CHttpRequest::Run()
m_State = OnCompletion(FinalState);
}

void CHttpRequest::Done()
{
if(m_DoneHandler)
{
m_DoneHandler(this);
}
}

bool CHttpRequest::BeforeInit()
{
if(m_WriteToFile)
Expand Down
8 changes: 8 additions & 0 deletions src/engine/shared/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#include <algorithm>
#include <atomic>
#include <functional>

#include <engine/shared/jobs.h>

typedef struct _json_value json_value;
Expand Down Expand Up @@ -39,6 +41,8 @@ struct CTimeout
long LowSpeedTime;
};

typedef std::function<void(class CHttpRequest *pRequest)> TRequestDoneHandler;

class CHttpRequest : public IJob
{
enum class REQUEST
Expand Down Expand Up @@ -71,6 +75,8 @@ class CHttpRequest : public IJob
char m_aDestAbsolute[IO_MAX_PATH_LENGTH] = {0};
char m_aDest[IO_MAX_PATH_LENGTH] = {0};

TRequestDoneHandler m_DoneHandler = nullptr;

std::atomic<double> m_Size{0.0};
std::atomic<double> m_Current{0.0};
std::atomic<int> m_Progress{0};
Expand All @@ -81,6 +87,7 @@ class CHttpRequest : public IJob
std::atomic<bool> m_Abort{false};

void Run() override;
void Done() override;
// Abort the request with an error if `BeforeInit()` returns false.
bool BeforeInit();
int RunImpl(void *pUser);
Expand Down Expand Up @@ -133,6 +140,7 @@ class CHttpRequest : public IJob
str_format(aHeader, sizeof(aHeader), "%s: %d", pName, Value);
Header(aHeader);
}
void DoneHandler(TRequestDoneHandler &&DoneHandler) { m_DoneHandler = std::move(DoneHandler); }

const char *Dest()
{
Expand Down
Loading

0 comments on commit e826d8e

Please sign in to comment.