使用 CrewAI 和 AG-UI 构建全栈股票投资组合代理
由 Mux 主办的 DEV 全球展示挑战赛:展示你的项目!
本指南将介绍如何将 CrewAI 代理与 AG-UI 协议集成。此外,我们还将介绍如何将 AG-UI + CrewAI 代理与 CopilotKit 集成,以便与代理进行聊天并在前端实时显示其回复。
在正式开始之前,我们先来看看我们将要涵盖的内容:
-
AG-UI协议是什么?
-
将CrewAI 代理与 AG-UI 协议集成
-
使用 CopilotKit 将前端集成到 AG-UI + CrewAI 代理中
以下是我们将要构建的内容预览:
AG-UI协议是什么?
由 CopilotKit 开发的 Agent User Interaction Protocol (AG-UI) 是一种开源、轻量级、基于事件的协议,可促进前端和 AI 代理之间丰富的实时交互。
AG-UI 协议支持事件驱动通信、状态管理、工具使用和流式 AI 代理响应。
为了在前端和您的 AI 代理之间传递信息,AG-UI 使用以下事件:
-
生命周期事件:这些事件标志着代理任务执行的开始或结束。生命周期事件包括开始
RUN_STARTED和RUN_FINISHED结束事件。 -
短信事件:这些事件处理流式代理对前端的响应。短信事件包括
TEXT_MESSAGE_START、TEXT_MESSAGE_CONTENT和TEXT_MESSAGE_END事件。 -
工具调用事件:这些事件管理代理的工具执行。工具调用事件包括
TOOL_CALL_START、TOOL_CALL_ARGS和TOOL_CALL_END事件。 -
状态管理事件:这些事件使前端和AI代理的状态保持同步。状态管理事件包括
STATE_SNAPSHOT事件STATE_DELTA。
您可以访问 AG-UI文档,了解更多关于 AG-UI协议及其架构的信息。
现在我们已经了解了 AG-UI 协议是什么,让我们看看如何将其与 CrewAI 代理框架集成。
我们开始吧!
先决条件
要完全理解本教程,您需要对 React 或 Next.js 有一定的了解。
我们还将利用以下资源:
-
Python 是一种流行的编程语言,可用于使用 LangGraph 构建 AI 代理;请确保您的计算机上已安装 Python。
-
Crew AI—— 一个精简、速度极快的 Python 框架,使您能够构建协同工作的 AI 代理团队来处理复杂的任务。
-
OpenAI API 密钥 - 用于启用 GPT 模型执行各种任务的 API 密钥;对于本教程,请确保您有权访问 GPT-4 模型。
-
CopilotKit - 一个开源的辅助驾驶框架,用于构建自定义 AI 聊天机器人、应用内 AI 代理和文本区域。
将 CrewAI 代理与 AG-UI 协议集成
首先,克隆Open AG UI Demo 存储库,该存储库包含一个基于 Python 的后端(代理)和一个 Next.js 前端(前端)。
接下来,导航到后端目录:
cd agent
然后使用 Poetry 安装依赖项:
poetry install
之后,创建一个 .env 包含OpenAI API密钥的文件:
OPENAI_API_KEY=<<your-OpenAI-key-here>>
然后使用以下命令运行代理:
poetry run python main.py
要测试 AG-UI + CrewAI 集成,请在https://reqbin.com/curl上运行以下 curl 命令。
curl -X POST "http://localhost:8000/crewai-agent" \
-H "Content-Type: application/json" \
-d '{
"thread_id": "test_thread_123",
"run_id": "test_run_456",
"messages": [
{
"id": "msg_1",
"role": "user",
"content": "Analyze AAPL stock with a $10000 investment from 2023-01-01"
}
],
"tools": [],
"context": [],
"forwarded_props": {},
"state": {}
}'
现在让我们看看如何将 AG-UI 协议与 CrewAI 代理集成。
步骤 1:创建 CrewAI 代理工作流程
在将 AG-UI 协议与 CrewAI 代理集成之前,请按照agent/stock_analysis.py文件中所示创建 CrewAI 代理工作流程。
# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================
class StockAnalysisFlow(Flow):
"""
Main workflow class that orchestrates the stock analysis process.
This flow consists of multiple stages:
1. start() - Initialize the system prompt with portfolio data
2. chat() - Parse user input and extract investment parameters
3. simulation() - Gather historical stock data
4. allocation() - Calculate portfolio allocation and performance
5. insights() - Generate bull/bear insights about the investments
6. end() - Return final state
"""
@start()
def start(self):
"""
Step 1: Initialize the workflow
- Replace placeholder in system prompt with actual portfolio data
- This sets up the AI assistant with context about the current portfolio
"""
# Inject current portfolio data into the system prompt template
self.state['state']["messages"][0].content = system_prompt.replace(
"{PORTFOLIO_DATA_PLACEHOLDER}", json.dumps(self.state["investment_portfolio"])
)
return self.state
// ...
@listen(or_("chat", "insights"))
def end(self):
"""
Step 6: Final step - return the complete state
- This method is called from either 'chat' (if no investment data)
or 'insights' (after successful analysis)
- Returns the final state with all analysis results
"""
return self.state
步骤 2:使用 FastAPI 创建端点
定义好 CrewAI 代理工作流程后,创建一个 FastAPI 端点,并按照agent/main.py文件中所示导入 CrewAI 代理工作流程。
# ===============================================================================
# IMPORTS AND SETUP
# ===============================================================================
# FastAPI framework for building the web API
from fastapi import FastAPI
from fastapi.responses import StreamingResponse # For streaming real-time responses to the client
# Standard Python libraries
import uuid # For generating unique identifiers
from typing import Any # Type hints for flexible data types
import os # For environment variables
import uvicorn # ASGI server for running FastAPI
import asyncio # For asynchronous programming
# AG UI core components for agent communication and event handling
from ag_ui.core import (
RunAgentInput, # Input data structure for agent runs
StateSnapshotEvent, # Event for sending state snapshots
EventType, # Enumeration of event types
RunStartedEvent, # Event emitted when the agent run starts
RunFinishedEvent, # Event emitted when the agent run completes
TextMessageStartEvent, # Event for starting text message streaming
TextMessageEndEvent, # Event for ending text message streaming
TextMessageContentEvent, # Event for streaming text message content
ToolCallStartEvent, # Event for starting tool call execution
ToolCallEndEvent, # Event for ending tool call execution
ToolCallArgsEvent, # Event for streaming tool call arguments
StateDeltaEvent # Event for updating specific parts of state
)
from ag_ui.encoder import EventEncoder # Encoder for converting events to streamable format
# Import our custom stock analysis workflow
from stock_analysis import StockAnalysisFlow
# CopilotKit state management
from copilotkit import CopilotKitState
# ===============================================================================
# APPLICATION SETUP
# ===============================================================================
# Initialize FastAPI application instance
app = FastAPI()
# ===============================================================================
# MAIN API ENDPOINT
# ===============================================================================
@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
"""
Main API endpoint for processing stock analysis requests.
This endpoint:
1. Receives user input and current state from the frontend
2. Streams real-time events back to the client during processing
3. Runs the StockAnalysisFlow workflow asynchronously
4. Returns results via Server-Sent Events (SSE) streaming
Args:
input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs
Returns:
StreamingResponse: Real-time stream of events during agent execution
"""
// ...
# ===============================================================================
# SERVER STARTUP AND CONFIGURATION
# ===============================================================================
def main():
"""
Main function to start the uvicorn server.
This function:
- Reads the port from environment variables (defaults to 8000)
- Configures uvicorn server settings
- Starts the server with hot reload enabled for development
"""
# Get port from environment variable or use default
port = int(os.getenv("PORT", "8000"))
# Start uvicorn server with configuration
uvicorn.run(
"main:app", # Module and app instance
host="0.0.0.0", # Listen on all network interfaces
port=port, # Port number
reload=True, # Enable hot reload for development
)
if __name__ == "__main__":
"""
Entry point when script is run directly.
Starts the FastAPI server using uvicorn.
"""
main()
步骤 3:定义事件生成器
创建 FastAPI 端点后,定义一个事件生成器,生成 AG-UI 协议事件流,初始化事件编码器,并将流式响应返回给客户端或前端,如文件中所示agent/main.py。
@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
"""
Main API endpoint for processing stock analysis requests.
This endpoint:
1. Receives user input and current state from the frontend
2. Streams real-time events back to the client during processing
3. Runs the StockAnalysisFlow workflow asynchronously
4. Returns results via Server-Sent Events (SSE) streaming
Args:
input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs
Returns:
StreamingResponse: Real-time stream of events during agent execution
"""
try:
async def event_generator():
"""
An asynchronous generator that streams events to the client in real-time.
This function orchestrates the entire stock analysis workflow:
1. Sets up event streaming infrastructure
2. Emits initial state and run started events
3. Launches the StockAnalysisFlow workflow
4. Streams progress events as they occur
5. Handles final results (tool calls or text messages)
6. Emits run completion events
Yields:
Encoded events for Server-Sent Events (SSE) streaming
"""
# Step 1: Initialize event streaming infrastructure
encoder = EventEncoder() # Converts events to SSE format
event_queue = asyncio.Queue() # Queue for events from the workflow
def emit_event(event):
"""Callback function for the workflow to emit events"""
event_queue.put_nowait(event)
# Generate a unique identifier for text messages
# Generate unique identifier for text messages
message_id = str(uuid.uuid4())
// ...
except Exception as e:
# Step 10: Handle any unexpected errors during processing
print(e) # Log error for debugging
# Step 11: Return streaming response to client
# Step 11: Return streaming response to client
return StreamingResponse(event_generator(), media_type="text/event-stream")
步骤 4:配置 AG-UI 协议生命周期事件
定义事件生成器后,定义 AG-UI 协议生命周期事件,这些事件代表 AG-UI + CrewAI 代理工作流运行的生命周期,如agent/main.py文件中所示。
@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
"""
Main API endpoint for processing stock analysis requests.
This endpoint:
1. Receives user input and current state from the frontend
2. Streams real-time events back to the client during processing
3. Runs the StockAnalysisFlow workflow asynchronously
4. Returns results via Server-Sent Events (SSE) streaming
Args:
input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs
Returns:
StreamingResponse: Real-time stream of events during agent execution
"""
try:
async def event_generator():
// ...
# Step 2: Emit run started event to notify client that processing has begun
yield encoder.encode(
RunStartedEvent(
type=EventType.RUN_STARTED,
thread_id=input_data.thread_id, # Conversation thread identifier
run_id=input_data.run_id, # Unique run identifier
)
)
// ...
# Step 9: Emit run finished event to signal completion
yield encoder.encode(
RunFinishedEvent(
type=EventType.RUN_FINISHED,
thread_id=input_data.thread_id, # Same thread ID from start
run_id=input_data.run_id, # Same run ID from start
)
)
except Exception as e:
# Step 10: Handle any unexpected errors during processing
print(e) # Log error for debugging
# Step 11: Return streaming response to client
# Step 11: Return streaming response to client
return StreamingResponse(event_generator(), media_type="text/event-stream")
步骤 5:配置 AG-UI 协议状态管理事件
定义 AG-UI 协议生命周期事件后,将 AG-UI 协议状态管理事件集成到 CrewAI 代理工作流程中,如文件中的股票分析 CrewAI 工作流程所示agent/stock_analysis.py。
# AG UI types for message handling and state management
from ag_ui.core.types import AssistantMessage, ToolMessage
from ag_ui.core.events import StateDeltaEvent, EventType
# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================
class StockAnalysisFlow(Flow):
"""
Main workflow class that orchestrates the stock analysis process.
This flow consists of multiple stages:
1. start() - Initialize the system prompt with portfolio data
2. chat() - Parse user input and extract investment parameters
3. simulation() - Gather historical stock data
4. allocation() - Calculate portfolio allocation and performance
5. insights() - Generate bull/bear insights about the investments
6. end() - Return final state
"""
// ...
@listen("start")
async def chat(self):
"""
Step 2: Parse user input and extract investment parameters
- Create a tool log entry to show progress to the user
- Use OpenAI to analyze the user's message and extract structured data
- Return the next step based on whether structured data was extracted
"""
try:
// ...
# Step 2.2: Emit state change event to update UI with new tool log
self.state.get("emit_event")(
StateDeltaEvent(
type=EventType.STATE_DELTA,
delta=[
{
"op": "add",
"path": "/tool_logs/-",
"value": {
"message": "Analyzing user query",
"status": "processing",
"id": tool_log_id,
},
}
],
)
)
await asyncio.sleep(0) # Allow other tasks to run
// ...
# Step 2.4: Update tool log status to completed
index = len(self.state['state']["tool_logs"]) - 1
self.state.get("emit_event")(
StateDeltaEvent(
type=EventType.STATE_DELTA,
delta=[
{
"op": "replace",
"path": f"/tool_logs/{index}/status",
"value": "completed",
}
],
)
)
await asyncio.sleep(0)
// ...
except Exception as e:
# Step 2.6: Handle any errors during processing
print(e)
a_message = AssistantMessage(id=response.id, content="", role="assistant")
self.state['state']["messages"].append(a_message)
return "end"
然后,在 FastAPI 端点中,使用STATE_SNAPSHOTAG-UI 协议状态管理事件初始化 CrewAI 代理工作流状态,如下所示。
@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
"""
Main API endpoint for processing stock analysis requests.
This endpoint:
1. Receives user input and current state from the frontend
2. Streams real-time events back to the client during processing
3. Runs the StockAnalysisFlow workflow asynchronously
4. Returns results via Server-Sent Events (SSE) streaming
Args:
input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs
Returns:
StreamingResponse: Real-time stream of events during agent execution
"""
try:
async def event_generator():
# Step 1: Initialize event streaming infrastructure
encoder = EventEncoder() # Converts events to SSE format
// ...
# Step 3: Send initial state snapshot to client
# This provides the client with the current financial state before analysis
yield encoder.encode(
StateSnapshotEvent(
type=EventType.STATE_SNAPSHOT,
snapshot={
"available_cash": input_data.state["available_cash"], # Current cash balance
"investment_summary": input_data.state["investment_summary"], # Previous analysis results
"investment_portfolio": input_data.state["investment_portfolio"], # Current holdings
"tool_logs": [] # Reset tool logs for new analysis
}
)
)
// ...
步骤 6:使用 AG-UI 协议配置 CrewAI 代理工作流程
初始化 CrewAI 代理工作流状态后,按照agent/main.py文件中的说明,将 CrewAI 代理工作流与 AG-UI 协议集成。
@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
"""
Main API endpoint for processing stock analysis requests.
This endpoint:
1. Receives user input and current state from the frontend
2. Streams real-time events back to the client during processing
3. Runs the StockAnalysisFlow workflow asynchronously
4. Returns results via Server-Sent Events (SSE) streaming
Args:
input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs
Returns:
StreamingResponse: Real-time stream of events during agent execution
"""
try:
async def event_generator():
# Step 1: Initialize event streaming infrastructure
encoder = EventEncoder() # Converts events to SSE format
// ...
# Step 4: Initialize agent state with input data
state = AgentState(
tools=input_data.tools, # Available tools
messages=input_data.messages, # Conversation history
be_stock_data=None, # Will be populated during analysis
be_arguments=None, # Will be populated during analysis
available_cash=input_data.state["available_cash"], # Current cash
investment_portfolio=input_data.state["investment_portfolio"], # Current portfolio
tool_logs=[] # Progress tracking
)
# Step 5: Launch the stock analysis workflow asynchronously
# This creates a task that runs the StockAnalysisFlow in the background
agent_task = asyncio.create_task(
StockAnalysisFlow().kickoff_async(inputs={
"state": state, # Agent state
"emit_event": emit_event, # Event emission callback
"investment_portfolio": input_data.state["investment_portfolio"] # Portfolio data
})
)
# Step 6: Event streaming loop - relay events from workflow to client
while True:
try:
# Try to get an event from the queue with a short timeout
event = await asyncio.wait_for(event_queue.get(), timeout=0.1)
yield encoder.encode(event) # Stream the event to the client
except asyncio.TimeoutError:
# No events in queue - check if workflow is complete
if agent_task.done():
break # Exit loop when workflow finishes
# Step 7: Clear tool logs after workflow completion
# This prevents old progress logs from cluttering the UI
yield encoder.encode(
StateDeltaEvent(
type=EventType.STATE_DELTA,
delta=[
{
"op": "replace",
"path": "/tool_logs",
"value": []
}
]
)
)
// ...
except Exception as e:
# Step 10: Handle any unexpected errors during processing
print(e) # Log error for debugging
# Step 11: Return streaming response to client
# Step 11: Return streaming response to client
return StreamingResponse(event_generator(), media_type="text/event-stream")
步骤 7:配置 AG-UI 协议工具事件以处理人机交互断点
将 CrewAI 代理工作流程与 AG-UI 协议集成后,将带有工具调用名称的工具调用消息附加到状态,如文件中的现金分配步骤所示agent/stock_analysis.py。
# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================
class StockAnalysisFlow(Flow):
"""
Main workflow class that orchestrates the stock analysis process.
This flow consists of multiple stages:
1. start() - Initialize the system prompt with portfolio data
2. chat() - Parse user input and extract investment parameters
3. simulation() - Gather historical stock data
4. allocation() - Calculate portfolio allocation and performance
5. insights() - Generate bull/bear insights about the investments
6. end() - Return final state
"""
// ...
@listen("simulation")
async def allocation(self):
"""
Step 4: Calculate portfolio allocation and performance simulation
- Simulate buying stocks based on investment strategy (single-shot vs DCA)
- Calculate returns, allocation percentages, and performance metrics
- Compare portfolio performance against SPY (S&P 500) benchmark
- Generate performance data for charting
"""
# Step 4.1: Ensure we have tool calls with investment data
if self.state['state']['messages'][-1].tool_calls is None:
return "end"
// ...
# Step 4.18: Add a tool message indicating data extraction is complete
self.state['state']["messages"].append(
ToolMessage(
role="tool",
id=str(uuid.uuid4()),
content="The relevant details had been extracted",
tool_call_id=self.state['state']["messages"][-1].tool_calls[0].id,
)
)
# Step 4.19: Add assistant message with chart rendering tool call
self.state['state']["messages"].append(
AssistantMessage(
role="assistant",
tool_calls=[
{
"id": str(uuid.uuid4()),
"type": "function",
"function": {
"name": "render_standard_charts_and_table",
"arguments": json.dumps(
{"investment_summary": self.state['state']["investment_summary"]}
),
},
}
],
id=str(uuid.uuid4()),
)
)
// ...
然后,定义 AG-UI 协议工具调用事件,代理可以使用这些事件通过调用前端操作并使用工具名称来请求用户反馈,从而触发前端操作,如agent/main.py文件中所示。
@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
"""
Main API endpoint for processing stock analysis requests.
This endpoint:
1. Receives user input and current state from the frontend
2. Streams real-time events back to the client during processing
3. Runs the StockAnalysisFlow workflow asynchronously
4. Returns results via Server-Sent Events (SSE) streaming
Args:
input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs
Returns:
StreamingResponse: Real-time stream of events during agent execution
"""
try:
async def event_generator():
# Step 1: Initialize event streaming infrastructure
encoder = EventEncoder() # Converts events to SSE format
// ...
# Step 8: Handle workflow results based on the final message type
# Check if the last message is from the assistant (AI agent)
if state["messages"][-1].role == "assistant":
# Step 8.1: Handle tool call results (charts, data visualizations)
if state["messages"][-1].tool_calls:
# The workflow generated a tool call (e.g., render charts)
# Stream tool call events to trigger UI rendering
yield encoder.encode(
ToolCallStartEvent(
type=EventType.TOOL_CALL_START,
tool_call_id=state["messages"][-1].tool_calls[0].id,
toolCallName=state["messages"][-1]
.tool_calls[0]
.function.name,
)
)
# Stream the tool call arguments (contains analysis results)
yield encoder.encode(
ToolCallArgsEvent(
type=EventType.TOOL_CALL_ARGS,
tool_call_id=state["messages"][-1].tool_calls[0].id,
delta=state["messages"][-1]
.tool_calls[0]
.function.arguments, # Contains investment summary and insights
)
)
# Signal end of tool call
yield encoder.encode(
ToolCallEndEvent(
type=EventType.TOOL_CALL_END,
tool_call_id=state["messages"][-1].tool_calls[0].id,
)
)
else:
# Step 8.2: Handle text message results (when no analysis was performed)
yield encoder.encode(
TextMessageStartEvent(
type=EventType.TEXT_MESSAGE_START,
message_id=message_id,
role="assistant",
)
)
# Step 8.2.1: Stream text content if available
if state["messages"][-1].content:
content = state["messages"][-1].content
# Split content into chunks for smooth streaming effect
n_parts = 100 # Number of chunks for streaming
part_length = max(1, len(content) // n_parts)
parts = [content[i:i+part_length] for i in range(0, len(content), part_length)]
# Ensure we don't exceed the target number of parts
if len(parts) > n_parts:
parts = parts[:n_parts-1] + [''.join(parts[n_parts-1:])]
# Stream each part with a small delay for smooth typing effect
for part in parts:
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=message_id,
delta=part, # Chunk of text content
)
)
await asyncio.sleep(0.05) # Small delay for typing effect
else:
# Step 8.2.2: Handle case where no content is available (error scenario)
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=message_id,
delta="Something went wrong! Please try again.",
)
)
# Step 8.2.3: Signal end of text message
yield encoder.encode(
TextMessageEndEvent(
type=EventType.TEXT_MESSAGE_END,
message_id=message_id,
)
)
except Exception as e:
# Step 10: Handle any unexpected errors during processing
print(e) # Log error for debugging
# Step 11: Return streaming response to client
# Step 11: Return streaming response to client
return StreamingResponse(event_generator(), media_type="text/event-stream")
步骤 8:配置 AG-UI 协议文本消息事件
配置好 AG-UI 协议工具事件后,定义 AG-UI 协议文本消息事件来处理流式代理响应到前端,如agent/main.py文件中所示。
@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
"""
Main API endpoint for processing stock analysis requests.
This endpoint:
1. Receives user input and current state from the frontend
2. Streams real-time events back to the client during processing
3. Runs the StockAnalysisFlow workflow asynchronously
4. Returns results via Server-Sent Events (SSE) streaming
Args:
input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs
Returns:
StreamingResponse: Real-time stream of events during agent execution
"""
try:
async def event_generator():
# Step 1: Initialize event streaming infrastructure
encoder = EventEncoder() # Converts events to SSE format
// ...
# Step 8: Handle workflow results based on the final message type
# Check if the last message is from the assistant (AI agent)
if state["messages"][-1].role == "assistant":
// ...
else:
# Step 8.2: Handle text message results (when no analysis was performed)
yield encoder.encode(
TextMessageStartEvent(
type=EventType.TEXT_MESSAGE_START,
message_id=message_id,
role="assistant",
)
)
# Step 8.2.1: Stream text content if available
if state["messages"][-1].content:
content = state["messages"][-1].content
# Split content into chunks for smooth streaming effect
n_parts = 100 # Number of chunks for streaming
part_length = max(1, len(content) // n_parts)
parts = [content[i:i+part_length] for i in range(0, len(content), part_length)]
# Ensure we don't exceed the target number of parts
if len(parts) > n_parts:
parts = parts[:n_parts-1] + [''.join(parts[n_parts-1:])]
# Stream each part with a small delay for smooth typing effect
for part in parts:
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=message_id,
delta=part, # Chunk of text content
)
)
await asyncio.sleep(0.05) # Small delay for typing effect
else:
# Step 8.2.2: Handle case where no content is available (error scenario)
yield encoder.encode(
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=message_id,
delta="Something went wrong! Please try again.",
)
)
# Step 8.2.3: Signal end of text message
yield encoder.encode(
TextMessageEndEvent(
type=EventType.TEXT_MESSAGE_END,
message_id=message_id,
)
)
except Exception as e:
# Step 10: Handle any unexpected errors during processing
print(e) # Log error for debugging
# Step 11: Return streaming response to client
# Step 11: Return streaming response to client
return StreamingResponse(event_generator(), media_type="text/event-stream")
恭喜!您已将 CrewAI 代理工作流程与 AG-UI 协议集成。现在让我们看看如何为 AG-UI + CrewAI 代理工作流程添加前端。
使用 CopilotKit 将前端集成到 AG-UI + CrewAI 代理工作流程中
在本节中,您将学习如何使用 CopilotKit 在 AG-UI + CrewAI 代理工作流程和前端之间建立连接。
我们开始吧。
首先,导航到前端目录:
cd frontend
接下来,创建一个 .env 包含OpenAI API密钥的文件:
OPENAI_API_KEY=<<your-OpenAI-key-here>>
然后安装依赖项:
pnpm install
之后,启动开发服务器:
pnpm run dev
访问 http://localhost:3000,您应该可以看到 AG-UI + CrewAI 代理前端已启动并运行。
现在让我们看看如何使用 CopilotKit 为 AG-UI + CrewAI 代理构建前端 UI。
步骤 1:创建 HttpAgent 实例
在创建 HttpAgent 实例之前,我们先来了解一下 HttpAgent 是什么。
HttpAgent 是 AG-UI 库中的一个客户端,它可以将您的前端应用程序与任何 AG-UI 兼容的 AI 代理服务器连接起来。
要创建 HttpAgent 实例,请在 API 路由中定义它,如src/app/api/copilotkit/route.ts文件中所示。
// ===============================================================================
// IMPORTS AND DEPENDENCIES
// ===============================================================================
// CopilotKit framework imports for AI assistant integration
import {
CopilotRuntime, // Core runtime for managing AI agents
copilotRuntimeNextJSAppRouterEndpoint, // Next.js App Router integration
OpenAIAdapter, // Adapter for OpenAI API compatibility
} from "@copilotkit/runtime";
// Next.js framework imports
import { NextRequest } from "next/server"; // Type for Next.js request objects
// AG UI client for communicating with external agents
import { HttpAgent } from "@ag-ui/client"; // HTTP client for agent communication
// ===============================================================================
// AGENT CONFIGURATION
// ===============================================================================
/**
* Step 1: Configure the CrewAI stock analysis agent
*
* This HttpAgent connects to our FastAPI backend running the stock analysis workflow.
* It acts as a bridge between the CopilotKit frontend and the Python-based agent.
*/
const crewaiAgent = new HttpAgent({
// Development URL (commented out for reference)
// url: "http://0.0.0.0:8000/crewai-agent",
// Production-ready URL configuration with environment variable fallback
// Uses NEXT_PUBLIC_CREWAI_URL if set, otherwise defaults to local development URL
url: process.env.NEXT_PUBLIC_CREWAI_URL || "http://0.0.0.0:8000/crewai-agent",
});
// ===============================================================================
// COPILOT RUNTIME SETUP
// ===============================================================================
/**
* Step 2: Configure the OpenAI service adapter
*
* This adapter provides OpenAI API compatibility for the CopilotKit runtime.
* It handles communication with OpenAI's language models for natural language processing.
*/
const serviceAdapter = new OpenAIAdapter();
/**
* Step 3: Initialize the CopilotKit runtime
*
* The runtime orchestrates communication between:
* - Frontend UI components
* - Language models (via OpenAI adapter)
* - External agents (our stock analysis agent)
*/
const runtime = new CopilotRuntime({
agents: {
// Register our stock analysis agent with the runtime
// @ts-ignore - Suppressing TypeScript warning for agent type compatibility
crewaiAgent: crewaiAgent,
},
});
// Alternative simple runtime configuration (commented out)
// const runtime = new CopilotRuntime()
// ===============================================================================
// API ROUTE HANDLER
// ===============================================================================
/**
* Step 4: Define the POST endpoint handler
*
* This Next.js API route handles incoming requests from the CopilotKit frontend.
* It processes user interactions, manages agent communication, and streams responses.
*
* @param req - Next.js request object containing user input and session data
* @returns Promise<Response> - Streaming response with agent results
*/
export const POST = async (req: NextRequest) => {
/**
* Step 4.1: Create the request handler using CopilotKit's Next.js integration
*
* This function:
* - Extracts user messages and state from the request
* - Routes requests to appropriate agents
* - Manages streaming responses back to the frontend
* - Handles error scenarios gracefully
*/
const { handleRequest } = copilotRuntimeNextJSAppRouterEndpoint({
runtime, // The configured CopilotKit runtime with our agents
serviceAdapter, // OpenAI adapter for language model integration
endpoint: "/api/copilotkit", // The API endpoint path for this handler
});
/**
* Step 4.2: Process the incoming request
*
* The handleRequest function:
* - Parses the request body for user input
* - Determines which agent should handle the request
* - Forwards the request to the appropriate agent (crewaiAgent)
* - Streams the agent's response back to the client
*/
return handleRequest(req);
};
步骤 2:设置 CopilotKit 提供程序
要设置 CopilotKit 提供程序, [<CopilotKit>](https://docs.copilotkit.ai/reference/components/CopilotKit) 组件必须包装应用程序中与 Copilot 相关的部分。
对于大多数使用场景,最好将 CopilotKit 提供程序包装在整个应用程序周围,例如,在您的 中 layout.tsx,如下面的文件中所示 src/app/layout.tsx 。
// Next.js imports for metadata and font handling
import type { Metadata } from "next";
import { Geist, Geist_Mono } from "next/font/google";
// Global styles for the application
import "./globals.css";
// CopilotKit UI styles for AI components
import "@copilotkit/react-ui/styles.css";
// CopilotKit core component for AI functionality
import { CopilotKit } from "@copilotkit/react-core";
// Configure Geist Sans font with CSS variables for consistent typography
const geistSans = Geist({
variable: "--font-geist-sans",
subsets: ["latin"],
});
// Configure Geist Mono font for code and monospace text
const geistMono = Geist_Mono({
variable: "--font-geist-mono",
subsets: ["latin"],
});
// Metadata configuration for SEO and page information
export const metadata: Metadata = {
title: "AI Stock Portfolio",
description: "AI Stock Portfolio",
};
// Root layout component that wraps all pages in the application
export default function RootLayout({
children,
}: Readonly<{
children: React.ReactNode;
}>) {
return (
<html lang="en">
<body
className={`${geistSans.variable} ${geistMono.variable} antialiased`}>
{/* CopilotKit wrapper that enables AI functionality throughout the app */}
{/* runtimeUrl points to the API endpoint for AI backend communication */}
{/* agent specifies which AI agent to use (stockAgent for stock analysis) */}
<CopilotKit runtimeUrl="/api/copilotkit" agent="crewaiAgent">
{children}
</CopilotKit>
</body>
</html>
);
}
步骤 3:设置 Copilot 聊天组件
CopilotKit 附带多个内置聊天组件,包括CopilotPopup、 CopilotSidebar和CopilotChat。
要设置 Copilot 聊天组件,请按照src/app/components/prompt-panel.tsx文件中所示进行定义。
// Client-side component directive for Next.js
"use client";
import type React from "react";
// CopilotKit chat component for AI interactions
import { CopilotChat } from "@copilotkit/react-ui";
// Props interface for the PromptPanel component
interface PromptPanelProps {
// Amount of available cash for investment, displayed in the panel
availableCash: number;
}
// Main component for the AI chat interface panel
export function PromptPanel({ availableCash }: PromptPanelProps) {
// Utility function to format numbers as USD currency
// Removes decimal places for cleaner display of large amounts
const formatCurrency = (amount: number) => {
return new Intl.NumberFormat("en-US", {
style: "currency",
currency: "USD",
minimumFractionDigits: 0,
maximumFractionDigits: 0,
}).format(amount);
};
return (
// Main container with full height and white background
<div className="h-full flex flex-col bg-white">
{/* Header section with title, description, and cash display */}
<div className="p-4 border-b border-[#D8D8E5] bg-[#FAFCFA]">
{/* Title section with icon and branding */}
<div className="flex items-center gap-2 mb-2">
<span className="text-xl">🪁</span>
<div>
<h1 className="text-lg font-semibold text-[#030507] font-['Roobert']">
Portfolio Chat
</h1>
{/* Pro badge indicator */}
<div className="inline-block px-2 py-0.5 bg-[#BEC9FF] text-[#030507] text-xs font-semibold uppercase rounded">
PRO
</div>
</div>
</div>
{/* Description of the AI agent's capabilities */}
<p className="text-xs text-[#575758]">
Interact with the LangGraph-powered AI agent for portfolio
visualization and analysis
</p>
{/* Available Cash Display section */}
<div className="mt-3 p-2 bg-[#86ECE4]/10 rounded-lg">
<div className="text-xs text-[#575758] font-medium">
Available Cash
</div>
<div className="text-sm font-semibold text-[#030507] font-['Roobert']">
{formatCurrency(availableCash)}
</div>
</div>
</div>
{/* CopilotKit chat interface with custom styling and initial message */}
{/* Takes up majority of the panel height for conversation */}
<CopilotChat
className="h-[78vh] p-2"
labels={{
// Initial welcome message explaining the AI agent's capabilities and limitations
initial: `I am a Crew AI agent designed to analyze investment opportunities and track stock performance over time. How can I help you with your investment query? For example, you can ask me to analyze a stock like "Invest in Apple with 10k dollars since Jan 2023". \n\nNote: The AI agent has access to stock data from the past 4 years only`,
}}
/>
</div>
);
}
步骤 4:使用 CopilotKit 钩子将 AG-UI + CrewAI 代理状态与前端同步
在 CopilotKit 中,CoAgents 维护着一个共享状态,该状态能够将前端 UI 与代理的执行无缝连接起来。这个共享状态系统使您能够:
-
显示代理的当前进度和中间结果
-
通过用户界面交互更新代理的状态
-
对应用程序中的状态变化做出实时反应
您可以在 CopilotKit 文档中了解更多关于 CoAgents 共享状态的信息 。
要将 AG-UI + CrewAI 代理状态与前端同步,请使用 CopilotKit useCoAgent hook 将 AG-UI + CrewAI 代理状态与前端共享,如src/app/page.tsx文件中所示。
"use client";
import {
useCoAgent,
} from "@copilotkit/react-core";
// ...
export interface SandBoxPortfolioState {
performanceData: Array<{
date: string;
portfolio: number;
spy: number;
}>;
}
export interface InvestmentPortfolio {
ticker: string;
amount: number;
}
export default function OpenStocksCanvas() {
// ...
const [totalCash, setTotalCash] = useState(1000000);
const { state, setState } = useCoAgent({
name: "crewaiAgent",
initialState: {
available_cash: totalCash,
investment_summary: {} as any,
investment_portfolio: [] as InvestmentPortfolio[],
},
});
// ...
return (
<div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
{/* ... */}
</div>
);
}
然后在聊天界面中渲染 AG-UI + CrewAI 代理的状态,这有助于以更贴近上下文的方式告知用户代理的状态。
要在聊天 UI 中渲染 AG-UI + CrewAI 代理的状态,可以使用 useCoAgentStateRender 钩子,如文件中所示src/app/page.tsx。
"use client";
import {
useCoAgentStateRender,
} from "@copilotkit/react-core";
import { ToolLogs } from "./components/tool-logs";
// ...
export default function OpenStocksCanvas() {
// ...
useCoAgentStateRender({
name: "crewaiAgent",
render: ({ state }) => <ToolLogs logs={state.tool_logs} />,
});
// ...
return (
<div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
{/* ... */}
</div>
);
}
如果在聊天中执行查询,您应该会在聊天界面中看到 AG-UI + CrewAI 代理的状态任务执行情况,如下所示。
步骤 5:在前端实现人机交互(HITL)
人机交互(HITL)允许智能体在执行过程中请求人类输入或批准,从而提高人工智能系统的可靠性和可信度。对于需要处理复杂决策或需要人类判断的行动的人工智能应用而言,这种模式至关重要。
您可以在CopilotKit 文档中了解更多关于“人机交互”的信息。
要在前端实现人机交互(HITL),您需要使用 CopilotKit 的 useCopilotKitAction 钩子renderAndWaitForResponse方法,该方法允许从渲染函数异步返回值,如src/app/page.tsx文件中所示。
"use client";
import {
useCopilotAction,
} from "@copilotkit/react-core";
// ...
export default function OpenStocksCanvas() {
// ...
useCopilotAction({
name: "render_standard_charts_and_table",
description:
"This is an action to render a standard chart and table. The chart can be a bar chart or a line chart. The table can be a table of data.",
renderAndWaitForResponse: ({ args, respond, status }) => {
useEffect(() => {
console.log(args, "argsargsargsargsargsaaa");
}, [args]);
return (
<>
{args?.investment_summary?.percent_allocation_per_stock &&
args?.investment_summary?.percent_return_per_stock &&
args?.investment_summary?.performanceData && (
<>
<div className="flex flex-col gap-4">
<LineChartComponent
data={args?.investment_summary?.performanceData}
size="small"
/>
<BarChartComponent
data={Object.entries(
args?.investment_summary?.percent_return_per_stock
).map(([ticker, return1]) => ({
ticker,
return: return1 as number,
}))}
size="small"
/>
<AllocationTableComponent
allocations={Object.entries(
args?.investment_summary?.percent_allocation_per_stock
).map(([ticker, allocation]) => ({
ticker,
allocation: allocation as a number,
currentValue:
args?.investment_summary.final_prices[ticker] *
args?.investment_summary.holdings[ticker],
totalReturn:
args?.investment_summary.percent_return_per_stock[
ticker
],
}))}
size="small"
/>
</div>
<button
hidden={status == "complete"}
className="mt-4 rounded-full px-6 py-2 bg-green-50 text-green-700 border border-green-200 shadow-sm hover:bg-green-100 transition-colors font-semibold text-sm"
onClick={() => {
debugger;
if (respond) {
setTotalCash(args?.investment_summary?.cash);
setCurrentState({
...currentState,
returnsData: Object.entries(
args?.investment_summary?.percent_return_per_stock
).map(([ticker, return1]) => ({
ticker,
return: return1 as number,
})),
allocations: Object.entries(
args?.investment_summary?.percent_allocation_per_stock
).map(([ticker, allocation]) => ({
ticker,
allocation: allocation as a number,
currentValue:
args?.investment_summary?.final_prices[ticker] *
args?.investment_summary?.holdings[ticker],
totalReturn:
args?.investment_summary?.percent_return_per_stock[
ticker
],
})),
performanceData:
args?.investment_summary?.performanceData,
bullInsights: args?.insights?.bullInsights || [],
bearInsights: args?.insights?.bearInsights || [],
currentPortfolioValue:
args?.investment_summary?.total_value,
totalReturns: (
Object.values(
args?.investment_summary?.returns
) as number[]
).reduce((acc, val) => acc + val, 0),
});
setInvestedAmount(
(
Object.values(
args?.investment_summary?.total_invested_per_stock
) as number[]
).reduce((acc, val) => acc + val, 0)
);
setState({
...state,
available_cash: totalCash,
});
respond(
"Data rendered successfully. Provide a summary of the investments by not making any tool calls."
);
}
}}>
Accept
</button>
<button
hidden={status == "complete"}
className="rounded-full px-6 py-2 bg-red-50 text-red-700 border border-red-200 shadow-sm hover:bg-red-100 transition-colors font-semibold text-sm ml-2"
onClick={() => {
debugger;
if (respond) {
respond(
"Data rendering rejected. Just give a summary of the rejected investments by not making any tool calls."
);
}
}}>
Reject
</button>
</>
)}
</>
);
},
});
// ...
return (
<div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
{/* ... */}
</div>
);
}
当代理通过工具/操作名称触发前端操作,以在执行过程中请求人工输入或反馈时,最终用户会看到一个选择提示(显示在聊天界面中)。然后,用户可以通过按下聊天界面中的按钮进行选择,如下图所示。
步骤 6:在前端流式传输 AG-UI + CrewAI 代理响应
要在前端流式传输 AG-UI + CrewAI 代理的响应或结果,请将代理的状态字段值传递给前端组件,如文件中所示src/app/page.tsx。
"use client";
import { useEffect, useState } from "react";
import { PromptPanel } from "./components/prompt-panel";
import { GenerativeCanvas } from "./components/generative-canvas";
import { ComponentTree } from "./components/component-tree";
import { CashPanel } from "./components/cash-panel";
// ...
export default function OpenStocksCanvas() {
const [currentState, setCurrentState] = useState<PortfolioState>({
id: "",
trigger: "",
performanceData: [],
allocations: [],
returnsData: [],
bullInsights: [],
bearInsights: [],
currentPortfolioValue: 0,
totalReturns: 0,
});
const [sandBoxPortfolio, setSandBoxPortfolio] = useState<
SandBoxPortfolioState[]
>([]);
const [selectedStock, setSelectedStock] = useState<string | null>(null);
return (
<div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
{/* Left Panel - Prompt Input */}
<div className="w-85 border-r border-[#D8D8E5] bg-white flex-shrink-0">
<PromptPanel availableCash={totalCash} />
</div>
{/* Center Panel - Generative Canvas */}
<div className="flex-1 relative min-w-0">
{/* Top Bar with Cash Info */}
<div className="absolute top-0 left-0 right-0 bg-white border-b border-[#D8D8E5] p-4 z-10">
<CashPanel
totalCash={totalCash}
investedAmount={investedAmount}
currentPortfolioValue={
totalCash + investedAmount + currentState.totalReturns || 0
}
onTotalCashChange={setTotalCash}
onStateCashChange={setState}
/>
</div>
<div className="pt-20 h-full">
<GenerativeCanvas
setSelectedStock={setSelectedStock}
portfolioState={currentState}
sandBoxPortfolio={sandBoxPortfolio}
setSandBoxPortfolio={setSandBoxPortfolio}
/>
</div>
</div>
{/* Right Panel - Component Tree (Optional) */}
{showComponentTree && (
<div className="w-64 border-l border-[#D8D8E5] bg-white flex-shrink-0">
<ComponentTree portfolioState={currentState} />
</div>
)}
</div>
);
}
如果您向客服人员提出询问并批准其反馈请求,您应该会在用户界面中看到客服人员的回复或结果,如下所示。
结论
在本指南中,我们逐步介绍了如何将 CrewAI 代理与 AG-UI 协议集成,然后使用 CopilotKit 为代理添加前端。
虽然我们已经探索了一些功能,但我们仅仅触及了 CopilotKit 无数用例的冰山一角,从构建交互式 AI 聊天机器人到构建代理解决方案——本质上,CopilotKit 可以让您在几分钟内为您的产品添加大量有用的 AI 功能。
希望本指南能帮助您更轻松地将人工智能驱动的副驾驶功能集成到您现有的应用程序中。
在Twitter上关注 CopilotKit 并打个招呼,如果你想开发一些很酷的东西,请加入 Discord 社区。
文章来源:https://dev.to/copilotkit/how-to-integrate-crewai-agents-with-ag-ui-protocol-crewai-ag-ui-copilotkit-3n1d





