Skip to content

Commit

Permalink
Merge pull request #173 from diogomatoschaves/issue#163
Browse files Browse the repository at this point in the history
Issue#163
  • Loading branch information
diogomatoschaves authored Mar 1, 2024
2 parents 08df27c + 40109c6 commit 5cddd86
Show file tree
Hide file tree
Showing 48 changed files with 1,479 additions and 642 deletions.
2 changes: 1 addition & 1 deletion data/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def futures_exchange_info(self):

@pytest.fixture(autouse=True)
def mock_binance_client(mocker):
return mocker.patch('data.service.blueprints.bots_api.binance_client', FakeBinanceClient())
return mocker.patch('data.service.blueprints.bots_api._helpers.binance_client', FakeBinanceClient())


@pytest.fixture(autouse=True)
Expand Down
8 changes: 1 addition & 7 deletions data/service/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import os

import data.service.app as application


def create_app(testing=False, env_vars=None):
def create_app(testing=False):
"""Create and configure an instance of the Flask application."""

if env_vars is not None:
for key, value in env_vars.items():
os.environ.setdefault(key, value)

app = application.create_app()

app.config['TESTING'] = testing
Expand Down
28 changes: 21 additions & 7 deletions data/service/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import logging
import os
import sys
import time
from datetime import timedelta

from flask import Flask, send_from_directory
Expand All @@ -9,8 +12,10 @@
import redis
from flask_jwt_extended import JWTManager, create_access_token

from data.service.cron_jobs.app_health import check_app_health
from data.service.cron_jobs.main import start_background_scheduler
from shared.utils.config_parser import get_config
from shared.utils.helpers import is_pipeline_loading, LOADING

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
Expand All @@ -26,7 +31,7 @@
django.setup()


from database.model.models import Position
from database.model.models import Pipeline
from shared.utils.logger import configure_logger

config_vars = get_config('data')
Expand All @@ -38,19 +43,28 @@

def startup_task(app):

cache.set(LOADING, json.dumps([]))

start_background_scheduler(config_vars)

open_positions = Position.objects.filter(pipeline__active=True)
active_pipelines = Pipeline.objects.filter(active=True)

with app.app_context():
access_token = create_access_token(identity='abc', expires_delta=False)
bearer_token = 'Bearer ' + access_token
cache.set("bearer_token", bearer_token)

for open_position in open_positions:
start_symbol_trading(open_position.pipeline)
open_position.pipeline.active = True
open_position.pipeline.save()
for pipeline in active_pipelines:

response = start_symbol_trading(pipeline)

if not response["success"]:
logging.info(f"Pipeline {pipeline.id} could not be started. {response['message']}")

while any(is_pipeline_loading(cache, pipeline.id) for pipeline in active_pipelines):
time.sleep(10)

check_app_health()


def create_app():
Expand All @@ -66,7 +80,7 @@ def create_app():
app.config["JWT_SECRET_KEY"] = os.getenv('SECRET_KEY')
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(days=config_vars.token_expires_days)

jwt = JWTManager(app)
JWTManager(app)

CORS(app)

Expand Down
2 changes: 2 additions & 0 deletions data/service/blueprints/bots_api/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from data.service.blueprints.bots_api._bots_api import bots_api
from data.service.blueprints.bots_api._helpers import stop_pipeline, start_symbol_trading
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from functools import reduce

import django
import redis
from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required

from data.service.external_requests import get_strategies, start_stop_symbol_trading
from data.service.blueprints.bots_api._helpers import start_symbol_trading, stop_instance
from data.service.external_requests import get_strategies
from data.service.helpers import check_input, get_or_create_pipeline, extract_request_params, convert_client_request
from shared.utils.config_parser import get_config
from shared.utils.decorators import general_app_error
from data.service.helpers.decorators.handle_app_errors import handle_app_errors
from data.service.helpers.exceptions import PipelineStartFail, DataPipelineDoesNotExist
from data.service.helpers.responses import Responses
from data.sources._sources import DataHandler
from shared.exchanges.binance import BinanceHandler
from shared.utils.decorators import handle_db_connection_error
from shared.utils.helpers import get_item_from_cache, get_logging_row_header
from shared.utils.helpers import get_item_from_cache

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "database.settings")
django.setup()
Expand All @@ -32,58 +29,6 @@

bots_api = Blueprint('bots_api', __name__)

executor = ThreadPoolExecutor(16)

binance_instances = []

binance_client = BinanceHandler()


def initialize_data_collection(pipeline, header):

global binance_instances

data_handler = DataHandler(pipeline, header=header)
binance_instances.append(data_handler.binance_handler)
data_handler.binance_handler.start_data_ingestion(header=header)


def reduce_instances(instances, instance, pipeline_id, header):
if pipeline_id == instance.pipeline_id:
instance.stop_data_ingestion(header=header)
return instances
else:
return [*instances, instance]


def stop_instance(pipeline_id, header):

global binance_instances

binance_instances = reduce(
lambda instances, instance: reduce_instances(instances, instance, pipeline_id, header),
binance_instances,
[]
)


def start_symbol_trading(pipeline):

header = get_logging_row_header(pipeline)

cache.set(
f"pipeline {pipeline.id}",
json.dumps(header)
)

logging.info(header + f"Starting data pipeline.")

executor.submit(
initialize_data_collection,
pipeline,
header
)


@bots_api.put('/start_bot')
@general_app_error
Expand Down Expand Up @@ -111,24 +56,10 @@ def start_bot():
data
)

payload = {
"pipeline_id": pipeline.id,
"binance_trader_type": "futures",
}

response = start_stop_symbol_trading(payload, 'start')
response = start_symbol_trading(pipeline)

if not response["success"]:
pipeline.active = False
pipeline.open_time = None
pipeline.save()

raise PipelineStartFail(response["message"])
else:
pipeline.last_entry = None
pipeline.save()

start_symbol_trading(pipeline)
raise PipelineStartFail(pipeline.id)

return jsonify(Responses.DATA_PIPELINE_START_OK(pipeline))

Expand All @@ -148,13 +79,11 @@ def stop_bot():
pipeline_id = data.get("pipelineId", None)

try:
Pipeline.objects.filter(id=pipeline_id).exists()

header = json.loads(get_item_from_cache(cache, pipeline_id))

logging.info(header + f"Stopping pipeline {pipeline_id}.")

stop_instance(pipeline_id, header=header)
stop_instance(pipeline_id, header=header, raise_exception=True)

pipeline = Pipeline.objects.get(id=pipeline_id)
pipeline.active = False
Expand Down
120 changes: 120 additions & 0 deletions data/service/blueprints/bots_api/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from functools import reduce

import redis

from data.service.external_requests import start_stop_symbol_trading
from data.sources._sources import DataHandler
from shared.exchanges.binance import BinanceHandler
from shared.utils.config_parser import get_config
from shared.utils.helpers import get_logging_row_header, add_pipeline_loading, is_pipeline_loading

config_vars = get_config()

cache = redis.from_url(os.getenv('REDIS_URL', config_vars.redis_url))
executor = ThreadPoolExecutor(16)

binance_instances = []

binance_client = BinanceHandler()


def initialize_data_collection(pipeline, header):

global binance_instances

data_handler = DataHandler(pipeline, header=header)
binance_instances.append(data_handler.binance_handler)
data_handler.binance_handler.start_data_ingestion(header=header)


def reduce_instances(accumulator, instance, pipeline_id, header, raise_exception, force):

if pipeline_id == instance.pipeline_id:
return_value = instance.stop_data_ingestion(
header=header,
raise_exception=raise_exception,
force=force,
)
return {
**accumulator,
"return_values": [*accumulator["return_values"], return_value]
}

else:
return {
**accumulator,
"instances": [*accumulator["instances"], instance]
}


def stop_instance(pipeline_id, header, raise_exception=False, force=False):

global binance_instances

reduced_instances = reduce(
lambda accumulator, instance: reduce_instances(
accumulator, instance, pipeline_id, header, raise_exception, force
),
binance_instances,
{"instances": [], "return_values": []}
)

binance_instances = reduced_instances["instances"]

try:
return reduced_instances["return_values"][0]
except IndexError:
return False


def stop_pipeline(pipeline_id, header='', raise_exception=False, nr_retries=3, force=False):
success = stop_instance(pipeline_id, header, raise_exception, force=force)

retries = 0
while not success:
if retries > nr_retries:
break

success = stop_instance(pipeline_id, header, raise_exception, force=True)

retries += 1

if not success:
time.sleep(60 * retries)

def start_symbol_trading(pipeline):

add_pipeline_loading(cache, pipeline.id)

payload = {
"pipeline_id": pipeline.id,
"binance_trader_type": "futures",
}

header = get_logging_row_header(cache, pipeline)

logging.info(header + f"Starting data pipeline.")

response = start_stop_symbol_trading(payload, 'start')

if response["success"]:
pipeline.last_entry = None
pipeline.save()
else:
pipeline.active = False
pipeline.open_time = None
pipeline.save()

return response

executor.submit(
initialize_data_collection,
pipeline,
header
)

return response
10 changes: 7 additions & 3 deletions data/service/blueprints/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,16 @@ def get_trades_metrics():
except NoSuchPipeline:
aggregate_values = convert_trades_to_dict(query_trades_metrics())

symbols_objs = Symbol.objects.annotate(trade_count=Count('trade', filter=~Q(trade__close_time=None)))
symbols_objs = Symbol.objects.all()

symbols = []

for symbol in symbols_objs:
if symbol.trade_count > 0:
symbol_dict = {"name": symbol.name, "value": symbol.trade_count}

trades = Trade.objects.filter(pipeline__symbol=symbol).exclude(close_time=None).count()

if trades > 0:
symbol_dict = {"name": symbol.name, "value": trades}

symbols.append(symbol_dict)

Expand Down
1 change: 1 addition & 0 deletions data/service/cron_jobs/app_health/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from data.service.cron_jobs.app_health._app_health import check_app_health, check_matching_remote_position
Loading

0 comments on commit 5cddd86

Please sign in to comment.