diff --git a/examples/dash/dash-live-updates/README.md b/examples/dash/dash-live-updates/README.md new file mode 100644 index 00000000..26ce6fda --- /dev/null +++ b/examples/dash/dash-live-updates/README.md @@ -0,0 +1,126 @@ +# Real-Time Bitcoin Monitoring App with Dash and WebSocket API + +Explore how to display real-time, time-series data using Dash and a PostgreSQL extension, TimescaleDB, to dynamically update Bitcoin prices and aggregate trades fetched via the Binance WebSocket API. + +![Dash App](images/dashboard.png) + +## Architecture +Our Bitcoin trade monitoring app consists of two main components: + +- **Backend (Data Fetching and Storage)**: This component connects to the [Binance WebSocket API](https://developers.binance.com/docs/binance-spot-api-docs/web-socket-streams) to receive real-time trade data for Bitcoin (BTC/USDT). It processes the incoming data and stores it in a PostgreSQL database equipped with the TimescaleDB extension for efficient time-series data management. The backend functionality is implemented in `websocket_backend.py`. + +- **Frontend (Real-time Monitoring Dash App)**: This component establishes a Dash application to visualize the Bitcoin trade data stored in the database. It utilizes Dash’s `dcc.Interval` component to periodically fetch the latest data and update the graph. The frontend is implemented in `app.py`. + +Note: `rest_api_dash.py` is an example code that uses the Binance REST API to fetch data and display it immediately without the need for a database. However, due to its slower update rate, this approach was not adopted. + +## Steps for Testing Locally + +### Development Environment Setup +```sh +conda create --name YOUR_ENV_NAME python=3.11 +conda activate YOUR_ENV_NAME +pip install -r requirements.txt +``` + +### Backend for Data Fetching and Storage +#### 1. Database Setup +`websocket_backend.py` uses [Timescale Database](https://www.timescale.com/) (a PostgreSQL extension). As of August 2024, TimescaleDB offers a 30-day free trial, so you can follow this tutorial using it without the code modification, or adapt the steps and update `websocket_backend.py` for a DB of your choice. The steps and coding approach will remain similar regardless of the database used. + +To start with Timescale DB, sign up [here](https://console.cloud.timescale.com/signup), and click `New service` in your dashboard. + +![Database Setup 0](images/timescale0.png) + +Configure your service: +![Database Setup 1](images/timescale1.png) + +Now save your connection information. This is crucial as you will need this information to connect both your backend and frontend scripts to the database. + +![Database Setup 2](images/timescale2.png) + +You can either manually set your connection details as environment variables using `export DB_USER=`, or utilize a `.env` file in your working directory and load it via `load_dotenv(".env")` at the beginning of each file. Your `.env` file should be formatted as follows: + +``` +DB_USER= +DB_PASSWORD= +DB_NAME= +DB_HOST= +DB_PORT= +``` + +After setup, your [dashboard](https://console.cloud.timescale.com/dashboard/services) will display your new service. If you haven't saved your connection details, click on your service and scroll to the bottom to find them, and save them via the mentioned method above. +![Database Setup 3](images/timescale3.png) + +#### 2. Update `websocket_backend.py` +After ensuring all environment variables are set, update `websocket_backend.py` accordingly if you are using different database than TimescaleDB. + +#### 3. Run `websocket_backend.py` +Run the backend script and see if it's successfully printing out the real-time data. +```sh +python websocket_backend.py +``` + +### Frontend for Real-time Monitoring Dash App + +Run the frontend script to start the Dash application: +```sh +python app.py +``` + +### Deployment on Ploomber Cloud + +Ploomber Cloud supports two deployment methods: +- Graphical User Interface (GUI) +- Command Line Interface (CLI) + +To deploy your Dash app on Ploomber Cloud, you need: + +- `app.py` +- `requirements.txt` +- `.env` (only for CLI method) + +`requirements.txt` in this directory includes all libraries you need for both backend and Dash app. To deploy your Dash app, delete libraries from requirements.txt that are not needed. Your `requirements.txt` should be: +``` +dash +dash-bootstrap-components +psycopg2-binary +``` + +#### Graphical User Interface (GUI) + +Log into your [Ploomber Cloud account](https://www.platform.ploomber.io/applications). + +Click the NEW button: + +![GUI Deployment](images/gui_deploy1.png) + +Select the Dash option, and upload your code (`app.py` and `requirements.txt`) as a zip file in the source code section: + +![GUI Deployment](images/gui_deploy2.png) + +In the secret section, add your database's connection details as environment variables required for our app. + +![GUI Deployment](images/gui_deploy3.png) + +After optionally customizing some settings, click `CREATE`. + +#### Command Line Interface (CLI) +Make sure you have a `.env` file for your DB connection information. + +If you haven't installed `ploomber-cloud`, run: +```sh +pip install ploomber-cloud +``` + +Set your API key following [this documentation](https://docs.cloud.ploomber.io/en/latest/quickstart/apikey.html). +```sh +ploomber-cloud key YOURKEY +``` + +Run the following commands: +```sh +cd +ploomber-cloud init +ploomber-cloud deploy +``` + +For more details about the CLI method, see this [documentation](https://docs.cloud.ploomber.io/en/latest/user-guide/cli.html). \ No newline at end of file diff --git a/examples/dash/dash-live-updates/app.py b/examples/dash/dash-live-updates/app.py new file mode 100644 index 00000000..8311d2df --- /dev/null +++ b/examples/dash/dash-live-updates/app.py @@ -0,0 +1,98 @@ +from dash import html, dcc, Output, Input, Dash +import dash_bootstrap_components as dbc +import os, psycopg2 + +update_frequency = 4000 +graph_title1 = "Price Change: 5-Minute Rolling Window (BTC/USDT)" +graph_title2 = "Aggregated Trades per Minute: 5-Minute Rolling Window" + +default_fig = lambda title: dict( + data=[{'x': [], 'y': [], 'name': title}], + layout=dict( + title=dict(text=title, font=dict(color='white')), + xaxis=dict(autorange=True, tickformat="%H:%M:%S", color='white', nticks=8), + yaxis=dict(autorange=True, color="white"), + paper_bgcolor="#2D2D2D", + plot_bgcolor="#2D2D2D" + )) + +# Initialize the Dash app +app = Dash(external_stylesheets=[dbc.themes.CYBORG]) +server = app.server + +app.layout = html.Div([ + html.H1("Live Bitcoin Monitoring App", + style={"text-align":"center", "padding-top":"20px", "padding-bottom":"20px"}), + html.Hr(), + html.H2(id="price-ticker", + style={"text-align":"center", "padding-top":"10px", "padding-bottom":"10px"}), + dcc.Graph(id="graph-price-change", figure=default_fig(graph_title1)), + dcc.Graph(id="graph-agg-per-min", figure=default_fig(graph_title2)), + dcc.Interval(id="update", interval=update_frequency), + ]) + +previous_time = None +# Function to fetch data from the database +def fetch_data(): + global previous_time + # Construct the connection string + db_user = os.getenv('DB_USER') + db_password = os.getenv('DB_PASSWORD') + db_host = os.getenv('DB_HOST') + db_port = os.getenv('DB_PORT') + db_name = os.getenv('DB_NAME') + + if not all([db_user, db_password, db_host, db_port, db_name]): + return None, "Database connection details are not fully set in environment variables" + + connection_string = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}" + try: + with psycopg2.connect(connection_string) as conn: + cursor = conn.cursor() + cursor.execute(""" + SELECT * + FROM trades + WHERE time >= ( + SELECT MAX(time) + FROM trades + ) - INTERVAL '1 minute'; + """) + rows = cursor.fetchall() + + if not rows or (previous_time and rows[0][0] <= previous_time): + print("No data available - Are you running websocket_backend.py?") + return None, "No data available" + + previous_time = rows[0][0] + + return rows, None + + except psycopg2.Error as e: + print(f"Database connection error: {e}") + return None, "Database connection error" + +# Callback to extend the data in the graphs +@app.callback( + Output("graph-price-change", "extendData"), + Output("graph-agg-per-min", "extendData"), + Output("price-ticker", "children"), + Input("update", "n_intervals"), + ) +def update_data(intervals): + rows, msg = fetch_data() + if rows == None: + return None, None, msg + + current_price = rows[0][1] + total_trades = len(rows) + new_data_price_change = dict(x=[[rows[0][0]]], y=[[current_price]]) + new_data_agg_per_min = dict(x=[[rows[0][0]]], y=[[total_trades]]) + print(f"Current X: {rows[0][0]}") + + # (new data, trace to add data to, number of elements to keep) + return ((new_data_price_change, [0], 75), + (new_data_agg_per_min, [0], 75), + f"Current BTC price: {current_price}") + +if __name__ == "__main__": + app.run_server(debug=True) \ No newline at end of file diff --git a/examples/dash/dash-live-updates/images/dashboard.png b/examples/dash/dash-live-updates/images/dashboard.png new file mode 100644 index 00000000..71f7a308 Binary files /dev/null and b/examples/dash/dash-live-updates/images/dashboard.png differ diff --git a/examples/dash/dash-live-updates/images/gui_deploy1.png b/examples/dash/dash-live-updates/images/gui_deploy1.png new file mode 100644 index 00000000..056ee118 Binary files /dev/null and b/examples/dash/dash-live-updates/images/gui_deploy1.png differ diff --git a/examples/dash/dash-live-updates/images/gui_deploy2.png b/examples/dash/dash-live-updates/images/gui_deploy2.png new file mode 100644 index 00000000..33193735 Binary files /dev/null and b/examples/dash/dash-live-updates/images/gui_deploy2.png differ diff --git a/examples/dash/dash-live-updates/images/gui_deploy3.png b/examples/dash/dash-live-updates/images/gui_deploy3.png new file mode 100644 index 00000000..ce13f285 Binary files /dev/null and b/examples/dash/dash-live-updates/images/gui_deploy3.png differ diff --git a/examples/dash/dash-live-updates/images/timescale0.png b/examples/dash/dash-live-updates/images/timescale0.png new file mode 100644 index 00000000..cc98a8a3 Binary files /dev/null and b/examples/dash/dash-live-updates/images/timescale0.png differ diff --git a/examples/dash/dash-live-updates/images/timescale1.png b/examples/dash/dash-live-updates/images/timescale1.png new file mode 100644 index 00000000..ed2987a1 Binary files /dev/null and b/examples/dash/dash-live-updates/images/timescale1.png differ diff --git a/examples/dash/dash-live-updates/images/timescale2.png b/examples/dash/dash-live-updates/images/timescale2.png new file mode 100644 index 00000000..fd2512a9 Binary files /dev/null and b/examples/dash/dash-live-updates/images/timescale2.png differ diff --git a/examples/dash/dash-live-updates/images/timescale3.png b/examples/dash/dash-live-updates/images/timescale3.png new file mode 100644 index 00000000..7adf9572 Binary files /dev/null and b/examples/dash/dash-live-updates/images/timescale3.png differ diff --git a/examples/dash/dash-live-updates/requirements.txt b/examples/dash/dash-live-updates/requirements.txt new file mode 100644 index 00000000..13163c86 --- /dev/null +++ b/examples/dash/dash-live-updates/requirements.txt @@ -0,0 +1,6 @@ +python-dotenv +dash +dash-bootstrap-components +psycopg2-binary +asyncpg +websockets \ No newline at end of file diff --git a/examples/dash/dash-live-updates/rest_api_dash.py b/examples/dash/dash-live-updates/rest_api_dash.py new file mode 100644 index 00000000..c48964cd --- /dev/null +++ b/examples/dash/dash-live-updates/rest_api_dash.py @@ -0,0 +1,48 @@ + +from dash import html, dcc, Output, Input, Dash +import dash_bootstrap_components as dbc +from datetime import datetime +import requests + +default_fig = dict( + data=[{'x': [], 'y': [], 'name': 'BTC/USDT'}], + layout=dict( + title=dict(text='Live Bitcoin Price Tracker: 5-Minute Rolling Window (BTC/USDT)', font=dict(color='white')), + xaxis=dict(autorange=True, tickformat="%H:%M:%S", nticks=8, color='white'), + yaxis=dict(autorange=True, color="white"), + paper_bgcolor="#2D2D2D", + plot_bgcolor="#2D2D2D" + )) + +app = Dash('Live Bitcoin Price Tracker', external_stylesheets=[dbc.themes.CYBORG]) +app.layout = html.Div([ + html.H1("Live Bitcoin Price Tracker", + style={"text-align":"center", + "padding-top":"40px", + "padding-bottom":"20px"}), + html.Hr(), + html.H2(id="price-ticker", + style={"text-align":"center", + "padding-top":"20px", + "padding-bottom":"20px"}), + dcc.Graph(id="graph-price-change", figure=default_fig), + dcc.Interval(id="update", interval=4000), + ]) + +@app.callback( + Output("graph-price-change", "extendData"), + Output("price-ticker", "children"), + Input("update", "n_intervals"), + ) +def update_data(intervals): + time = datetime.now().strftime("%H:%M:%S") + response = requests.get('https://api.binance.us/api/v3/ticker/price?symbol=BTCUSDT') + if response.status_code != 200: + return default_fig, f"Failed to fetch data: {response}" + data = response.json() + price = float(data['price']) + new_data_price_change = dict(x=[[time]], y=[[price]]) + return (new_data_price_change, [0], 75), f"Current BTC/USDT Price: {price:.2f}" + +if __name__ == "__main__": + app.run_server(debug=True) \ No newline at end of file diff --git a/examples/dash/dash-live-updates/websocket_backend.py b/examples/dash/dash-live-updates/websocket_backend.py new file mode 100644 index 00000000..da758150 --- /dev/null +++ b/examples/dash/dash-live-updates/websocket_backend.py @@ -0,0 +1,61 @@ +import asyncio, asyncpg, datetime, json, os +from websockets import connect +from dotenv import load_dotenv + +# Load environment variables +load_dotenv(".env") + +async def setup_db(): + db_user = os.getenv('DB_USER') + db_password = os.getenv('DB_PASSWORD') + db_host = os.getenv('DB_HOST') + db_port = os.getenv('DB_PORT') + db_name = os.getenv('DB_NAME') + + if not all([db_user, db_password, db_host, db_port, db_name]): + raise ValueError("Database connection details are not fully set in environment variables") + + conn = await asyncpg.connect(user=db_user, + password=db_password, + database=db_name, + host=db_host, + port=db_port) + await conn.execute("DROP TABLE IF EXISTS trades") + await conn.execute(""" + CREATE TABLE trades( + time TIMESTAMP NOT NULL, + price DOUBLE PRECISION + ) + """) + await conn.execute("SELECT create_hypertable('trades', 'time')") + await conn.close() + + +async def insert_data(url, db_conn): + trades_buffer = [] + async with connect(url) as websocket: + while True: + data = await websocket.recv() + data = json.loads(data) + trades_buffer.append((datetime.datetime.fromtimestamp(data['T']/1000.0), float(data['p']))) + print(trades_buffer[-1]) + # Write in batches of 5 + if len(trades_buffer) > 5: + await db_conn.executemany("""INSERT INTO trades(time, price) VALUES ($1, $2)""", trades_buffer) + trades_buffer = [] + +async def main(): + url_binance = "wss://data-stream.binance.vision/ws/btcusdt@aggTrade" + await setup_db() + db_conn = await asyncpg.connect(user=os.getenv("DB_USER"), + password=os.getenv("DB_PASSWORD"), + database=os.getenv("DB_NAME"), + host=os.getenv("DB_HOST"), + port=os.getenv("DB_PORT")) + try: + await insert_data(url_binance, db_conn) + finally: + await db_conn.close() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file