Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Dash live updates example (Live Bitcoin Tracker) #269

Merged
merged 19 commits into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions examples/dash/dash-live-updates/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Real-Time Bitcoin Tracker with Dash and WebSocket API

Explore how to display real-time, time-series data using Dash and a PostgreSQL extension, TimescaleDB, to dynamically update Bitcoin prices and aggregate trades fetched via the Binance WebSocket API.

![Dash App](dashboard.png)

## Steps for Testing Locally

### 1. Development Environment Setup
```sh
conda create --name YOUR_ENV_NAME python=3.11
conda activate YOUR_ENV_NAME
pip install -r requirements.txt
```

### 2. Save Database Connection Details as Environment Variables
```
DB_USER=<USERNAME>
neelasha23 marked this conversation as resolved.
Show resolved Hide resolved
DB_PASSWORD=<PASSWORD>
DB_NAME=<DATABASE_NAME>
DB_HOST=<HOST>
DB_PORT=<PORT>
```

### 3. Backend Setup
neelasha23 marked this conversation as resolved.
Show resolved Hide resolved
`websocket_backend.py`: connects to the Binance WebSocket API to receive real-time trade data for Bitcoin (BTC/USDT), and stores it in a PostgreSQL database equipped with the TimescaleDB extension.
neelasha23 marked this conversation as resolved.
Show resolved Hide resolved

For local testing, you need to modify `websocket_backend.py` according to your database type. By default, it uses TimescaleDB (a PostgreSQL extension).

If you prefer to use SQLite, you can modify the script to use `asyncio`, `aiosqlite`, and `sqlite3` libraries, along with SQL statements. If needed, an alternative script with SQLite can be added.
neelasha23 marked this conversation as resolved.
Show resolved Hide resolved

Run the backend script:
```sh
python websocket_backend.py
```

### 4. Frontend Setup
neelasha23 marked this conversation as resolved.
Show resolved Hide resolved
`app.py`: establishes a Dash application to visualize the Bitcoin trade data stored in the database.

Run the frontend script to start the Dash application:
```sh
python app.py
```

89 changes: 89 additions & 0 deletions examples/dash/dash-live-updates/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from dash import html, dcc, Output, Input, Dash
import dash_bootstrap_components as dbc
import os, psycopg2
from datetime import datetime
from dotenv import load_dotenv

# Load environment variables
load_dotenv(".env")

update_frequency = 4000
graph_title1 = "Price Change: 5-Minute Rolling Window (BTC/USDT)"
graph_title2 = "Number of Aggregate Trades: 5-Minute Rolling Window"

default_fig = lambda title: dict(
data=[{'x': [], 'y': [], 'name': title}],
layout=dict(
title=dict(text=title, font=dict(color='white')),
xaxis=dict(autorange=True, tickformat="%H:%M:%S", color='white', nticks=8),
yaxis=dict(autorange=True, color="white"),
paper_bgcolor="#2D2D2D",
plot_bgcolor="#2D2D2D"
))

# Initialize the Dash app
app = Dash(external_stylesheets=[dbc.themes.CYBORG])
server = app.server

app.layout = html.Div([
html.H1("Live Bitcoin Price Tracker",
style={"text-align":"center", "padding-top":"20px", "padding-bottom":"20px"}),
html.Hr(),
html.H2(id="price-ticker",
style={"text-align":"center", "padding-top":"10px", "padding-bottom":"10px"}),
dcc.Graph(id="graph-price-change", figure=default_fig(graph_title1)),
dcc.Graph(id="graph-agg-per-min", figure=default_fig(graph_title2)),
dcc.Interval(id="update", interval=update_frequency),
])

# Function to fetch data from the database
def fetch_data():
# Construct the connection string
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_name = os.getenv('DB_NAME')

if not all([db_user, db_password, db_host, db_port, db_name]):
return None, "Database connection details are not fully set in environment variables"

connection_string = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
try:
with psycopg2.connect(connection_string) as conn:
cursor = conn.cursor()
cursor.execute("SELECT price FROM trades WHERE time > NOW() - INTERVAL '1 minute'")
rows = cursor.fetchall()

if not rows:
print("No data available - Are you running websocket_backend.py?")
return None, "No data available"
return rows, None
except psycopg2.Error as e:
print(f"Database connection error: {e}")
return None, "Database connection error"

# Callback to extend the data in the graphs
@app.callback(
Output("graph-price-change", "extendData"),
Output("graph-agg-per-min", "extendData"),
Output("price-ticker", "children"),
Input("update", "n_intervals"),
)
def update_data(intervals):
rows, msg = fetch_data()
if rows == None:
return None, None, msg

current_price = rows[0][0]
total_trades = len(rows)
new_data_price_change = dict(x=[[datetime.now().strftime("%H:%M:%S")]], y=[[current_price]])
new_data_agg_per_min = dict(x=[[datetime.now().strftime("%H:%M:%S")]], y=[[total_trades]])

# (new data, trace to add data to, number of elements to keep)
return ((new_data_price_change, [0], 75),
(new_data_agg_per_min, [0], 75),
f"Current BTC price: {current_price}")

if __name__ == "__main__":
app.run_server(debug=True)
Binary file added examples/dash/dash-live-updates/dashboard.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions examples/dash/dash-live-updates/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
python-dotenv
dash
dash-bootstrap-components
psycopg2-binary
asyncpg
websockets
48 changes: 48 additions & 0 deletions examples/dash/dash-live-updates/rest_api_dash.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

from dash import html, dcc, Output, Input, Dash
import dash_bootstrap_components as dbc
from datetime import datetime
import requests

default_fig = dict(
data=[{'x': [], 'y': [], 'name': 'BTC/USDT'}],
layout=dict(
title=dict(text='Live Bitcoin Price Tracker: 5-Minute Rolling Window (BTC/USDT)', font=dict(color='white')),
xaxis=dict(autorange=True, tickformat="%H:%M:%S", nticks=8, color='white'),
yaxis=dict(autorange=True, color="white"),
paper_bgcolor="#2D2D2D",
plot_bgcolor="#2D2D2D"
))

app = Dash('Live Bitcoin Price Tracker', external_stylesheets=[dbc.themes.CYBORG])
app.layout = html.Div([
html.H1("Live Bitcoin Price Tracker",
style={"text-align":"center",
"padding-top":"40px",
"padding-bottom":"20px"}),
html.Hr(),
html.H2(id="price-ticker",
style={"text-align":"center",
"padding-top":"20px",
"padding-bottom":"20px"}),
dcc.Graph(id="graph-price-change", figure=default_fig),
dcc.Interval(id="update", interval=4000),
])

@app.callback(
Output("graph-price-change", "extendData"),
Output("price-ticker", "children"),
Input("update", "n_intervals"),
)
def update_data(intervals):
time = datetime.now().strftime("%H:%M:%S")
response = requests.get('https://api.binance.us/api/v3/ticker/price?symbol=BTCUSDT')
if response.status_code != 200:
return default_fig, f"Failed to fetch data: {response}"
data = response.json()
price = float(data['price'])
new_data_price_change = dict(x=[[time]], y=[[price]])
return (new_data_price_change, [0], 75), f"Current BTC/USDT Price: {price:.2f}"

if __name__ == "__main__":
app.run_server(debug=True)
61 changes: 61 additions & 0 deletions examples/dash/dash-live-updates/websocket_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import asyncio, asyncpg, datetime, json, os
from websockets import connect
from dotenv import load_dotenv

# Load environment variables
load_dotenv(".env")

async def setup_db():
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_name = os.getenv('DB_NAME')

if not all([db_user, db_password, db_host, db_port, db_name]):
raise ValueError("Database connection details are not fully set in environment variables")

conn = await asyncpg.connect(user=db_user,
password=db_password,
database=db_name,
host=db_host,
port=db_port)
await conn.execute("DROP TABLE IF EXISTS trades")
await conn.execute("""
CREATE TABLE trades(
time TIMESTAMPTZ NOT NULL,
price DOUBLE PRECISION
)
""")
await conn.execute("SELECT create_hypertable('trades', 'time')")
await conn.close()


async def insert_data(url, db_conn):
trades_buffer = []
async with connect(url) as websocket:
while True:
data = await websocket.recv()
data = json.loads(data)
trades_buffer.append((datetime.datetime.fromtimestamp(data['T']/1000.0), float(data['p'])))
print(trades_buffer[-1])
# Write in batches of 10
if len(trades_buffer) > 10:
await db_conn.executemany("""INSERT INTO trades(time, price) VALUES ($1, $2)""", trades_buffer)
trades_buffer = []

async def main():
url_binance = "wss://data-stream.binance.vision/ws/btcusdt@aggTrade"
await setup_db()
db_conn = await asyncpg.connect(user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
database=os.getenv("DB_NAME"),
host=os.getenv("DB_HOST"),
port=os.getenv("DB_PORT"))
try:
await insert_data(url_binance, db_conn)
finally:
await db_conn.close()

if __name__ == "__main__":
asyncio.run(main())
Loading