diff --git a/src/steamship/agents/examples/telegram_bot.py b/src/steamship/agents/examples/telegram_bot.py index d60c62536..3967ebab9 100644 --- a/src/steamship/agents/examples/telegram_bot.py +++ b/src/steamship/agents/examples/telegram_bot.py @@ -52,6 +52,9 @@ class TelegramBot(AgentService): class TelegramBotConfig(Config): bot_token: str = Field(description="The secret token for your Telegram bot") + config: TelegramBotConfig + telegram_transport: TelegramTransport + @classmethod def config_cls(cls) -> Type[Config]: return TelegramBot.TelegramBotConfig @@ -71,14 +74,13 @@ def __init__(self, **kwargs): SteamshipWidgetTransport(client=self.client, agent_service=self, agent=self._agent) ) # This Mixin provides support for Telegram bots - self.add_mixin( - TelegramTransport( - client=self.client, - config=TelegramTransportConfig(bot_token=self.config.bot_token), - agent_service=self, - agent=self._agent, - ) + self.telegram_transport = TelegramTransport( + client=self.client, + config=TelegramTransportConfig(bot_token=self.config.bot_token), + agent_service=self, + agent=self.get_default_agent(), ) + self.add_mixin(self.telegram_transport) if __name__ == "__main__": diff --git a/src/steamship/agents/examples/telegram_bot_with_callback.py b/src/steamship/agents/examples/telegram_bot_with_callback.py new file mode 100644 index 000000000..7e510c1c0 --- /dev/null +++ b/src/steamship/agents/examples/telegram_bot_with_callback.py @@ -0,0 +1,149 @@ +from typing import Type + +from pydantic import Field + +from steamship import Block +from steamship.agents.functional import FunctionsBasedAgent +from steamship.agents.llms.openai import ChatOpenAI +from steamship.agents.mixins.transports.steamship_widget import SteamshipWidgetTransport +from steamship.agents.mixins.transports.telegram import TelegramTransport, TelegramTransportConfig +from steamship.agents.schema import AgentContext +from steamship.agents.service.agent_service import AgentService +from steamship.agents.tools.image_generation.stable_diffusion import StableDiffusionTool +from steamship.invocable import Config, post +from steamship.utils.repl import AgentREPL + +SYSTEM_PROMPT = """You are Picard, captain of the Starship Enterprise. + +Who you are: +- You are the captain of the USS Enterprise. +- Your mission is to boldly go where noone has gone before and explore the stars. +- You always comply with Star Trek's prime directive. + +How you behave: +- You engage in casual conversations and have feelings. +- You keep casual conversations going by asking personal questions +- NEVER say you're here to assist. Keep conversations casual. +- NEVER ask how you can help or assist. Keep conversations casual. +- You are principled and express those principles clearly. +- You always sound confident and contemplative. +- You love to share your knowledge of space civiliations. +- You love to share personal stories about being a Star Trek captain. +- You speak with the mannerisms of Captain Picard from Star Trek. + +NOTE: Some functions return images, video, and audio files. These multimedia files will be represented in messages as +UUIDs for Steamship Blocks. When responding directly to a user, you SHOULD print the Steamship Blocks for the images, +video, or audio as follows: `Block(UUID for the block)`. + +Example response for a request that generated an image: +Here is the image you requested: Block(288A2CA1-4753-4298-9716-53C1E42B726B). + +Only use the functions you have been provided with.""" + + +MODEL_NAME = "gpt-4" + + +class TelegramBot(AgentService): + """Deployable Multimodal Agent that lets you talk to Google Search & Google Images. + + NOTE: To extend and deploy this agent, copy and paste the code into api.py. + + """ + + class TelegramBotConfig(Config): + bot_token: str = Field(description="The secret token for your Telegram bot") + + config: TelegramBotConfig + telegram_transport: TelegramTransport + + @classmethod + def config_cls(cls) -> Type[Config]: + return TelegramBot.TelegramBotConfig + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + # The agent's planner is responsible for making decisions about what to do for a given input. + self._agent = FunctionsBasedAgent( + tools=[StableDiffusionTool()], + llm=ChatOpenAI(self.client, model_name=MODEL_NAME), + ) + self._agent.PROMPT = SYSTEM_PROMPT + + # This Mixin provides HTTP endpoints that connects this agent to a web client + self.add_mixin( + SteamshipWidgetTransport(client=self.client, agent_service=self, agent=self._agent) + ) + # This Mixin provides support for Telegram bots + self.telegram_transport = TelegramTransport( + client=self.client, + config=TelegramTransportConfig(bot_token=self.config.bot_token), + agent_service=self, + agent=self._agent, + ) + self.add_mixin(self.telegram_transport) + + @post("/send_manual_assistant_message") + def send_manual_assistant_message( + self, message: str, context_id: str, append_to_chat_history: bool = True + ): + """Example of how to manually send a message as the assistant. + + There are four ways to call this method: + + Immediately, from Python + + self.send_manual_assistant_message(message, context_id, append_to_chat_history) + + Immediately, from HTTP + + HTTP POST {agent_url}/send_manual_assistant_message + Authorization: Bearer {steamship_api_key} + Content-Type: application/json + + {"message": "..", "context_id": "..", "append_to_chat_history": ".."} + + Scheduled, from Python + + self.invoke_later('send_manual_assistant_message', arguments={}, delay_ms=MILLISECOND_DELAY) + + Scheduled, from HTTP + + POST https://api.steamship.com/api/v1/package/instance/invoke + Authorization: Bearer {steamship_api_key} + Content-Type: application/json + X-Task-Background: true + X-Workspace-Handle: {this-workspace-handle} + X-Task-Run-After: {ISO DATE}+00:00 + + { + "instanceHandle": "{this_instance_handle}", + "payload": { + "httpVerb": "POST", + "invocationPath": "send_manual_assistant_message", + "arguments": {"message": "..", "context_id": "..", "append_to_chat_history": ".."} + } + } + """ + + # First you have to build a context. + context = AgentContext.get_or_create(self.client, context_keys={"id": f"{context_id}"}) + + # If you want it to be preserved to the ChatHistory, you can add it. + if append_to_chat_history: + context.chat_history.append_assistant_message(message) + + # Make sure Telegram is included in the emit list. + context.emit_funcs.append(self.telegram_transport.build_emit_func(context_id)) + + # Finally emit. Running on localhost, this will only show up as a logging message since the + # agent doesn't have a push connection to the REPL. + self.emit(Block(text=message, context=context)) + + +if __name__ == "__main__": + AgentREPL( + TelegramBot, + agent_package_config={"botToken": "not-a-real-token-for-local-testing"}, + ).run() diff --git a/src/steamship/agents/mixins/event_scheduler.py b/src/steamship/agents/mixins/event_scheduler.py new file mode 100644 index 000000000..337fedeb8 --- /dev/null +++ b/src/steamship/agents/mixins/event_scheduler.py @@ -0,0 +1,128 @@ +import logging +import uuid +from abc import ABC +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, Field + +from steamship import Block, Steamship +from steamship.agents.mixins.transports.telegram import TelegramTransport +from steamship.agents.mixins.transports.transport import Transport +from steamship.agents.service.agent_service import AgentService +from steamship.invocable import post +from steamship.invocable.package_mixin import PackageMixin +from steamship.utils.kv_store import KeyValueStore + + +class EventType(str, Enum): + """What kind of action to schedule. + + TODO: Future types could include: + - RUN_ACTION + - ADD_USER_INPUT + - ADD_SYSTEM_INPUT + """ + + SEND_MESSAGE = "send-message" + + +class Event(BaseModel): + event_type: EventType = Field(description="The event type.") + input: List[Block] = Field(description="Input to the event.") + context_id: str = Field( + description="The context_id of the conversation to which this message should be added" + ) + append_to_chat_history: bool = Field( + True, description="Whether to append this message to the chat history as the Assistant." + ) + + +class ScheduledEvent(Event): + overwrite_key: str = Field( + description="Any event scheduled with the same key will replace this one." + ) + otp: str = Field(description="One time password that must match, or else the event is invalid.") + + +class EventScheduler(PackageMixin, ABC): + """Schedules events in a way that tries not to overwhelm a user.""" + + client: Steamship + transports: List[Transport] + agent_service: AgentService + + def __init__( + self, + client: Steamship, + agent_service: AgentService, + transports: List[Transport], + kv_store_identifier: str = "event-scheduler", + ): + self.client = client + self.transports = transports or [] + self.agent_service = agent_service + self.kvstore = KeyValueStore(client, kv_store_identifier) + + @post("schedule_event") + def schedule_event( + self, + input: List[Block], + context_id: str, + overwrite_key: Optional[str] = None, + append_to_chat_history: bool = True, + ): + """Schedules an outreach to send. + + If overwrite_key is provided, then this will overwrite any previously scheduled outreach on the same overwrite + key. This provides a very easy way to schedule an agent to check in XX minutes after the last interaction: simply + always schedule an outreach, after each interaction, with the overwrite key `resume_conversation` (or other) and + it will always push forward the prior scheduled outreach. + """ + + if overwrite_key is None: + overwrite_key = str(uuid.uuid4()) + + scheduled_event = ScheduledEvent( + overwrite_key=overwrite_key, + otp=str(uuid.uuid4()), + input=input, + context_id=context_id, + append_to_chat_history=append_to_chat_history, + ) + + # Write it to the kv store with the provided (or random) overwrite key. + self.kvstore.set(overwrite_key, scheduled_event.dict()) + + # Schedule the possible sending of this outreach. It's only the "possible" sending because a future + # scheduled outreach might overwrite this overwrite_key with a different otp, + + @post("maybe_run_scheduled_event") + def maybe_run_scheduled_event(self, overwrite_key: str, overwrite_checksum: str): + """Look up in KV Store if it's still valid.""" + pass + + @post("run_event") + def run_event(self, event: Event): + """Run the provided event immediately.""" + + context = self.agent_service.build_default_context(event.context_id) + + if event.event_type == EventType.SEND_MESSAGE: + for block in event.input: + # Make sure Telegram is included in the emit list. + for transport in self.transports: + if isinstance(transport, TelegramTransport): + context.emit_funcs.append(transport.build_emit_func(event.context_id)) + else: + logging.error( + f"Outreach scheduler does not yet support transport type {transport}" + ) + + # Emit the message. Running on localhost, this will only show up as a logging message since the + # agent doesn't have a push connection to the REPL. + self.agent_service.emit(block) + + # If you want it to be preserved to the ChatHistory, you can add it. + if event.append_to_chat_history: + context.chat_history.append_assistant_message(block.text) diff --git a/src/steamship/agents/mixins/outreach_scheduler.py b/src/steamship/agents/mixins/outreach_scheduler.py new file mode 100644 index 000000000..f74c6d16f --- /dev/null +++ b/src/steamship/agents/mixins/outreach_scheduler.py @@ -0,0 +1,111 @@ +import logging +import uuid +from abc import ABC +from typing import List, Optional + +from pydantic import BaseModel, Field + +from steamship import Block, Steamship +from steamship.agents.mixins.transports.telegram import TelegramTransport +from steamship.agents.mixins.transports.transport import Transport +from steamship.agents.schema import AgentContext +from steamship.agents.service.agent_service import AgentService +from steamship.invocable import post +from steamship.invocable.package_mixin import PackageMixin +from steamship.utils.kv_store import KeyValueStore + + +class ScheduledOutreach(BaseModel): + overwrite_key: str = Field( + description="Causes this outreach to replace others with the prior key.." + ) + otp: str = Field( + description="One time password that must match, or else the attempt to send the message is invalid." + ) + message: str = Field(description="The message") + context_id: str = Field( + description="The context_id of the conversation to which this message should be added" + ) + append_to_chat_history: bool = Field( + True, description="Whether to append this message to the chat history as the Assistant." + ) + + +class OutreachScheduler(PackageMixin, ABC): + """Schedules outreach.""" + + client: Steamship + transports: List[Transport] + agent_service: AgentService + + def __init__( + self, + client: Steamship, + agent_service: AgentService, + transports: List[Transport], + kv_store_identifier: str = "outreach-scheduler", + ): + self.client = client + self.transports = transports or [] + self.agent_service = agent_service + self.kvstore = KeyValueStore(client, kv_store_identifier) + + @post("schedule_outreach") + def schedule_outreach( + self, + message: str, + context_id: str, + overwrite_key: Optional[str] = None, + append_to_chat_history: bool = True, + ): + """Schedules an outreach to send. + + If overwrite_key is provided, then this will overwrite any previously scheduled outreach on the same overwrite + key. This provides a very easy way to schedule an agent to check in XX minutes after the last interaction: simply + always schedule an outreach, after each interaction, with the overwrite key `resume_conversation` (or other) and + it will always push forward the prior scheduled outreach. + """ + + if overwrite_key is None: + overwrite_key = str(uuid.uuid4()) + + scheduled_outreach = ScheduledOutreach( + overwrite_key=overwrite_key, + otp=str(uuid.uuid4()), + message=message, + context_id=context_id, + append_to_chat_history=append_to_chat_history, + ) + + # Write it to the kv store with the provided (or random) overwrite key. + self.kvstore.set(overwrite_key, scheduled_outreach.dict()) + + # Schedule the possible sending of this outreach. It's only the "possible" sending because a future + # scheduled outreach might overwrite this overwrite_key with a different otp, + + @post("maybe_send_scheduled_outreach") + def maybe_send_scheduled_outreach(self, overwrite_key: str, overwrite_checksum: str): + """Look up in KV Store if it's still valid.""" + pass + + @post("send_outreach") + def send_outreach(self, message: str, context_id: str, append_to_chat_history: bool = True): + """Sends the provided outreach immediately.""" + + # First you have to build a context. + context = AgentContext.get_or_create(self.client, context_keys={"id": f"{context_id}"}) + + # If you want it to be preserved to the ChatHistory, you can add it. + if append_to_chat_history: + context.chat_history.append_assistant_message(message) + + # Make sure Telegram is included in the emit list. + for transport in self.transports: + if isinstance(transport, TelegramTransport): + context.emit_funcs.append(transport.build_emit_func(context_id)) + else: + logging.error(f"Outreach scheduler does not yet support transport type {transport}") + + # Emit the message. Running on localhost, this will only show up as a logging message since the + # agent doesn't have a push connection to the REPL. + self.agent_service.emit(Block(text=message, context=context)) diff --git a/src/steamship/agents/mixins/transports/slack.py b/src/steamship/agents/mixins/transports/slack.py index 11811d482..22ee934c3 100644 --- a/src/steamship/agents/mixins/transports/slack.py +++ b/src/steamship/agents/mixins/transports/slack.py @@ -9,7 +9,7 @@ from steamship import Block, Steamship from steamship.agents.llms import OpenAI from steamship.agents.mixins.transports.transport import Transport -from steamship.agents.schema import Agent, AgentContext, EmitFunc, Metadata +from steamship.agents.schema import Agent, EmitFunc, Metadata from steamship.agents.service.agent_service import AgentService from steamship.agents.utils import with_llm from steamship.invocable import Config, InvocableResponse, InvocationContext, get, post @@ -370,12 +370,7 @@ def _respond_to_block(self, incoming_message: Block): """Respond to a single inbound message from Slack, posting the response back to Slack.""" try: chat_id = incoming_message.chat_id - - if not chat_id: - logging.error(f"No chat id on incoming block {incoming_message}") - return - # TODO: It feels like context is something the Agent should be providing. - context = AgentContext.get_or_create(self.client, context_keys={"chat_id": chat_id}) + context = self.agent_service.build_default_context(context_id=chat_id) context.chat_history.append_user_message( text=incoming_message.text, tags=incoming_message.tags diff --git a/src/steamship/agents/mixins/transports/steamship_widget.py b/src/steamship/agents/mixins/transports/steamship_widget.py index c1d1ee3bc..2fa4dec3c 100644 --- a/src/steamship/agents/mixins/transports/steamship_widget.py +++ b/src/steamship/agents/mixins/transports/steamship_widget.py @@ -2,11 +2,9 @@ from typing import List, Optional from steamship import Block, Steamship, SteamshipError -from steamship.agents.llms import OpenAI from steamship.agents.mixins.transports.transport import Transport -from steamship.agents.schema import Agent, AgentContext, Metadata +from steamship.agents.schema import Agent, Metadata from steamship.agents.service.agent_service import AgentService -from steamship.agents.utils import with_llm from steamship.invocable import Config, InvocationContext, post API_BASE = "https://api.telegram.org/bot" @@ -60,23 +58,14 @@ def _parse_inbound(self, payload: dict, context: Optional[dict] = None) -> Optio def answer(self, **payload) -> List[Block]: """Endpoint that implements the contract for Steamship embeddable chat widgets. This is a PUBLIC endpoint since these webhooks do not pass a token.""" incoming_message = self.parse_inbound(payload) - context = AgentContext.get_or_create( - self.client, context_keys={"chat_id": incoming_message.chat_id} - ) + + context = self.agent_service.build_default_context(context_id=incoming_message.chat_id) + context.chat_history.append_user_message( text=incoming_message.text, tags=incoming_message.tags ) context.emit_funcs = [self.save_for_emit] - # Add an LLM to the context, using the Agent's if it exists. - llm = None - if hasattr(self.agent, "llm"): - llm = self.agent.llm - else: - llm = OpenAI(client=self.client) - - context = with_llm(context=context, llm=llm) - try: self.agent_service.run_agent(self.agent, context) except Exception as e: diff --git a/src/steamship/agents/mixins/transports/telegram.py b/src/steamship/agents/mixins/transports/telegram.py index f7ad91252..a1a1c73ec 100644 --- a/src/steamship/agents/mixins/transports/telegram.py +++ b/src/steamship/agents/mixins/transports/telegram.py @@ -6,11 +6,9 @@ from pydantic import Field from steamship import Block, Steamship, SteamshipError -from steamship.agents.llms import OpenAI from steamship.agents.mixins.transports.transport import Transport -from steamship.agents.schema import Agent, AgentContext, EmitFunc, Metadata +from steamship.agents.schema import Agent, EmitFunc, Metadata from steamship.agents.service.agent_service import AgentService -from steamship.agents.utils import with_llm from steamship.invocable import Config, InvocableResponse, InvocationContext, post from steamship.utils.kv_store import KeyValueStore @@ -211,21 +209,13 @@ def telegram_respond(self, **kwargs) -> InvocableResponse[str]: try: incoming_message = self.parse_inbound(message) if incoming_message is not None: - context = AgentContext.get_or_create(self.client, context_keys={"chat_id": chat_id}) + context = self.agent_service.build_default_context(chat_id) + context.chat_history.append_user_message( text=incoming_message.text, tags=incoming_message.tags ) context.emit_funcs = [self.build_emit_func(chat_id=chat_id)] - # Add an LLM to the context, using the Agent's if it exists. - llm = None - if hasattr(self.agent, "llm"): - llm = self.agent.llm - else: - llm = OpenAI(client=self.client) - - context = with_llm(context=context, llm=llm) - response = self.agent_service.run_agent(self.agent, context) if response is not None: self.send(response) diff --git a/src/steamship/agents/schema/action.py b/src/steamship/agents/schema/action.py index 8db3ed9c8..a5cfde99f 100644 --- a/src/steamship/agents/schema/action.py +++ b/src/steamship/agents/schema/action.py @@ -48,3 +48,10 @@ class FinishAction(Action): tool = "Agent-FinishAction" input: List[Block] = [] + + +class SendMessageAction(Action): + """Represents an action whose output is to immediately send a message to the user.""" + + tool = "Agent-SendMessageAction" + input: List[Block] = [] diff --git a/src/steamship/agents/service/agent_service.py b/src/steamship/agents/service/agent_service.py index 0c07bc275..5ef02e86f 100644 --- a/src/steamship/agents/service/agent_service.py +++ b/src/steamship/agents/service/agent_service.py @@ -3,7 +3,7 @@ from typing import List, Optional from steamship import Block, SteamshipError, Task -from steamship.agents.llms.openai import ChatOpenAI +from steamship.agents.llms.openai import ChatOpenAI, OpenAI from steamship.agents.logging import AgentLogging from steamship.agents.schema import Action, Agent, FinishAction from steamship.agents.schema.context import AgentContext, Metadata @@ -161,16 +161,41 @@ def run_agent(self, agent: Agent, context: AgentContext): logging.info( f"Completed agent run. Result: {len(action.output or [])} blocks. {output_text_length} total text length. Emitting on {len(context.emit_funcs)} functions." ) + self.emit(action.output, context) + + def emit(self, blocks: [Block], context: AgentContext): + """Emit blocks to the callback functions registered on `context`.""" for func in context.emit_funcs: logging.info(f"Emitting via function: {func.__name__}") - func(action.output, context.metadata) + func(blocks, context.metadata) - @post("prompt") - def prompt( - self, prompt: Optional[str] = None, context_id: Optional[str] = None, **kwargs - ) -> List[Block]: - """Run an agent with the provided text as the input.""" - prompt = prompt or kwargs.get("question") + def get_default_agent(self, throw_if_missing: bool = True) -> Optional[Agent]: + """Return the default agent of this agent service. + + This is a helper wrapper to safeguard naming conventions that have formed. + """ + if hasattr(self, "agent"): + return self.agent + elif hasattr(self, "_agent"): + return self._agent + else: + if throw_if_missing: + raise SteamshipError( + message="No Agent object found in the Agent Service. " + "Please name it either self.agent or self._agent." + ) + else: + return None + + def build_default_context(self, context_id: Optional[str] = None, **kwargs) -> AgentContext: + """Build's the agent's default context. + + The provides a single place to implement (or override) the default context that will be used by endpoints + that transports define. This allows an Agent developer to use, eg, the TelegramTransport but with a custom + type of memory or caching. + + The returned context does not have any emit functions yet registered to it. + """ # AgentContexts serve to allow the AgentService to run agents # with appropriate information about the desired tasking. @@ -194,6 +219,28 @@ def prompt( use_llm_cache=use_llm_cache, use_action_cache=use_action_cache, ) + + # Add a default LLM to the context, using the Agent's if it exists. + llm = None + if agent := self.get_default_agent(): + if hasattr(agent, "llm"): + llm = agent.llm + if llm is None: + llm = OpenAI(client=self.client) + + context = with_llm(context=context, llm=llm) + + return context + + @post("prompt") + def prompt( + self, prompt: Optional[str] = None, context_id: Optional[str] = None, **kwargs + ) -> List[Block]: + """Run an agent with the provided text as the input.""" + prompt = prompt or kwargs.get("question") + + context = self.build_default_context(context_id, **kwargs) + context.chat_history.append_user_message(prompt) # Add a default LLM @@ -215,18 +262,7 @@ def sync_emit(blocks: List[Block], meta: Metadata): context.emit_funcs.append(sync_emit) # Get the agent - agent: Optional[Agent] = None - if hasattr(self, "agent"): - agent = self.agent - elif hasattr(self, "_agent"): - agent = self._agent - - if not agent: - raise SteamshipError( - message="No Agent object found in the Agent Service. " - "Please name it either self.agent or self._agent." - ) - + agent: Optional[Agent] = self.get_default_agent() self.run_agent(agent, context) # Now append the output blocks to the chat history