本文会带你从零搭建一个完整的概念验证项目(POC),技术栈涵盖 Adaptive RAG、LangGraph、FastAPI 和 Streamlit 四个核心组件。Adaptive RAG 负责根据查询复杂度自动调整检索策略;LangGraph 把多步 LLM 推理组织成有状态的可靠工作流;FastAPI 作为高性能后端暴露整条 AI 管道;Streamlit 则提供一个可以直接交互的前端界面。
读完这篇文章,你拿到的不只是理论——而是一个跑得起来的端到端 AI 系统。

要构建的是一个技术支持智能助手。它能理解用户查询,根据问题复杂度动态选择检索深度(Adaptive RAG),通过 LangGraph 执行推理工作流,经由 FastAPI 返回结果,最后在 Streamlit UI 上呈现响应。
这个场景针对的是一个真实痛点:团队面对大规模文档集时,传统 RAG 在模糊查询或多步骤问题上经常答非所问。
技术概览Adaptive RAG
可以把 Adaptive RAG 理解为"搜索之前先思考"的 RAG。简单查询走轻量级检索就够了,遇到复杂问题则自动切换到多跳深度搜索、重排序或查询扩展,用更低的延迟换更高的准确率。
LangGraph
LangGraph 是用来构建有状态、多步骤 AI 工作流的框架。和传统链式调用不同它把 LLM 工作流建模成一张图——每个节点对应一个步骤(检索 → 推理 → 验证 → 响应),原生支持重试、记忆、循环和故障转移。对于需要在生产环境中保证可预测行为的场景,这种抽象比线性 chain 灵活得多。
FastAPI
FastAPI 把 Adaptive RAG + LangGraph 包装成 API 接口对外暴露,处理请求分发,天然适配异步 I/O。
Streamlit
前端用 Streamlit 搭建,聊天风格的界面,不需要写 HTML/CSS做 POC 演示足够了。
系统架构
数据流走向:
User → Query → Streamlit UI Streamlit → Sends request → FastAPI FastAPI → Passes query → LangGraph LangGraph → Runs Adaptive RAG → Retriever Retriever → Gets chunks → Vector DB Vector DB → Returns results → LangGraph LangGraph → Generates final response FastAPI → Sends to UI → User
文件夹结构项目结构尽量精简:
ai-poc/ │ ├── backend/ # 后端逻辑 │ ├── app.py # FastAPI API 服务器 │ ├── rag_pipeline.py # Adaptive RAG 检索 │ ├── graph_workflow.py # LangGraph 工作流 │ ├── config.py # 配置和环境设置 │ ├── data/ # 源文档 │ └── __init__.py # 包初始化器 │ ├── frontend/ # UI 层 │ ├── ui.py # Streamlit 界面 │ └── __init__.py # 包初始化器 │ ├── .env # API 密钥和机密信息 ├── requirements.txt # 项目依赖 └── README.md # 设置说明
requirements.txt 文件
fastapi uvicorn[standard] streamlit requests pydantic langchain langchain-community langgraph faiss-cpu sentence-transformers openai python-dotenv
代码实现(关键代码片段)Adaptive RAG 管道(rag_pipeline.py)
# backend/rag_pipeline.py from typing import List from langchain_community.vectorstores import FAISS from langchain_community.embeddings import HuggingFaceEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import Document class AdaptiveRAG: """ Adaptive Retrieval Pipeline """ def __init__(self, vector_db: FAISS): self.db = vector_db def retrieve(self, query: str) -> List[Document]: if not query.strip(): return [] # Adaptive heuristic token_count = len(query.split()) k = 3 if token_count < 6 else 8 return self.db.similarity_search(query, k=k) def build_vector_store(texts: List[str]) -> FAISS: """ Build FAISS index from raw texts (POC only). In production load persisted DB instead. """ embeddings = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=100 ) docs = [] for text in texts: chunks = splitter.split_text(text) for chunk in chunks: docs.append(chunk) return FAISS.from_texts(docs, embeddings)
自适应的核心逻辑其实很简单:根据查询的 Token 数决定检索深度。查询短于 6 个词就取 3 条结果,否则拉 8 条。这是一个粗粒度的启发式方法,在 POC 阶段够用,生产环境可以替换成更精细的分类器。
build_vector_store 函数从原始文本构建 FAISS 索引。注意这里每次启动都重建索引,生产上应该加载持久化的数据库。
LangGraph 工作流(graph_workflow.py)
# backend/graph_workflow.py from typing import TypedDict, List from langgraph.graph import StateGraph, END from langchain.schema import Document from langchain_openai import ChatOpenAI class GraphState(TypedDict): question: str docs: List[Document] answer: str def create_workflow(rag): llm = ChatOpenAI(model="gpt-4o-mini", temperature=0) workflow = StateGraph(GraphState) # Retrieval Node async def retrieve_node(state: GraphState): docs = rag.retrieve(state["question"]) return {"docs": docs} # Reasoning Node async def reasoning_node(state: GraphState): question = state["question"] docs = state.get("docs", []) context = "\n\n".join([d.page_content for d in docs]) prompt = f""" You are a technical assistant. Use ONLY the context below to answer the question. If the answer is not in the context, say you don't know. Context: {context} Question: {question} """ response = await llm.ainvoke(prompt) return {"answer": response.content} # Add nodes workflow.add_node("retrieve", retrieve_node) workflow.add_node("reason", reasoning_node) # Connect nodes workflow.set_entry_point("retrieve") workflow.add_edge("retrieve", "reason") workflow.add_edge("reason", END) return workflow.compile()
整个工作流只有两个节点:retrieve 负责检索,reason 负责推理生成答案。GraphState 作为 TypedDict 在节点间传递状态。流程很线性——先检索再推理,然后结束。实际项目中可以在这个图上加验证节点、循环重试等分支,LangGraph 的图结构天然支持这种扩展。
FastAPI 后端(app.py)
# backend/app.py import os from fastapi import FastAPI, HTTPException from pydantic import BaseModel from dotenv import load_dotenv from rag_pipeline import AdaptiveRAG, build_vector_store from graph_workflow import create_workflow load_dotenv() app = FastAPI(title="Adaptive RAG API") # --------------------------- # Startup Initialization # --------------------------- class AskRequest(BaseModel): query: str @app.on_event("startup") async def startup_event(): global workflow # Sample knowledge base (replace with real docs) sample_docs = [ "LangGraph supports stateful workflows and retry logic.", "Adaptive RAG dynamically changes retrieval depth based on query complexity.", "FastAPI is a high-performance async Python framework.", ] vector_db = build_vector_store(sample_docs) rag = AdaptiveRAG(vector_db) workflow = create_workflow(rag) # --------------------------- # API Endpoint # --------------------------- @app.post("/ask") async def ask(payload: AskRequest): if not payload.query.strip(): raise HTTPException(status_code=400, detail="Query cannot be empty") try: result = await workflow.ainvoke( {"question": payload.query} ) return {"response": result["answer"]} except Exception as e: raise HTTPException( status_code=500, detail="Internal RAG processing error" )
后端在启动时完成向量库构建和工作流初始化,之后通过 /ask 端点接收查询请求。这里用了 global 变量来持有 workflow 实例——POC 阶段这样做没问题,上生产建议用依赖注入替代。
Streamlit UI(ui.py)
# frontend/ui.py import streamlit as st import requests API_URL = "http://localhost:8000/ask" st.set_page_config(page_title="Adaptive RAG Assistant") st.title("Adaptive RAG Support Assistant") query = st.text_input("Enter your question") if st.button("Ask"): if not query.strip(): st.warning("Please enter a question.") else: try: with st.spinner("Thinking..."): response = requests.post( API_URL, json={"query": query}, timeout=60 ) response.raise_for_status() answer = response.json()["response"] st.markdown("### Answer:") st.write(answer) except Exception as e: st.error(f"Error: {e}")
前端就这么几行:输入框接收问题,按钮触发请求,拿到结果直接渲染。Streamlit 的好处就是不用折腾前端那套东西,做 POC 验证概念足够。
运行项目安装依赖:
pip install -r requirements.txt
设置 OpenAI Key:
export OPENAI_API_KEY="your_key_here" Or(For Windows) setx OPENAI_API_KEY "your_key_here"
启动后端:
uvicorn backend.app:app --reload
启动前端:
streamlit run frontend/ui.py
内部执行流程在 UI 中输入这样一条查询
How does retry logic work in LangGraph workflows?
请求先到达 FastAPI 后端。LangGraph 工作流从 retrieve 节点启动,Adaptive RAG 根据查询长度动态选定检索深度——短查询取 k=3长查询取 k=8。从向量数据库拉到相关文档块后,reasoning 节点把这些上下文拼装成 prompt,交给 LLM 生成答案。LLM 的回答被限定在检索到的上下文范围内,最终结果沿原路返回到 UI。
一切正常的话,你现在手上就有了一条完整的端到端 RAG 管道:UI → API → Graph → Retriever → LLM → Response。
下一步:生产部署POC 跑通了但离生产还有距离。下面按模块列一下需要补强的方向。
检索层
向量相似度搜索可以跟 BM25 关键词搜索做混合,在一些 edge case 上召回率会好很多。检索完 top-k 文档后,再套一层 Cross-Encoder 做重排序,排序精度能上一个台阶。如果系统要支持多团队或多租户,还得引入基于命名空间的文档隔离,防止跨域信息泄漏。
工作流
当前工作流里没有验证环节。生产环境建议加一个验证节点,检查生成的答案是否真的有检索上下文支撑——这对控制幻觉至关重要。另外,如果要做多轮对话,就需要往图里加记忆节点来持久化对话状态。
重试与回退
LLM 调用失败后要有重试机制。主模型不可用时能自动降级到更小或更便宜的备选模型。超时控制和优雅降级也不能少。
成本控制也值得考虑:简单查询走轻量模型,只在必要时才升级到大模型。
可观测性与评估
日志要记全——检索分数、命中的文档、响应延迟、Token 消耗,这些都得有。定期做离线评估,准备好测试数据集,跑检索质量和回答质量的指标。幻觉监控单独拎出来盯,追踪那些答案脱离检索上下文的 case。
UI 改进
聊天界面得支持历史记录和多轮对话。回答来源要做高亮——用户应该能看到答案是从哪几段文档生成的。再加上反馈按钮,让用户对回答打分,收集回来的数据可以用于后续评估和微调。
部署与基础设施
前后端都做 Docker 容器化,保证部署的可复现性。上云的话 AWS、GCP、Azure 都行,记得配自动扩缩容。端点要上 HTTPS 和 Token 认证(JWT 或 OAuth)。生产环境的 API Key 不要再靠环境变量了,换托管的密钥存储服务。
总结一个模块化、可扩展的 RAG 架构就搭建完成了,它完全可以从当前的 POC 状态逐步演化成生产级系统。自适应检索、有状态编排、可扩展 API、简洁的交互界面——这几个构建块拼在一起,基本覆盖了现代 LLM 应用的核心架构需求。
https://avoid.overfit.cn/post/770176403fa04ac49a55a145c36266b8
by Robi Kumar Tomar