-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add @persist decorator with FlowPersistence interface #1892
base: main
Are you sure you want to change the base?
Conversation
- Add FlowPersistence abstract base class - Implement SQLiteFlowPersistence backend - Add @persist decorator for flow state persistence - Add tests for flow persistence functionality Co-Authored-By: Joe Moura <[email protected]>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
⚙️ Control Options:
|
- Adopt main's FlowState base class while preserving persistence functionality - Keep main's platform-specific resolution markers in uv.lock - Fix VCR decorator in crew_test.py Co-Authored-By: Joe Moura <[email protected]>
- Remove stray merge conflict markers - Keep main's comprehensive platform-specific resolution markers - Preserve all required dependencies for persistence functionality Co-Authored-By: Joe Moura <[email protected]>
- Resolve NVIDIA CUDA solver dependency conflicts - Use main's comprehensive platform checks - Ensure all merge conflict markers are removed - Preserve persistence-related dependencies Co-Authored-By: Joe Moura <[email protected]>
- Resolve NVIDIA CUSPARSE dependency conflicts - Use main's comprehensive platform checks - Complete systematic check of entire uv.lock file - Ensure all merge conflict markers are removed Co-Authored-By: Joe Moura <[email protected]>
- Resolve triton package filelock dependency conflict - Use main's comprehensive platform checks - Complete final systematic check of entire uv.lock file - Ensure TOML file structure is valid Co-Authored-By: Joe Moura <[email protected]>
- Remove duplicate assertion in test_multimodal_agent_live_image_analysis - Clean up conflict markers - Preserve test functionality Co-Authored-By: Joe Moura <[email protected]>
- Remove remaining conflict marker at end of file - Preserve test functionality - Complete conflict resolution Co-Authored-By: Joe Moura <[email protected]>
…onflicts Co-Authored-By: Joe Moura <[email protected]>
Co-Authored-By: Joe Moura <[email protected]>
Disclaimer: This review was made by a crew of AI Agents. Code Review Comments on Flow Persistence ImplementationGeneral FeedbackThe PR introducing flow state persistence in crewAI via SQLite generally shows a sound structure and an understanding of the requirements for flow management. It lays the groundwork for an extensible and efficient persistence system, but there are multiple enhancements needed to boost the overall quality, maintainability, and robustness of the implementation. Key Areas for Improvement1. Type Safety in Flow StatesCurrent Concern:
Recommendation: Example Improvement: from typing import TypeVar, Union, Dict, Any
StateType = TypeVar('StateType', bound=Union[Dict[str, Any], BaseModel])
class Flow(Generic[StateType]):
def _create_initial_state(self) -> StateType:
# Enhanced type safety here 2. Error Handling in PersistenceCurrent Concern:
Recommendation: Example Improvement: class FlowPersistenceError(Exception):
pass
class SQLiteFlowPersistence(FlowPersistence):
def save_state(self, flow_uuid: str, state_data: Union[Dict[str, Any], BaseModel]):
try:
with sqlite3.connect(self.db_path) as conn:
# Transaction handling
except sqlite3.Error as e:
raise FlowPersistenceError(f"Database error: {e}") 3. Handling of DecoratorsCurrent Concern:
Recommendation: Example Improvement: def persist(persistence: FlowPersistence):
def decorator(method: Callable[..., T]) -> Callable[..., T]:
original_attrs = {attr: getattr(method, attr) for attr in expected_attributes}
# Decorator logic preserving attributes 4. State Validation and Schema VersioningCurrent Concern:
Recommendation: Example Improvement: class FlowState(BaseModel):
version: str = "1.0" 5. Comprehensive TestingCurrent Concern:
Recommendation: Example Improvement: @pytest.mark.asyncio
async def test_persistence_error_handling():
# Scope and implementation of edge testing ConclusionThis implementation lays a strong foundation for managing flow state persistence within crewAI but must address the issues identified for long-term maintainability and reliability. Focusing on type safety, enhanced error handling, robust decorator handling, thorough validations, and comprehensive tests will solidify the codebase against future changes and ensure user trust in the functionality. The PR has demonstrated potential but requires additional refinements to meet the required standards for production readiness. Thank you for your hard work, and I look forward to seeing revisions based on this feedback. |
Disclaimer: This review was made by a crew of AI Agents. Code Review Comment: Flow State Persistence ImplementationOverviewThe recent changes implement a new state persistence system for crewAI flows, utilizing SQLite as the backend. This implementation notably consists of abstract interfaces, decorators, and concrete storage solutions, facilitating improved flow management. Key Analysis Insights & Recommendations1. Flow State ArchitectureIssues Found:
Recommendations: class FlowStateManager:
def __init__(self, initial_state: Optional[Union[Type[T], T]] = None):
self.initial_state = initial_state
def create_state(self) -> T:
state = self._initialize_base_state()
state = self._ensure_state_id(state)
return self._validate_state(state) 2. Persistence ImplementationIssues Found:
Recommendations: class SQLiteFlowPersistence(FlowPersistence):
@contextmanager
def get_connection(self):
conn = self._connection_pool.acquire()
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise PersistenceError(f"Database operation failed: {str(e)}") from e 3. Type Safety & Error HandlingIssues Found:
Recommendations: class InvalidStateError(FlowStateError):
"""Raised when state validation fails"""
pass
def validate_state_model(state_model: Type[BaseModel]) -> None:
if not hasattr(state_model, 'id'):
raise InvalidStateError("State model must have 'id' field") 4. Test CoverageIssues Found:
Recommendations: @pytest.mark.asyncio
async def test_concurrent_state_persistence():
"""Test concurrent state updates for robustness."""
... General Recommendations
ConclusionThese changes underpin a robust foundation for flow state persistence. However, addressing the identified issues and implementing the recommendations will significantly enhance the maintainability, performance, and reliability of the system. The critical focus should be on:
These enhancements will pave the way toward a more resilient and maintainable system. |
Disclaimer: This review was made by a crew of AI Agents. Code Review Comment: Flow State Persistence ImplementationOverviewThis pull request introduces state persistence capabilities to the CrewAI flow system using SQLite as the initial backend. While the implementation adheres to solid design principles, there are several opportunities for improvement regarding type safety, error handling, and overall code organization. Key Findings and Suggestions1. Flow State Type SafetyEnhancing type safety will improve code reliability. The current implementation mixes dictionary and Pydantic model states, which can lead to runtime errors. Improvement Example: class FlowState(BaseModel):
id: str
def _create_initial_state(self) -> T:
if not issubclass(self.initial_state, FlowState):
raise TypeError("Flow state must inherit from FlowState") 2. Persistence Backend InterfaceThe abstraction of the SQLite persistence layer should be improved to enhance decoupling and maintainability. Introducing an abstract base class for persistence will provide clear contracts for future implementations. Improvement Example: @dataclass
class PersistenceEvent:
flow_uuid: str
method_name: str
state: FlowState
timestamp: datetime
class FlowPersistence(abc.ABC):
@abstractmethod
async def save_state(self, event: PersistenceEvent) -> None:
pass 3. Robust Error HandlingCurrent error handling is insufficient, as it does not account for retries on persistence failures. Implementing retry logic can improve the resilience of the persistence layer. Improvement Example: class PersistenceError(Exception):
pass
def _persist_state(flow_instance: Any, method_name: str):
...
for attempt in range(max_retries):
...
if attempt == max_retries - 1:
raise PersistenceError(f"Failed to persist state after {max_retries} attempts") from e 4. Enhanced State ValidationAdding comprehensive validation ensures that the flow state strictly adheres to expected structure, preventing bugs down the line. Improvement Example: class FlowState(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
... 5. Documentation and Type HintsImproving documentation and type hints throughout the codebase will enhance clarity and usability for future developers. This will also help in maintaining the code by making the API clear. Improvement Example: def persist(
persistence: P,
*,
...
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""Decorator to persist flow state after method execution.
Args:
persistence: Flow persistence backend implementation
...
""" 6. Testing ImprovementsTo ensure functionality is maintained and enhanced, additional tests should be implemented. This includes testing various edge cases associated with state persistence. Testing Example: @pytest.mark.asyncio
async def test_persistence_retry_behavior():
... Historical Context and Related InsightsIn previous pull requests focused on state management, it was noted that maintaining type safety and implementing robust error handling significantly improved code maintainability. It is essential to reference those insights as they reinforce the need for similar patterns in the current changes. Additional Recommendations
ConclusionThis pull request lays a strong foundation for flow state persistence within the CrewAI framework. However, addressing the suggested improvements concerning type safety, error handling, and documentation will create a more robust and maintainable system. Thank you for your efforts on this feature, and I look forward to seeing these enhancements in the code. |
Co-Authored-By: Joe Moura <[email protected]>
Co-Authored-By: Joe Moura <[email protected]>
src/crewai/flow/flow.py
Outdated
# For dict states, preserve existing ID or use provided one | ||
if "id" in inputs: | ||
# Create new state dict with provided ID | ||
new_state = dict(inputs) | ||
self._state.clear() | ||
self._state.update(new_state) | ||
else: | ||
# Preserve existing ID if any | ||
current_id = self._state.get("id") | ||
self._state.update(inputs) | ||
if current_id: | ||
self._state["id"] = current_id | ||
elif "id" not in self._state: | ||
self._state["id"] = str(uuid4()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I follow this, the logic is that if an id is passed the flow should load the state from the persistency layer, and override just new fields being sent, but before overriding it shoudl reload all the state fields based on the id
Co-Authored-By: Joe Moura <[email protected]>
Co-Authored-By: Joe Moura <[email protected]>
- Add class-level @persist decorator support - Set SQLiteFlowPersistence as default backend - Use db_storage_path for consistent database location - Improve async method handling and type safety - Add comprehensive docstrings and examples Co-Authored-By: Joe Moura <[email protected]>
Co-Authored-By: Joe Moura <[email protected]>
Co-Authored-By: Joe Moura <[email protected]>
Co-Authored-By: Joe Moura <[email protected]>
Co-Authored-By: Joe Moura <[email protected]>
Co-Authored-By: Joe Moura <[email protected]>
Class-Level Flow Persistence with SQLite Default
This PR enhances the flow persistence functionality by:
@persist
decorator supportdb_storage_path
for consistent database locationExample Usage
Changes
persist
decorator to support class-level decorationdb_storage_path
to return full database file pathTesting
Link to Devin run: https://app.devin.ai/sessions/22958d99277c4b4087bb1fa591ca41f6