-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobjects.py
42 lines (27 loc) · 814 Bytes
/
objects.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from typing import Any, Optional, TypedDict
PG_TYPE_MAPPING = {23: int, 25: bytes}
class Field(TypedDict):
table_id: int
attr_num: int
type_id: int
type_size: int
type_mod: int
format_code: int
value: Optional[Any]
class Row(TypedDict):
fields: list[tuple[str, Field]]
class Command(TypedDict):
query: str
rows: list[Row]
has_result: bool
class PGClient:
def __init__(self) -> None:
self.parameters: dict[str, str] = {}
self.shutting_down = False
self.authenticating = False
self.authenticated = False
self.ready_for_query = False
self.process_id: Optional[int] = None
self.secret_key: Optional[int] = None
self.packet_buffer = bytearray()
self.command: Optional[Command] = None