Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Beginnings of "socially intelligent cron jobs" #498

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions src/steamship/agents/examples/telegram_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class TelegramBot(AgentService):
class TelegramBotConfig(Config):
bot_token: str = Field(description="The secret token for your Telegram bot")

config: TelegramBotConfig
telegram_transport: TelegramTransport

@classmethod
def config_cls(cls) -> Type[Config]:
return TelegramBot.TelegramBotConfig
Expand All @@ -71,14 +74,13 @@ def __init__(self, **kwargs):
SteamshipWidgetTransport(client=self.client, agent_service=self, agent=self._agent)
)
# This Mixin provides support for Telegram bots
self.add_mixin(
TelegramTransport(
client=self.client,
config=TelegramTransportConfig(bot_token=self.config.bot_token),
agent_service=self,
agent=self._agent,
)
self.telegram_transport = TelegramTransport(
client=self.client,
config=TelegramTransportConfig(bot_token=self.config.bot_token),
agent_service=self,
agent=self.get_default_agent(),
)
self.add_mixin(self.telegram_transport)


if __name__ == "__main__":
Expand Down
149 changes: 149 additions & 0 deletions src/steamship/agents/examples/telegram_bot_with_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from typing import Type

from pydantic import Field

from steamship import Block
from steamship.agents.functional import FunctionsBasedAgent
from steamship.agents.llms.openai import ChatOpenAI
from steamship.agents.mixins.transports.steamship_widget import SteamshipWidgetTransport
from steamship.agents.mixins.transports.telegram import TelegramTransport, TelegramTransportConfig
from steamship.agents.schema import AgentContext
from steamship.agents.service.agent_service import AgentService
from steamship.agents.tools.image_generation.stable_diffusion import StableDiffusionTool
from steamship.invocable import Config, post
from steamship.utils.repl import AgentREPL

SYSTEM_PROMPT = """You are Picard, captain of the Starship Enterprise.

Who you are:
- You are the captain of the USS Enterprise.
- Your mission is to boldly go where noone has gone before and explore the stars.
- You always comply with Star Trek's prime directive.

How you behave:
- You engage in casual conversations and have feelings.
- You keep casual conversations going by asking personal questions
- NEVER say you're here to assist. Keep conversations casual.
- NEVER ask how you can help or assist. Keep conversations casual.
- You are principled and express those principles clearly.
- You always sound confident and contemplative.
- You love to share your knowledge of space civiliations.
- You love to share personal stories about being a Star Trek captain.
- You speak with the mannerisms of Captain Picard from Star Trek.

NOTE: Some functions return images, video, and audio files. These multimedia files will be represented in messages as
UUIDs for Steamship Blocks. When responding directly to a user, you SHOULD print the Steamship Blocks for the images,
video, or audio as follows: `Block(UUID for the block)`.

Example response for a request that generated an image:
Here is the image you requested: Block(288A2CA1-4753-4298-9716-53C1E42B726B).

Only use the functions you have been provided with."""


MODEL_NAME = "gpt-4"


class TelegramBot(AgentService):
"""Deployable Multimodal Agent that lets you talk to Google Search & Google Images.

NOTE: To extend and deploy this agent, copy and paste the code into api.py.

"""

class TelegramBotConfig(Config):
bot_token: str = Field(description="The secret token for your Telegram bot")

config: TelegramBotConfig
telegram_transport: TelegramTransport

@classmethod
def config_cls(cls) -> Type[Config]:
return TelegramBot.TelegramBotConfig

def __init__(self, **kwargs):
super().__init__(**kwargs)

# The agent's planner is responsible for making decisions about what to do for a given input.
self._agent = FunctionsBasedAgent(
tools=[StableDiffusionTool()],
llm=ChatOpenAI(self.client, model_name=MODEL_NAME),
)
self._agent.PROMPT = SYSTEM_PROMPT

# This Mixin provides HTTP endpoints that connects this agent to a web client
self.add_mixin(
SteamshipWidgetTransport(client=self.client, agent_service=self, agent=self._agent)
)
# This Mixin provides support for Telegram bots
self.telegram_transport = TelegramTransport(
client=self.client,
config=TelegramTransportConfig(bot_token=self.config.bot_token),
agent_service=self,
agent=self._agent,
)
self.add_mixin(self.telegram_transport)

@post("/send_manual_assistant_message")
def send_manual_assistant_message(
self, message: str, context_id: str, append_to_chat_history: bool = True
):
"""Example of how to manually send a message as the assistant.

There are four ways to call this method:

Immediately, from Python

self.send_manual_assistant_message(message, context_id, append_to_chat_history)

Immediately, from HTTP

HTTP POST {agent_url}/send_manual_assistant_message
Authorization: Bearer {steamship_api_key}
Content-Type: application/json

{"message": "..", "context_id": "..", "append_to_chat_history": ".."}

Scheduled, from Python

self.invoke_later('send_manual_assistant_message', arguments={}, delay_ms=MILLISECOND_DELAY)

Scheduled, from HTTP

POST https://api.steamship.com/api/v1/package/instance/invoke
Authorization: Bearer {steamship_api_key}
Content-Type: application/json
X-Task-Background: true
X-Workspace-Handle: {this-workspace-handle}
X-Task-Run-After: {ISO DATE}+00:00

{
"instanceHandle": "{this_instance_handle}",
"payload": {
"httpVerb": "POST",
"invocationPath": "send_manual_assistant_message",
"arguments": {"message": "..", "context_id": "..", "append_to_chat_history": ".."}
}
}
"""

# First you have to build a context.
context = AgentContext.get_or_create(self.client, context_keys={"id": f"{context_id}"})

# If you want it to be preserved to the ChatHistory, you can add it.
if append_to_chat_history:
context.chat_history.append_assistant_message(message)

# Make sure Telegram is included in the emit list.
context.emit_funcs.append(self.telegram_transport.build_emit_func(context_id))

# Finally emit. Running on localhost, this will only show up as a logging message since the
# agent doesn't have a push connection to the REPL.
self.emit(Block(text=message, context=context))


if __name__ == "__main__":
AgentREPL(
TelegramBot,
agent_package_config={"botToken": "not-a-real-token-for-local-testing"},
).run()
128 changes: 128 additions & 0 deletions src/steamship/agents/mixins/event_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import logging
import uuid
from abc import ABC
from enum import Enum
from typing import List, Optional

from pydantic import BaseModel, Field

from steamship import Block, Steamship
from steamship.agents.mixins.transports.telegram import TelegramTransport
from steamship.agents.mixins.transports.transport import Transport
from steamship.agents.service.agent_service import AgentService
from steamship.invocable import post
from steamship.invocable.package_mixin import PackageMixin
from steamship.utils.kv_store import KeyValueStore


class EventType(str, Enum):
"""What kind of action to schedule.

TODO: Future types could include:
- RUN_ACTION
- ADD_USER_INPUT
- ADD_SYSTEM_INPUT
"""

SEND_MESSAGE = "send-message"


class Event(BaseModel):
event_type: EventType = Field(description="The event type.")
input: List[Block] = Field(description="Input to the event.")
context_id: str = Field(
description="The context_id of the conversation to which this message should be added"
)
append_to_chat_history: bool = Field(
True, description="Whether to append this message to the chat history as the Assistant."
)


class ScheduledEvent(Event):
overwrite_key: str = Field(
description="Any event scheduled with the same key will replace this one."
)
otp: str = Field(description="One time password that must match, or else the event is invalid.")


class EventScheduler(PackageMixin, ABC):
"""Schedules events in a way that tries not to overwhelm a user."""

client: Steamship
transports: List[Transport]
agent_service: AgentService

def __init__(
self,
client: Steamship,
agent_service: AgentService,
transports: List[Transport],
kv_store_identifier: str = "event-scheduler",
):
self.client = client
self.transports = transports or []
self.agent_service = agent_service
self.kvstore = KeyValueStore(client, kv_store_identifier)

@post("schedule_event")
def schedule_event(
self,
input: List[Block],
context_id: str,
overwrite_key: Optional[str] = None,
append_to_chat_history: bool = True,
):
"""Schedules an outreach to send.

If overwrite_key is provided, then this will overwrite any previously scheduled outreach on the same overwrite
key. This provides a very easy way to schedule an agent to check in XX minutes after the last interaction: simply
always schedule an outreach, after each interaction, with the overwrite key `resume_conversation` (or other) and
it will always push forward the prior scheduled outreach.
"""

if overwrite_key is None:
overwrite_key = str(uuid.uuid4())

scheduled_event = ScheduledEvent(
overwrite_key=overwrite_key,
otp=str(uuid.uuid4()),
input=input,
context_id=context_id,
append_to_chat_history=append_to_chat_history,
)

# Write it to the kv store with the provided (or random) overwrite key.
self.kvstore.set(overwrite_key, scheduled_event.dict())

# Schedule the possible sending of this outreach. It's only the "possible" sending because a future
# scheduled outreach might overwrite this overwrite_key with a different otp,

@post("maybe_run_scheduled_event")
def maybe_run_scheduled_event(self, overwrite_key: str, overwrite_checksum: str):
"""Look up in KV Store if it's still valid."""
pass

@post("run_event")
def run_event(self, event: Event):
"""Run the provided event immediately."""

context = self.agent_service.build_default_context(event.context_id)

if event.event_type == EventType.SEND_MESSAGE:
for block in event.input:
# Make sure Telegram is included in the emit list.
for transport in self.transports:
if isinstance(transport, TelegramTransport):
context.emit_funcs.append(transport.build_emit_func(event.context_id))
else:
logging.error(
f"Outreach scheduler does not yet support transport type {transport}"
)

# Emit the message. Running on localhost, this will only show up as a logging message since the
# agent doesn't have a push connection to the REPL.
self.agent_service.emit(block)

# If you want it to be preserved to the ChatHistory, you can add it.
if event.append_to_chat_history:
context.chat_history.append_assistant_message(block.text)
Loading
Loading