Skip to content

Commit

Permalink
Basic error handling for curl multi
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeyRyabinin authored and Sergey Ryabinin committed Jan 24, 2025
1 parent 7e85c62 commit fcfea62
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace Http
/* SDK side */
const CurlMultiHttpClient* m_client = nullptr;
ExecutionPolicy m_execPolicy;
std::function<void()> m_onCurlDoneFn;
std::function<void()> m_onCurlDoneFn; // called in a main multi loop => must be lightweight.
Aws::Utils::DateTime startTransmissionTime;

/* Curl calls the SDK back */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/utils/memory/stl/AWSString.h>
#include <aws/core/utils/memory/stl/AWSQueue.h>
#include <aws/core/utils/memory/stl/AWSSet.h>
#include <atomic>
#include <thread>
#include <queue>
Expand Down Expand Up @@ -86,10 +87,11 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient
virtual void OverrideOptionsOnConnectionHandle(CURL*) const {}

private:
void SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const;
bool SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const;

static std::shared_ptr<HttpResponse> HandleCurlResponse(Curl::CurlEasyHandleContext* pEasyHandleCtx);
static void CurlMultiPerformThread(CurlMultiHttpClient* pClient);
void CurlMultiPerformReset();

std::thread m_multiHandleThread;
std::atomic<bool> m_isRunning;
Expand All @@ -99,7 +101,8 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient
// mutable std::mutex m_tasksMutex;
mutable std::atomic<size_t> m_tasksQueued;
mutable std::mutex m_tasksMutex;
mutable Aws::UnorderedMap<CURL*, std::shared_ptr<Curl::CurlEasyHandleContext>> m_tasks;
// used to track tasks sent to multi handle, for handling multi perform errors
mutable Aws::UnorderedSet<Curl::CurlEasyHandleContext*> m_multiTasks;

CurlMultiHttpClientConfig m_config;

Expand Down
70 changes: 52 additions & 18 deletions src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,24 @@ int CurlMultiDebugCallback(CURL *handle, curl_infotype type, char *data, size_t
return 0;
}

void CurlMultiHttpClient::CurlMultiPerformReset()
{
// TODO: refactor
std::unique_lock<std::mutex> lockGuard(m_signalMutex);
std::unique_lock<std::mutex> lock(m_tasksMutex);
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Removing all easy handles from a multi handle and triggering callbacks.");

for(Curl::CurlEasyHandleContext* handleCtx : m_multiTasks)
{
curl_multi_remove_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), handleCtx->m_curlEasyHandle);
handleCtx->curlResultMsg = nullptr;
handleCtx->curlResult = static_cast<CURLcode>(-1);
handleCtx->m_onCurlDoneFn();
m_multiTasks.erase(handleCtx);
}
m_multiTasks.clear();
}

void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient)
{
assert(pClient && pClient->m_curlMultiHandleContainer.AccessCurlMultiHandle());
Expand All @@ -297,12 +315,18 @@ void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient)
}
if(!pClient->m_isRunning.load())
{
break;
break;
}

pClient->m_tasksQueued = 0;

CURLMcode mc = curl_multi_perform(multi_handle, &stillRunning);
if(mc != CURLM_OK)
{
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Curl curl_multi_perform returned error code " << mc
<< " resetting multi handle.");
pClient->CurlMultiPerformReset();
}
int msgQueue = 0;
do {

Expand All @@ -326,12 +350,14 @@ void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient)
} while(msgQueue > 0);

if(!mc && stillRunning)
/* wait for activity, timeout or "nothing" */
mc = curl_multi_poll(multi_handle, NULL, 0, 1000, NULL);

if(mc) {
fprintf(stderr, "curl_multi_poll() failed, code %d.\n", (int)mc);
break;
{
/* wait for activity, timeout or "nothing" */
mc = curl_multi_poll(multi_handle, NULL, 0, 1000, NULL);
if(mc)
{
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Curl curl_multi_poll returned error code " << mc);
break;
}
}
};
}
Expand Down Expand Up @@ -705,6 +731,10 @@ std::shared_ptr<HttpResponse> CurlMultiHttpClient::HandleCurlResponse(Curl::Curl
}

curl_multi_remove_handle(client->m_curlMultiHandleContainer.AccessCurlMultiHandle(), connectionHandle);
{
std::unique_lock<std::mutex> lock(client->m_tasksMutex);
client->m_multiTasks.erase(pEasyHandleCtx);
}

std::shared_ptr<HttpResponse> res = std::move(pEasyHandleCtx->writeContext.m_response);
pEasyHandleCtx->writeContext.m_response.reset();
Expand All @@ -721,19 +751,27 @@ std::shared_ptr<HttpResponse> CurlMultiHttpClient::HandleCurlResponse(Curl::Curl
return res;
}

void CurlMultiHttpClient::SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const
bool CurlMultiHttpClient::SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const
{
assert(pEasyHandleCtx);
AWS_UNREFERENCED_PARAM(pEasyHandleCtx);
CURLMcode curlMultiResponseCode = curl_multi_add_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(),
pEasyHandleCtx->m_curlEasyHandle);
if (CURLM_OK != curlMultiResponseCode)
{
return false;
}

{
std::unique_lock<std::mutex> lock(m_tasksMutex);
m_multiTasks.insert(pEasyHandleCtx);
m_tasksQueued++;
}
{
std::unique_lock<std::mutex> lockGuard(m_signalMutex);
m_signalRunning.notify_one();
curl_multi_wakeup(m_curlMultiHandleContainer.AccessCurlMultiHandle());
}
return true;
}

// Blocking
Expand Down Expand Up @@ -774,18 +812,14 @@ std::shared_ptr<HttpResponse> CurlMultiHttpClient::MakeRequest(const std::shared

easyHandleContext->startTransmissionTime = Aws::Utils::DateTime::Now();

CURLMcode curlMultiResponseCode = curl_multi_add_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(),
easyHandleContext->m_curlEasyHandle);
if (CURLM_OK != curlMultiResponseCode)
if(!SubmitTask(easyHandleContext))
{
response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION);
response->SetClientErrorMessage("Failed to add curl_easy_handle to curl_multi_handle.");
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to add curl_easy_handle to curl_multi_handle.");
return response;
response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION);
response->SetClientErrorMessage("Failed to add curl_easy_handle to curl_multi_handle.");
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to add curl_easy_handle to curl_multi_handle.");
return response;
}

SubmitTask(easyHandleContext);

// Task submitted, wait for it's completion
std::unique_lock<std::mutex> lockGuard(taskMutex);
signal.wait(lockGuard,
Expand Down

0 comments on commit fcfea62

Please sign in to comment.