-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathteams_interface.py
72 lines (63 loc) · 2.12 KB
/
teams_interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python3
from botbuilder.schema import Activity
from botbuilder.schema import ActivityTypes
from botbuilder.schema import ChannelAccount
from botframework.connector.aio import ConnectorClient
from opentelemetry import trace
from config import DefaultConfig
config = DefaultConfig()
tracer = trace.get_tracer(__name__)
SERVICE_URL = "https://smba.trafficmanager.net/amer/"
CHANNEL_ID = "msteams"
class TeamsInterface:
def __init__(self, config: DefaultConfig) -> None:
self._connector = ConnectorClient(
config.get_credentials(),
base_url=SERVICE_URL,
)
self._conv = self._connector.conversations
self._chanacc = ChannelAccount(id=config.APP_ID)
self.me = self._chanacc
def str_to_activity(self, activity: Activity | str) -> Activity:
if isinstance(activity, Activity):
return activity
return Activity(
type=ActivityTypes.message,
channel_id=CHANNEL_ID,
from_property=self.me,
text=activity,
)
@tracer.start_as_current_span("send_to_conversation")
async def send_to_conversation(
self,
conversation_teams_id: str,
activity: Activity,
) -> str:
result = await self._conv.send_to_conversation(
conversation_id=conversation_teams_id,
activity=activity,
)
return result.id # type: ignore
@tracer.start_as_current_span("update_activity")
async def update_activity(
self,
conversation_teams_id: str,
activity_id: str,
activity: Activity,
) -> str:
res = await self._conv.update_activity(
conversation_id=conversation_teams_id,
activity_id=activity_id,
activity=activity,
)
return res.id # type: ignore
@tracer.start_as_current_span("delete_activity")
async def delete_activity(
self,
conversation_teams_id: str,
activity_id: str,
):
await self._conv.delete_activity(
conversation_id=conversation_teams_id,
activity_id=activity_id,
)