
说起 AI Agent(智能体),很多人第一反应是"能聊天的模型"。但当你真正上手搭一个多步骤自动化任务——比如让 AI 帮人写代码、审合同、生成 SQL——就会发现,光靠一个大模型远远不够。你需要技能(Skills)、工具(Tools)、规则(Rules) 和 MCP 工具这几个概念的配合。
本文从概念辨析出发,用 Langchain + AWS Bedrock Nova(AWS 推出的 Nova 系列大模型,海外已大量使用)给出完整的实战代码,手把手展示这四类组件在真实 Agent 系统里是怎么配合工作的。
用一个实际场景来理解这四者的区别:假设你要做一个"研发助手"Agent,目标是帮人审代码、写博客、查数据库。
execute_code() 在沙箱里跑代码片段,就是一个典型工具。下面这张图很直观地展示了它们的关系:
(配图略)
首先创建虚拟环境并安装依赖:
python3 -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install -r requirements.txt
requirements.txt 内容:
langchain>=1.0.0
langchain-aws>=1.2.0
langgraph>=1.0.0
python-dotenv>=1.0.0
boto3>=1.34.0
langfuse>=4.0.0
AWS 侧配置:在 AWS Console 的 Bedrock 控制台申请模型访问(目前 Nova-Pro 在多个区域可用,如 eu-central-1、us-east-1)。拿到访问权限后,在 IAM 里赋予 AmazonBedrockFullAccess 策略。最后把密钥写入 .env 文件:
AWS_ACCESS_KEY_ID=你的AccessKey
AWS_SECRET_ACCESS_KEY=你的SecretKey
我们在项目根目录建一个 skills/ 目录,每个子目录代表一个技能,包含 prompt.md(指令)和 tools.py(工具函数)。以"代码审查"技能为例:
skills/
code_review/
prompt.md
tools.py
prompt.md:
你是一位资深软件工程师,进行专业级代码审查。
审查优先级:正确性 > 安全性 > 性能 > 代码风格
必查项:空输入、None、溢出、边界(off-by-one)等边界情况
标记安全问题:注入、不安全反序列化、硬编码密钥
给出具体重构建议,不只是"这里不好"
也要表扬写得好的地方
输出格式:
- - 总体评价(代码做什么,质量如何)
- - 严重问题(bug、安全漏洞,合并前必修复)
- - 改进建议(性能、可读性、可测试性)
- - 亮点(写得好的地方)
- - 建议重构(关键段落重写示例)
支持语言:Python、JavaScript/TypeScript、SQL、Bash、Go、Java
审查开头请注明语言类型。
tools.py(用 Langchain 的 @tool 装饰器注册工具):
// code_review/tools.py
import ast
import re
import textwrap
from langchain_core.tools import tool
@tool
def detect_secrets(code: str) -> str:
"""扫描源码中的硬编码凭证、API 密钥和连接字符串。"""
patterns = [
("硬编码密码", r"(?i)password\s*=\s*[\"'][^\"']{3,}[\"']"),
("AWS Access Key", r"AKIA[0-9A-Z]{16}"),
("通用 API 密钥", r"(?i)api[_-]?key\s*=\s*[\"'][^\"']{8,}[\"']"),
("含凭证明文连接", r"(?i)(?:postgres|mysql|mongodb)://[^:]+:[^@]+@"),
("私钥块", r"-----BEGIN (?:RSA )?PRIVATE KEY-----"),
]
findings = []
for lineno, line in enumerate(code.splitlines(), 1):
for label, pat in patterns:
if re.search(pat, line):
findings.append(f" [REDACTED] 第 {lineno} 行 — {label}")
return "密钥扫描:\n" + "\n".join(findings) if findings else "未检测到硬编码密钥。"
@tool
def analyze_python_ast(code: str) -> str:
"""通过 Python AST 做静态分析:bare except、eval/exec、可变默认参数、长函数。"""
code = textwrap.dedent(code)
try:
tree = ast.parse(code)
except SyntaxError as exc:
return f"语法错误: {exc}"
issues = []
for node in ast.walk(tree):
if isinstance(node, ast.ExceptHandler) and node.type is None:
issues.append(f" 第 {node.lineno} 行: 裸 except: 请用 except Exception:")
if isinstance(node, ast.Call):
name = getattr(node.func, "id", getattr(node.func, "attr", ""))
if name in ("eval", "exec"):
issues.append(f" 第 {node.lineno} 行: {name}() 任意代码执行风险")
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
for d in node.args.defaults:
if isinstance(d, (ast.List, ast.Dict, ast.Set)):
issues.append(f" 第 {node.lineno} 行: {node.name} 使用了可变默认参数")
length = node.end_lineno - node.lineno + 1
if length > 50:
issues.append(f" 第 {node.lineno} 行: {node.name} 共 {length} 行,建议拆分")
if isinstance(node, ast.Global):
issues.append(f" 第 {node.lineno} 行: global 语句建议显式传参")
return "AST 分析:\n" + "\n".join(issues) if issues else "未发现结构性问题。"
@tool
def check_sql_injection(code: str) -> str:
"""检测 execute() 调用中不安全的字符串拼接导致的 SQL 注入。"""
patterns = [
("execute() 中使用 f-string", r"\.execute\s*\(\s*f[\"']"),
("execute() 中使用百分号格式化", r"\.execute\s*\(\s*[\"'][^\"']*%[^\"']*[\"'\s]*%"),
("execute() 中使用 .format()", r"\.execute\s*\(\s*[\"'][^\"']*\{.*?\}.*?\.format"),
("execute() 中使用字符串拼接", r"\.execute\s*\(\s*[\"'][^\"']*[\"'\s]*\+"),
]
findings = []
for lineno, line in enumerate(code.splitlines(), 1):
for label, pat in patterns:
if re.search(pat, line):
findings.append(f" 第 {lineno} 行 — {label} 修复:使用参数化查询")
return "SQL 注入扫描:\n" + "\n".join(findings) if findings else "未检测到 SQL 注入模式。"
@tool
def measure_complexity(code: str) -> str:
"""估算每个函数的圈复杂度(复杂度 1-5 低,6-10 中,11-20 高,20+ 很高)。"""
code = textwrap.dedent(code)
try:
tree = ast.parse(code)
except SyntaxError as exc:
return f"语法错误: {exc}"
_BRANCH = (ast.If, ast.For, ast.While, ast.ExceptHandler, ast.With, ast.Assert, ast.BoolOp)
results = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
score = 1 + sum(1 for n in ast.walk(node) if isinstance(n, _BRANCH))
level = "高" if score > 20 else ("中" if score > 10 else ("较低" if score > 5 else "低"))
results.append(f" {node.name} (第 {node.lineno} 行): 复杂度 {score} ({level})")
return "复杂度报告:\n" + "\n".join(sorted(results)) if results else "未找到函数。"
prompt.md:
你是一位资深商业合同审查专家。
审查规则:
- - 标记所有责任限制条款、赔偿条款和责任上限条款
- - 标红单方面不公平条款
- - 识别缺失的标准保护条款(如知识产权归属、终止权)
- - 注意管辖权和适用法律问题
- - 永远不要给出确定的法律建议——始终建议寻求律师审核
输出格式:
- - 合同摘要(类型、当事人、目的)
- - 高风险条款(必须交律师审查)
- - 中等风险条款(值得谈判)
- - 标准/可接受条款
- - 缺失条款(未出现的保护条款)
- - 整体风险评级:LOW / MEDIUM / HIGH
重点审查领域:
- - 知识产权归属与许可
- - 付款条款与逾期罚款
- - 自动续期与取消通知期
- - 数据隐私与保密义务
- - 不可抗力与争议解决
tools.py:
// review_legal_doc/tools.py
import re
from langchain_core.tools import tool
_CLAUSES = {
"责任上限": [r"(?i)total\s+liability\s+shall\s+not\s+exceed", r"(?i)aggregate\s+liability.{0,40}limited\s+to"],
"间接损失弃权": [r"(?i)not\s+be\s+liable\s+for\s+any\s+indirect", r"(?i)indirect.{0,20}incidental.{0,20}consequential"],
"自动续期": [r"(?i)auto.?renew", r"(?i)renews?\s+annually"],
"取消通知期": [r"(?i)\d+\s+days?\s+(?:written\s+)?notice"],
"知识产权归属": [r"(?i)intellectual\s+property.{0,40}assign", r"(?i)work\s+made\s+for\s+hire"],
"保密义务": [r"(?i)confidential\s+information", r"(?i)non.disclosure"],
"赔偿条款": [r"(?i)indemnif(?:y|ication)", r"(?i)hold\s+harmless"],
"适用法律": [r"(?i)governed\s+by\s+the\s+laws?\s+of"],
"不可抗力": [r"(?i)force\s+majeure", r"(?i)acts?\s+of\s+God"],
"因因由终止": [r"(?i)terminat.{0,40}material\s+breach"],
}
_HIGH_RISK = {"责任上限", "间接损失弃权", "自动续期", "知识产权归属", "赔偿条款"}
_RISK_WEIGHTS = {
"间接损失弃权": 25, "责任上限": 20, "知识产权归属": 20,
"赔偿条款": 15, "自动续期": 15, "取消通知期": 10,
"不可抗力": -5, "保密义务": -5, "因因由终止": -10
}
@tool
def extract_legal_clauses(text: str) -> str:
"""扫描合同文本,返回所有检测到的条款类型及上下文片段。"""
results = []
for clause_type, patterns in _CLAUSES.items():
for pattern in patterns:
m = re.search(pattern, text)
if m:
start, end = max(0, m.start()-30), min(len(text), m.end()+90)
snippet = text[start:end].replace("\n", " ").strip()
icon = "[高风险]" if clause_type in _HIGH_RISK else "[中等]"
results.append(f"{icon} {clause_type}: ...{snippet}...")
break
return "\n".join(results) if results else "未检测到已知条款类型。"
@tool
def score_legal_risk(text: str) -> str:
"""返回启发式风险评分(0-100)及 LOW / MEDIUM / HIGH 评级。"""
found = {ct for ct, patterns in _CLAUSES.items() if any(re.search(p, text) for p in patterns)}
score = max(0, min(100, sum(_RISK_WEIGHTS[c] for c in found)))
rating = "HIGH" if score >= 60 else ("MEDIUM" if score >= 30 else "LOW")
breakdown = "\n".join(
f" {c}: {_RISK_WEIGHTS[c]} 分" for c in found
)
return f"风险评分: {score}/100 -> {rating}\n\n详细:\n{breakdown}"
@tool
def extract_dates_and_deadlines(text: str) -> str:
"""从合同文本中提取所有日期引用和通知期限。"""
patterns = [r"\b\d+[\s-]?days?\b", r"\b\d+[\s-]?months?\b", r"\b\d+[\s-]?years?\b",
r"\bann(?:ual(?:ly)?|um)\b", r"\b\d{4}-\d{2}-\d{2}\b"]
seen, findings = set(), []
for pat in patterns:
for m in re.finditer(pat, text, re.IGNORECASE):
key = m.group(0).lower()
if key not in seen:
seen.add(key)
start, end = max(0, m.start()-40), min(len(text), m.end()+60)
findings.append(f" * {m.group(0)} — ...{text[start:end].strip()}...")
return "日期与截止期限:\n" + "\n".join(findings) if findings else "未找到日期引用。"
prompt.md:
你是一位资深 SQL 工程师。
规则:
- - 复杂查询一律使用 CTE(WITH 子句)
- - 为非显而易见的逻辑添加注释
- - 优先使用窗口函数而非子查询(性能更好)
- - 查询必须带 ORDER BY 保证结果确定性
- - 标记潜在的 N+1 问题或缺失索引风险
- - 默认方言:PostgreSQL(切换时请注明)
输出格式:
- - 简要方案说明
- - SQL 查询(sql 代码块)
- - 性能备注(如有)
- - 备选方案(如有更简单的方案)
示例模式注意事项:
- - 不确定 schema 时主动询问
- - 根据上下文推断列名
- - 对 NULL 和数据类型不匹配发出警告
tools.py:
// write_sql/tools.py
import re
from langchain_core.tools import tool
@tool
def validate_sql_syntax(sql: str, dialect: str = "postgres") -> str:
"""解析 SQL 语句,报告语法错误(不执行)。"""
try:
import sqlglot
sqlglot.parse(sql, dialect=dialect, error_level=sqlglot.ErrorLevel.RAISE)
return "有效 SQL,未检测到语法错误。"
except ImportError:
return "未安装 sqlglot,请运行 pip install sqlglot。"
except Exception as exc:
return f"语法错误: {exc}"
@tool
def format_sql(sql: str, dialect: str = "postgres") -> str:
"""使用规范化格式美化 SQL 查询。"""
try:
import sqlglot
return sqlglot.transpile(sql, read=dialect, write=dialect, pretty=True)[0]
except ImportError:
return "未安装 sqlglot,请运行 pip install sqlglot。"
except Exception as exc:
return f"无法格式化 SQL: {exc}"
_RISKS = [
(r"\bSELECT\s+\*\b", "[中] SELECT * 请显式枚举列名"),
(r"\bIN\s*\(\s*SELECT\b", "[中] IN (SELECT ...) 建议用 EXISTS 或 JOIN"),
(r"(?i)\bDELETE\s+FROM\b(?!.*\bWHERE\b)", "[高] DELETE 无 WHERE 将删除所有行"),
(r"(?i)\bUPDATE\b(?!.*\bWHERE\b)", "[高] UPDATE 无 WHERE 将更新所有行"),
(r"(?i)\bDROP\s+(TABLE|DATABASE)\b", "[高] DROP 语句是破坏性 DDL"),
(r"(?i)\bNOT\s+IN\s*\(\s*SELECT\b", "[中] NOT IN (子查询) 对 NULL 不安全,请用 NOT EXISTS"),
(r"(?i)ORDER\s+BY\s+RAND\(\)", "[中] ORDER BY RAND() 大表很慢"),
]
@tool
def detect_sql_risks(sql: str) -> str:
"""扫描 SQL 查询中的常见反模式和陷阱。"""
findings = [
f"{lvl}: {msg}" for pattern, lvl, msg in _RISKS
if re.search(pattern, sql, re.IGNORECASE)
]
return "\n".join(findings) if findings else "未发现明显风险。"
接下来实现最核心的部分——技能动态加载和 Agent 路由:
import importlib.util
import inspect
import sys
from pathlib import Path
from langchain_core.tools import BaseTool, tool
SKILLS_DIR = Path(__file__).parent / "skills"
def _import_tools(skill_dir: Path) -> list[BaseTool]:
"""从技能目录导入 tools.py,返回所有 tool 装饰的对象。"""
py_file = skill_dir / "tools.py"
if not py_file.exists():
return []
module_id = f"skills.{skill_dir.name}"
if module_id not in sys.modules:
spec = importlib.util.spec_from_file_location(module_id, py_file)
mod = importlib.util.module_from_spec(spec)
sys.modules[module_id] = mod
spec.loader.exec_module(mod)
return [obj for _, obj in inspect.getmembers(sys.modules[module_id]) if isinstance(obj, BaseTool)]
@tool
def list_skills() -> str:
"""列出所有可用技能。"""
lines = []
for d in sorted(SKILLS_DIR.iterdir()):
if d.is_dir():
tag = "有 tools.py" if (d / "tools.py").exists() else "仅 prompt"
lines.append(f" - {d.name} [{tag}]")
return "\n".join(lines) or "No skills found."
def _make_load_skill(session_tools: dict[str, BaseTool]):
"""返回一个 load_skill 工具,注册到给定的 session dict。"""
@tool
def load_skill(skill_name: str) -> str:
"""按目录名加载专业技能。"""
skill_dir = SKILLS_DIR / skill_name
if not skill_dir.is_dir():
available = [d.name for d in SKILLS_DIR.iterdir() if d.is_dir()]
return f"技能 '{skill_name}' 未找到。有效名称: {', '.join(available)}"
prompt = (skill_dir / "prompt.md").read_text(encoding="utf-8")
new_tools = _import_tools(skill_dir)
session_tools.update({t.name: t for t in new_tools})
tool_note = (
"\n\n工具已注册(直接调用,勿通过 load_skill):\n"
+ "\n".join(f" - {t.name}" for t in new_tools)
if new_tools else ""
)
return prompt + tool_note
return load_skill
from langchain_aws import ChatBedrockConverse
from langchain_core.messages import SystemMessage
from langchain.agents import create_agent
LLM = ChatBedrockConverse(model="us.amazon.nova-pro-v1:0", temperature=0.2)
SYSTEM_PROMPT = SystemMessage(content=(
"你是一位多技能专家助手。\n\n"
"技能路由——调用 load_skill() 时精确使用以下名称之一:\n"
" - write_sql 用户想编写或生成 SQL 查询\n"
" - review_legal_doc 用户想审查合同、条款或法律文本\n"
" - code_review 用户想审查源代码(任何语言)\n"
" - 如不确定,请调用 list_skills()\n\n"
"注意:'detect_sql_risks'、'score_legal_risk' 等是工具名称,不是技能名称。\n"
"load_skill() 返回后,使用任何已注册的工具来丰富你的回答。"
))
def ask(query: str) -> None:
print(f"\n{'='*60}\nUSER: {query}\n{'='*60}")
session_tools: dict[str, BaseTool] = {}
load_skill = _make_load_skill(session_tools)
def build_agent():
return create_agent(
model=LLM,
tools=[list_skills, load_skill, *session_tools.values()],
system_prompt=SYSTEM_PROMPT,
)
result = build_agent().invoke({"messages": [{"role": "user", "content": query}]})
for msg in result["messages"]:
for tc in getattr(msg, "tool_calls", []):
print(f" TOOL CALL: {tc['name']}")
final = result["messages"][-1]
print(f"\nRESPONSE:\n{final.content}")
// 示例调用
ask("帮我审查这段 Python 代码:\ndef foo(items=[]):\n for i in range(100):\n eval('pass')\n return items")
这套架构的核心思想是分层解耦:
有了这套体系,你可以像搭乐高一样,给同一个 Agent 配上代码审查、合同审查、SQL 生成等不同技能,而 Agent 会根据用户意图自动加载对应的技能包,真正做到"一专多能"。
如果你对 Agent 设计感兴趣,或正在构建自己的 AI 编程助手,希望这篇文章能给你一个清晰、实用的起点。
参考链接:原文链接