Event-Driven AI Workflows
data:image/s3,"s3://crabby-images/5be92/5be92a226c25a677191df4277a1f808d81223fb7" alt="Mastering CrewAI Secrets 2: Build Intelligent Workflows with Flows"
Workflows allow us to coordinate tasks and manage teams.
Previous Chapter:
data:image/s3,"s3://crabby-images/3cc81/3cc816a90fec50db5bef34fd9efd58864b6a78d3" alt="Mastering CrewAI Secrets 2: Build Intelligent Workflows with Flows"
-
• We can link multiple teams and tasks together to build AI workflows. -
• Workflows share state between different tasks. -
• It is event-driven, and tasks can trigger subsequent tasks based on specific events. -
• We can implement conditional logic, loops, and branches in workflows.
crewai create flow flow-example
This will create a new template project for us. In the <span>main.py</span>
file, we have the following code.
#!/usr/bin/env python
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
result = (
PoemCrew()
.crew()
.kickoff(inputs={"sentence_count": self.state.sentence_count})
)
print("Poem generated", result.raw)
self.state.poem = result.raw
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
def kickoff():
poem_flow = PoemFlow()
poem_flow.kickoff()
def plot():
poem_flow = PoemFlow()
poem_flow.plot()
if __name__ == "__main__":
kickoff()
<span>PoemState</span>
class inherits from <span>BaseModel</span>
, used to manage the state of the workflow. It contains:
-
• <span>sentence_count</span>
: Tracks the number of sentences to generate. -
• <span>poem</span>
: Saves the generated poem as a string.
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
<span>PoemFlow</span>
class inherits from <span>Flow</span>
and implements the workflow logic. Each method represents a step in the workflow.
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
self.state.sentence_count = randint(1, 5)
<span>generate_sentence_count</span>
is the entry point of the workflow, marked with the <span>@start</span>
decorator.
It simply updates the workflow state <span>self.state.sentence_count</span>
.
<span>generate_poem</span>
is triggered after <span>generate_sentence_count</span>
completes, using the <span>@listen(generate_sentence_count)</span>
decorator.
Its job is to call <span>PoemCrew</span>
to generate a poem with the specified number of sentences.
The result (<span>result.raw</span>
) is stored in the state (<span>self.state.poem</span>
).
class PoemFlow(Flow[PoemState]):
...
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
result = (
PoemCrew()
.crew()
.kickoff(inputs={"sentence_count": self.state.sentence_count})
)
print("Poem generated", result.raw)
self.state.poem = result.raw
I won’t go into detail about <span>PoemCrew</span>
, as we discussed the crew in the previous chapter. It is just a crew, and <span>generate_poem</span>
is used to start it.
<span>save_poem</span>
is triggered after the poem is generated <span>@listen(generate_poem)</span>
Let’s run it.
First, place this code into flow_example/src/flow_example/crews/poem_crew/poem_crew.py
from dotenv import load_dotenv
load_dotenv()
Then:
python src/flow_example/main.py
Generating sentence count
Generating poem
## Agent: CrewAI Poem Writer
### Task: Write a poem about how great CrewAI is. Ensure the poem is engaging and follows the specified sentence count of 1.
## Agent: CrewAI Poem Writer
### Final Answer:
In the whirlwind of bytes and brilliance, CrewAI dances through data, weaving solutions with digital grace, leaving us all in awe.
Poem generated In the whirlwind of bytes and brilliance, CrewAI dances through data, weaving solutions with digital grace, leaving us all in awe.
Saving poem
It creates a <span>poem.txt</span>
file and writes the poem into it.
In the whirlwind of bytes and brilliance, CrewAI dances through data, weaving solutions with digital grace, leaving us all in awe.
When we run the above Python code, it executes the <span>kickoff</span>
method.
We can also run the <span>plot</span>
method, which generates an HTML file to display the workflow created by the code.
def kickoff():
poem_flow = PoemFlow()
poem_flow.kickoff()
def plot():
poem_flow = PoemFlow()
poem_flow.plot()
if __name__ == "__main__":
#kickoff()
plot()
When we run this code (calling only the <span>plot</span>
function), we get the following output:
data:image/s3,"s3://crabby-images/5daf9/5daf95157018d3f30a4f2f99271552bb51c1c572" alt="Mastering CrewAI Secrets 2: Build Intelligent Workflows with Flows"
Essentially, we are controlling the flow and process of a single crew <span>PoemCrew</span>
. We created a class (<span>PoemState</span>
) to manage state data, established an entry point for the workflow, allowed the crew to execute its tasks, and finally set up an end stage.
We can do even more; we can use <span>or_</span>
, <span>and_</span>
, and <span>@router</span>
to control the flow.
<span>or_</span>
function allows a listener method to be triggered when any specified method outputs.
from crewai.flow.flow import Flow, listen, or_, start
class ExampleFlow(Flow):
@start()
def method_a(self):
return "Output A"
@start()
def method_b(self):
return "Output B"
@listen(or_(method_a, method_b))
def listener_method(self, output):
print(f"Triggered: {output}")
If <span>method_a</span>
or <span>method_b</span>
completes, <span>listener_method</span>
will be triggered.
<span>and_</span>
function makes a listener method trigger only when all specified methods output.
from crewai.flow.flow import Flow, listen, and_, start
class ExampleFlow(Flow):
@start()
def method_x(self):
return "Output X"
@start()
def method_y(self):
return "Output Y"
@listen(and_(method_x, method_y))
def listener_method(self, outputs):
print(f"Triggered after: {outputs}")
<span>listener_method</span>
is triggered only when <span>method_x</span>
and <span>method_y</span>
both output.
<span>@router()</span>
decorator allows conditional routing in the workflow based on method outputs.
import random
from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel
class ExampleState(BaseModel):
success_flag: bool = False
class RouterFlow(Flow[ExampleState]):
@start()
def start_method(self):
print("Starting structured flow")
random_boolean = random.choice([True, False])
self.state.success_flag = random_boolean
@router(start_method)
def second_method(self):
if self.state.success_flag:
return "Success"
else:
return "Failure"
@listen("Success")
def third_method(self):
print("Third method running")
@listen("Failure")
def fourth_method(self):
print("Fourth method running")
flow = RouterFlow()
flow.kickoff()
<span>second_method</span>
routes the flow to <span>"Success"</span>
or <span>"Failure"</span>
path based on <span>success_flag</span>
state. Depending on the routing, it executes <span>third_method</span>
(success) or <span>fourth_method</span>
(failure), ensuring the workflow adapts dynamically to the state.
Learn More
https://docs.crewai.com/concepts/flows
data:image/s3,"s3://crabby-images/bc9eb/bc9eb4202c8a38e0fdd2ef9210d292049d163c86" alt="Mastering CrewAI Secrets 2: Build Intelligent Workflows with Flows"