Skip to content

Commit

Permalink
refactor xapiskill and add image generation
Browse files Browse the repository at this point in the history
  • Loading branch information
shihao-guo committed Jan 29, 2025
1 parent 8e6effa commit c913f90
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 254 deletions.
97 changes: 56 additions & 41 deletions my_digital_being/activities/activity_post_a_tweet.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import logging
from typing import Dict, Any, List
from typing import Dict, Any, List, Tuple

from framework.activity_decorator import activity, ActivityBase, ActivityResult
from framework.api_management import api_manager
from framework.memory import Memory
from skills.skill_chat import chat_skill
from skills.skill_generate_image import ImageGenerationSkill
from skills.skill_x_api import XAPISkill

logger = logging.getLogger(__name__)


@activity(
name="post_a_tweet",
energy_cost=0.4,
cooldown=10000, # 1 hour
required_skills=["twitter_posting"],
cooldown=3600, # 1 hour
required_skills=["twitter_posting", "image_generation"],
)
class PostTweetActivity(ActivityBase):
"""
Expand All @@ -26,11 +28,13 @@ class PostTweetActivity(ActivityBase):
def __init__(self):
super().__init__()
self.max_length = 280
# The Composio action name from your logs
self.composio_action = "TWITTER_CREATION_OF_A_POST"
# If you know your Twitter username, you can embed it in the link
# or fetch it dynamically. Otherwise, substitute accordingly:
self.twitter_username = "YourUserName"
# set this to True if you want to generate an image for the tweet
self.image_generation_enabled = True
self.default_size = (1024, 1024) # Added for image generation
self.default_format = "png" # Added for image generation

async def execute(self, shared_data) -> ActivityResult:
try:
Expand Down Expand Up @@ -61,8 +65,18 @@ async def execute(self, shared_data) -> ActivityResult:
if len(tweet_text) > self.max_length:
tweet_text = tweet_text[: self.max_length - 3] + "..."

# 4) Post the tweet via Composio
post_result = self._post_tweet_via_composio(tweet_text)
# 4) Generate an image based on the tweet text
if self.image_generation_enabled:
image_prompt, media_urls = await self._generate_image_for_tweet(tweet_text, personality_data)
else:
image_prompt, media_urls = None, []

# 5) Post the tweet via X API
x_api = XAPISkill({
"enabled": True,
"twitter_username": self.twitter_username
})
post_result = await x_api.post_tweet(tweet_text, media_urls)
if not post_result["success"]:
error_msg = post_result.get(
"error", "Unknown error posting tweet via Composio"
Expand All @@ -77,7 +91,7 @@ async def execute(self, shared_data) -> ActivityResult:
else None
)

# 5) Return success, adding link & prompt in metadata
# 6) Return success, adding link & prompt in metadata
logger.info(f"Successfully posted tweet: {tweet_text[:50]}...")
return ActivityResult(
success=True,
Expand All @@ -88,7 +102,9 @@ async def execute(self, shared_data) -> ActivityResult:
"model": chat_response["data"].get("model"),
"finish_reason": chat_response["data"].get("finish_reason"),
"tweet_link": tweet_link,
"prompt_used": prompt_text, # <--- includes the full prompt
"prompt_used": prompt_text,
"image_prompt_used": image_prompt,
"image_count": len(media_urls),
},
)

Expand Down Expand Up @@ -159,38 +175,37 @@ def _build_chat_prompt(
f"but not repeating old tweets. Avoid hashtags or repeated phrases.\n"
)

def _post_tweet_via_composio(self, tweet_text: str) -> Dict[str, Any]:
def _build_image_prompt(self, tweet_text: str, personality: Dict[str, Any]) -> str:
personality_str = "\n".join(f"{t}: {v}" for t, v in personality.items())
return f"Our digital being has these personality traits:\n" \
f"{personality_str}\n\n" \
f"And is creating a tweet with the text: {tweet_text}\n\n" \
f"Generate an image that represents the story of the tweet and reflects the personality traits. Do not include the tweet text in the image."

async def _generate_image_for_tweet(self, tweet_text: str, personality_data: Dict[str, Any]) -> Tuple[str, List[str]]:
"""
Post a tweet using the "Creation of a post" Composio action.
The response returns {'successfull': True, ...}, not 'success'.
We'll check 'successfull' or fallback if needed.
Generate an image for the tweet and upload it to Twitter.
Returns a tuple of (image_prompt, media_urls).
If generation fails, returns (None, []).
"""
try:
from framework.composio_integration import composio_manager

logger.info(
f"Posting tweet via Composio action='{self.composio_action}', text='{tweet_text[:50]}...'"
)

response = composio_manager._toolset.execute_action(
action=self.composio_action,
params={"text": tweet_text},
entity_id="MyDigitalBeing",
logger.info("Decided to generate an image for tweet")
image_skill = ImageGenerationSkill({
"enabled": True,
"max_generations_per_day": 50,
"supported_formats": ["png", "jpg"],
})

if await image_skill.can_generate():
image_prompt = self._build_image_prompt(tweet_text, personality_data)
image_result = await image_skill.generate_image(
prompt=image_prompt,
size=self.default_size,
format=self.default_format
)

# The actual success key is "successfull" (with 2 Ls)
success_val = response.get("success", response.get("successfull"))
if success_val:
data_section = response.get("data", {})
nested_data = data_section.get("data", {})
tweet_id = nested_data.get("id")
return {"success": True, "tweet_id": tweet_id}
else:
return {
"success": False,
"error": response.get("error", "Unknown or missing success key"),
}

except Exception as e:
logger.error(f"Error in Composio tweet post: {e}", exc_info=True)
return {"success": False, "error": str(e)}

if image_result.get("success") and image_result.get("image_data", {}).get("url"):
return image_prompt, [image_result["image_data"]["url"]]
else:
logger.warning("Image generation not available, proceeding with text-only tweet")

return None, []
112 changes: 9 additions & 103 deletions my_digital_being/activities/activity_post_recent_memory_tweet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from framework.api_management import api_manager
from framework.memory import Memory
from skills.skill_chat import chat_skill
from skills.skill_x_api import XAPISkill

logger = logging.getLogger(__name__)

Expand All @@ -27,7 +28,6 @@ class PostRecentMemoriesTweetActivity(ActivityBase):
def __init__(self, num_activities_to_fetch: int = 10):
super().__init__()
self.max_length = 280
self.composio_action = "TWITTER_CREATION_OF_A_POST"
self.twitter_username = "YourUserName"

# Activity types to ignore in memory results
Expand Down Expand Up @@ -88,9 +88,8 @@ async def execute(self, shared_data) -> ActivityResult:
new_memories=new_memories,
)

# 6) Extract drawing URLs and upload to Twitter media
# 6) Extract drawing URLs from memories
drawing_urls = self._extract_drawing_urls(new_memories)
media_ids = await self._upload_drawings_to_twitter(drawing_urls)

# 7) Use chat skill to generate the tweet text
chat_response = await chat_skill.get_chat_completion(
Expand All @@ -108,8 +107,12 @@ async def execute(self, shared_data) -> ActivityResult:
if len(tweet_text) > self.max_length:
tweet_text = tweet_text[: self.max_length - 3] + "..."

# 8) Post to Twitter via Composio
post_result = self._post_tweet_via_composio(tweet_text, media_ids)
# 8) Post to Twitter via X API with any extracted images
x_api = XAPISkill({
"enabled": True,
"twitter_username": self.twitter_username
})
post_result = await x_api.post_tweet(tweet_text, drawing_urls)
if not post_result["success"]:
error_msg = post_result.get(
"error", "Unknown error posting tweet via Composio"
Expand Down Expand Up @@ -141,6 +144,7 @@ async def execute(self, shared_data) -> ActivityResult:
"prompt_used": prompt_text,
"model": chat_response["data"].get("model"),
"finish_reason": chat_response["data"].get("finish_reason"),
"image_count": len(drawing_urls),
},
)

Expand Down Expand Up @@ -264,42 +268,6 @@ def _build_chat_prompt(
)
return prompt

def _post_tweet_via_composio(self, tweet_text: str, media_ids: List[str]) -> Dict[str, Any]:
"""
Post tweet via Composio with optional media_ids.
"""
try:
from framework.composio_integration import composio_manager

logger.info(
f"Posting tweet via Composio action='{self.composio_action}', text='{tweet_text[:50]}...', media_count={len(media_ids)}"
)

response = composio_manager._toolset.execute_action(
action=self.composio_action,
params={
"text": tweet_text,
"media__media__ids": media_ids if media_ids else None
},
entity_id="MyDigitalBeing",
)

success_val = response.get("success", response.get("successfull"))
if success_val:
data_section = response.get("data", {})
nested_data = data_section.get("data", {})
tweet_id = nested_data.get("id")
return {"success": True, "tweet_id": tweet_id}
else:
return {
"success": False,
"error": response.get("error", "Unknown or missing success key"),
}

except Exception as e:
logger.error(f"Error in Composio tweet post: {e}", exc_info=True)
return {"success": False, "error": str(e)}

def _extract_drawing_urls(self, memories: List[str]) -> List[str]:
"""
Extract URLs from all DrawActivity entries in memories.
Expand Down Expand Up @@ -329,65 +297,3 @@ def _extract_drawing_urls(self, memories: List[str]) -> List[str]:
continue

return drawing_urls

async def _upload_drawings_to_twitter(self, drawing_urls: List[str]) -> List[str]:
"""
Downloads images from URLs and uploads them to Twitter via Composio.
Returns a list of Twitter media IDs.
"""
import aiohttp
import base64
from framework.composio_integration import composio_manager

media_ids = []

if not drawing_urls:
return media_ids

async with aiohttp.ClientSession() as session:
for url in drawing_urls:
try:
# Download image
async with session.get(url) as response:
if response.status != 200:
logger.warning(f"Failed to download image from {url}: {response.status}")
continue

image_data = await response.read()

# Convert to base64
base64_image = base64.b64encode(image_data).decode('utf-8')

# Extract filename from URL or use default
filename = url.split('/')[-1].split('?')[0] or 'image.png'

# Upload to Twitter via Composio
upload_response = composio_manager._toolset.execute_action(
action="TWITTER_MEDIA_UPLOAD_MEDIA",
params={
"media": {
"name": filename,
"content": base64_image
}
},
entity_id="MyDigitalBeing"
)

# Composio returns 'successfull' instead of 'successful'
if upload_response.get("successful") or upload_response.get("successfull"):
media_id = upload_response.get("media_id") or upload_response.get("data", {}).get("media_id")
if media_id:
media_ids.append(media_id)
logger.info(f"Successfully uploaded image to Twitter, media_id: {media_id}")
else:
logger.warning(f"Upload succeeded but no media_id returned. Response: {upload_response}")
else:
error = upload_response.get("error", "Unknown error")
logger.warning(f"Failed to upload image to Twitter: {error}")

except Exception as e:
logger.error(f"Error uploading image to Twitter: {e}", exc_info=True)
continue

logger.debug(f"Uploaded drawing URLs to Twitter media: {media_ids}")
return media_ids
Loading

0 comments on commit c913f90

Please sign in to comment.