• 文档
  • 简单的 Agentic RAG 系统

agentic-rag-crewai-zoom

使用 CrewAI 和 Qdrant 向量数据库的 Agentic RAG

时间:45 分钟等级:初级输出:GitHub

通过结合 Qdrant 进行向量搜索的能力以及 CrewAI 用于编排模块化代理的能力,您可以构建不仅回答问题,还能分析、解释和行动的系统。

传统的 RAG 系统侧重于获取数据并生成响应,但它们缺乏深度推理或处理多步骤过程的能力。

在本教程中,我们将逐步引导您构建一个 Agentic RAG 系统。完成本教程后,您将获得一个可行的框架,用于将数据存储在 Qdrant 向量数据库中,并结合向量搜索,使用 CrewAI 代理从数据中提取洞察。

我们已经为您构建了此应用。克隆此仓库并按照教程进行操作。

您将构建什么

在这个动手教程中,我们将创建一个系统,该系统:

  1. 使用 Qdrant 将会议记录存储和检索为向量嵌入
  2. 利用 CrewAI 代理分析和总结会议数据
  3. 在简单的 Streamlit 界面中呈现洞察,方便交互

本项目演示了如何构建一个由向量搜索驱动的 Agentic 工作流,用于从会议记录中提取洞察。通过结合 Qdrant 的向量搜索能力和 CrewAI 代理,用户可以搜索和分析自己的会议内容。

该应用首先将会议记录转换为向量嵌入并存储在 Qdrant 向量数据库中。然后,它使用 CrewAI 代理查询向量数据库,并从会议内容中提取洞察。最后,它使用 Anthropic Claude 根据从向量数据库中提取的洞察,生成对用户查询的自然语言响应。

工作原理

当您与系统交互时,后台会发生以下事情:

首先,用户向系统提交查询。在此示例中,我们想找出营销会议的平均时长。由于会议数据点之一是会议时长,代理可以通过平均主题或内容中包含关键词“营销”的所有会议的时长来计算会议的平均时长。

User Query Interface

接下来,代理使用 search_meetings 工具在 Qdrant 向量数据库中搜索语义最相似的会议点。我们询问了营销会议,因此代理使用搜索会议工具搜索数据库中所有主题或内容包含关键词“营销”的会议。

Vector Search Results

接下来,代理使用 calculator 工具查找会议的平均时长。

Duration Calculation

最后,代理使用 Information Synthesizer 工具合成分析并以自然语言格式呈现。

Synthesized Analysis

用户在类似聊天的界面中看到最终输出。

Chat Interface

用户可以继续通过提出更多问题与系统交互。

架构

系统基于三个主要组件构建:

  • Qdrant 向量数据库:将会议记录和摘要存储为向量嵌入,支持语义搜索
  • CrewAI 框架:协调处理会议分析不同方面的 AI 代理
  • Anthropic Claude:提供自然语言理解和响应生成能力
  1. 数据处理管道

    • 处理会议记录和元数据
    • 使用 SentenceTransformer 创建嵌入
    • 管理 Qdrant 集合和数据上传
  2. AI 代理系统

    • 实现 CrewAI 代理逻辑
    • 处理向量搜索集成
    • 使用 Claude 处理查询
  3. 用户界面

    • 提供类似聊天的 Web 界面
    • 显示实时处理反馈
    • 维护对话历史记录

入门指南

agentic-rag-crewai-zoom

  1. 获取 Qdrant API 凭据:

  2. 获取 AI 服务的 API 凭据:


设置

  1. 克隆仓库:
git clone https://github.com/qdrant/examples.git
cd agentic_rag_zoom_crewai
  1. 创建并激活一个 Python 虚拟环境,使用 Python 3.10 以确保兼容性:
python3.10 -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate
  1. 安装依赖项:
pip install -r requirements.txt
  1. 配置环境变量:创建一个 .env.local 文件,包含
openai_api_key=your_openai_key_here
anthropic_api_key=your_anthropic_key_here
qdrant_url=your_qdrant_url_here
qdrant_api_key=your_qdrant_api_key_here

使用方法

1. 处理会议数据

data_loader.py 脚本处理会议数据并将其存储在 Qdrant 中

python vector/data_loader.py

脚本运行后,您应该在 Qdrant Cloud 账户中看到一个名为 zoom_recordings 的新集合。此集合包含会议记录的向量嵌入。集合中的点包含原始会议数据,包括主题、内容和摘要。

2. 启动界面

streamlit_app.py 位于 vector 文件夹中。要启动它,请运行

streamlit run vector/streamlit_app.py

运行此脚本后,您将能够通过类似聊天的界面与系统交互。询问有关会议内容的问题,系统将使用 AI 代理找到最相关的信息并以自然语言格式呈现。

数据管道

我们系统的核心是数据处理管道

class MeetingData:
    def _initialize(self):
        self.data_dir = Path(__file__).parent.parent / 'data'
        self.meetings = self._load_meetings()
        
        self.qdrant_client = QdrantClient(
            url=os.getenv('qdrant_url'),
            api_key=os.getenv('qdrant_api_key')
        )
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

data_loader.py 中的单例模式通过使用 Python 的 newinit 方法的 MeetingData 类实现。该类维护一个私有的 _instance 变量来跟踪实例是否存在,以及一个 _initialized 标志来确保初始化代码只运行一次。当使用 MeetingData() 创建新实例时,new 首先检查 _instance 是否存在 - 如果不存在,则创建一个并设置初始化标志为 False。然后 init 方法检查此标志,如果为 False,则运行初始化代码并将标志设置为 True。这确保了所有后续调用 MeetingData() 都返回具有相同初始化资源的相同实例。

处理会议时,我们需要考虑内容和上下文。每个会议在转换为向量之前,会被转换为富文本表示形式。

text_to_embed = f"""
    Topic: {meeting.get('topic', '')}
    Content: {meeting.get('vtt_content', '')}
    Summary: {json.dumps(meeting.get('summary', {}))}
"""

这种结构化格式确保我们的向量嵌入捕获每个会议的完整上下文。但逐个处理会议效率低下。因此,我们批量处理数据。

batch_size = 100
for i in range(0, len(points), batch_size):
    batch = points[i:i + batch_size]
    self.qdrant_client.upsert(
        collection_name='zoom_recordings',
        points=batch
    )

构建 AI 代理系统

我们的 AI 系统采用基于工具的方法。让我们从最简单的工具开始 - 一个用于会议统计的计算器。

class CalculatorTool(BaseTool):
    name: str = "calculator"
    description: str = "Perform basic mathematical calculations"
    
    def _run(self, a: int, b: int) -> dict:
        return {
            "addition": a + b,
            "multiplication": a * b
        }

但真正的力量来自我们的向量搜索集成。此工具将自然语言查询转换为向量表示,并搜索我们的会议数据库。

class SearchMeetingsTool(BaseTool):
    def _run(self, query: str) -> List[Dict]:
        response = openai_client.embeddings.create(
            model="text-embedding-ada-002",
            input=query
        )
        query_vector = response.data[0].embedding
        
        return self.qdrant_client.search(
            collection_name='zoom_recordings',
            query_vector=query_vector,
            limit=10
        )

搜索结果随后输入到我们的分析工具中,该工具使用 Claude 提供更深入的洞察。

class MeetingAnalysisTool(BaseTool):
    def _run(self, meeting_data: dict) -> Dict:
        meetings_text = self._format_meetings(meeting_data)
        
        message = client.messages.create(
            model="claude-3-sonnet-20240229",
            messages=[{
                "role": "user", 
                "content": f"Analyze these meetings:\n\n{meetings_text}"
            }]
        )

编排工作流

当我们在代理框架下将这些工具整合在一起时,奇迹就发生了。我们创建了两个专门的代理:

researcher = Agent(
    role='Research Assistant',
    goal='Find and analyze relevant information',
    tools=[calculator, searcher, analyzer]
)

synthesizer = Agent(
    role='Information Synthesizer',
    goal='Create comprehensive and clear responses'
)

这些代理在协调的工作流中协同工作。研究员收集和分析信息,而合成器创建清晰、可操作的响应。这种关注点分离允许每个代理专注于其优势。

构建用户界面

Streamlit 界面提供了一个简洁、类似聊天的体验,用于与我们的 AI 系统交互。让我们从基本设置开始:

st.set_page_config(
    page_title="Meeting Assistant",
    page_icon="🤖",
    layout="wide"
)

为了使界面更具吸引力,我们添加了自定义样式,使输出更易于阅读。

st.markdown("""
    <style>
    .stApp {
        max-width: 1200px;
        margin: 0 auto;
    }
    .output-container {
        background-color: #f0f2f6;
        padding: 20px;
        border-radius: 10px;
        margin: 10px 0;
    }
    </style>
""", unsafe_allow_html=True)

其中一个关键特性是在处理过程中提供实时反馈。我们通过自定义输出处理程序实现这一点。

class ConsoleOutput:
    def __init__(self, placeholder):
        self.placeholder = placeholder
        self.buffer = []
        self.update_interval = 0.5  # seconds
        self.last_update = time.time()

    def write(self, text):
        self.buffer.append(text)
        if time.time() - self.last_update > self.update_interval:
            self._update_display()

此处理程序缓冲输出并定期更新显示,创建流畅的用户体验。当用户发送查询时,我们通过视觉反馈进行处理。

with st.chat_message("assistant"):
    message_placeholder = st.empty()
    progress_bar = st.progress(0)
    console_placeholder = st.empty()
    
    try:
        console_output = ConsoleOutput(console_placeholder)
        with contextlib.redirect_stdout(console_output):
            progress_bar.progress(0.3)
            full_response = get_crew_response(prompt)
            progress_bar.progress(1.0)

界面维护对话历史记录,使其感觉像自然的对话。

if "messages" not in st.session_state:
    st.session_state.messages = []

for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

我们还在侧边栏中包含了有用的示例和设置。

with st.sidebar:
    st.header("Settings")
    search_limit = st.slider("Number of results", 1, 10, 5)
    
    analysis_depth = st.select_slider(
        "Analysis Depth",
        options=["Basic", "Standard", "Detailed"],
        value="Standard"
    )

这些功能的组合创建了一个既强大又易于使用的界面。用户可以实时查看其查询的处理过程,根据自己的需求调整设置,并通过聊天历史记录保持上下文。


结论

agentic-rag-crewai-zoom

本教程演示了如何构建一个结合向量搜索和 AI 代理的复杂会议分析系统。让我们回顾一下我们涵盖的关键组件:

  1. 向量搜索集成

    • 使用 Qdrant 高效存储和检索会议内容
    • 通过向量嵌入实现语义搜索能力
    • 批量处理以实现最佳性能
  2. AI 代理框架

    • 基于工具的方法实现模块化功能
    • 用于研究和分析的专门代理
    • 集成 Claude 以提供智能洞察
  3. 交互式界面

    • 实时反馈和进度跟踪
    • 持久化聊天历史记录
    • 可配置的搜索和分析设置

生成的系统展示了结合向量搜索和 AI 代理来创建智能会议助手的强大能力。通过本教程,您学习了如何:

  • 高效处理和存储会议数据
  • 实现语义搜索功能
  • 创建用于分析的专门 AI 代理
  • 构建直观的用户界面

这个基础可以通过多种方式进行扩展,例如:

  • 添加更多专门的代理
  • 实现更多分析工具
  • 增强用户界面
  • 集成其他数据源

代码可在仓库中获取,我们鼓励您尝试进行自己的修改和改进。


本页面有用吗?

感谢您的反馈!🙏

很抱歉听到这个消息。😔 您可以在 GitHub 上编辑本页面,或创建一个 GitHub issue。