diff --git a/data/service/app.py b/data/service/app.py index dffeedd..ac94a79 100644 --- a/data/service/app.py +++ b/data/service/app.py @@ -1,6 +1,8 @@ +import json import logging import os import sys +import time from datetime import timedelta from flask import Flask, send_from_directory @@ -10,9 +12,10 @@ import redis from flask_jwt_extended import JWTManager, create_access_token -from data.service.cron_jobs.app_health import check_matching_remote_position +from data.service.cron_jobs.app_health import check_app_health from data.service.cron_jobs.main import start_background_scheduler from shared.utils.config_parser import get_config +from shared.utils.helpers import is_pipeline_loading, LOADING module_path = os.path.abspath(os.path.join('..')) if module_path not in sys.path: @@ -40,6 +43,8 @@ def startup_task(app): + cache.set(LOADING, json.dumps([])) + start_background_scheduler(config_vars) active_pipelines = Pipeline.objects.filter(active=True) @@ -50,11 +55,17 @@ def startup_task(app): cache.set("bearer_token", bearer_token) for pipeline in active_pipelines: + response = start_symbol_trading(pipeline) if not response["success"]: logging.info(f"Pipeline {pipeline.id} could not be started. {response['message']}") + while any(is_pipeline_loading(cache, pipeline.id) for pipeline in active_pipelines): + time.sleep(10) + + check_app_health() + def create_app(): diff --git a/data/service/blueprints/bots_api/_helpers.py b/data/service/blueprints/bots_api/_helpers.py index 1f42455..a135f23 100644 --- a/data/service/blueprints/bots_api/_helpers.py +++ b/data/service/blueprints/bots_api/_helpers.py @@ -10,7 +10,7 @@ from data.sources._sources import DataHandler from shared.exchanges.binance import BinanceHandler from shared.utils.config_parser import get_config -from shared.utils.helpers import get_logging_row_header +from shared.utils.helpers import get_logging_row_header, add_pipeline_loading, is_pipeline_loading config_vars = get_config() @@ -87,6 +87,9 @@ def stop_pipeline(pipeline_id, header='', raise_exception=False, nr_retries=3, f time.sleep(60 * retries) def start_symbol_trading(pipeline): + + add_pipeline_loading(cache, pipeline.id) + payload = { "pipeline_id": pipeline.id, "binance_trader_type": "futures", diff --git a/data/service/cron_jobs/app_health/_app_health.py b/data/service/cron_jobs/app_health/_app_health.py index 57cb11a..6257756 100644 --- a/data/service/cron_jobs/app_health/_app_health.py +++ b/data/service/cron_jobs/app_health/_app_health.py @@ -5,6 +5,7 @@ import django import pytz +import redis from data.service.blueprints.bots_api import stop_pipeline, start_symbol_trading from data.service.external_requests import get_open_positions, start_stop_symbol_trading @@ -19,6 +20,8 @@ config = get_config() +cache = redis.from_url(os.getenv('REDIS_URL', config.redis_url)) + def find_position(positions, symbol): """ diff --git a/data/sources/binance/_binance.py b/data/sources/binance/_binance.py index d0d9f84..6d18ee6 100644 --- a/data/sources/binance/_binance.py +++ b/data/sources/binance/_binance.py @@ -20,7 +20,7 @@ ) from data.sources.binance.load import load_data from data.sources.binance.transform import resample_data, transform_data -from shared.utils.helpers import get_minimum_lookback_date, get_pipeline_max_window +from shared.utils.helpers import get_minimum_lookback_date, get_pipeline_max_window, remove_pipeline_loading from shared.exchanges.binance import BinanceHandler from shared.utils.config_parser import get_config import shared.exchanges.binance.constants as const @@ -157,6 +157,8 @@ def start_data_ingestion(self, header=''): if start_pipeline: self._start_kline_websockets(self.symbol, self._websocket_callback, header=header) + remove_pipeline_loading(cache, self.pipeline_id) + def stop_data_ingestion(self, header='', raise_exception=False, force=False): """ Public method which stops the data pipeline for the symbol. diff --git a/data/tests/setup/fixtures/app.py b/data/tests/setup/fixtures/app.py index 808e7c7..c1fafed 100644 --- a/data/tests/setup/fixtures/app.py +++ b/data/tests/setup/fixtures/app.py @@ -10,9 +10,12 @@ mock_get_strategies, mock_redis_connection_bots_api, mock_redis_connection_bots_api_helpers, - spy_start_symbol_trading + mock_redis_connection_external_requests, + mock_start_stop_symbol_trading_success_true, + spy_start_symbol_trading, + mock_get_open_positions, ) -from shared.utils.tests.fixtures.external_modules import mock_jwt_required +from shared.utils.tests.fixtures.external_modules import mock_jwt_required, mock_requests_post from shared.utils.tests.fixtures.models import * @@ -32,15 +35,19 @@ def app( mock_redis_connection, mock_redis_connection_bots_api, mock_redis_connection_bots_api_helpers, + mock_redis_connection_external_requests, + mock_start_stop_symbol_trading_success_true, mock_jwt_required, mock_settings_env_vars, mock_get_strategies, mock_binance_client_exchange_info, + mock_requests_post, create_exchange, create_assets, create_symbol, spy_start_symbol_trading, - monkeypatch + monkeypatch, + mock_get_open_positions ): # monkeypatch.setenv("TEST", ) @@ -58,16 +65,18 @@ def client(app): @pytest.fixture def app_with_open_position( db, - # mock_client_env_vars, mock_create_access_token, mock_redis_connection, mock_redis_connection_bots_api, mock_redis_connection_bots_api_helpers, + mock_redis_connection_external_requests, mock_jwt_required, mock_settings_env_vars, mock_get_strategies, mock_binance_client_exchange_info, + mock_get_open_positions, mock_start_stop_symbol_trading_success_true, + mock_requests_post, spy_start_stop_symbol_trading, fake_executor_submit, create_exchange, diff --git a/data/tests/setup/fixtures/internal_modules.py b/data/tests/setup/fixtures/internal_modules.py index e318c36..37d796f 100644 --- a/data/tests/setup/fixtures/internal_modules.py +++ b/data/tests/setup/fixtures/internal_modules.py @@ -372,11 +372,12 @@ def mock_get_open_positions_unsuccessful(mocker): [ 'check_inconsistencies', 'restart_failed_pipelines', - 'restart_retries' + 'restart_retries', + 'redis_url' ] ) -fake_config_no_restart = FakeConfig('false', 'false', '2') -fake_config_no_retries = FakeConfig('false', 'true', '0') +fake_config_no_restart = FakeConfig('false', 'false', '2', '') +fake_config_no_retries = FakeConfig('false', 'true', '0', '') @pytest.fixture diff --git a/shared/utils/helpers/__init__.py b/shared/utils/helpers/__init__.py index e0ba585..459b76c 100644 --- a/shared/utils/helpers/__init__.py +++ b/shared/utils/helpers/__init__.py @@ -1,10 +1,14 @@ from shared.utils.helpers._helpers import ( get_logging_row_header, get_item_from_cache, + add_pipeline_loading, + remove_pipeline_loading, + is_pipeline_loading, get_pipeline_data, get_input_dimensions, convert_trade, get_pipeline_max_window, get_minimum_lookback_date, get_root_dir, + LOADING ) diff --git a/shared/utils/helpers/_helpers.py b/shared/utils/helpers/_helpers.py index a42d146..f99213c 100644 --- a/shared/utils/helpers/_helpers.py +++ b/shared/utils/helpers/_helpers.py @@ -20,6 +20,9 @@ translator = str.maketrans('', '', escapes) +LOADING = "Loading" + + PIPELINE = namedtuple( 'Pipeline', [ @@ -65,6 +68,40 @@ def get_item_from_cache(cache, key): return item if item else '""' +def add_pipeline_loading(cache, pipeline_id): + + loading = set(json.loads(cache.get(LOADING))) if cache.get(LOADING) is not None else set() + + loading.add(pipeline_id) + + cache.set( + LOADING, + json.dumps(list(set(loading))) + ) + + +def remove_pipeline_loading(cache, pipeline_id): + + loading = set(json.loads(cache.get(LOADING))) if cache.get(LOADING) is not None else set() + + try: + loading.remove(pipeline_id) + except KeyError: + pass + + cache.set( + LOADING, + json.dumps(list(loading)) + ) + + +def is_pipeline_loading(cache, pipeline_id): + + loading = set(json.loads(cache.get(LOADING))) + + return pipeline_id in loading + + def get_pipeline_data(pipeline_id, return_obj=False, ignore_exception=False): if pipeline_id is None and ignore_exception: