-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransport_aio.py
88 lines (73 loc) · 2.73 KB
/
transport_aio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from __future__ import annotations
# python imports:
import asyncio
import contextlib
import logging
import ssl
from typing import Iterator, Type
# email_proto imports:
from transport import AsyncTransport
logger = logging.getLogger ( __name__ )
@contextlib.contextmanager
def asyncio_timeout ( self: object, text: str ) -> Iterator[None]:
try:
yield
except asyncio.TimeoutError:
cls = self.__class__
raise TimeoutError ( f'{cls.__module__}.{cls.__name__} timeout {text}' ) from None
class AsyncioTransport ( AsyncTransport ):
rx: asyncio.StreamReader
tx: asyncio.StreamWriter
def __init__ ( self, rx: asyncio.StreamReader, tx: asyncio.StreamWriter ) -> None:
self.rx, self.tx = rx, tx
@classmethod
async def connect ( cls: Type[AsyncioTransport],
hostname: str,
port: int,
tls: bool,
) -> AsyncioTransport:
log = logger.getChild ( 'AsyncioTransport.connect' )
rx, tx = await asyncio.open_connection (
hostname, port, ssl = tls,
)
return cls ( rx, tx )
async def read ( self ) -> bytes:
#log = logger.getChild ( 'AsyncioTransport.read' )
with asyncio_timeout ( self, 'waiting to read data' ):
return await asyncio.wait_for (
self.rx.readline(), # NOTE: there doesn't seem to be a way to tell asyncio to give us everything it has...
timeout = 1.0, # TODO FIXME: configurable timeout (this value is only for testing) and better error handling
)
async def write ( self, data: bytes ) -> None:
#log = logger.getChild ( 'AsyncioTransport.write' )
self.tx.write ( data )
with asyncio_timeout ( self, 'waiting to write data' ):
return await asyncio.wait_for (
self.tx.drain(),
timeout = 1.0, # TODO FIXME: configurable timeout (this value is only for testing) and better error handling
)
async def starttls_client ( self, server_hostname: str ) -> None:
context = self.ssl_context_or_default_client()
transport = await asyncio.get_event_loop().start_tls (
self.tx.transport,
getattr ( self.tx, '_protocol' ),
sslcontext = context,
server_hostname = server_hostname,
)
self.rx.set_transport ( transport )
setattr ( self.tx, '_transport', transport )
async def starttls_server ( self ) -> None:
#log = logger.getChild ( 'AsyncioTransport.starttls_server' )
context = self.ssl_context_or_default_server()
transport = await asyncio.get_event_loop().start_tls (
self.tx.transport,
getattr ( self.tx, '_protocol' ),
sslcontext = context,
server_side = True,
)
self.rx.set_transport ( transport )
setattr ( self.tx, '_transport', transport )
async def close ( self ) -> None:
#log = logger.getChild ( 'AsyncioTransport.close' )
self.tx.close()
await self.tx.wait_closed()