发布于 2026-01-06 4 阅读
0

使用 LlamaIndex + AG-UI 构建您自己的 AI 股票投资组合代理

使用 LlamaIndex + AG-UI 构建您自己的 AI 股票投资组合代理

本指南将介绍如何将 LlamaIndex 代理与 AG-UI 协议集成。此外,我们还将介绍如何将 AG-UI + LlamaIndex 代理与 CopilotKit 集成,以便与代理进行聊天并将代理的回复流式传输到前端。

在正式开始之前,我们先来看看我们将要涵盖的内容:

  • AG-UI协议是什么?

  • 将 LlamaIndex 代理与 AG-UI 协议集成

  • 使用 CopilotKit 将前端集成到 AG-UI + LlamaIndex 代理中

以下是我们将要构建的内容预览:

AG-UI协议是什么?

由 CopilotKit 开发的 Agent User Interaction Protocol (AG-UI) 是一种开源、轻量级、基于事件的协议,可促进前端和 AI 代理之间丰富的实时交互。

AG-UI 协议支持事件驱动通信、状态管理、工具使用和流式 AI 代理响应。

星 AG-UI ⭐️

为了在前端和您的 AI 代理之间传递信息,AG-UI 使用以下事件:

  • 生命周期事件:这些事件标志着代理任务执行的开始或结束。生命周期事件包括开始RUN_STARTEDRUN_FINISHED结束事件。

  • 短信事件:这些事件处理流式代理对前端的响应。短信事件包括TEXT_MESSAGE_STARTTEXT_MESSAGE_CONTENTTEXT_MESSAGE_END事件。

  • 工具调用事件:这些事件管理代理的工具执行。工具调用事件包括TOOL_CALL_STARTTOOL_CALL_ARGSTOOL_CALL_END事件。

  • 状态管理事件:这些事件使前端和AI代理的状态保持同步。状态管理事件包括STATE_SNAPSHOT事件STATE_DELTA

您可以访问 AG-UI文档,了解更多关于 AG-UI协议及其架构的信息。

图片来自 Notion

现在我们已经了解了 AG-UI 协议是什么,让我们看看如何将其与 LlamaIndex 代理框架集成。

我们开始吧!

先决条件

要完全理解本教程,您需要对 React 或 Next.js 有一定的了解。

我们还将利用以下资源:

  • Python 是一种流行的编程语言,可用于使用 LangGraph 构建 AI 代理;请确保您的计算机上已安装 Python。

  • LlamaIndex  - 一个简单、灵活的框架,用于使用连接到企业数据的 LLM 构建知识助手。

  • OpenAI API 密钥 - 用于启用 GPT 模型执行各种任务的 API 密钥;对于本教程,请确保您有权访问 GPT-4 模型。

  • CopilotKit  - 一个开源的辅助驾驶框架,用于构建自定义 AI 聊天机器人、应用内 AI 代理和文本区域。

将 LlamaIndex 代理与 AG-UI 协议集成

首先,克隆Open AG-UI Demo 存储库,该存储库包含一个基于 Python 的后端(代理)和一个 Next.js 前端(前端)。

接下来,导航到后端目录:

cd agent
Enter fullscreen mode Exit fullscreen mode

然后使用 Poetry 安装依赖项:

poetry install
Enter fullscreen mode Exit fullscreen mode

之后,创建一个 .env 包含OpenAI API密钥的文件:

OPENAI_API_KEY=<<your-OpenAI-key-here>>
Enter fullscreen mode Exit fullscreen mode

然后使用以下命令运行代理:

poetry run python main.py
Enter fullscreen mode Exit fullscreen mode

要测试 AG-UI + LlamaIndex 集成,请在https://reqbin.com/curl上运行以下 curl 命令

curl -X POST "http://localhost:8000/llamaindex-agent" \
  -H "Content-Type: application/json" \
  -d '{
    "thread_id": "test_thread_123",
    "run_id": "test_run_456",
    "messages": [
      {
        "id": "msg_1",
        "role": "user",
        "content": "Analyze AAPL stock with a $10000 investment from 2023-01-01"
      }
    ],
    "tools": [],
    "context": [],
    "forwarded_props": {},
    "state": {}
  }'
Enter fullscreen mode Exit fullscreen mode

现在让我们看看如何将 AG-UI 协议与 LlamaIndex 代理框架集成。

步骤 1:创建您的 LlamaIndex 代理工作流程

在将 AG-UI 协议与 LlamaIndex 代理集成之前,请按照agent/stock_analysis.py文件中所示创建 LlamaIndex 代理工作流程。

from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)

class FuncationCallingAgent(Workflow):
    """
    Main workflow class for stock analysis and portfolio simulation.
    This agent processes user queries, fetches stock data, performs portfolio calculations,
    and generates investment insights using AI function calling.
    """
    def __init__(
        self,
        *args: Any,
        llm: FunctionCallingLLM | None = None,
        tools: List[BaseTool] | None = None,
        messages: List[Any],  # Conversation history
        emit_event: Callable,  # Function to emit UI state updates
        available_cash: int,  # Total cash available for investment
        investment_portfolio: List[Any],  # Current portfolio holdings
        **kwargs: Any,
    ) -> None:
        """
        Initialize the stock analysis workflow agent.

        Args:
            messages: Chat message history
            emit_event: Function to emit UI state changes
            available_cash: Available cash for investments
            investment_portfolio: Current portfolio data
        """
        super().__init__(*args, **kwargs)

        // ...

    @step
    async def chat_function(self, ctx: Context, ev: StartEvent) -> SimulationEvent:
        """
        First step: Analyze the user query and extract investment parameters.
        Uses AI function calling to parse ticker symbols, investment amounts, dates, etc.
        """

        // ...

    @step
    async def simulation_function(
        self, ctx: Context, ev: SimulationEvent
    ) -> CashAllocationEvent | StopEvent:
        """
        Second step: Fetch historical stock data for the specified tickers and time period.
        Downloads stock prices using Yahoo Finance API.
        """
        // ...

    @step
    async def cash_allocation_function(
        self, ctx: Context, ev: CashAllocationEvent
    ) -> InsightsEvent:
        """
        Third step: Perform portfolio simulation and cash allocation calculations.
        Simulates investment strategy (single shot or DCA) and calculates returns.
        """

        // ...

    @step
    async def insights_function(self, ctx: Context, ev: InsightsEvent) -> InsightsEvent:
        """
        Fourth step: Generate AI-powered insights about the investment portfolio.
        Uses GPT to create bull (positive) and bear (negative) insights for each stock.
        """

        // ...
Enter fullscreen mode Exit fullscreen mode

步骤 2:使用 FastAPI 创建端点

定义好 LlamaIndex 代理工作流程后,创建一个 FastAPI 端点,并按照agent/main.py文件中所示导入 LlamaIndex 代理工作流程。

# Import necessary libraries for FastAPI web server and async operations
from fastapi import FastAPI
from fastapi.responses import StreamingResponse  # For streaming real-time responses to the client
import uuid  # For generating unique identifiers
from typing import Any  # Type hints for flexible data types
import os  # Environment variable access
import uvicorn  # ASGI server for running the FastAPI application
import asyncio  # Asynchronous programming support

# Import AG UI core components for event-driven agent communication
from ag_ui.core import (
    RunAgentInput,          # Input schema for agent runs
    StateSnapshotEvent,     # Event for sharing current state
    EventType,              # Enumeration of all event types
    RunStartedEvent,        # Event emitted when the agent run begins
    RunFinishedEvent,       # Event emitted when the agent run completes
    TextMessageStartEvent,  # Event emitted when text message begins
    TextMessageEndEvent,    # Event emitted when text message ends
    TextMessageContentEvent, # Event containing text message content chunks
    ToolCallStartEvent,     # Event emitted when tool call begins
    ToolCallEndEvent,       # Event emitted when tool call completes
    ToolCallArgsEvent,      # Event containing tool call arguments
    StateDeltaEvent,        # Event for incremental state updates
)

# Import encoder for converting events to streamable format
from ag_ui.encoder import EventEncoder

# Import CopilotKit state management
from copilotkit import CopilotKitState
from typing import List

# Import the stock analysis agent workflow
from stock_analysis import FunctionCallingAgent

# Create FastAPI application instance
app = FastAPI()

@app.post("/llamaindex-agent")
async def llama_index_agent(input_data: RunAgentInput):
    """
    Main API endpoint for running the stock analysis agent.

    This endpoint handles incoming requests to analyze stock investments,
    manages the agent workflow execution, and streams real-time events
    back to the client for live progress updates.

    Args:
        input_data: RunAgentInput containing user message, state, tools, and metadata

    Returns:
        StreamingResponse: Real-time event stream of agent execution progress
    """

    // ...

def main():
    """
    Server startup function.

    Initializes and runs the FastAPI server using Uvicorn ASGI server.
    Configures the server to listen on all interfaces with auto-reload for development.
    """
    # Step 1: Get port from environment variable or use default 8000
    port = int(os.getenv("PORT", "8000"))

    # Step 2: Start the Uvicorn server with configuration
    uvicorn.run(
        "main:app",         # Module:app reference for the FastAPI application
        host="0.0.0.0",     # Listen on all network interfaces
        port=port,          # Port number (from environment or default)
        reload=True,        # Enable auto-reload for development (restarts on file changes)
    )

# Application entry point
if __name__ == "__main__":
    # Run the server when the script is executed directly
    main()
Enter fullscreen mode Exit fullscreen mode

步骤 3:定义事件生成器

创建 FastAPI 端点后,定义一个事件生成器,生成 AG-UI 协议事件流,初始化事件编码器,并将流式响应返回给客户端或前端,如文件中所示agent/main.py

@app.post("/llamaindex-agent")
async def llama_index_agent(input_data: RunAgentInput):
    """
    Main API endpoint for running the stock analysis agent.

    This endpoint handles incoming requests to analyze stock investments,
    manages the agent workflow execution, and streams real-time events
    back to the client for live progress updates.

    Args:
        input_data: RunAgentInput containing user message, state, tools, and metadata

    Returns:
        StreamingResponse: Real-time event stream of agent execution progress
    """
    try:
        # Step 1: Define the async event generator function
        # This function orchestrates the entire agent execution and event streaming
        async def event_generator():
            # Step 2: Initialize event handling infrastructure
            encoder = EventEncoder()  # Converts events to streamable format
            event_queue = asyncio.Queue()  # Queue for managing events between agent and API

            # Step 3: Define event emission function for agent callbacks
            def emit_event(event):
                """Callback function for agent to emit events to the stream"""
                event_queue.put_nowait(event)

            # Step 4: Generate unique message ID for tracking
            message_id = str(uuid.uuid4())

            // ...

    except Exception as e:
        # Step 14: Handle any errors during agent execution
        print(e)  # Log the error for debugging
        # Note: In production, you might want to emit an error event to the client

    # Step 15: Return the streaming response to the client
    # The event_generator() produces a stream of server-sent events
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

步骤 4:配置 AG-UI 协议生命周期事件

定义事件生成器后,定义 AG-UI 协议生命周期事件,这些事件表示 AG-UI + LlamaIndex 代理工作流运行的生命周期,如文件中所示agent/main.py

@app.post("/llamaindex-agent")
async def llama_index_agent(input_data: RunAgentInput):
    """
    Main API endpoint for running the stock analysis agent.

    This endpoint handles incoming requests to analyze stock investments,
    manages the agent workflow execution, and streams real-time events
    back to the client for live progress updates.

    Args:
        input_data: RunAgentInput containing user message, state, tools, and metadata

    Returns:
        StreamingResponse: Real-time event stream of agent execution progress
    """
    try:
        # Step 1: Define the async event generator function
        # This function orchestrates the entire agent execution and event streaming
        async def event_generator():

            // ...

            # Step 5: Emit the run started event to notify the client
            yield encoder.encode(
                RunStartedEvent(
                    type=EventType.RUN_STARTED,
                    thread_id=input_data.thread_id,  # Conversation thread identifier
                    run_id=input_data.run_id,        # Unique run identifier
                )
            )

            // ...

            # Step 13: Signal completion of the agent run
            yield encoder.encode(
                RunFinishedEvent(
                    type=EventType.RUN_FINISHED,
                    thread_id=input_data.thread_id,  # Match the original thread ID
                    run_id=input_data.run_id,        # Match the original run ID
                )
            )

    except Exception as e:
        # Step 14: Handle any errors during agent execution
        print(e)  # Log the error for debugging
        # Note: In production, you might want to emit an error event to the client

    # Step 15: Return the streaming response to the client
    # The event_generator() produces a stream of server-sent events
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

步骤 5:配置 AG-UI 协议状态管理事件

STATE_DELTA定义 AG-UI 协议生命周期事件后,使用LlamaIndex 代理工作流中的事件集成 AG-UI 协议状态管理事件,如agent/stock_analysis.py文件中所示。

from ag_ui.core import EventType, StateDeltaEvent  # Event management for UI state updates

class FuncationCallingAgent(Workflow):
    """
    Main workflow class for stock analysis and portfolio simulation.
    This agent processes user queries, fetches stock data, performs portfolio calculations,
    and generates investment insights using AI function calling.
    """
    def __init__(
        self,
        *args: Any,
        llm: FunctionCallingLLM | None = None,
        tools: List[BaseTool] | None = None,
        messages: List[Any],  # Conversation history
        emit_event: Callable,  # Function to emit UI state updates
        available_cash: int,  # Total cash available for investment
        investment_portfolio: List[Any],  # Current portfolio holdings
        **kwargs: Any,
    ) -> None:
        """
        Initialize the stock analysis workflow agent.

        Args:
            messages: Chat message history
            emit_event: Function to emit UI state changes
            available_cash: Available cash for investments
            investment_portfolio: Current portfolio data
        """

        // ...

    @step
    async def chat_function(self, ctx: Context, ev: StartEvent) -> SimulationEvent:
        """
        First step: Analyze the user query and extract investment parameters.
        Uses AI function calling to parse ticker symbols, investment amounts, dates, etc.
        """
        # Step 1: Initialize error handling and progress tracking
        try:
            # Step 2: Create a unique tool log ID for UI progress tracking
            tool_log_id = str(uuid.uuid4())
            self.tool_logs.append(
                {
                    "id": tool_log_id,
                    "message": "Analyzing user query",
                    "status": "processing",
                }
            )

            # Step 3: Emit UI state update to show progress to the user
            self.emit_event(
                StateDeltaEvent(
                    type=EventType.STATE_DELTA,
                    delta=[
                        {
                            "op": "add",
                            "path": "/tool_logs/-",
                            "value": {
                                "message": "Analyzing user query",
                                "status": "processing",
                                "id": tool_log_id,
                            },
                        }
                    ],
                )
            )
            await asyncio.sleep(0)  # Yield control for async execution

            # Step 4: Call the OpenAI GPT model with function calling to extract investment data
            response = self.llm.chat.completions.create(
                model="gpt-4.1-mini",
                messages=self.messages,
                tools=[extract_relevant_data_from_user_prompt],  # Function schema for data extraction
            )
            print(response)  # Debug: Show AI response

            # Step 5: Handle AI response - check if it made function calls
            if response.choices[0].finish_reason == "tool_calls":
                # Step 5a: AI wants to call the data extraction function
                tool_calls = [
                    convert_tool_call(tc)
                    for tc in response.choices[0].message.tool_calls
                ]

                # Step 5b: Add AI message with tool calls to conversation history
                a_message = AssistantMessage(
                    role="assistant", tool_calls=tool_calls, id=response.id
                )
                self.messages.append(a_message)

                # Step 5c: Update UI to show completion of analysis step
                index = len(self.tool_logs) - 1
                self.emit_event(
                    StateDeltaEvent(
                        type=EventType.STATE_DELTA,
                        delta=[
                            {
                                "op": "replace",
                                "path": f"/tool_logs/{index}/status",
                                "value": "completed",
                            }
                        ],
                    )
                )
                await asyncio.sleep(0)
                # Step 5d: Proceed to next workflow step (simulation)
                return SimulationEvent(input=self.messages)
            else:
                # Step 6: AI provided a direct response without function calls
                a_message = AssistantMessage(
                    id=response.id,
                    content=response.choices[0].message.content,
                    role="assistant",
                )
                self.messages.append(a_message)

                # Step 6a: Update UI status and end workflow
                index = len(self.tool_logs) - 1
                self.emit_event(
                    StateDeltaEvent(
                        type=EventType.STATE_DELTA,
                        delta=[
                            {
                                "op": "replace",
                                "path": f"/tool_logs/{index}/status",
                                "value": "completed",
                            }
                        ],
                    )
                )
                await asyncio.sleep(0)
                return StopEvent(result=self.messages)
        except Exception as e:
            # Step 7: Handle any errors in the chat analysis step
            print(e)
            return StopEvent(
                result={"response": "Error in chat function", "sources": []}
            )
Enter fullscreen mode Exit fullscreen mode

然后,在 FastAPI 端点中,使用STATE_SNAPSHOT状态管理事件初始化 LlamaIndex 代理工作流状态,如下所示。

@app.post("/llamaindex-agent")
async def llama_index_agent(input_data: RunAgentInput):
    """
    Main API endpoint for running the stock analysis agent.

    This endpoint handles incoming requests to analyze stock investments,
    manages the agent workflow execution, and streams real-time events
    back to the client for live progress updates.

    Args:
        input_data: RunAgentInput containing user message, state, tools, and metadata

    Returns:
        StreamingResponse: Real-time event stream of agent execution progress
    """
    try:
        # Step 1: Define the async event generator function
        # This function orchestrates the entire agent execution and event streaming
        async def event_generator():

            // ...

            # Step 6: Send initial state snapshot to client
            # This provides the frontend with the current portfolio and cash state
            yield encoder.encode(
                StateSnapshotEvent(
                    type=EventType.STATE_SNAPSHOT,
                    snapshot={
                        "available_cash": input_data.state["available_cash"],
                        "investment_summary": input_data.state["investment_summary"],
                        "investment_portfolio": input_data.state["investment_portfolio"],
                        "tool_logs": [],  # Start with empty tool logs
                    },
                )
            )

            // ...
Enter fullscreen mode Exit fullscreen mode

步骤 6:使用 AG-UI 协议配置 LlamaIndex 代理工作流程

初始化 LlamaIndex 代理工作流状态后,按照agent/main.py文件中的说明,将 LlamaIndex 代理工作流与 AG-UI 协议集成。

@app.post("/llamaindex-agent")
async def llama_index_agent(input_data: RunAgentInput):
    """
    Main API endpoint for running the stock analysis agent.

    This endpoint handles incoming requests to analyze stock investments,
    manages the agent workflow execution, and streams real-time events
    back to the client for live progress updates.

    Args:
        input_data: RunAgentInput containing user message, state, tools, and metadata

    Returns:
        StreamingResponse: Real-time event stream of agent execution progress
    """
    try:
        # Step 1: Define the async event generator function
        # This function orchestrates the entire agent execution and event streaming
        async def event_generator():

            // ...

            # Step 7: Initialize agent state object
            # Creates a structured state object with all necessary data for the workflow
            state = AgentState(
                tools=input_data.tools,                                    # Available function tools
                messages=input_data.messages,                              # Conversation history
                be_stock_data=None,                                        # Will be populated with stock data
                be_arguments=None,                                         # Will be populated with investment params
                available_cash=input_data.state["available_cash"],         # Cash available for investing
                investment_portfolio=input_data.state["investment_portfolio"], # Current portfolio holdings
                tool_logs=[],                                              # Progress tracking logs
            )

            # Step 8: Create and configure the stock analysis agent
            agent = FuncationCallingAgent(
                tools=input_data.tools,                                    # Function calling tools
                messages=input_data.messages,                              # Message history
                emit_event=emit_event,                                     # Callback for real-time events
                available_cash=input_data.state["available_cash"],         # Investment budget
                investment_portfolio=input_data.state["investment_portfolio"], # Portfolio data
            )

            # Step 9: Start the agent workflow execution asynchronously
            # The agent will run through: chat → simulation → allocation → insights
            agent_task = agent.run(input=input_data.messages[-1].content)

            # Step 10: Event streaming loop - relay agent events to client
            # This loop runs until the agent completes its workflow
            while True:
                try:
                    # Step 10a: Wait for events from agent (with timeout for responsiveness)
                    event = await asyncio.wait_for(event_queue.get(), timeout=0.1)
                    # Step 10b: Encode and stream the event to the client
                    yield encoder.encode(event)
                except asyncio.TimeoutError:
                    # Step 10c: Check if the agent workflow has completed
                    if agent_task.done():
                        break  # Exit the streaming loop

            # Step 11: Clear tool logs after workflow completion
            # Reset progress indicators since the analysis is complete
            yield encoder.encode(
                StateDeltaEvent(
                    type=EventType.STATE_DELTA,
                    delta=[{"op": "replace", "path": "/tool_logs", "value": []}],
                )
            )

            // ...

    except Exception as e:
        # Step 14: Handle any errors during agent execution
        print(e)  # Log the error for debugging
        # Note: In production, you might want to emit an error event to the client

    # Step 15: Return the streaming response to the client
    # The event_generator() produces a stream of server-sent events
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

步骤 7:配置 AG-UI 协议工具事件以处理人机交互断点

将 LlamaIndex 代理工作流程与 AG-UI 协议集成后,将带有工具调用名称的工具调用消息附加到状态,如文件中的现金分配步骤所示agent/stock_analysis.py

class FuncationCallingAgent(Workflow):
    """
    Main workflow class for stock analysis and portfolio simulation.
    This agent processes user queries, fetches stock data, performs portfolio calculations,
    and generates investment insights using AI function calling.
    """
    def __init__(
        self,
        *args: Any,
        llm: FunctionCallingLLM | None = None,
        tools: List[BaseTool] | None = None,
        messages: List[Any],  # Conversation history
        emit_event: Callable,  # Function to emit UI state updates
        available_cash: int,  # Total cash available for investment
        investment_portfolio: List[Any],  # Current portfolio holdings
        **kwargs: Any,
    ) -> None:
        """
        Initialize the stock analysis workflow agent.

        Args:
            messages: Chat message history
            emit_event: Function to emit UI state changes
            available_cash: Available cash for investments
            investment_portfolio: Current portfolio data
        """

        // ...

    @step
    async def cash_allocation_function(
        self, ctx: Context, ev: CashAllocationEvent
    ) -> InsightsEvent:
        """
        Third step: Perform portfolio simulation and cash allocation calculations.
        Simulates investment strategy (single shot or DCA) and calculates returns.
        """

        // ...

        # Step 20: Add tool message to conversation history
        # This confirms the data extraction was completed successfully
        self.messages.append(
            ToolMessage(
                role="tool",
                id=str(uuid.uuid4()),
                content="The relevant details had been extracted",
                tool_call_id=self.messages[-1].tool_calls[0].id,
            )
        )

        # Step 21: Add assistant message requesting chart/table rendering
        # This triggers the UI to display charts and allocation tables
        self.messages.append(
            AssistantMessage(
                role="assistant",
                tool_calls=[
                    {
                        "id": str(uuid.uuid4()),
                        "type": "function",
                        "function": {
                            "name": "render_standard_charts_and_table",
                            "arguments": json.dumps(
                                {"investment_summary": self.investment_summary}
                            ),
                        },
                    }
                ],
                id=str(uuid.uuid4()),
            )
        )

        # Step 22: Update UI to show completion of cash allocation step
        index = len(self.tool_logs) - 1
        self.emit_event(
            StateDeltaEvent(
                type=EventType.STATE_DELTA,
                delta=[
                    {
                        "op": "replace",
                        "path": f"/tool_logs/{index}/status",
                        "value": "completed",
                    }
                ],
            )
        )
        await asyncio.sleep(0)

        # Step 23: Proceed to insights generation step
        return InsightsEvent(input=self.messages)
Enter fullscreen mode Exit fullscreen mode

然后,定义 AG-UI 协议工具调用事件,代理可以使用这些事件通过调用前端操作并使用工具名称来触发前端操作,以便请求用户反馈,如文件中所示agent/main.py

@app.post("/llamaindex-agent")
async def llama_index_agent(input_data: RunAgentInput):
    """
    Main API endpoint for running the stock analysis agent.

    This endpoint handles incoming requests to analyze stock investments,
    manages the agent workflow execution, and streams real-time events
    back to the client for live progress updates.

    Args:
        input_data: RunAgentInput containing user message, state, tools, and metadata

    Returns:
        StreamingResponse: Real-time event stream of agent execution progress
    """
    try:
        # Step 1: Define the async event generator function
        # This function orchestrates the entire agent execution and event streaming
        async def event_generator():

            // ...

            # Step 12: Process and stream the final agent response
            # Check the type of response (tool call vs text message)
            if agent_task.result()[-1].role == "assistant":
                if agent_task.result()[-1].tool_calls:
                    # Step 12a: Handle tool call response (e.g., chart rendering request)
                    # This occurs when the agent wants to display charts/tables

                    # Step 12a-i: Signal start of tool call
                    yield encoder.encode(
                        ToolCallStartEvent(
                            type=EventType.TOOL_CALL_START,
                            tool_call_id=agent_task.result()[-1].tool_calls[0].id,
                            toolCallName=agent_task.result()[-1].tool_calls[0].function.name,
                        )
                    )

                    # Step 12a-ii: Stream tool call arguments (investment data + insights)
                    yield encoder.encode(
                        ToolCallArgsEvent(
                            type=EventType.TOOL_CALL_ARGS,
                            tool_call_id=agent_task.result()[-1].tool_calls[0].id,
                            delta=agent_task.result()[-1].tool_calls[0].function.arguments,
                        )
                    )

                    # Step 12a-iii: Signal end of tool call
                    yield encoder.encode(
                        ToolCallEndEvent(
                            type=EventType.TOOL_CALL_END,
                            tool_call_id=agent_task.result()[-1].tool_calls[0].id,
                        )
                    )
                else:

                    // ...

    except Exception as e:
        # Step 14: Handle any errors during agent execution
        print(e)  # Log the error for debugging
        # Note: In production, you might want to emit an error event to the client

    # Step 15: Return the streaming response to the client
    # The event_generator() produces a stream of server-sent events
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

步骤 8:配置 AG-UI 协议文本消息事件

配置好 AG-UI 协议工具事件后,定义 AG-UI 协议文本消息事件,以便处理流式代理响应到前端,如agent/main.py文件中所示。

@app.post("/llamaindex-agent")
async def llama_index_agent(input_data: RunAgentInput):
    """
    Main API endpoint for running the stock analysis agent.

    This endpoint handles incoming requests to analyze stock investments,
    manages the agent workflow execution, and streams real-time events
    back to the client for live progress updates.

    Args:
        input_data: RunAgentInput containing user message, state, tools, and metadata

    Returns:
        StreamingResponse: Real-time event stream of agent execution progress
    """
    try:
        # Step 1: Define the async event generator function
        # This function orchestrates the entire agent execution and event streaming
        async def event_generator():

            // ...

            # Step 12: Process and stream the final agent response
            # Check the type of response (tool call vs text message)
            if agent_task.result()[-1].role == "assistant":

                // ...

                else:
                    # Step 12b: Handle text message response
                    # This occurs when the agent provides a direct text response

                    # Step 12b-i: Signal start of text message
                    yield encoder.encode(
                        TextMessageStartEvent(
                            type=EventType.TEXT_MESSAGE_START,
                            message_id=message_id,
                            role="assistant",
                        )
                    )

                    # Step 12b-ii: Stream message content with chunking for smooth display
                    if agent_task.result()[-1].content:
                        content = agent_task.result()[-1].content

                        # Step 12b-ii-1: Split content into chunks for streaming effect
                        n_parts = 100  # Number of chunks for smooth streaming
                        part_length = max(1, len(content) // n_parts)
                        parts = [
                            content[i : i + part_length]
                            for i in range(0, len(content), part_length)
                        ]

                        # Step 12b-ii-2: Handle rounding issues by merging excess parts
                        if len(parts) > n_parts:
                            parts = parts[: n_parts - 1] + [
                                "".join(parts[n_parts - 1 :])
                            ]

                        # Step 12b-ii-3: Stream each content chunk with a delay for typing effect
                        for part in parts:
                            yield encoder.encode(
                                TextMessageContentEvent(
                                    type=EventType.TEXT_MESSAGE_CONTENT,
                                    message_id=message_id,
                                    delta=part,  # Chunk of the message content
                                )
                            )
                            await asyncio.sleep(0.05)  # Small delay for streaming effect
                    else:
                        # Step 12b-ii-4: Handle empty content case with error message
                        yield encoder.encode(
                            TextMessageContentEvent(
                                type=EventType.TEXT_MESSAGE_CONTENT,
                                message_id=message_id,
                                delta="Something went wrong! Please try again.",
                            )
                        )

                    # Step 12b-iii: Signal end of text message
                    yield encoder.encode(
                        TextMessageEndEvent(
                            type=EventType.TEXT_MESSAGE_END,
                            message_id=message_id,
                        )
                    )

            // ...

    except Exception as e:
        # Step 14: Handle any errors during agent execution
        print(e)  # Log the error for debugging
        # Note: In production, you might want to emit an error event to the client

    # Step 15: Return the streaming response to the client
    # The event_generator() produces a stream of server-sent events
    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

恭喜!您已将 LlamaIndex 代理工作流与 AG-UI 协议集成。现在让我们看看如何为 AG-UI + LlamaIndex 代理工作流添加前端。

使用 CopilotKit 将前端集成到 AG-UI + LlamaIndex 代理工作流程中

在本节中,您将学习如何使用 CopilotKit 在 AG-UI + LlamaIndex 代理工作流程和前端之间建立连接。

我们开始吧。

首先,导航到前端目录:

cd frontend
Enter fullscreen mode Exit fullscreen mode

接下来,创建一个 .env 包含OpenAI API密钥的文件:

OPENAI_API_KEY=<<your-OpenAI-key-here>>
Enter fullscreen mode Exit fullscreen mode

然后安装依赖项:

pnpm install
Enter fullscreen mode Exit fullscreen mode

之后,启动开发服务器:

pnpm run dev
Enter fullscreen mode Exit fullscreen mode

访问 http://localhost:3000,您应该可以看到 AG-UI + LlamaIndex 代理前端已启动并运行。

图片来自 Notion

现在让我们看看如何使用 CopilotKit 为 AG-UI + LlamaIndex 代理构建前端 UI。

步骤 1:创建 HttpAgent 实例

在创建 HttpAgent 实例之前,我们先来了解一下 HttpAgent 是什么。

HttpAgent 是 AG-UI 库中的一个客户端,它可以将您的前端应用程序与任何兼容 AG-UI 的 AI 代理服务器连接起来。

要创建 HttpAgent 实例,请在 API 路由中定义它,如src/app/api/copilotkit/route.ts文件中所示。

// Import necessary CopilotKit components for runtime and AI integration
import {
  CopilotRuntime, // Core runtime for managing AI agents and conversations
  copilotRuntimeNextJSAppRouterEndpoint, // Next.js App Router integration helper
  OpenAIAdapter, // Adapter for OpenAI API integration
} from "@copilotkit/runtime";

// Import Next.js request handling types
import { NextRequest } from "next/server";

// Import AG UI client for external agent communication
import { HttpAgent } from "@ag-ui/client";

// Step 1: Configure the LlamaIndex agent connection
// Create an HTTP agent client to communicate with the Python stock analysis service
const llamaIndexAgent = new HttpAgent({
  // Step 1a: Set agent URL from environment variable or use local development default
  // This points to the FastAPI server running the stock analysis workflow
  url:
    process.env.NEXT_PUBLIC_LLAMAINDEX_URL ||
    "http://0.0.0.0:8000/llamaindex-agent",

  // Alternative production URL (commented out for development)
  // url: "https://open-ag-ui-demo-llamaindex.onrender.com/llamaindex-agent",
});

// Step 2: Initialize OpenAI service adapter
// This adapter handles communication with OpenAI's API for language model interactions
const serviceAdapter = new OpenAIAdapter();

// Step 3: Create CopilotRuntime with agent configuration
// The runtime orchestrates conversations between the frontend and backend agents
const runtime = new CopilotRuntime({
  agents: {
    // Step 3a: Register the LlamaIndex agent for stock analysis
    // @ts-ignore - Suppress TypeScript warnings for agent type compatibility
    llamaIndexAgent: llamaIndexAgent,
  },
});

// Alternative simpler runtime configuration (commented out)
// const runtime = new CopilotRuntime()
// Step 4: Define the POST endpoint handler for CopilotKit API requests
// This endpoint receives requests from the frontend React components and routes them to agents
export const POST = async (req: NextRequest) => {
  // Step 4a: Create request handler using CopilotKit's Next.js integration
  // This configures the endpoint to work with Next.js App Router
  const { handleRequest } = copilotRuntimeNextJSAppRouterEndpoint({
    runtime, // The configured CopilotRuntime with our agents
    serviceAdapter, // OpenAI adapter for language model communication
    endpoint: "/api/copilotkit", // API endpoint path for client connections
  });

  // Step 4b: Delegate request handling to CopilotKit
  // This processes the incoming request and routes it to the appropriate agent
  // Returns streaming responses for real-time communication
  return handleRequest(req);
};
Enter fullscreen mode Exit fullscreen mode

步骤 2:设置 CopilotKit 提供程序

要设置 CopilotKit 提供程序, [<CopilotKit>](https://docs.copilotkit.ai/reference/components/CopilotKit) 组件必须包装应用程序中与 Copilot 相关的部分。

对于大多数使用场景,最好将 CopilotKit 提供程序包装在整个应用程序周围,例如,在您的 中 layout.tsx,如下面的文件中所示 src/app/layout.tsx 。

// Next.js imports for metadata and font handling
import type { Metadata } from "next";
import { Geist, Geist_Mono } from "next/font/google";
// Global styles for the application
import "./globals.css";
// CopilotKit UI styles for AI components
import "@copilotkit/react-ui/styles.css";
// CopilotKit core component for AI functionality
import { CopilotKit } from "@copilotkit/react-core";

// Configure Geist Sans font with CSS variables for consistent typography
const geistSans = Geist({
  variable: "--font-geist-sans",
  subsets: ["latin"],
});

// Configure Geist Mono font for code and monospace text
const geistMono = Geist_Mono({
  variable: "--font-geist-mono",
  subsets: ["latin"],
});

// Metadata configuration for SEO and page information
export const metadata: Metadata = {
  title: "AI Stock Portfolio",
  description: "AI Stock Portfolio",
};

// Root layout component that wraps all pages in the application
export default function RootLayout({
  children,
}: Readonly<{
  children: React.ReactNode;
}>) {
  return (
    <html lang="en">
      <body
        className={`${geistSans.variable} ${geistMono.variable} antialiased`}>
        {/* CopilotKit wrapper that enables AI functionality throughout the app */}
        {/* runtimeUrl points to the API endpoint for AI backend communication */}
        {/* agent specifies which AI agent to use (stockAgent for stock analysis) */}
        <CopilotKit runtimeUrl="/api/copilotkit" agent="llamaIndexAgent">
          {children}
        </CopilotKit>
      </body>
    </html>
  );
}
Enter fullscreen mode Exit fullscreen mode

步骤 3:设置 Copilot 聊天组件

CopilotKit 附带多个内置聊天组件,包括CopilotPopup、  CopilotSidebarCopilotChat

要设置 Copilot 聊天组件,请按照src/app/components/prompt-panel.tsx文件中所示进行定义。

// Client-side component directive for Next.js
"use client";

import type React from "react";
// CopilotKit chat component for AI interactions
import { CopilotChat } from "@copilotkit/react-ui";

// Props interface for the PromptPanel component
interface PromptPanelProps {
  // Amount of available cash for investment, displayed in the panel
  availableCash: number;
}

// Main component for the AI chat interface panel
export function PromptPanel({ availableCash }: PromptPanelProps) {
  // Utility function to format numbers as USD currency
  // Removes decimal places for cleaner display of large amounts
  const formatCurrency = (amount: number) => {
    return new Intl.NumberFormat("en-US", {
      style: "currency",
      currency: "USD",
      minimumFractionDigits: 0,
      maximumFractionDigits: 0,
    }).format(amount);
  };

  return (
    // Main container with full height and white background
    <div className="h-full flex flex-col bg-white">
      {/* Header section with title, description, and cash display */}
      <div className="p-4 border-b border-[#D8D8E5] bg-[#FAFCFA]">
        {/* Title section with icon and branding */}
        <div className="flex items-center gap-2 mb-2">
          <span className="text-xl">🪁</span>
          <div>
            <h1 className="text-lg font-semibold text-[#030507] font-['Roobert']">
              Portfolio Chat
            </h1>
            {/* Pro badge indicator */}
            <div className="inline-block px-2 py-0.5 bg-[#BEC9FF] text-[#030507] text-xs font-semibold uppercase rounded">
              PRO
            </div>
          </div>
        </div>
        {/* Description of the AI agent's capabilities */}
        <p className="text-xs text-[#575758]">
          Interact with the LangGraph-powered AI agent for portfolio
          visualization and analysis
        </p>

        {/* Available Cash Display section */}
        <div className="mt-3 p-2 bg-[#86ECE4]/10 rounded-lg">
          <div className="text-xs text-[#575758] font-medium">
            Available Cash
          </div>
          <div className="text-sm font-semibold text-[#030507] font-['Roobert']">
            {formatCurrency(availableCash)}
          </div>
        </div>
      </div>
      {/* CopilotKit chat interface with custom styling and initial message */}
      {/* Takes up majority of the panel height for conversation */}
      <CopilotChat
        className="h-[78vh] p-2"
        labels={{
          // Initial welcome message explaining the AI agent's capabilities and limitations
          initial: `I am a Crew AI agent designed to analyze investment opportunities and track stock performance over time. How can I help you with your investment query? For example, you can ask me to analyze a stock like "Invest in Apple with 10k dollars since Jan 2023". \n\nNote: The AI agent has access to stock data from the past 4 years only.
        }}
      />
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

步骤 4:使用 CopilotKit 钩子将 AG-UI + LlamaIndex 代理状态与前端同步

在 CopilotKit 中,CoAgents 维护着一个共享状态,该状态能够将前端 UI 与代理的执行无缝连接起来。这个共享状态系统使您能够:

  • 显示代理的当前进度和中间结果

  • 通过用户界面交互更新代理的状态

  • 对应用程序中的状态变化做出实时反应

您可以在 CopilotKit 文档中了解更多关于 CoAgents 共享状态的信息 。

图片来自 Notion

要将 AG-UI + LlamaIndex 代理状态与前端同步,请使用 CopilotKit useCoAgent hook 将 AG-UI + LlamaIndex 代理状态与前端共享,如src/app/page.tsx文件中所示。

"use client";

import {
  useCoAgent,
} from "@copilotkit/react-core";

// ...

export interface SandBoxPortfolioState {
  performanceData: Array<{
    date: string;
    portfolio: number;
    spy: number;
  }>;
}
export interface InvestmentPortfolio {
  ticker: string;
  amount: number;
}

export default function OpenStocksCanvas() {

  // ...

  const [totalCash, setTotalCash] = useState(1000000);

  const { state, setState } = useCoAgent({
    name: "llamaIndexAgent",
    initialState: {
      available_cash: totalCash,
      investment_summary: {} as any,
      investment_portfolio: [] as InvestmentPortfolio[],
    },
  });

    // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
       {/* ... */}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

然后在聊天界面中渲染 AG-UI + LlamaIndex 代理的状态,这有助于以更贴合上下文的方式告知用户代理的状态。

要在聊天 UI 中渲染 AG-UI + LlamaIndex 代理的状态,可以使用 useCoAgentStateRender 钩子,如文件中所示src/app/page.tsx

"use client";

import {
  useCoAgentStateRender,
} from "@copilotkit/react-core";

import { ToolLogs } from "./components/tool-logs";

// ...

export default function OpenStocksCanvas() {

  // ...

  useCoAgentStateRender({
    name: "llamaIndexAgent",
    render: ({ state }) => <ToolLogs logs={state.tool_logs} />,
  });

  // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* ... */}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

如果在聊天中执行查询,您应该会在聊天界面中看到 AG-UI + LlamaIndex 代理的状态任务执行情况,如下所示。

图片来自 Notion

步骤 5:在前端实现人机交互(HITL)

人机交互(HITL)允许智能体在执行过程中请求人类输入或批准,从而提高人工智能系统的可靠性和可信度。对于需要处理复杂决策或需要人类判断的行动的人工智能应用而言,这种模式至关重要。

您可以在CopilotKit 文档中了解更多关于“人机交互”的信息。

要在前端实现人机交互(HITL),您需要使用 CopilotKit 的 useCopilotKitAction 钩子renderAndWaitForResponse方法,该方法允许从渲染函数异步返回值,如src/app/page.tsx文件中所示。

"use client";

import {
  useCopilotAction,
} from "@copilotkit/react-core";

// ...

export default function OpenStocksCanvas() {

  // ...

  useCopilotAction({
    name: "render_standard_charts_and_table",
    description:
      "This is an action to render a standard chart and table. The chart can be a bar chart or a line chart. The table can be a table of data.",
    renderAndWaitForResponse: ({ args, respond, status }) => {
      useEffect(() => {
        console.log(args, "argsargsargsargsargsaaa");
      }, [args]);
      return (
        <>
          {args?.investment_summary?.percent_allocation_per_stock &&
            args?.investment_summary?.percent_return_per_stock &&
            args?.investment_summary?.performanceData && (
              <>
                <div className="flex flex-col gap-4">
                  <LineChartComponent
                    data={args?.investment_summary?.performanceData}
                    size="small"
                  />
                  <BarChartComponent
                    data={Object.entries(
                      args?.investment_summary?.percent_return_per_stock
                    ).map(([ticker, return1]) => ({
                      ticker,
                      return: return1 as number,
                    }))}
                    size="small"
                  />
                  <AllocationTableComponent
                    allocations={Object.entries(
                      args?.investment_summary?.percent_allocation_per_stock
                    ).map(([ticker, allocation]) => ({
                      ticker,
                      allocation: allocation as a number,
                      currentValue:
                        args?.investment_summary.final_prices[ticker] *
                        args?.investment_summary.holdings[ticker],
                      totalReturn:
                        args?.investment_summary.percent_return_per_stock[
                          ticker
                        ],
                    }))}
                    size="small"
                  />
                </div>

                <button
                  hidden={status == "complete"}
                  className="mt-4 rounded-full px-6 py-2 bg-green-50 text-green-700 border border-green-200 shadow-sm hover:bg-green-100 transition-colors font-semibold text-sm"
                  onClick={() => {
                    debugger;
                    if (respond) {
                      setTotalCash(args?.investment_summary?.cash);
                      setCurrentState({
                        ...currentState,
                        returnsData: Object.entries(
                          args?.investment_summary?.percent_return_per_stock
                        ).map(([ticker, return1]) => ({
                          ticker,
                          return: return1 as number,
                        })),
                        allocations: Object.entries(
                          args?.investment_summary?.percent_allocation_per_stock
                        ).map(([ticker, allocation]) => ({
                          ticker,
                          allocation: allocation as a number,
                          currentValue:
                            args?.investment_summary?.final_prices[ticker] *
                            args?.investment_summary?.holdings[ticker],
                          totalReturn:
                            args?.investment_summary?.percent_return_per_stock[
                              ticker
                            ],
                        })),
                        performanceData:
                          args?.investment_summary?.performanceData,
                        bullInsights: args?.insights?.bullInsights || [],
                        bearInsights: args?.insights?.bearInsights || [],
                        currentPortfolioValue:
                          args?.investment_summary?.total_value,
                        totalReturns: (
                          Object.values(
                            args?.investment_summary?.returns
                          ) as number[]
                        ).reduce((acc, val) => acc + val, 0),
                      });
                      setInvestedAmount(
                        (
                          Object.values(
                            args?.investment_summary?.total_invested_per_stock
                          ) as number[]
                        ).reduce((acc, val) => acc + val, 0)
                      );
                      setState({
                        ...state,
                        available_cash: totalCash,
                      });
                      respond(
                        "Data rendered successfully. Provide a summary of the investments by not making any tool calls."
                      );
                    }
                  }}>
                  Accept
                </button>
                <button
                  hidden={status == "complete"}
                  className="rounded-full px-6 py-2 bg-red-50 text-red-700 border border-red-200 shadow-sm hover:bg-red-100 transition-colors font-semibold text-sm ml-2"
                  onClick={() => {
                    debugger;
                    if (respond) {
                      respond(
                        "Data rendering rejected. Just give a summary of the rejected investments by not making any tool calls."
                      );
                    }
                  }}>
                  Reject
                </button>
              </>
            )}
        </>
      );
    },
  });

  // ...

  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* ... */}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

当代理通过工具/操作名称触发前端操作,以在执行过程中请求人工输入或反馈时,最终用户会看到一个选择提示(显示在聊天界面中)。然后,用户可以通过按下聊天界面中的按钮进行选择,如下图所示。

图片来自 Notion

步骤 6:在前端流式传输 AG-UI + LlamaIndex 代理响应

要将 AG-UI + LlamaIndex 代理的响应或结果流式传输到前端,请将代理的状态字段值传递给前端组件,如文件中所示src/app/page.tsx

"use client";

import { useEffect, useState } from "react";
import { PromptPanel } from "./components/prompt-panel";
import { GenerativeCanvas } from "./components/generative-canvas";
import { ComponentTree } from "./components/component-tree";
import { CashPanel } from "./components/cash-panel";

// ...

export default function OpenStocksCanvas() {
  const [currentState, setCurrentState] = useState<PortfolioState>({
    id: "",
    trigger: "",
    performanceData: [],
    allocations: [],
    returnsData: [],
    bullInsights: [],
    bearInsights: [],
    currentPortfolioValue: 0,
    totalReturns: 0,
  });
  const [sandBoxPortfolio, setSandBoxPortfolio] = useState<
    SandBoxPortfolioState[]
  >([]);
  const [selectedStock, setSelectedStock] = useState<string | null>(null);


  return (
    <div className="h-screen bg-[#FAFCFA] flex overflow-hidden">
      {/* Left Panel - Prompt Input */}
      <div className="w-85 border-r border-[#D8D8E5] bg-white flex-shrink-0">
        <PromptPanel availableCash={totalCash} />
      </div>

      {/* Center Panel - Generative Canvas */}
      <div className="flex-1 relative min-w-0">
        {/* Top Bar with Cash Info */}
        <div className="absolute top-0 left-0 right-0 bg-white border-b border-[#D8D8E5] p-4 z-10">
          <CashPanel
            totalCash={totalCash}
            investedAmount={investedAmount}
            currentPortfolioValue={
              totalCash + investedAmount + currentState.totalReturns || 0
            }
            onTotalCashChange={setTotalCash}
            onStateCashChange={setState}
          />
        </div>

        <div className="pt-20 h-full">
          <GenerativeCanvas
            setSelectedStock={setSelectedStock}
            portfolioState={currentState}
            sandBoxPortfolio={sandBoxPortfolio}
            setSandBoxPortfolio={setSandBoxPortfolio}
          />
        </div>
      </div>

      {/* Right Panel - Component Tree (Optional) */}
      {showComponentTree && (
        <div className="w-64 border-l border-[#D8D8E5] bg-white flex-shrink-0">
          <ComponentTree portfolioState={currentState} />
        </div>
      )}
    </div>
  );
}
Enter fullscreen mode Exit fullscreen mode

如果您向客服人员提出询问并批准其反馈请求,您应该会在用户界面中看到客服人员的回复或结果,如下所示。

结论

在本指南中,我们逐步介绍了如何将 LlamaIndex 代理与 AG-UI 协议集成,然后使用 CopilotKit 为代理添加前端。

虽然我们已经探索了一些功能,但我们仅仅触及了 CopilotKit 无数用例的冰山一角,从构建交互式 AI 聊天机器人到构建代理解决方案——本质上,CopilotKit 可以让您在几分钟内为您的产品添加大量有用的 AI 功能。

希望本指南能帮助您更轻松地将人工智能驱动的副驾驶功能集成到您现有的应用程序中。

您想为 AG-UI 做贡献吗?
加入每周工作组吧!

在Twitter上关注 CopilotKit  并打个招呼,如果你想开发一些很酷的东西,请加入 Discord 社区。

文章来源:https://dev.to/copilotkit/build-your-own-ai-stock-portfolio-agent-with-llamaindex-ag-ui-62