Skip to content

Commit

Permalink
Check App health at startup
Browse files Browse the repository at this point in the history
  • Loading branch information
diogomatoschaves committed Mar 1, 2024
1 parent ab53e1d commit 40109c6
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 10 deletions.
13 changes: 12 additions & 1 deletion data/service/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import logging
import os
import sys
import time
from datetime import timedelta

from flask import Flask, send_from_directory
Expand All @@ -10,9 +12,10 @@
import redis
from flask_jwt_extended import JWTManager, create_access_token

from data.service.cron_jobs.app_health import check_matching_remote_position
from data.service.cron_jobs.app_health import check_app_health
from data.service.cron_jobs.main import start_background_scheduler
from shared.utils.config_parser import get_config
from shared.utils.helpers import is_pipeline_loading, LOADING

module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
Expand Down Expand Up @@ -40,6 +43,8 @@

def startup_task(app):

cache.set(LOADING, json.dumps([]))

start_background_scheduler(config_vars)

active_pipelines = Pipeline.objects.filter(active=True)
Expand All @@ -50,11 +55,17 @@ def startup_task(app):
cache.set("bearer_token", bearer_token)

for pipeline in active_pipelines:

response = start_symbol_trading(pipeline)

if not response["success"]:
logging.info(f"Pipeline {pipeline.id} could not be started. {response['message']}")

Check warning on line 62 in data/service/app.py

View check run for this annotation

Codecov / codecov/patch

data/service/app.py#L62

Added line #L62 was not covered by tests

while any(is_pipeline_loading(cache, pipeline.id) for pipeline in active_pipelines):
time.sleep(10)

Check warning on line 65 in data/service/app.py

View check run for this annotation

Codecov / codecov/patch

data/service/app.py#L65

Added line #L65 was not covered by tests

check_app_health()


def create_app():

Expand Down
5 changes: 4 additions & 1 deletion data/service/blueprints/bots_api/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from data.sources._sources import DataHandler
from shared.exchanges.binance import BinanceHandler
from shared.utils.config_parser import get_config
from shared.utils.helpers import get_logging_row_header
from shared.utils.helpers import get_logging_row_header, add_pipeline_loading, is_pipeline_loading

config_vars = get_config()

Expand Down Expand Up @@ -87,6 +87,9 @@ def stop_pipeline(pipeline_id, header='', raise_exception=False, nr_retries=3, f
time.sleep(60 * retries)

def start_symbol_trading(pipeline):

add_pipeline_loading(cache, pipeline.id)

payload = {
"pipeline_id": pipeline.id,
"binance_trader_type": "futures",
Expand Down
3 changes: 3 additions & 0 deletions data/service/cron_jobs/app_health/_app_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import django
import pytz
import redis

from data.service.blueprints.bots_api import stop_pipeline, start_symbol_trading
from data.service.external_requests import get_open_positions, start_stop_symbol_trading
Expand All @@ -19,6 +20,8 @@

config = get_config()

cache = redis.from_url(os.getenv('REDIS_URL', config.redis_url))


def find_position(positions, symbol):
"""
Expand Down
4 changes: 3 additions & 1 deletion data/sources/binance/_binance.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from data.sources.binance.load import load_data
from data.sources.binance.transform import resample_data, transform_data
from shared.utils.helpers import get_minimum_lookback_date, get_pipeline_max_window
from shared.utils.helpers import get_minimum_lookback_date, get_pipeline_max_window, remove_pipeline_loading
from shared.exchanges.binance import BinanceHandler
from shared.utils.config_parser import get_config
import shared.exchanges.binance.constants as const
Expand Down Expand Up @@ -157,6 +157,8 @@ def start_data_ingestion(self, header=''):
if start_pipeline:
self._start_kline_websockets(self.symbol, self._websocket_callback, header=header)

remove_pipeline_loading(cache, self.pipeline_id)

def stop_data_ingestion(self, header='', raise_exception=False, force=False):
"""
Public method which stops the data pipeline for the symbol.
Expand Down
17 changes: 13 additions & 4 deletions data/tests/setup/fixtures/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
mock_get_strategies,
mock_redis_connection_bots_api,
mock_redis_connection_bots_api_helpers,
spy_start_symbol_trading
mock_redis_connection_external_requests,
mock_start_stop_symbol_trading_success_true,
spy_start_symbol_trading,
mock_get_open_positions,
)
from shared.utils.tests.fixtures.external_modules import mock_jwt_required
from shared.utils.tests.fixtures.external_modules import mock_jwt_required, mock_requests_post
from shared.utils.tests.fixtures.models import *


Expand All @@ -32,15 +35,19 @@ def app(
mock_redis_connection,
mock_redis_connection_bots_api,
mock_redis_connection_bots_api_helpers,
mock_redis_connection_external_requests,
mock_start_stop_symbol_trading_success_true,
mock_jwt_required,
mock_settings_env_vars,
mock_get_strategies,
mock_binance_client_exchange_info,
mock_requests_post,
create_exchange,
create_assets,
create_symbol,
spy_start_symbol_trading,
monkeypatch
monkeypatch,
mock_get_open_positions
):

# monkeypatch.setenv("TEST", )
Expand All @@ -58,16 +65,18 @@ def client(app):
@pytest.fixture
def app_with_open_position(
db,
# mock_client_env_vars,
mock_create_access_token,
mock_redis_connection,
mock_redis_connection_bots_api,
mock_redis_connection_bots_api_helpers,
mock_redis_connection_external_requests,
mock_jwt_required,
mock_settings_env_vars,
mock_get_strategies,
mock_binance_client_exchange_info,
mock_get_open_positions,
mock_start_stop_symbol_trading_success_true,
mock_requests_post,
spy_start_stop_symbol_trading,
fake_executor_submit,
create_exchange,
Expand Down
7 changes: 4 additions & 3 deletions data/tests/setup/fixtures/internal_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,12 @@ def mock_get_open_positions_unsuccessful(mocker):
[
'check_inconsistencies',
'restart_failed_pipelines',
'restart_retries'
'restart_retries',
'redis_url'
]
)
fake_config_no_restart = FakeConfig('false', 'false', '2')
fake_config_no_retries = FakeConfig('false', 'true', '0')
fake_config_no_restart = FakeConfig('false', 'false', '2', '')
fake_config_no_retries = FakeConfig('false', 'true', '0', '')


@pytest.fixture
Expand Down
4 changes: 4 additions & 0 deletions shared/utils/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from shared.utils.helpers._helpers import (
get_logging_row_header,
get_item_from_cache,
add_pipeline_loading,
remove_pipeline_loading,
is_pipeline_loading,
get_pipeline_data,
get_input_dimensions,
convert_trade,
get_pipeline_max_window,
get_minimum_lookback_date,
get_root_dir,
LOADING
)
37 changes: 37 additions & 0 deletions shared/utils/helpers/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
translator = str.maketrans('', '', escapes)


LOADING = "Loading"


PIPELINE = namedtuple(
'Pipeline',
[
Expand Down Expand Up @@ -65,6 +68,40 @@ def get_item_from_cache(cache, key):
return item if item else '""'


def add_pipeline_loading(cache, pipeline_id):

loading = set(json.loads(cache.get(LOADING))) if cache.get(LOADING) is not None else set()

loading.add(pipeline_id)

cache.set(
LOADING,
json.dumps(list(set(loading)))
)


def remove_pipeline_loading(cache, pipeline_id):

loading = set(json.loads(cache.get(LOADING))) if cache.get(LOADING) is not None else set()

try:
loading.remove(pipeline_id)
except KeyError:
pass

cache.set(
LOADING,
json.dumps(list(loading))
)


def is_pipeline_loading(cache, pipeline_id):

loading = set(json.loads(cache.get(LOADING)))

return pipeline_id in loading


def get_pipeline_data(pipeline_id, return_obj=False, ignore_exception=False):

if pipeline_id is None and ignore_exception:
Expand Down

0 comments on commit 40109c6

Please sign in to comment.