Skip to content

Commit

Permalink
Initial revision of the LLVMActor.
Browse files Browse the repository at this point in the history
  • Loading branch information
sobomax committed Dec 25, 2024
1 parent 533b644 commit 3df3142
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 0 deletions.
49 changes: 49 additions & 0 deletions Cluster/InfernLLMActor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Dict
from uuid import UUID

import ray

from Cluster.InfernLLMWorker import InfernLLMWorker
from Cluster.LLMSession import LLMSession, LLMRequest

@ray.remote(num_gpus=1.0, resources={"llm": 1})
class InfernLLMActor():
debug = True
sessions: Dict[UUID, LLMSession]
LLM: InfernLLMWorker

def __init__(self):
super().__init__()
self.sessions = {}

def start(self):
for device in ('xpu', 'cuda', 'cpu'):
try:
self.llm = InfernLLMWorker(device)
except (ValueError, RuntimeError):
continue
break
else:
raise RuntimeError('Failed to initialize LLM')
self.llm.start()

def stop(self):
self.llm.stop()

def new_llm_session(self):
if self.debug: print('InfernLLMActor.new_llm_session')
sess = LLMSession(self.llm)
self.sessions[sess.id] = sess
return sess.id

def llm_session_end(self, sess_id):
if self.debug: print('InfernLLMActor.llm_session_end')
sess = self.sessions[sess_id]
sess.stop()
del self.sessions[sess_id]

def llm_session_textin(self, sess_id, req:LLMRequest):
if self.debug: print('InfernLLMActor.llm_session_textin')
sess = self.sessions[sess_id]
sess.textin(req)
return sess_id
63 changes: 63 additions & 0 deletions Cluster/InfernLLMWorker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import Tuple, List
from os.path import exists as path_exists
import torch

from transformers import AutoTokenizer
from ipex_llm.transformers import AutoModelForCausalLM

from Cluster.InfernBatchedWorker import InfernBatchedWorker
from Cluster.InfernTTSWorker import get_torch_hw
from Cluster.LLMSession import LLMResult, LLMInferRequest

class InfernLLMWorker(InfernBatchedWorker):
model_name = "Qwen/Qwen2.5-Coder-14B-Instruct"
model_cache_dir = "/tmp/saved_model"
max_batch_size: int = 8
debug = True
llm_model: object
llm_tokenizer: object
output_sr: int

def __init__(self, device=None):
super().__init__()
if device is None:
device = get_torch_hw()
def load_model(mn, **kwargs):
m = AutoModelForCausalLM.from_pretrained(mn, torch_dtype="auto",
device_map="auto",
optimize_model=True,
trust_remote_code=True,
use_cache=True, **kwargs
)
if mn != self.model_cache_dir:
m.save_pretrained(self.model_cache_dir)
return m.to(device)
if path_exists(self.model_cache_dir):
try:
self.llm_model = load_model(self.model_cache_dir, load_low_bit=True)
except Exception:
self.llm_model = load_model(self.model_name, load_in_4bit=True)
else:
self.llm_model = load_model(self.model_name, load_in_4bit=True)
self.llm_tokenizer = AutoTokenizer.from_pretrained(self.model_name)

def process_batch(self, wis:List[Tuple[LLMInferRequest]]):
if self.debug:
print(f'InfernLLMWorker.process_batch: got {len(wis)=}')
messages = [self.llm_tokenizer.apply_chat_template(list(r.context), tokenize=False,
add_generation_prompt=True)
for r in wis]
model_inputs = self.llm_tokenizer(messages, return_tensors="pt", padding=True).to(self.llm_model.device)
generated_ids = self.llm_model.generate(**model_inputs, max_new_tokens=16 * 1024, output_scores=True,
return_dict_in_generate=True)
torch.xpu.synchronize()
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids,
generated_ids.sequences)
]
responses = self.llm_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)
for wi, r in zip(wis, responses):
res = LLMResult(r)
wi.textout_cb(result = res)


55 changes: 55 additions & 0 deletions Cluster/LLMSession.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import List, Tuple
from time import monotonic
from functools import partial
from uuid import uuid4, UUID


class LLMRequest():
text: str
textout_cb: callable
def __init__(self, text:str, textout_cb:callable):
self.text, self.textout_cb = text, textout_cb

class LLMResult():
text: str
def __init__(self, text:str):
self.text = text

class LLMInferRequest():
req: LLMRequest
context: Tuple[dict]
textout_cb: callable

def __init__(self, req:LLMRequest, context:List[dict]):
self.req, self.context = req, tuple(context)

class LLMSession():
id: UUID
context: List[dict]
debug: bool = True
def __init__(self, llm):
self.id = uuid4()
self.context = [{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. " +
"You are a helpful voice auto-attendant for the company Sippy Software. " +
"Start by greeting the caller and asking how you can help. " +
"Try to keep your messages brief and concise to reduce latency."}]
self.llm = llm

def context_add(self, content:str, role:str = "user"):
if self.debug:
print(f'{monotonic():4.3f}: LLMSession.context_add: {self.context=}, {content=}')
self.context.append({"role": role, "content": content})

def textin(self, req:LLMRequest):
if self.debug:
print(f'{monotonic():4.3f}: LLMSession.textin: ${req.text=}, {req.textout_cb=}')
self.context_add(req.text)
ireq = LLMInferRequest(req, self.context)
ireq.textout_cb = partial(self.textout, req = req)
self.llm.infer(ireq)

def textout(self, req:LLMRequest, result:LLMResult):
if self.debug:
print(f'{monotonic():4.3f}: LLMSession.textout: {result.text=}')
self.context_add(result.text, "assistant")
req.textout_cb(result = result)
33 changes: 33 additions & 0 deletions llm_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import ray
from queue import Queue
from functools import partial
from time import sleep
from Cluster.InfernLLMActor import InfernLLMActor
from Cluster.LLMSession import LLMRequest

#@ray.remote(resources={"head": 1})
#class text_in(result):

ray.init(num_gpus=2, resources = {'llm':1,'head':1})
llm_actor = InfernLLMActor.remote()
llm_actor.start.remote()
#q = Queue()
#@ray.remote(resources={"head": 1})
def text_in(result, lms):
print(f'text_in: got {result=}')
tin = partial(text_in, lms=lms)
req = LLMRequest('Hello, can I speak to the CEO?', tin)
llm_actor.llm_session_textin.remote(lms, req)
flms = [llm_actor.new_llm_session.remote() for _ in range(100)]
#lms = ray.get(llm_actor.new_llm_session.remote())
def sess(lms):
tin = partial(text_in, lms=lms)
req = LLMRequest('<Incoming call from "Doe Joe" +11233742223>', tin)
return llm_actor.llm_session_textin.remote(lms, req)
futs = [sess(lms) for lms in flms]
for f in futs:
ray.get(f)
#for _ in futs:
# result = q.get()
# print(f'text_in: got {result=}')
sleep(3600)
34 changes: 34 additions & 0 deletions voice_ass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import torch
from transformers import AutoTokenizer, AutoConfig
from ipex_llm.transformers import AutoModelForCausalLM
from datetime import datetime

model_name = "Qwen/Qwen2.5-Coder-14B-Instruct"
#config = AutoConfig.from_pretrained(model_name, trust_remote_code=True)
#local_cache = f"~/.cache/Infernos/{model_name}"
#config.save_pretrained(local_cache)

model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype="auto",
device_map="auto",
load_in_4bit=True,
optimize_model=True,
trust_remote_code=True,
use_cache=True
)
#model = model.half().to("xpu")
model = model.to("xpu")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-Coder-14B-Instruct")
messages = [{"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful voice auto-attendant for the company Sippy Software. Start by greeting the caller and asking how you can help. Try to keep your messages brief and concise to reduce latency."}, {"role": "system", "content": f'<Now is {datetime.now()}> <Incoming call from "Doe Joe" +11233742223>'}]
text = tokenizer.apply_chat_template(messages,
tokenize=False,
add_generation_prompt=True
)
for i in range(10):
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)
generated_ids = model.generate(**model_inputs, max_new_tokens=16 * 1024, output_scores=True, return_dict_in_generate=True)
torch.xpu.synchronize()
generated_ids = [
output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids.sequences)
]
response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
print(messages, response)

0 comments on commit 3df3142

Please sign in to comment.