Skip to content

Commit

Permalink
Merge pull request #450 from crypto-chassis/local_address
Browse files Browse the repository at this point in the history
dev: draft for specifying a local ip address when sending a request
  • Loading branch information
cryptochassis authored Oct 28, 2023
2 parents 6d152be + f059ed3 commit 2eb0ab2
Show file tree
Hide file tree
Showing 103 changed files with 1,126 additions and 981 deletions.
3 changes: 0 additions & 3 deletions app/src/single_order_execution/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ int main(int argc, char** argv) {
eventHandler.promisePtr = promisePtr;
#ifndef CCAPI_APP_IS_BACKTEST
SessionOptions sessionOptions;
sessionOptions.httpConnectionPoolIdleTimeoutMilliseconds = 1;
sessionOptions.httpMaxNumRetry = 0;
sessionOptions.httpMaxNumRedirect = 0;
SessionConfigs sessionConfigs;
Session session(sessionOptions, sessionConfigs, &eventHandler);
eventHandler.onInit(&session);
Expand Down
3 changes: 0 additions & 3 deletions app/src/spot_market_making/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ int main(int argc, char** argv) {
eventHandler.promisePtr = promisePtr;
#ifndef CCAPI_APP_IS_BACKTEST
SessionOptions sessionOptions;
sessionOptions.httpConnectionPoolIdleTimeoutMilliseconds = 1 + eventHandler.accountBalanceRefreshWaitSeconds;
sessionOptions.httpMaxNumRetry = 0;
sessionOptions.httpMaxNumRedirect = 0;
SessionConfigs sessionConfigs;
Session session(sessionOptions, sessionConfigs, &eventHandler);
eventHandler.onInit(&session);
Expand Down
2 changes: 1 addition & 1 deletion example/src/market_data_simple_subscription/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
set(NAME market_data_simple_subscription)
project(${NAME})
add_compile_definitions(CCAPI_ENABLE_SERVICE_MARKET_DATA)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_COINBASE)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_OKX)
add_executable(${NAME} main.cpp)
add_dependencies(${NAME} boost rapidjson)
2 changes: 1 addition & 1 deletion example/src/market_data_simple_subscription/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ int main(int argc, char** argv) {
SessionConfigs sessionConfigs;
MyEventHandler eventHandler;
Session session(sessionOptions, sessionConfigs, &eventHandler);
Subscription subscription("coinbase", "BTC-USD", "MARKET_DEPTH");
Subscription subscription("okx", "BTC-USDT", "MARKET_DEPTH");
session.subscribe(subscription);
std::this_thread::sleep_for(std::chrono::seconds(10));
session.stop();
Expand Down
4 changes: 3 additions & 1 deletion include/ccapi_cpp/ccapi_http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ class HttpConnection CCAPI_FINAL {
std::string toString() const {
std::ostringstream oss;
oss << streamPtr;
std::string output = "HttpConnection [host = " + host + ", port = " + port + ", streamPtr = " + oss.str() + "]";
std::string output = "HttpConnection [host = " + host + ", port = " + port + ", streamPtr = " + oss.str() +
", lastReceiveDataTp = " + UtilTime::getISOTimestamp(lastReceiveDataTp) + "]";
return output;
}
std::string host;
std::string port;
std::shared_ptr<beast::ssl_stream<beast::tcp_stream> > streamPtr;
TimePoint lastReceiveDataTp{std::chrono::seconds{0}};
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_HTTP_CONNECTION_H_
9 changes: 3 additions & 6 deletions include/ccapi_cpp/ccapi_macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@
#ifndef CCAPI_ORDER_PRICE_TIMES_QUANTITY_MIN
#define CCAPI_ORDER_PRICE_TIMES_QUANTITY_MIN "PRICE_TIMES_QUANTITY_MIN"
#endif
#ifndef CCAPI_INSTRUMENT_STATUS
#define CCAPI_INSTRUMENT_STATUS "INSTRUMENT_STATUS"
#endif
#ifndef CCAPI_CONTRACT_SIZE
#define CCAPI_CONTRACT_SIZE "CONTRACT_SIZE"
#endif
Expand Down Expand Up @@ -1004,12 +1007,6 @@
#ifndef CCAPI_BINANCE_API_SECRET
#define CCAPI_BINANCE_API_SECRET "BINANCE_API_SECRET"
#endif
// #ifndef CCAPI_BINANCE_MARGIN_API_KEY
// #define CCAPI_BINANCE_MARGIN_API_KEY "BINANCE_MARGIN_API_KEY"
// #endif
// #ifndef CCAPI_BINANCE_MARGIN_API_SECRET
// #define CCAPI_BINANCE_MARGIN_API_SECRET "BINANCE_MARGIN_API_SECRET"
// #endif
#ifndef CCAPI_BINANCE_USDS_FUTURES_API_KEY
#define CCAPI_BINANCE_USDS_FUTURES_API_KEY "BINANCE_USDS_FUTURES_API_KEY"
#endif
Expand Down
32 changes: 31 additions & 1 deletion include/ccapi_cpp/ccapi_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class Request CCAPI_FINAL {
", correlationId = " + correlationId + ", secondaryCorrelationId = " + secondaryCorrelationId +
(this->serviceName == CCAPI_FIX ? ", paramListFix = " + ccapi::toString(paramListFix) : ", paramList = " + ccapi::toString(paramList)) +
", credential = " + ccapi::toString(shortCredential) + ", operation = " + operationToString(operation) +
", timeSent = " + UtilTime::getISOTimestamp(timeSent) + "]";
", timeSent = " + UtilTime::getISOTimestamp(timeSent) + ", index = " + ccapi::toString(index) + ", localIpAddress = " + localIpAddress +
", baseUrl = " + baseUrl + "]";
return output;
}
const std::string& getCorrelationId() const { return correlationId; }
Expand Down Expand Up @@ -181,11 +182,36 @@ class Request CCAPI_FINAL {
std::pair<long long, long long> getTimeSentPair() const { return UtilTime::divide(timeSent); }
void setTimeSent(TimePoint timeSent) { this->timeSent = timeSent; }
int getIndex() const { return index; }
const std::string& getLocalIpAddress() const { return localIpAddress; }
const std::string& getBaseUrl() const { return baseUrl; }
const std::string& getHost() const { return host; }
const std::string& getPort() const { return port; }
void setIndex(int index) { this->index = index; }
void setCredential(const std::map<std::string, std::string>& credential) { this->credential = credential; }
void setCorrelationId(const std::string& correlationId) { this->correlationId = correlationId; }
void setSecondaryCorrelationId(const std::string& secondaryCorrelationId) { this->secondaryCorrelationId = secondaryCorrelationId; }
void setMarginType(const std::string& marginType) { this->marginType = marginType; }
void setLocalIpAddress(const std::string& localIpAddress) { this->localIpAddress = localIpAddress; }
void setBaseUrl(const std::string& baseUrl) {
this->baseUrl = baseUrl;
this->setBaseUrlParts();
}
void setBaseUrlParts() {
auto splitted1 = UtilString::split(this->baseUrl, "://");
if (splitted1.size() >= 2) {
auto splitted2 = UtilString::split(UtilString::split(splitted1.at(1), "/").at(0), ":");
this->host = splitted2.at(0);
if (splitted2.size() == 2) {
this->port = splitted2.at(1);
} else {
if (splitted1.at(0) == "https" || splitted1.at(0) == "wss") {
this->port = CCAPI_HTTPS_PORT_DEFAULT;
} else {
this->port = CCAPI_HTTP_PORT_DEFAULT;
}
}
}
}
#ifndef CCAPI_EXPOSE_INTERNAL

private:
Expand All @@ -202,6 +228,10 @@ class Request CCAPI_FINAL {
std::vector<std::vector<std::pair<int, std::string> > > paramListFix;
TimePoint timeSent{std::chrono::seconds{0}};
int index{};
std::string localIpAddress;
std::string baseUrl;
std::string host;
std::string port;
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_REQUEST_H_
54 changes: 34 additions & 20 deletions include/ccapi_cpp/ccapi_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,27 @@ class Session {
Session(const Session&) = delete;
Session& operator=(const Session&) = delete;
Session(const SessionOptions& sessionOptions = SessionOptions(), const SessionConfigs& sessionConfigs = SessionConfigs(),
EventHandler* eventHandler = nullptr, EventDispatcher* eventDispatcher = nullptr)
EventHandler* eventHandler = nullptr, EventDispatcher* eventDispatcher = nullptr
#ifndef SWIG
,
ServiceContext* serviceContextPtr = nullptr
#endif
)
: sessionOptions(sessionOptions),
sessionConfigs(sessionConfigs),
eventHandler(eventHandler),
#ifndef CCAPI_USE_SINGLE_THREAD
eventDispatcher(eventDispatcher),
#endif
eventQueue(sessionOptions.maxEventQueueSize),
serviceContextPtr(new ServiceContext()) {
eventQueue(sessionOptions.maxEventQueueSize)
#ifndef SWIG
,
serviceContextPtr(serviceContextPtr)
#endif
{
if (!this->serviceContextPtr) {
this->serviceContextPtr = new ServiceContext();
}
CCAPI_LOGGER_FUNCTION_ENTER;
#ifndef CCAPI_USE_SINGLE_THREAD
if (this->eventHandler) {
Expand All @@ -291,6 +303,7 @@ class Session {
delete this->eventDispatcher;
}
#endif
delete this->serviceContextPtr;
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void start() {
Expand Down Expand Up @@ -637,19 +650,19 @@ class Session {
return;
}
if (serviceName == CCAPI_MARKET_DATA) {
std::set<std::string> correlationIdSet;
std::set<std::string> duplicateCorrelationIdSet;
// std::set<std::string> correlationIdSet;
// std::set<std::string> duplicateCorrelationIdSet;
std::unordered_set<std::string> unsupportedExchangeFieldSet;
std::map<std::string, std::vector<Subscription> > subscriptionListByExchangeMap;
auto exchangeFieldMap = this->sessionConfigs.getExchangeFieldMap();
CCAPI_LOGGER_DEBUG("exchangeFieldMap = " + toString(exchangeFieldMap));
for (const auto& subscription : subscriptionList) {
auto correlationId = subscription.getCorrelationId();
if (correlationIdSet.find(correlationId) != correlationIdSet.end()) {
duplicateCorrelationIdSet.insert(correlationId);
} else {
correlationIdSet.insert(correlationId);
}
// auto correlationId = subscription.getCorrelationId();
// if (correlationIdSet.find(correlationId) != correlationIdSet.end()) {
// duplicateCorrelationIdSet.insert(correlationId);
// } else {
// correlationIdSet.insert(correlationId);
// }
auto exchange = subscription.getExchange();
CCAPI_LOGGER_DEBUG("exchange = " + exchange);
auto field = subscription.getField();
Expand All @@ -663,11 +676,11 @@ class Session {
}
subscriptionListByExchangeMap[exchange].push_back(subscription);
}
if (!duplicateCorrelationIdSet.empty()) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE,
"duplicated correlation ids: " + toString(duplicateCorrelationIdSet));
return;
}
// if (!duplicateCorrelationIdSet.empty()) {
// this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE,
// "duplicated correlation ids: " + toString(duplicateCorrelationIdSet));
// return;
// }
if (!unsupportedExchangeFieldSet.empty()) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE,
"unsupported exchange fields: " + toString(unsupportedExchangeFieldSet));
Expand Down Expand Up @@ -902,7 +915,8 @@ class Session {
virtual void setTimer(const std::string& id, long delayMilliseconds, std::function<void(const boost::system::error_code&)> errorHandler,
std::function<void()> successHandler) {
boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, id, delayMilliseconds, errorHandler, successHandler]() {
std::shared_ptr<steady_timer> timerPtr(new steady_timer(*this->serviceContextPtr->ioContextPtr, boost::asio::chrono::milliseconds(delayMilliseconds)));
std::shared_ptr<boost::asio::steady_timer> timerPtr(
new boost::asio::steady_timer(*this->serviceContextPtr->ioContextPtr, boost::asio::chrono::milliseconds(delayMilliseconds)));
timerPtr->async_wait([this, id, errorHandler, successHandler](const boost::system::error_code& ec) {
if (this->eventHandler) {
#ifdef CCAPI_USE_SINGLE_THREAD
Expand Down Expand Up @@ -963,12 +977,12 @@ class Session {
#endif
SessionOptions sessionOptions;
SessionConfigs sessionConfigs;
EventHandler* eventHandler;
EventHandler* eventHandler{nullptr};
#ifndef CCAPI_USE_SINGLE_THREAD
EventDispatcher* eventDispatcher;
EventDispatcher* eventDispatcher{nullptr};
bool useInternalEventDispatcher{};
#endif
std::shared_ptr<ServiceContext> serviceContextPtr;
ServiceContext* serviceContextPtr{nullptr};
std::map<std::string, std::map<std::string, std::shared_ptr<Service> > > serviceByServiceNameExchangeMap;
std::thread t;
Queue<Event> eventQueue;
Expand Down
2 changes: 0 additions & 2 deletions include/ccapi_cpp/ccapi_session_configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ class SessionConfigs CCAPI_FINAL {
{CCAPI_EXCHANGE_NAME_BITMEX, CCAPI_BITMEX_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_US, CCAPI_BINANCE_US_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE, CCAPI_BINANCE_URL_WS_BASE},
// {CCAPI_EXCHANGE_NAME_BINANCE_MARGIN, CCAPI_BINANCE_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_USDS_FUTURES, CCAPI_BINANCE_USDS_FUTURES_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_COIN_FUTURES, CCAPI_BINANCE_COIN_FUTURES_URL_WS_BASE},
{CCAPI_EXCHANGE_NAME_HUOBI, CCAPI_HUOBI_URL_WS_BASE},
Expand Down Expand Up @@ -374,7 +373,6 @@ class SessionConfigs CCAPI_FINAL {
{CCAPI_EXCHANGE_NAME_BITMEX, CCAPI_BITMEX_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_US, CCAPI_BINANCE_US_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE, CCAPI_BINANCE_URL_REST_BASE},
// {CCAPI_EXCHANGE_NAME_BINANCE_MARGIN, CCAPI_BINANCE_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_USDS_FUTURES, CCAPI_BINANCE_USDS_FUTURES_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_BINANCE_COIN_FUTURES, CCAPI_BINANCE_COIN_FUTURES_URL_REST_BASE},
{CCAPI_EXCHANGE_NAME_HUOBI, CCAPI_HUOBI_URL_REST_BASE},
Expand Down
8 changes: 4 additions & 4 deletions include/ccapi_cpp/ccapi_session_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SessionOptions CCAPI_FINAL {
", httpMaxNumRedirect = " + ccapi::toString(httpMaxNumRedirect) +
", httpRequestTimeoutMilliseconds = " + ccapi::toString(httpRequestTimeoutMilliseconds) +
", httpConnectionPoolMaxSize = " + ccapi::toString(httpConnectionPoolMaxSize) +
", httpConnectionPoolIdleTimeoutMilliseconds = " + ccapi::toString(httpConnectionPoolIdleTimeoutMilliseconds) +
", httpConnectionKeepAliveTimeoutSeconds = " + ccapi::toString(httpConnectionKeepAliveTimeoutSeconds) +
", enableOneHttpConnectionPerRequest = " + ccapi::toString(enableOneHttpConnectionPerRequest) + "]";
return output;
}
Expand All @@ -50,9 +50,9 @@ class SessionOptions CCAPI_FINAL {
int httpMaxNumRedirect{1};
long httpRequestTimeoutMilliseconds{10000};
int httpConnectionPoolMaxSize{1}; // used to set the maximal number of http connections to be kept in the pool (connections in the pool are idle)
long httpConnectionPoolIdleTimeoutMilliseconds{0}; // used to purge the http connection pool if all connections in the
// pool have stayed idle for at least this amount of time
bool enableOneHttpConnectionPerRequest{}; // create a new http connection for each request
long httpConnectionKeepAliveTimeoutSeconds{
10}; // used to remove a http connection from the http connection pool if it has stayed idle for at least this amount of time
bool enableOneHttpConnectionPerRequest{}; // create a new http connection for each request
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
long websocketConnectTimeoutMilliseconds{10000};
Expand Down
2 changes: 1 addition & 1 deletion include/ccapi_cpp/ccapi_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Subscription CCAPI_FINAL {
}
return output;
}
const std::string getSerializedCredential() const { return ::ccapi::toString(this->credential); }
const std::string getSerializedCredential() const { return ccapi::toString(this->credential); }
// 'getTimeSent' only works in C++. For other languages, please use 'getTimeSentISO'.
TimePoint getTimeSent() const { return timeSent; }
std::string getTimeSentISO() const { return UtilTime::getISOTimestamp(timeSent); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ class ExecutionManagementServiceAscendex : public ExecutionManagementService {
this->baseUrlRest = sessionConfigs.getUrlRestBase().at(this->exchangeName);
this->setHostRestFromUrlRest(this->baseUrlRest);
this->setHostWsFromUrlWs(this->baseUrlWs);
try {
this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
try {
this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#endif
// try {
// this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
// #else
// try {
// this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #endif
this->apiKeyName = CCAPI_ASCENDEX_API_KEY;
this->apiSecretName = CCAPI_ASCENDEX_API_SECRET;
this->apiAccountGroupName = CCAPI_ASCENDEX_API_ACCOUNT_GROUP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ class ExecutionManagementServiceBinance : public ExecutionManagementServiceBinan
this->baseUrlRest = sessionConfigs.getUrlRestBase().at(this->exchangeName);
this->setHostRestFromUrlRest(this->baseUrlRest);
this->setHostWsFromUrlWs(this->baseUrlWs);
try {
this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
try {
this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#endif
// try {
// this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
// #else
// try {
// this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #endif
this->apiKeyName = CCAPI_BINANCE_API_KEY;
this->apiSecretName = CCAPI_BINANCE_API_SECRET;
this->setupCredential({this->apiKeyName, this->apiSecretName});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ class ExecutionManagementServiceBinanceCoinFutures : public ExecutionManagementS
this->baseUrlRest = sessionConfigs.getUrlRestBase().at(this->exchangeName);
this->setHostRestFromUrlRest(this->baseUrlRest);
this->setHostWsFromUrlWs(this->baseUrlWs);
try {
this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
try {
this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
} catch (const std::exception& e) {
CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
}
#endif
// try {
// this->tcpResolverResultsRest = this->resolver.resolve(this->hostRest, this->portRest);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
// #else
// try {
// this->tcpResolverResultsWs = this->resolverWs.resolve(this->hostWs, this->portWs);
// } catch (const std::exception& e) {
// CCAPI_LOGGER_FATAL(std::string("e.what() = ") + e.what());
// }
// #endif
this->apiKeyName = CCAPI_BINANCE_COIN_FUTURES_API_KEY;
this->apiSecretName = CCAPI_BINANCE_COIN_FUTURES_API_SECRET;
this->setupCredential({this->apiKeyName, this->apiSecretName});
Expand Down
Loading

0 comments on commit 2eb0ab2

Please sign in to comment.