Skip to content

Commit

Permalink
Merge pull request #385 from LACMTA/dev
Browse files Browse the repository at this point in the history
Add connection pooling with PgBouncer and enable non-root user
  • Loading branch information
albertkun authored Nov 16, 2023
2 parents bb09ba1 + e068d58 commit c5e3dca
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 35 deletions.
31 changes: 20 additions & 11 deletions data-loading-service/app/update_canceled_trips.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,26 @@ def run_update():
except Exception as e:
# logger.exception('FTP transfer failed: ' + str(e))
print('FTP transfer failed: ' + str(e))
from sqlalchemy.orm import Session

def load_canceled_service_into_db(path_to_json_file):
with open(path_to_json_file) as json_file:
opened_json_file = json.load(json_file)
canceled_data_frame = pd.json_normalize(data=opened_json_file['CanceledService'])
canceled_data_frame['trp_route'] = canceled_data_frame['trp_route'].str.replace(' ','')
canceled_data_frame['dty_number'] = canceled_data_frame['dty_number'].str.replace(' ','')
canceled_data_frame['LastUpdateDate'] = canceled_data_frame['LastUpdateDate'].str.split(';').str[0].str.replace('_',' ')
session = Session(bind=engine)
try:
with open(path_to_json_file) as json_file:
opened_json_file = json.load(json_file)
canceled_data_frame = pd.json_normalize(data=opened_json_file['CanceledService'])
canceled_data_frame['trp_route'] = canceled_data_frame['trp_route'].str.replace(' ','')
canceled_data_frame['dty_number'] = canceled_data_frame['dty_number'].str.replace(' ','')
canceled_data_frame['LastUpdateDate'] = canceled_data_frame['LastUpdateDate'].str.split(';').str[0].str.replace('_',' ')

canceled_data_frame_from_database = pd.read_sql_query('select * from "canceled_service"',con=engine)
canceled_data_frame_from_database = canceled_data_frame_from_database.drop_duplicates(subset=['dpce_date','m_gtfs_trip_id'], keep='first')
combined_df = pd.concat([canceled_data_frame_from_database,canceled_data_frame],ignore_index=True)
combined_df.drop_duplicates(subset=['dpce_date','m_gtfs_trip_id'], keep='first')
combined_df.to_sql('canceled_service',engine,index=False,if_exists="replace",schema=Config.TARGET_DB_SCHEMA)
canceled_data_frame_from_database = pd.read_sql_query('select * from "canceled_service"',con=session.bind)
canceled_data_frame_from_database = canceled_data_frame_from_database.drop_duplicates(subset=['dpce_date','m_gtfs_trip_id'], keep='first')
combined_df = pd.concat([canceled_data_frame_from_database,canceled_data_frame],ignore_index=True)
combined_df.drop_duplicates(subset=['dpce_date','m_gtfs_trip_id'], keep='first')
combined_df.to_sql('canceled_service',session.bind,index=False,if_exists="replace",schema=Config.TARGET_DB_SCHEMA)
session.commit()
except:
session.rollback()
raise
finally:
session.close()
6 changes: 4 additions & 2 deletions data-loading-service/app/utils/database_connector.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# Using SQLAlchemy to connect to the Database

from sqlalchemy import create_engine,MetaData
from sqlalchemy import create_engine,MetaData, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from config import Config
# from .utils.log_helper import *

engine = create_engine(Config.API_DB_URI, echo=False)
engine = create_engine(Config.API_DB_URI, echo=False,pool_size=20, max_overflow=0)

Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Expand All @@ -18,6 +18,8 @@
def get_db():
db = Session()
try:
# Execute a simple query to keep the connection alive
db.execute(text("SELECT 1"))
yield db
finally:
db.close()
43 changes: 27 additions & 16 deletions fastapi/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,8 @@ FROM lacmta/geodb-base

WORKDIR /app

#
# WORKDIR /code
COPY requirements.txt .

#
# ARG FTP_SERVER
# COPY use-secret.sh .
# RUN --mount=type=secret,id=ftp_server ./use-secret.sh
# RUN --mount=type=secret,id=ftp_server ./use_secret.sh


RUN apt-get update && apt-get install -y \
gcc \
git \
Expand All @@ -24,16 +15,36 @@ RUN apt-get update && apt-get install -y \
python3-dev \
proj-bin \
libgeos-dev \
pgbouncer \
gosu \
&& rm -rf /var/lib/apt/lists/* \
&& pip install --no-cache-dir -r requirements.txt

COPY . .
# COPY ../appdata ../appdata
# COPY .git /code/.git
# VOLUME /appdata
#

# OLD UVICORN COMMAND 11/17/2022
# CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "80"]
CMD ["gunicorn", "app.main:app", "--workers", "2", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:80", "--timeout", "120", "--keep-alive", "5"]
# Create a new user and switch to that user
RUN useradd -ms /bin/bash pgbouncer && \
chown -R pgbouncer:pgbouncer /etc/pgbouncer

CMD python -c "\
import os;\
from urllib.parse import urlparse;\
result = urlparse(os.environ['API_DB_URI']);\
print(f'export DB_USER={result.username}');\
print(f'export DB_PASSWORD={result.password}');\
print(f'export DB_HOST={result.hostname}');\
print(f'export DB_PORT={result.port}');\
print(f'export DB_NAME={result.path[1:]}')" > /tmp/env.sh && \
. /tmp/env.sh && \
echo "\"$DB_USER\" \"md5`echo -n \"$DB_PASSWORD$DB_USER\" | md5sum | cut -d ' ' -f1`\"" > /etc/pgbouncer/userlist.txt && \
echo "[databases]" > /etc/pgbouncer/pgbouncer.ini && \
echo "$DB_NAME = host=$DB_HOST port=$DB_PORT dbname=$DB_NAME user=$DB_USER password='$DB_PASSWORD'" >> /etc/pgbouncer/pgbouncer.ini && \
echo "[pgbouncer]" >> /etc/pgbouncer/pgbouncer.ini && \
echo "listen_port = 6432" >> /etc/pgbouncer/pgbouncer.ini && \
echo "listen_addr = *" >> /etc/pgbouncer/pgbouncer.ini && \
echo "auth_type = md5" >> /etc/pgbouncer/pgbouncer.ini && \
echo "auth_file = /etc/pgbouncer/userlist.txt" >> /etc/pgbouncer/pgbouncer.ini && \
gosu pgbouncer pgbouncer /etc/pgbouncer/pgbouncer.ini & \
gunicorn app.main:app --workers 2 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:80 --timeout 120 --keep-alive 5

EXPOSE 80
20 changes: 18 additions & 2 deletions fastapi/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import versiontag
import logging
import git

from urllib.parse import urlparse, urlunparse
import asyncpg
try:
if os.path.isfile('app/app_secrets.py'):
print('Loading secrets from secrets.py')
Expand Down Expand Up @@ -42,11 +43,26 @@ def get_version_tag_from_online_github_repo():
logging.info('Error getting version tag from github: ' + str(e))
return '0.0.error '+ str(e)

def get_pgbouncer_uri(original_uri):
# Parse the original URI
parsed = urlparse(original_uri)

# Replace the hostname and port with the ones for PgBouncer
pgbouncer_host = 'localhost' # PgBouncer is running on the same machine
pgbouncer_port = 6432 # Default PgBouncer port

# Construct the new URI
pgbouncer_uri = urlunparse(
(parsed.scheme, f"{parsed.username}:{parsed.password}@{pgbouncer_host}:{pgbouncer_port}", parsed.path, parsed.params, parsed.query, parsed.fragment)
)

return pgbouncer_uri

class Config:
BASE_URL = "https://api.metro.net"
REDIS_URL = os.environ.get('REDIS_URL', 'redis://redis:6379')
TARGET_DB_SCHEMA = "metro_api"
API_DB_URI = os.environ.get('API_DB_URI')
API_DB_URI = get_pgbouncer_uri(os.environ.get('API_DB_URI'))
SECRET_KEY = os.environ.get('HASH_KEY')
ALGORITHM = os.environ.get('HASHING_ALGORITHM')
ACCESS_TOKEN_EXPIRE_MINUTES = 30
Expand Down
7 changes: 4 additions & 3 deletions fastapi/app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,18 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool

from .config import Config
from .utils.log_helper import *
from sqlalchemy.pool import NullPool

def create_async_uri(uri):
return uri.replace('postgresql', 'postgresql+asyncpg')


engine = create_engine(Config.API_DB_URI, echo=False)
async_engine = create_async_engine(create_async_uri(Config.API_DB_URI), echo=False)
engine = create_engine(Config.API_DB_URI, echo=True, poolclass=NullPool)
async_engine = create_async_engine(create_async_uri(Config.API_DB_URI), echo=True, poolclass=NullPool)
async_session = sessionmaker(async_engine, expire_on_commit=False, class_=AsyncSession)
Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Expand All @@ -36,7 +38,6 @@ async def get_async_db():
yield db
finally:
await async_engine.dispose()

# async def get_refreshed_db(query):
# async with engine.begin() as conn:

Expand Down
1 change: 0 additions & 1 deletion fastapi/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,6 @@ async def startup_event():
uvicorn_error_logger.addFilter(LogFilter())
logger.addFilter(LogFilter())


app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
Expand Down

0 comments on commit c5e3dca

Please sign in to comment.