发布于 2026-01-06 3 阅读
0

使用 CrewAI 和 AG-UI 构建全栈股票投资组合代理 DEV 的全球展示挑战赛,由 Mux 呈现:展示你的项目!

使用 CrewAI 和 AG-UI 构建全栈股票投资组合代理

由 Mux 主办的 DEV 全球展示挑战赛:展示你的项目!

本指南将介绍如何将 CrewAI 代理与 AG-UI 协议集成。此外,我们还将介绍如何将 AG-UI + CrewAI 代理与 CopilotKit 集成,以便与代理进行聊天并在前端实时显示其回复。

在正式开始之前,我们先来看看我们将要涵盖的内容:

  • AG-UI协议是什么?

  • 将CrewAI 代理与 AG-UI 协议集成

  • 使用 CopilotKit 将前端集成到 AG-UI + CrewAI 代理中

以下是我们将要构建的内容预览:

AG-UI协议是什么?

由 CopilotKit 开发的 Agent User Interaction Protocol (AG-UI) 是一种开源、轻量级、基于事件的协议,可促进前端和 AI 代理之间丰富的实时交互。

AG-UI 协议支持事件驱动通信、状态管理、工具使用和流式 AI 代理响应。

星 AG-UI ⭐️

为了在前端和您的 AI 代理之间传递信息,AG-UI 使用以下事件:

  • 生命周期事件:这些事件标志着代理任务执行的开始或结束。生命周期事件包括开始RUN_STARTEDRUN_FINISHED结束事件。

  • 短信事件:这些事件处理流式代理对前端的响应。短信事件包括TEXT_MESSAGE_STARTTEXT_MESSAGE_CONTENTTEXT_MESSAGE_END事件。

  • 工具调用事件:这些事件管理代理的工具执行。工具调用事件包括TOOL_CALL_STARTTOOL_CALL_ARGSTOOL_CALL_END事件。

  • 状态管理事件:这些事件使前端和AI代理的状态保持同步。状态管理事件包括STATE_SNAPSHOT事件STATE_DELTA

您可以访问 AG-UI文档,了解更多关于 AG-UI协议及其架构的信息。

图片来自 Notion

现在我们已经了解了 AG-UI 协议是什么,让我们看看如何将其与 CrewAI 代理框架集成。

我们开始吧!

先决条件

要完全理解本教程,您需要对 React 或 Next.js 有一定的了解。

我们还将利用以下资源:

  • Python 是一种流行的编程语言,可用于使用 LangGraph 构建 AI 代理;请确保您的计算机上已安装 Python。

  • Crew AI—— 一个精简、速度极快的 Python 框架,使您能够构建协同工作的 AI 代理团队来处理复杂的任务。

  • OpenAI API 密钥 - 用于启用 GPT 模型执行各种任务的 API 密钥;对于本教程,请确保您有权访问 GPT-4 模型。

  • CopilotKit  - 一个开源的辅助驾驶框架,用于构建自定义 AI 聊天机器人、应用内 AI 代理和文本区域。

将 CrewAI 代理与 AG-UI 协议集成

首先,克隆Open AG UI Demo 存储库,该存储库包含一个基于 Python 的后端(代理)和一个 Next.js 前端(前端)。

接下来,导航到后端目录:

cd agent
Enter fullscreen mode Exit fullscreen mode

然后使用 Poetry 安装依赖项:

poetry install
Enter fullscreen mode Exit fullscreen mode

之后,创建一个 .env 包含OpenAI API密钥的文件:

OPENAI_API_KEY=<<your-OpenAI-key-here>>
Enter fullscreen mode Exit fullscreen mode

然后使用以下命令运行代理:

poetry run python main.py
Enter fullscreen mode Exit fullscreen mode

要测试 AG-UI + CrewAI 集成,请在https://reqbin.com/curl上运行以下 curl 命令

curl -X POST "http://localhost:8000/crewai-agent" \
  -H "Content-Type: application/json" \
  -d '{
    "thread_id": "test_thread_123",
    "run_id": "test_run_456",
    "messages": [
      {
        "id": "msg_1",
        "role": "user",
        "content": "Analyze AAPL stock with a $10000 investment from 2023-01-01"
      }
    ],
    "tools": [],
    "context": [],
    "forwarded_props": {},
    "state": {}
  }'
Enter fullscreen mode Exit fullscreen mode

现在让我们看看如何将 AG-UI 协议与 CrewAI 代理集成。

步骤 1:创建 CrewAI 代理工作流程

在将 AG-UI 协议与 CrewAI 代理集成之前,请按照agent/stock_analysis.py文件中所示创建 CrewAI 代理工作流程。

# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================

class StockAnalysisFlow(Flow):
    """
    Main workflow class that orchestrates the stock analysis process.
    This flow consists of multiple stages:
    1. start() - Initialize the system prompt with portfolio data
    2. chat() - Parse user input and extract investment parameters
    3. simulation() - Gather historical stock data
    4. allocation() - Calculate portfolio allocation and performance
    5. insights() - Generate bull/bear insights about the investments
    6. end() - Return final state
    """

    @start()
    def start(self):
        """
        Step 1: Initialize the workflow
        - Replace placeholder in system prompt with actual portfolio data
        - This sets up the AI assistant with context about the current portfolio
        """
        # Inject current portfolio data into the system prompt template
        self.state['state']["messages"][0].content = system_prompt.replace(
            "{PORTFOLIO_DATA_PLACEHOLDER}", json.dumps(self.state["investment_portfolio"])
        )
        return self.state

    // ...

    @listen(or_("chat", "insights"))
    def end(self):
        """
        Step 6: Final step - return the complete state
        - This method is called from either 'chat' (if no investment data) 
          or 'insights' (after successful analysis)
        - Returns the final state with all analysis results
        """
        return self.state
Enter fullscreen mode Exit fullscreen mode

步骤 2:使用 FastAPI 创建端点

定义好 CrewAI 代理工作流程后,创建一个 FastAPI 端点,并按照agent/main.py文件中所示导入 CrewAI 代理工作流程。

# ===============================================================================
# IMPORTS AND SETUP
# ===============================================================================

# FastAPI framework for building the web API
from fastapi import FastAPI
from fastapi.responses import StreamingResponse  # For streaming real-time responses to the client

# Standard Python libraries
import uuid  # For generating unique identifiers
from typing import Any  # Type hints for flexible data types
import os  # For environment variables
import uvicorn  # ASGI server for running FastAPI
import asyncio  # For asynchronous programming

# AG UI core components for agent communication and event handling
from ag_ui.core import (
    RunAgentInput,           # Input data structure for agent runs
    StateSnapshotEvent,      # Event for sending state snapshots
    EventType,               # Enumeration of event types
    RunStartedEvent,         # Event emitted when the agent run starts
    RunFinishedEvent,        # Event emitted when the agent run completes
    TextMessageStartEvent,   # Event for starting text message streaming
    TextMessageEndEvent,     # Event for ending text message streaming
    TextMessageContentEvent, # Event for streaming text message content
    ToolCallStartEvent,      # Event for starting tool call execution
    ToolCallEndEvent,        # Event for ending tool call execution
    ToolCallArgsEvent,       # Event for streaming tool call arguments
    StateDeltaEvent          # Event for updating specific parts of state
)
from ag_ui.encoder import EventEncoder  # Encoder for converting events to streamable format

# Import our custom stock analysis workflow
from stock_analysis import StockAnalysisFlow

# CopilotKit state management
from copilotkit import CopilotKitState

# ===============================================================================
# APPLICATION SETUP
# ===============================================================================

# Initialize FastAPI application instance
app = FastAPI()

# ===============================================================================
# MAIN API ENDPOINT
# ===============================================================================

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """

    // ...

# ===============================================================================
# SERVER STARTUP AND CONFIGURATION
# ===============================================================================

def main():
    """
    Main function to start the uvicorn server.

    This function:
    - Reads the port from environment variables (defaults to 8000)
    - Configures uvicorn server settings
    - Starts the server with hot reload enabled for development
    """
    # Get port from environment variable or use default
    port = int(os.getenv("PORT", "8000"))

    # Start uvicorn server with configuration
    uvicorn.run(
        "main:app",           # Module and app instance
        host="0.0.0.0",      # Listen on all network interfaces
        port=port,           # Port number
        reload=True,         # Enable hot reload for development
    )

if __name__ == "__main__":
    """
    Entry point when script is run directly.
    Starts the FastAPI server using uvicorn.
    """
    main()
Enter fullscreen mode Exit fullscreen mode

步骤 3:定义事件生成器

创建 FastAPI 端点后,定义一个事件生成器,生成 AG-UI 协议事件流,初始化事件编码器,并将流式响应返回给客户端或前端,如文件中所示agent/main.py

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            """
            An asynchronous generator that streams events to the client in real-time.

            This function orchestrates the entire stock analysis workflow:
            1. Sets up event streaming infrastructure
            2. Emits initial state and run started events
            3. Launches the StockAnalysisFlow workflow
            4. Streams progress events as they occur
            5. Handles final results (tool calls or text messages)
            6. Emits run completion events

            Yields:
                Encoded events for Server-Sent Events (SSE) streaming
            """
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format
            event_queue = asyncio.Queue()  # Queue for events from the workflow

            def emit_event(event):
                """Callback function for the workflow to emit events"""
                event_queue.put_nowait(event)

            # Generate a unique identifier for text messages
            # Generate unique identifier for text messages
            message_id = str(uuid.uuid4())

            // ...

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

步骤 4:配置 AG-UI 协议生命周期事件

定义事件生成器后,定义 AG-UI 协议生命周期事件,这些事件代表 AG-UI + CrewAI 代理工作流运行的生命周期,如agent/main.py文件中所示。

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            // ...

            # Step 2: Emit run started event to notify client that processing has begun
            yield encoder.encode(
                RunStartedEvent(
                    type=EventType.RUN_STARTED,
                    thread_id=input_data.thread_id,  # Conversation thread identifier
                    run_id=input_data.run_id,        # Unique run identifier
                )
            )

            // ...

            # Step 9: Emit run finished event to signal completion
            yield encoder.encode(
                RunFinishedEvent(
                    type=EventType.RUN_FINISHED,
                    thread_id=input_data.thread_id,  # Same thread ID from start
                    run_id=input_data.run_id,        # Same run ID from start
                )
            )

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

步骤 5:配置 AG-UI 协议状态管理事件

定义 AG-UI 协议生命周期事件后,将 AG-UI 协议状态管理事件集成到 CrewAI 代理工作流程中,如文件中的股票分析 CrewAI 工作流程所示agent/stock_analysis.py

# AG UI types for message handling and state management
from ag_ui.core.types import AssistantMessage, ToolMessage
from ag_ui.core.events import StateDeltaEvent, EventType

# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================

class StockAnalysisFlow(Flow):
    """
    Main workflow class that orchestrates the stock analysis process.
    This flow consists of multiple stages:
    1. start() - Initialize the system prompt with portfolio data
    2. chat() - Parse user input and extract investment parameters
    3. simulation() - Gather historical stock data
    4. allocation() - Calculate portfolio allocation and performance
    5. insights() - Generate bull/bear insights about the investments
    6. end() - Return final state
    """

    // ...

    @listen("start")
    async def chat(self):
        """
        Step 2: Parse user input and extract investment parameters
        - Create a tool log entry to show progress to the user
        - Use OpenAI to analyze the user's message and extract structured data
        - Return the next step based on whether structured data was extracted
        """
        try:
            // ...

            # Step 2.2: Emit state change event to update UI with new tool log
            self.state.get("emit_event")(
                StateDeltaEvent(
                    type=EventType.STATE_DELTA,
                    delta=[
                        {
                            "op": "add",
                            "path": "/tool_logs/-",
                            "value": {
                                "message": "Analyzing user query",
                                "status": "processing",
                                "id": tool_log_id,
                            },
                        }
                    ],
                )
            )
            await asyncio.sleep(0)  # Allow other tasks to run

            // ...

            # Step 2.4: Update tool log status to completed
            index = len(self.state['state']["tool_logs"]) - 1
            self.state.get("emit_event")(
                StateDeltaEvent(
                    type=EventType.STATE_DELTA,
                    delta=[
                        {
                            "op": "replace",
                            "path": f"/tool_logs/{index}/status",
                            "value": "completed",
                        }
                    ],
                )
            )
            await asyncio.sleep(0)

            // ...

        except Exception as e:
            # Step 2.6: Handle any errors during processing
            print(e)
            a_message = AssistantMessage(id=response.id, content="", role="assistant")
            self.state['state']["messages"].append(a_message)
            return "end"
Enter fullscreen mode Exit fullscreen mode

然后,在 FastAPI 端点中,使用STATE_SNAPSHOTAG-UI 协议状态管理事件初始化 CrewAI 代理工作流状态,如下所示。

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format

            // ...

            # Step 3: Send initial state snapshot to client
            # This provides the client with the current financial state before analysis
            yield encoder.encode(
                StateSnapshotEvent(
                    type=EventType.STATE_SNAPSHOT, 
                    snapshot={
                        "available_cash": input_data.state["available_cash"],        # Current cash balance
                        "investment_summary": input_data.state["investment_summary"], # Previous analysis results
                        "investment_portfolio": input_data.state["investment_portfolio"], # Current holdings
                        "tool_logs": []  # Reset tool logs for new analysis
                    }
                )
            )

            // ...
Enter fullscreen mode Exit fullscreen mode

步骤 6:使用 AG-UI 协议配置 CrewAI 代理工作流程

初始化 CrewAI 代理工作流状态后,按照agent/main.py文件中的说明,将 CrewAI 代理工作流与 AG-UI 协议集成。

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format

            // ...

            # Step 4: Initialize agent state with input data
            state = AgentState(
                tools=input_data.tools,                                      # Available tools
                messages=input_data.messages,                                # Conversation history
                be_stock_data=None,                                         # Will be populated during analysis
                be_arguments=None,                                          # Will be populated during analysis
                available_cash=input_data.state["available_cash"],          # Current cash
                investment_portfolio=input_data.state["investment_portfolio"], # Current portfolio
                tool_logs=[]                                                # Progress tracking
            )

            # Step 5: Launch the stock analysis workflow asynchronously
            # This creates a task that runs the StockAnalysisFlow in the background
            agent_task = asyncio.create_task(
                StockAnalysisFlow().kickoff_async(inputs={
                    "state": state,                                          # Agent state
                    "emit_event": emit_event,                               # Event emission callback
                    "investment_portfolio": input_data.state["investment_portfolio"]  # Portfolio data
                })
            )

            # Step 6: Event streaming loop - relay events from workflow to client
            while True:
                try:
                    # Try to get an event from the queue with a short timeout
                    event = await asyncio.wait_for(event_queue.get(), timeout=0.1)
                    yield encoder.encode(event)  # Stream the event to the client
                except asyncio.TimeoutError:
                    # No events in queue - check if workflow is complete
                    if agent_task.done():
                        break  # Exit loop when workflow finishes

            # Step 7: Clear tool logs after workflow completion
            # This prevents old progress logs from cluttering the UI
            yield encoder.encode(
            StateDeltaEvent(
                type=EventType.STATE_DELTA,
                delta=[
                    {
                        "op": "replace",
                        "path": "/tool_logs",
                        "value": []
                    }
                ]
            )
            )

            // ...

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

步骤 7:配置 AG-UI 协议工具事件以处理人机交互断点

将 CrewAI 代理工作流程与 AG-UI 协议集成后,将带有工具调用名称的工具调用消息附加到状态,如文件中的现金分配步骤所示agent/stock_analysis.py

# ===============================================================================
# MAIN FLOW CLASS
# ===============================================================================

class StockAnalysisFlow(Flow):
    """
    Main workflow class that orchestrates the stock analysis process.
    This flow consists of multiple stages:
    1. start() - Initialize the system prompt with portfolio data
    2. chat() - Parse user input and extract investment parameters
    3. simulation() - Gather historical stock data
    4. allocation() - Calculate portfolio allocation and performance
    5. insights() - Generate bull/bear insights about the investments
    6. end() - Return final state
    """

    // ...    

    @listen("simulation")
    async def allocation(self):
        """
        Step 4: Calculate portfolio allocation and performance simulation
        - Simulate buying stocks based on investment strategy (single-shot vs DCA)
        - Calculate returns, allocation percentages, and performance metrics
        - Compare portfolio performance against SPY (S&P 500) benchmark
        - Generate performance data for charting
        """
        # Step 4.1: Ensure we have tool calls with investment data
        if self.state['state']['messages'][-1].tool_calls is None:
            return "end"

           // ...

        # Step 4.18: Add a tool message indicating data extraction is complete
        self.state['state']["messages"].append(
            ToolMessage(
                role="tool",
                id=str(uuid.uuid4()),
                content="The relevant details had been extracted",
                tool_call_id=self.state['state']["messages"][-1].tool_calls[0].id,
            )
        )

        # Step 4.19: Add assistant message with chart rendering tool call
        self.state['state']["messages"].append(
            AssistantMessage(
                role="assistant",
                tool_calls=[
                    {
                        "id": str(uuid.uuid4()),
                        "type": "function",
                        "function": {
                            "name": "render_standard_charts_and_table",
                            "arguments": json.dumps(
                                {"investment_summary": self.state['state']["investment_summary"]}
                            ),
                        },
                    }
                ],
                id=str(uuid.uuid4()),
            )
        )

       // ...
Enter fullscreen mode Exit fullscreen mode

然后,定义 AG-UI 协议工具调用事件,代理可以使用这些事件通过调用前端操作并使用工具名称来请求用户反馈,从而触发前端操作,如agent/main.py文件中所示。

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format

            // ...

            # Step 8: Handle workflow results based on the final message type
            # Check if the last message is from the assistant (AI agent)
            if state["messages"][-1].role == "assistant":

                # Step 8.1: Handle tool call results (charts, data visualizations)
                if state["messages"][-1].tool_calls:
                    # The workflow generated a tool call (e.g., render charts)
                    # Stream tool call events to trigger UI rendering

                    yield encoder.encode(
                        ToolCallStartEvent(
                            type=EventType.TOOL_CALL_START,
                            tool_call_id=state["messages"][-1].tool_calls[0].id,
                            toolCallName=state["messages"][-1]
                            .tool_calls[0]
                            .function.name,
                        )
                    )

                    # Stream the tool call arguments (contains analysis results)
                    yield encoder.encode(
                        ToolCallArgsEvent(
                            type=EventType.TOOL_CALL_ARGS,
                            tool_call_id=state["messages"][-1].tool_calls[0].id,
                            delta=state["messages"][-1]
                            .tool_calls[0]
                            .function.arguments,  # Contains investment summary and insights
                        )
                    )

                    # Signal end of tool call
                    yield encoder.encode(
                        ToolCallEndEvent(
                            type=EventType.TOOL_CALL_END,
                            tool_call_id=state["messages"][-1].tool_calls[0].id,
                        )
                    )
                else:
                    # Step 8.2: Handle text message results (when no analysis was performed)
                    yield encoder.encode(
                        TextMessageStartEvent(
                            type=EventType.TEXT_MESSAGE_START,
                            message_id=message_id,
                            role="assistant",
                        )
                    )

                    # Step 8.2.1: Stream text content if available
                    if state["messages"][-1].content:
                        content = state["messages"][-1].content

                        # Split content into chunks for smooth streaming effect
                        n_parts = 100  # Number of chunks for streaming
                        part_length = max(1, len(content) // n_parts)
                        parts = [content[i:i+part_length] for i in range(0, len(content), part_length)]

                        # Ensure we don't exceed the target number of parts
                        if len(parts) > n_parts:
                            parts = parts[:n_parts-1] + [''.join(parts[n_parts-1:])]

                        # Stream each part with a small delay for smooth typing effect
                        for part in parts:
                            yield encoder.encode(
                                TextMessageContentEvent(
                                    type=EventType.TEXT_MESSAGE_CONTENT,
                                    message_id=message_id,
                                    delta=part,  # Chunk of text content
                                )
                            )
                            await asyncio.sleep(0.05)  # Small delay for typing effect
                    else:
                        # Step 8.2.2: Handle case where no content is available (error scenario)
                        yield encoder.encode(
                            TextMessageContentEvent(
                                type=EventType.TEXT_MESSAGE_CONTENT,
                                message_id=message_id,
                                delta="Something went wrong! Please try again.",
                            )
                        )

                    # Step 8.2.3: Signal end of text message
                    yield encoder.encode(
                        TextMessageEndEvent(
                            type=EventType.TEXT_MESSAGE_END,
                            message_id=message_id,
                        )
                    )

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

步骤 8:配置 AG-UI 协议文本消息事件

配置好 AG-UI 协议工具事件后,定义 AG-UI 协议文本消息事件来处理流式代理响应到前端,如agent/main.py文件中所示。

@app.post("/crewai-agent")
async def crewai_agent(input_data: RunAgentInput):
    """
    Main API endpoint for processing stock analysis requests.

    This endpoint:
    1. Receives user input and current state from the frontend
    2. Streams real-time events back to the client during processing
    3. Runs the StockAnalysisFlow workflow asynchronously
    4. Returns results via Server-Sent Events (SSE) streaming

    Args:
        input_data (RunAgentInput): Contains user messages, tools, state, thread/run IDs

    Returns:
        StreamingResponse: Real-time stream of events during agent execution
    """
    try:

        async def event_generator():
            # Step 1: Initialize event streaming infrastructure
            encoder = EventEncoder()  # Converts events to SSE format

            // ...

            # Step 8: Handle workflow results based on the final message type
            # Check if the last message is from the assistant (AI agent)
            if state["messages"][-1].role == "assistant":

                // ...

                else:
                    # Step 8.2: Handle text message results (when no analysis was performed)
                    yield encoder.encode(
                        TextMessageStartEvent(
                            type=EventType.TEXT_MESSAGE_START,
                            message_id=message_id,
                            role="assistant",
                        )
                    )

                    # Step 8.2.1: Stream text content if available
                    if state["messages"][-1].content:
                        content = state["messages"][-1].content

                        # Split content into chunks for smooth streaming effect
                        n_parts = 100  # Number of chunks for streaming
                        part_length = max(1, len(content) // n_parts)
                        parts = [content[i:i+part_length] for i in range(0, len(content), part_length)]

                        # Ensure we don't exceed the target number of parts
                        if len(parts) > n_parts:
                            parts = parts[:n_parts-1] + [''.join(parts[n_parts-1:])]

                        # Stream each part with a small delay for smooth typing effect
                        for part in parts:
                            yield encoder.encode(
                                TextMessageContentEvent(
                                    type=EventType.TEXT_MESSAGE_CONTENT,
                                    message_id=message_id,
                                    delta=part,  # Chunk of text content
                                )
                            )
                            await asyncio.sleep(0.05)  # Small delay for typing effect
                    else:
                        # Step 8.2.2: Handle case where no content is available (error scenario)
                        yield encoder.encode(
                            TextMessageContentEvent(
                                type=EventType.TEXT_MESSAGE_CONTENT,
                                message_id=message_id,
                                delta="Something went wrong! Please try again.",
                            )
                        )

                    # Step 8.2.3: Signal end of text message
                    yield encoder.encode(
                        TextMessageEndEvent(
                            type=EventType.TEXT_MESSAGE_END,
                            message_id=message_id,
                        )
                    )

    except Exception as e:
        # Step 10: Handle any unexpected errors during processing
        print(e)  # Log error for debugging

    # Step 11: Return streaming response to client
    # Step 11: Return streaming response to client
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

恭喜!您已将 CrewAI 代理工作流程与 AG-UI 协议集成。现在让我们看看如何为 AG-UI + CrewAI 代理工作流程添加前端。

使用 CopilotKit 将前端集成到 AG-UI + CrewAI 代理工作流程中

在本节中,您将学习如何使用 CopilotKit 在 AG-UI + CrewAI 代理工作流程和前端之间建立连接。

我们开始吧。

首先,导航到前端目录:

cd frontend
Enter fullscreen mode Exit fullscreen mode

接下来,创建一个 .env 包含OpenAI API密钥的文件:

OPENAI_API_KEY=<<your-OpenAI-key-here>>
Enter fullscreen mode Exit fullscreen mode

然后安装依赖项:

pnpm install
Enter fullscreen mode Exit fullscreen mode

之后,启动开发服务器:

pnpm run dev
Enter fullscreen mode Exit fullscreen mode

访问 http://localhost:3000,您应该可以看到 AG-UI + CrewAI 代理前端已启动并运行。

图片来自 Notion

现在让我们看看如何使用 CopilotKit 为 AG-UI + CrewAI 代理构建前端 UI。

步骤 1:创建 HttpAgent 实例

在创建 HttpAgent 实例之前,我们先来了解一下 HttpAgent 是什么。

HttpAgent 是 AG-UI 库中的一个客户端,它可以将您的前端应用程序与任何 AG-UI 兼容的 AI 代理服务器连接起来。

要创建 HttpAgent 实例,请在 API 路由中定义它,如src/app/api/copilotkit/route.ts文件中所示。

// ===============================================================================
// IMPORTS AND DEPENDENCIES
// ===============================================================================

// CopilotKit framework imports for AI assistant integration
import {
  CopilotRuntime, // Core runtime for managing AI agents
  copilotRuntimeNextJSAppRouterEndpoint, // Next.js App Router integration
  OpenAIAdapter, // Adapter for OpenAI API compatibility
} from "@copilotkit/runtime";

// Next.js framework imports
import { NextRequest } from "next/server"; // Type for Next.js request objects

// AG UI client for communicating with external agents
import { HttpAgent } from "@ag-ui/client"; // HTTP client for agent communication

// ===============================================================================
// AGENT CONFIGURATION
// ===============================================================================

/**
 * Step 1: Configure the CrewAI stock analysis agent
 *
 * This HttpAgent connects to our FastAPI backend running the stock analysis workflow.
 * It acts as a bridge between the CopilotKit frontend and the Python-based agent.
 */
const crewaiAgent = new HttpAgent({
  // Development URL (commented out for reference)
  // url: "http://0.0.0.0:8000/crewai-agent",

  // Production-ready URL configuration with environment variable fallback
  // Uses NEXT_PUBLIC_CREWAI_URL if set, otherwise defaults to local development URL
  url: process.env.NEXT_PUBLIC_CREWAI_URL || "http://0.0.0.0:8000/crewai-agent",
});

// ===============================================================================
// COPILOT RUNTIME SETUP
// ===============================================================================

/**
 * Step 2: Configure the OpenAI service adapter
 *
 * This adapter provides OpenAI API compatibility for the CopilotKit runtime.
 * It handles communication with OpenAI's language models for natural language processing.
 */
const serviceAdapter = new OpenAIAdapter();

/**
 * Step 3: Initialize the CopilotKit runtime
 *
 * The runtime orchestrates communication between:
 * - Frontend UI components
 * - Language models (via OpenAI adapter)
 * - External agents (our stock analysis agent)
 */
const runtime = new CopilotRuntime({
  agents: {
    // Register our stock analysis agent with the runtime
    // @ts-ignore - Suppressing TypeScript warning for agent type compatibility
    crewaiAgent: crewaiAgent,
  },
});

// Alternative simple runtime configuration (commented out)
// const runtime = new CopilotRuntime()

// ===============================================================================
// API ROUTE HANDLER
// ===============================================================================

/**
 * Step 4: Define the POST endpoint handler
 *
 * This Next.js API route handles incoming requests from the CopilotKit frontend.
 * It processes user interactions, manages agent communication, and streams responses.
 *
 * @param req - Next.js request object containing user input and session data
 * @returns Promise<Response> - Streaming response with agent results
 */
export const POST = async (req: NextRequest) => {
  /**
   * Step 4.1: Create the request handler using CopilotKit's Next.js integration
   *
   * This function:
   * - Extracts user messages and state from the request
   * - Routes requests to appropriate agents
   * - Manages streaming responses back to the frontend
   * - Handles error scenarios gracefully
   */
  const { handleRequest } = copilotRuntimeNextJSAppRouterEndpoint({
    runtime, // The configured CopilotKit runtime with our agents
    serviceAdapter, // OpenAI adapter for language model integration
    endpoint: "/api/copilotkit", // The API endpoint path for this handler
  });

  /**
   * Step 4.2: Process the incoming request
   *
   * The handleRequest function:
   * - Parses the request body for user input
   * - Determines which agent should handle the request
   * - Forwards the request to the appropriate agent (crewaiAgent)
   * - Streams the agent's response back to the client
   */
  return handleRequest(req);
};
Enter fullscreen mode Exit fullscreen mode

步骤 2:设置 CopilotKit 提供程序

要设置 CopilotKit 提供程序, [<CopilotKit>](https://docs.copilotkit.ai/reference/components/CopilotKit) 组件必须包装应用程序中与 Copilot 相关的部分。

对于大多数使用场景,最好将 CopilotKit 提供程序包装在整个应用程序周围,例如,在您的 中 layout.tsx,如下面的文件中所示 src/app/layout.tsx 。

// Next.js imports for metadata and font handling
import type { Metadata } from "next";
import { Geist, Geist_Mono } from "next/font/google";
// Global styles for the application
import "./globals.css";
// CopilotKit UI styles for AI components
import "@copilotkit/react-ui/styles.css";
// CopilotKit core component for AI functionality
import { CopilotKit } from "@copilotkit/react-core";

// Configure Geist Sans font with CSS variables for consistent typography
const geistSans = Geist({
  variable: "--font-geist-sans",
  subsets: ["latin"],
});

// Configure Geist Mono font for code and monospace text
const geistMono = Geist_Mono({
  variable: "--font-geist-mono",
  subsets: ["latin"],
});

// Metadata configuration for SEO and page information
export const metadata: Metadata = {
  title: "AI Stock Portfolio",
  description: "AI Stock Portfolio",
};

// Root layout component that wraps all pages in the application
export default function RootLayout({
  children,
}: Readonly<{
  children: React.ReactNode;
}>) {
  return (
    <html lang="en">
      <body
        className={`${geistSans.variable} ${geistMono.variable} antialiased`}>
        {/* CopilotKit wrapper that enables AI functionality throughout the app */}
        {/* runtimeUrl points to the API endpoint for AI backend communication */}
        {/* agent specifies which AI agent to use (stockAgent for stock analysis) */}
        <CopilotKit runtimeUrl="/api/copilotkit" agent="crewaiAgent">
          {children}
        </CopilotKit>
      </body>
    </html>
  );
}
Enter fullscreen mode Exit fullscreen mode

步骤 3:设置 Copilot 聊天组件

CopilotKit 附带多个内置聊天组件,包括CopilotPopup、  CopilotSidebarCopilotChat

要设置 Copilot 聊天组件,请按照src/app/components/prompt-panel.tsx文件中所示进行定义。

// Client-side component directive for Next.js
"use client";

import type React from "react";
// CopilotKit chat component for AI interactions
import { CopilotChat } from "@copilotkit/react-ui";

// Props interface for the PromptPanel component
interface PromptPanelProps {
  // Amount of available cash for investment, displayed in the panel
  availableCash: number;
}

// Main component for the AI chat interface panel
export function PromptPanel({ availableCash }: PromptPanelProps) {
  // Utility function to format numbers as USD currency
  // Removes decimal places for cleaner display of large amounts
  const formatCurrency = (amount: number) => {
    return new Intl.NumberFormat("en-US", {
      style: "currency",
      currency: "USD",
      minimumFractionDigits: 0,
      maximumFractionDigits: 0,
    }).format(amount);
  };

  return (
    // Main container with full height and white background
    <div className="h-full flex flex-col bg-white">
      {/* Header section with title, description, and cash display */}
      <div className="p-4 border-b border-[#D8D8E5] bg-[#FAFCFA]">
        {/* Title section with icon and branding */}
        <div className="flex items-center gap-2 mb-2">
          <span className="text-xl">🪁</span>
          <div>
            <h1 className="text-lg font-semibold text-[#030507] font-['Roobert']">
              Portfolio Chat
            </h1>
            {/* Pro badge indicator */}
            <div className="inline-block px-2 py-0.5 bg-[#BEC9FF] text-[#030507] text-xs font-semibold uppercase rounded">
              PRO
            </div>
          </div>
        </div>
        {/* Description of the AI agent's capabilities */}
        <p className="text-xs text-[#575758]">
          Interact with the LangGraph-powered AI agent for portfolio
          visualization and analysis
        </p>

        {/* Available Cash Display section */}
        <div className="mt-3 p-2 bg-[#86ECE4]/10 rounded-lg">
          <div className="text-xs text-[#575758] font-medium">
            Available Cash
          </div>
          <div className="text-sm font-semibold text-[#030507] font-['Roobert']">
            {formatCurrency(availableCash)}
          </div>
        </div>
      </div>
      {/* CopilotKit chat interface with custom styling and initial message */}
      {/* Takes up majority of the panel height for conversation */}
      <CopilotChat
        className="h-[78vh] p-2"
        labels={{
          // Initial welcome message explaining the AI agent's capabilities and limitations
          initial: `I am a Crew AI agent designed to analyze investment opportunities and track stock performance over time. How can I help you with your investment query? For example, you can ask me to analyze a stock like "Invest in Apple with 10k dollars since Jan 2023". \n\nNote: The AI agent has access to stock data from the past 4 years only`,
        }}
      />
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

步骤 4:使用 CopilotKit 钩子将 AG-UI + CrewAI 代理状态与前端同步

在 CopilotKit 中,CoAgents 维护着一个共享状态,该状态能够将前端 UI 与代理的执行无缝连接起来。这个共享状态系统使您能够:

  • 显示代理的当前进度和中间结果

  • 通过用户界面交互更新代理的状态

  • 对应用程序中的状态变化做出实时反应

您可以在 CopilotKit 文档中了解更多关于 CoAgents 共享状态的信息 。

图片来自 Notion

要将 AG-UI + CrewAI 代理状态与前端同步,请使用 CopilotKit useCoAgent hook 将 AG-UI + CrewAI 代理状态与前端共享,如src/app/page.tsx文件中所示。

"use client";

import {
  useCoAgent,
} from "@copilotkit/react-core";

// ...

export interface SandBoxPortfolioState {
  performanceData: Array<{
    date: string;
    portfolio: number;
    spy: number;
  }>;
}
export interface InvestmentPortfolio {
  ticker: string;
  amount: number;
}

export default function OpenStocksCanvas() {

  // ...

  const [totalCash, setTotalCash] = useState(1000000);

  const { state, setState } = useCoAgent({
    name: "crewaiAgent",
    initialState: {
      available_cash: totalCash,
      investment_summary: {} as any,
      investment_portfolio: [] as InvestmentPortfolio[],
    },
  });

    // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
       {/* ... */}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

然后在聊天界面中渲染 AG-UI + CrewAI 代理的状态,这有助于以更贴近上下文的方式告知用户代理的状态。

要在聊天 UI 中渲染 AG-UI + CrewAI 代理的状态,可以使用 useCoAgentStateRender 钩子,如文件中所示src/app/page.tsx

"use client";

import {
  useCoAgentStateRender,
} from "@copilotkit/react-core";

import { ToolLogs } from "./components/tool-logs";

// ...

export default function OpenStocksCanvas() {

  // ...

  useCoAgentStateRender({
    name: "crewaiAgent",
    render: ({ state }) => <ToolLogs logs={state.tool_logs} />,
  });

  // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* ... */}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

如果在聊天中执行查询,您应该会在聊天界面中看到 AG-UI + CrewAI 代理的状态任务执行情况,如下所示。

图片来自 Notion

步骤 5:在前端实现人机交互(HITL)

人机交互(HITL)允许智能体在执行过程中请求人类输入或批准,从而提高人工智能系统的可靠性和可信度。对于需要处理复杂决策或需要人类判断的行动的人工智能应用而言,这种模式至关重要。

您可以在CopilotKit 文档中了解更多关于“人机交互”的信息。

要在前端实现人机交互(HITL),您需要使用 CopilotKit 的 useCopilotKitAction 钩子renderAndWaitForResponse方法,该方法允许从渲染函数异步返回值,如src/app/page.tsx文件中所示。

"use client";

import {
  useCopilotAction,
} from "@copilotkit/react-core";

// ...

export default function OpenStocksCanvas() {

  // ...

  useCopilotAction({
    name: "render_standard_charts_and_table",
    description:
      "This is an action to render a standard chart and table. The chart can be a bar chart or a line chart. The table can be a table of data.",
    renderAndWaitForResponse: ({ args, respond, status }) => {
      useEffect(() => {
        console.log(args, "argsargsargsargsargsaaa");
      }, [args]);
      return (
        <>
          {args?.investment_summary?.percent_allocation_per_stock &&
            args?.investment_summary?.percent_return_per_stock &&
            args?.investment_summary?.performanceData && (
              <>
                <div className="flex flex-col gap-4">
                  <LineChartComponent
                    data={args?.investment_summary?.performanceData}
                    size="small"
                  />
                  <BarChartComponent
                    data={Object.entries(
                      args?.investment_summary?.percent_return_per_stock
                    ).map(([ticker, return1]) => ({
                      ticker,
                      return: return1 as number,
                    }))}
                    size="small"
                  />
                  <AllocationTableComponent
                    allocations={Object.entries(
                      args?.investment_summary?.percent_allocation_per_stock
                    ).map(([ticker, allocation]) => ({
                      ticker,
                      allocation: allocation as a number,
                      currentValue:
                        args?.investment_summary.final_prices[ticker] *
                        args?.investment_summary.holdings[ticker],
                      totalReturn:
                        args?.investment_summary.percent_return_per_stock[
                          ticker
                        ],
                    }))}
                    size="small"
                  />
                </div>

                <button
                  hidden={status == "complete"}
                  className="mt-4 rounded-full px-6 py-2 bg-green-50 text-green-700 border border-green-200 shadow-sm hover:bg-green-100 transition-colors font-semibold text-sm"
                  onClick={() => {
                    debugger;
                    if (respond) {
                      setTotalCash(args?.investment_summary?.cash);
                      setCurrentState({
                        ...currentState,
                        returnsData: Object.entries(
                          args?.investment_summary?.percent_return_per_stock
                        ).map(([ticker, return1]) => ({
                          ticker,
                          return: return1 as number,
                        })),
                        allocations: Object.entries(
                          args?.investment_summary?.percent_allocation_per_stock
                        ).map(([ticker, allocation]) => ({
                          ticker,
                          allocation: allocation as a number,
                          currentValue:
                            args?.investment_summary?.final_prices[ticker] *
                            args?.investment_summary?.holdings[ticker],
                          totalReturn:
                            args?.investment_summary?.percent_return_per_stock[
                              ticker
                            ],
                        })),
                        performanceData:
                          args?.investment_summary?.performanceData,
                        bullInsights: args?.insights?.bullInsights || [],
                        bearInsights: args?.insights?.bearInsights || [],
                        currentPortfolioValue:
                          args?.investment_summary?.total_value,
                        totalReturns: (
                          Object.values(
                            args?.investment_summary?.returns
                          ) as number[]
                        ).reduce((acc, val) => acc + val, 0),
                      });
                      setInvestedAmount(
                        (
                          Object.values(
                            args?.investment_summary?.total_invested_per_stock
                          ) as number[]
                        ).reduce((acc, val) => acc + val, 0)
                      );
                      setState({
                        ...state,
                        available_cash: totalCash,
                      });
                      respond(
                        "Data rendered successfully. Provide a summary of the investments by not making any tool calls."
                      );
                    }
                  }}>
                  Accept
                </button>
                <button
                  hidden={status == "complete"}
                  className="rounded-full px-6 py-2 bg-red-50 text-red-700 border border-red-200 shadow-sm hover:bg-red-100 transition-colors font-semibold text-sm ml-2"
                  onClick={() => {
                    debugger;
                    if (respond) {
                      respond(
                        "Data rendering rejected. Just give a summary of the rejected investments by not making any tool calls."
                      );
                    }
                  }}>
                  Reject
                </button>
              </>
            )}
        </>
      );
    },
  });

  // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* ... */}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

当代理通过工具/操作名称触发前端操作,以在执行过程中请求人工输入或反馈时,最终用户会看到一个选择提示(显示在聊天界面中)。然后,用户可以通过按下聊天界面中的按钮进行选择,如下图所示。

图片来自 Notion

步骤 6:在前端流式传输 AG-UI + CrewAI 代理响应

要在前端流式传输 AG-UI + CrewAI 代理的响应或结果,请将代理的状态字段值传递给前端组件,如文件中所示src/app/page.tsx

"use client";

import { useEffect, useState } from "react";
import { PromptPanel } from "./components/prompt-panel";
import { GenerativeCanvas } from "./components/generative-canvas";
import { ComponentTree } from "./components/component-tree";
import { CashPanel } from "./components/cash-panel";

// ...

export default function OpenStocksCanvas() {
  const [currentState, setCurrentState] = useState<PortfolioState>({
    id: "",
    trigger: "",
    performanceData: [],
    allocations: [],
    returnsData: [],
    bullInsights: [],
    bearInsights: [],
    currentPortfolioValue: 0,
    totalReturns: 0,
  });
  const [sandBoxPortfolio, setSandBoxPortfolio] = useState<
    SandBoxPortfolioState[]
  >([]);
  const [selectedStock, setSelectedStock] = useState<string | null>(null);


  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* Left Panel - Prompt Input */}
      <div className="w-85 border-r border-[#D8D8E5] bg-white flex-shrink-0">
        <PromptPanel availableCash={totalCash} />
      </div>

      {/* Center Panel - Generative Canvas */}
      <div className="flex-1 relative min-w-0">
        {/* Top Bar with Cash Info */}
        <div className="absolute top-0 left-0 right-0 bg-white border-b border-[#D8D8E5] p-4 z-10">
          <CashPanel
            totalCash={totalCash}
            investedAmount={investedAmount}
            currentPortfolioValue={
              totalCash + investedAmount + currentState.totalReturns || 0
            }
            onTotalCashChange={setTotalCash}
            onStateCashChange={setState}
          />
        </div>

        <div className="pt-20 h-full">
          <GenerativeCanvas
            setSelectedStock={setSelectedStock}
            portfolioState={currentState}
            sandBoxPortfolio={sandBoxPortfolio}
            setSandBoxPortfolio={setSandBoxPortfolio}
          />
        </div>
      </div>

      {/* Right Panel - Component Tree (Optional) */}
      {showComponentTree && (
        <div className="w-64 border-l border-[#D8D8E5] bg-white flex-shrink-0">
          <ComponentTree portfolioState={currentState} />
        </div>
      )}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

如果您向客服人员提出询问并批准其反馈请求,您应该会在用户界面中看到客服人员的回复或结果,如下所示。

结论

在本指南中,我们逐步介绍了如何将 CrewAI 代理与 AG-UI 协议集成,然后使用 CopilotKit 为代理添加前端。

虽然我们已经探索了一些功能,但我们仅仅触及了 CopilotKit 无数用例的冰山一角,从构建交互式 AI 聊天机器人到构建代理解决方案——本质上,CopilotKit 可以让您在几分钟内为您的产品添加大量有用的 AI 功能。

希望本指南能帮助您更轻松地将人工智能驱动的副驾驶功能集成到您现有的应用程序中。

在Twitter上关注 CopilotKit  并打个招呼,如果你想开发一些很酷的东西,请加入 Discord 社区。

文章来源:https://dev.to/copilotkit/how-to-integrate-crewai-agents-with-ag-ui-protocol-crewai-ag-ui-copilotkit-3n1d