Skip to content

Commit

Permalink
Add A.I. Attendant app (wip).
Browse files Browse the repository at this point in the history
  • Loading branch information
sobomax committed Dec 27, 2024
1 parent 7ee13bd commit 3d19cdc
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 5 deletions.
80 changes: 80 additions & 0 deletions Apps/AIAttendant/AIAActor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Dict, Optional, List
from uuid import UUID
from functools import partial

from ray import ray
import nltk
from tensorboardX import SummaryWriter

from config.InfernGlobals import InfernGlobals as IG
from Cluster.InfernSIPActor import InfernSIPActor
from Cluster.InfernTTSActor import InfernTTSActor
from Cluster.InfernSTTActor import InfernSTTActor
from Cluster.InfernLLMActor import InfernLLMActor
from Cluster.STTSession import STTResult
from SIP.RemoteSession import RemoteSessionOffer
from Core.T2T.NumbersToWords import NumbersToWords
from Core.Exceptions.InfernSessNotFoundErr import InfernSessNotFoundErr

from .AIASession import AIASession
from ..LiveTranslator.LTActor import ntw_filter
from ..LiveTranslator.LTSession import VADSignals

class AIASessNotFoundErr(InfernSessNotFoundErr): pass

@ray.remote(resources={"ai_attendant": 1})
class AIAActor():
sessions: Dict[UUID, AIASession]
vds: Optional[VADSignals]=None
translator: callable
nstts: int = 0
def __init__(self):
self.stt_out_lang = 'en'

def start(self, aia_prof: 'AIAProfile', sip_actr:InfernSIPActor):
self.aia_prof = aia_prof
self.tts_lang = aia_prof.tts_lang
self.stt_lang = aia_prof.stt_lang
nltk.download('punkt')
self.aia_actr = ray.get_runtime_context().current_actor
self.sip_actr = sip_actr
self.tts_actr = InfernTTSActor.remote()
self.stt_actr = InfernSTTActor.remote()
self.llm_actr = InfernLLMActor.remote()
futs = [self.stt_actr.start.remote(), self.tts_actr.start.remote(lang=self.tts_lang, output_sr=8000),
self.llm_actr.start.remote()]
if self.stt_out_lang == self.tts_lang:
self.translator = ntw_filter
else:
flt = partial(ntw_filter, obj=NumbersToWords(self.tts_lang))
self.translator = IG.get_translator(self.stt_out_lang, self.tts_lang, filter=flt).translate
self.swriter = SummaryWriter()
ray.get(futs)
self.sessions = {}

def new_sip_session_received(self, new_sess:RemoteSessionOffer):
if self.vds is None:
self.vds = VADSignals()
aia_sess = AIASession(self, new_sess)
print(f'{aia_sess=}')
self.sessions[aia_sess.id] = aia_sess

def sess_term(self, sess_id:UUID, sip_sess_id:UUID, relaxed:bool=False):
try:
self._get_session(sess_id).sess_term(sip_sess_id)
except AIASessNotFoundErr:
if not relaxed: raise
return
del self.sessions[sess_id]

def text_in(self, sess_id:UUID, result:STTResult):
self.swriter.add_scalar(f'stt/inf_time', result.inf_time, self.nstts)
self.nstts += 1
self._get_session(sess_id).text_in(result)

def tts_say_done(self, sess_id:UUID):
self._get_session(sess_id).tts_say_done()

def _get_session(self, sess_id:UUID):
try: return self.sessions[sess_id]
except KeyError: raise AIASessNotFoundErr(f'No LT session with id {sess_id}')
11 changes: 11 additions & 0 deletions Apps/AIAttendant/AIAAppConfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .AIAProfile import AIAProfile

class AIAAppConfig():
schema: dict = {
'ai_attendant': {
'type': 'dict',
'schema': {
**AIAProfile.schema,
}
},
}
39 changes: 39 additions & 0 deletions Apps/AIAttendant/AIAProfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import ray
from typing import Optional

from Cluster.InfernSIPActor import InfernSIPActor

from .AIAActor import AIAActor


class AIAProfile():
schema: dict = {
'profiles': {
'type': 'dict',
'keysrules': {'type': 'string'},
'valuesrules': {
'type': 'dict',
'schema': {
'tts_lang': {'type': 'string'},
'stt_lang': {'type': 'string'},
}
}
}
}
stt_lang: str = 'en'
tts_lang: str = 'en'
actor: Optional[AIAActor] = None

def __init__(self, name, conf):
self.name = name
self.tts_lang = conf['tts_lang']
self.stt_lang = conf['stt_lang']

def finalize(self, iconf:'InfernConfig'):
pass

def getActor(self, iconf:'InfernConfig', sip_act:InfernSIPActor):
if self.actor is None:
self.actor = AIAActor.remote()
ray.get(self.actor.start.remote(self, sip_act))
return self.actor
116 changes: 116 additions & 0 deletions Apps/AIAttendant/AIASession.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from typing import Tuple, List, Optional, Dict
from uuid import UUID, uuid4
from functools import partial
import ray

from Cluster.TTSSession import TTSRequest
from Cluster.STTSession import STTRequest, STTResult
from Cluster.RemoteTTSSession import RemoteTTSSession
from Cluster.InfernRTPActor import InfernRTPActor
from RTP.AudioInput import AudioInput
from SIP.RemoteSession import RemoteSessionOffer, RemoteSessionAccept
from Core.T2T.Translator import Translator
from Core.AudioChunk import AudioChunk
from ..LiveTranslator.LTSession import _sess_term, TTSProxy

class STTProxy():
debug = True
stt_do: callable
stt_done: callable
def __init__(self, stt_actr, stt_lang, stt_sess_id, stt_done):
self.stt_do = partial(stt_actr.stt_session_soundin.remote, sess_id=stt_sess_id)
self.lang, self.stt_done = stt_lang, stt_done

# This method runs in the context of the inbound RTP Actor
def __call__(self, chunk:AudioChunk):
if self.debug:
print(f'STTProxy: VAD: {len(chunk.audio)=} {chunk.track_id=}')
def stt_done(result:STTResult):
print(f'STTProxy: {result=}')
self.stt_done(result=result)
sreq = STTRequest(chunk, stt_done, self.lang)
sreq.mode = 'translate'
self.stt_do(req=sreq)

class AIASession():
debug = False
id: UUID
stt_sess_id: UUID
rtp_sess_id: UUID
llm_sess_id: UUID
rtp_actr: InfernRTPActor
tts_sess: RemoteTTSSession
say_buffer: List[TTSRequest]
translator: Optional[Translator]
stt_sess_term: callable

def __init__(self, aiaa:'AIAActor', new_sess:RemoteSessionOffer):
self.id = uuid4()
self.say_buffer = []
sess_term_alice = partial(_sess_term, sterm=aiaa.aia_actr.sess_term.remote, sess_id=self.id, sip_sess_id=new_sess.sip_sess_id)
self.tts_say_done_cb = partial(aiaa.aia_actr.tts_say_done.remote, sess_id=self.id)
amsg = RemoteSessionAccept(disc_cb=sess_term_alice, auto_answer=True)
try:
rtp_alice = ray.get(new_sess.accept(msg=amsg))
except KeyError:
print(f'Failed to accept {new_sess.sip_sess_id=}')
return
self.rtp_actr, self.rtp_sess_id = rtp_alice
stt_sess = aiaa.stt_actr.new_stt_session.remote(keep_context=True)
llm_sess = aiaa.llm_actr.new_llm_session.remote()
self.tts_sess = RemoteTTSSession(aiaa.tts_actr)
self.stt_sess_id, self.llm_sess_id = ray.get([stt_sess, llm_sess])
self.stt_sess_term = partial(aiaa.stt_actr.stt_session_end.remote, self.stt_sess_id)
self.translator = aiaa.translator
text_cb = partial(aiaa.aia_actr.text_in.remote, sess_id=self.id)
vad_handler = STTProxy(aiaa.stt_actr, aiaa.stt_lang, self.stt_sess_id, text_cb)
self.rtp_actr.rtp_session_connect.remote(self.rtp_sess_id, AudioInput(self.rtp_sess_id, vad_handler))
soundout = partial(self.rtp_actr.rtp_session_soundout.remote, self.rtp_sess_id)
tts_soundout = TTSProxy(soundout)
self.tts_sess.start(tts_soundout)
self.tts_say("Hello, how can I help you?")

def text_in(self, result:STTResult):
print(f'STT: "{result.text=}" {result.no_speech_prob=}')
nsp = result.no_speech_prob
if nsp > 0.5: return
# sinfo = self.fabric.info[result.direction]
# text = sinfo.translator(result.text)
# speaker_id = sinfo.get_speaker()
# #sinfo.rsess_pause()
# print(f'TTS: {sdir} "{text=}" {speaker_id=}')
# text = sent_tokenize(text)
# out_sents = [text.pop(0),]
# for t in text:
# if len(out_sents[-1]) + len(t) < 128 or out_sents[-1].endswith(' i.e.'):
# out_sents[-1] += ' ' + t
# else:
# out_sents.append(t)
#
# print(f'TTS split: "{out_sents=}" {[len(t) for t in out_sents]=}')
# tts_req = ray.put(TTSRequest(out_sents, speaker_id=speaker_id, done_cb=sinfo.tts_say_done))
# self.say_buffer[result.direction].append(tts_req)
# if len(self.say_buffer[result.direction]) > 1:
# return
# sinfo.tts_say(tts_req)
return

def tts_say(self, text):
print(f'tts_say({text=})')
tts_req = TTSRequest([text,], done_cb=self.tts_say_done_cb)
self.say_buffer.append(tts_req)
if len(self.say_buffer) > 1:
return
self.tts_sess.say(tts_req)

def tts_say_done(self):
if self.debug: print(f'tts_say_done()')
tbuf = self.say_buffer
tbuf.pop(0)
if len(tbuf) > 0:
self.tts_sess.say(tbuf[0])
return

def sess_term(self, _):
self.stt_sess_term()
self.tts_sess.end()
17 changes: 13 additions & 4 deletions Core/InfernConfig.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, Optional
from typing import Dict, Optional, Union
from functools import partial

from Cluster.InfernSIPActor import InfernSIPActor
from SIP.InfernSIPConf import InfernSIPConf
Expand Down Expand Up @@ -35,11 +36,14 @@ class InfernConfig():
sip_conf: Optional[InfernSIPConf]
rtp_conf: Optional[InfernRTPConf]
connectors: Dict[str, InfernSIPProfile]
apps: Dict[str, 'LTProfile']
apps: Dict[str, Union['LTProfile', 'AIAProfile']]
def __init__(self, filename: str):
from Apps.LiveTranslator.LTProfile import LTProfile
from Apps.LiveTranslator.LTAppConfig import LTAppConfig
schema['apps']['schema'].update(LTAppConfig.schema)
from Apps.AIAttendant.AIAProfile import AIAProfile
from Apps.AIAttendant.AIAAppConfig import AIAAppConfig
schema['apps']['schema'].update(AIAAppConfig.schema)
d = validate_yaml(schema, filename)
self.sip_conf = InfernSIPConf(d['sip'].get('settings', None)) if 'sip' in d else None
self.rtp_conf = InfernRTPConf(d['rtp'].get('settings', None)) if 'rtp' in d else None
Expand All @@ -49,8 +53,13 @@ def __init__(self, filename: str):
except KeyError:
self.connectors = {}
precache = 'live_translator_precache' in d['apps'] and d['apps']['live_translator_precache']
self.apps = dict((f'apps/live_translator/{name}', LTProfile(name, conf, precache))
for name, conf in d['apps']['live_translator']['profiles'].items())
_LTProfile = partial(LTProfile, precache=precache)
self.apps = {}
for aname, AProf in (('live_translator', _LTProfile), ('ai_attendant', AIAProfile)):
if aname not in d['apps']: continue
app_confs = dict((f'apps/{aname}/{name}', AProf(name, conf))
for name, conf in d['apps'][aname]['profiles'].items())
self.apps.update(app_confs)
for app in self.apps.values():
app.finalize(self)
if 'sip' in d:
Expand Down
4 changes: 3 additions & 1 deletion Infernos.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ def usage():

default_resources = InfernSIPActor.default_resources
default_resources['live_translator'] = 1
default_resources['ai_attendant'] = 1
default_resources['tts'] = 2
default_resources['stt'] = 1
default_resources['llm'] = 1
try:
ray.init(num_gpus=1, resources = default_resources)
ray.init(num_gpus=2, resources = default_resources)
except ValueError as ex:
if str(ex).index('connecting to an existing cluster') < 0: raise ex
ray.init()
Expand Down
20 changes: 20 additions & 0 deletions examples/ai_attendant.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
sip:
settings:
bind: 192.168.24.29:5060
profiles:
foo:
sip_server: 192.168.23.109:5070
sink: apps/ai_attendant/configuration1
username: 'incoming'
password: 'user'
register: False
rtp:
settings:
min_port: 1024
max_port: 2048
apps:
ai_attendant:
profiles:
configuration1:
stt_lang: 'en'
tts_lang: 'en'

0 comments on commit 3d19cdc

Please sign in to comment.