-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqlalchemy_dal_async_example.py
117 lines (98 loc) · 3.58 KB
/
sqlalchemy_dal_async_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# sqlachemy-async.py
from contextlib import asynccontextmanager
from quart import Quart
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String, Boolean, JSON
from sqlalchemy.orm import Session
from sqlalchemy.future import select
from sqlalchemy import update
from werkzeug.security import generate_password_hash, check_password_hash
# Initialize SQLAlchemy with a test database
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(DATABASE_URL, future=True, echo=True)
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
Base = declarative_base()
# Data Model
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String)
email = Column(String)
slack_id = Column(String)
password = Column(String)
config = Column(JSON)
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def set_password(self, password):
self.password = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password, password)
def json(self):
return {
"id": self.id,
"email": self.email,
"slack_id": self.slack_id,
"config": self.config,
"is_active": self.is_active,
"is_admin": self.is_admin,
}
# Data Access Layer
class UserDAL:
def __init__(self, db_session):
self.db_session = db_session
async def create_user(
self,
name,
email,
slack_id,
password=None,
config=None,
is_active=True,
is_admin=False,
):
new_user = User(
name=name,
email=email,
slack_id=slack_id,
password=password,
config=config,
is_active=is_active,
is_admin=is_admin,
)
self.db_session.add(new_user)
await self.db_session.flush()
return new_user.json()
async def get_all_users(self):
query_result = await self.db_session.execute(select(User).order_by(User.id))
return [user.json() for user in query_result.scalars().all()]
async def get_user(self, user_id):
query = select(User).where(User.id == user_id)
query_result = await self.db_session.execute(query)
user = query_result.one()
return user[0].json()
async def set_password(self, user_id, password):
query = select(User).where(User.id == user_id)
query_result = await self.db_session.execute(query)
user = query_result.one()[0]
user.set_password(password)
await self.db_session.flush()
async def authenticate(self, user_id, password):
query = select(User).where(User.id == user_id)
query_result = await self.db_session.execute(query)
user = query_result.one()[0]
return user.check_password(password)
async def initialize_database():
# create db tables
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async with user_dal() as bd:
await bd.create_user("name", "email", "slack_id")
@asynccontextmanager
async def user_dal():
async with async_session() as session:
async with session.begin():
yield UserDAL(session)