OpenAI最强开源Swarm AI智能体框架!Swarm框架实战教程,从函数调用到多表Text to SQL,再到自动化编程AI智能体,轻松打造专属AI Agent!#aigc #swarm #ai
OpenAI Swarm框架实战:3个案例带你掌握AI智能体开发,从天气查询到多表SQL,再到自动化编程工作流!
本篇笔记所对应的视频 https://youtu.be/ZzWt3wenMaE
充值openai api key可以使用wildcard虚拟卡充值。wildcard官方链接:
wildcard注册教程和充值API教程(国内打开速度快):
wildcard注册教程和充值API教程(海外打开速度快):
https://stoeng.medium.com/保姆级教程-通过wildcard虚拟信用卡订阅chatgpt-claude会员和api-百分百可用-a2865a18df01
👉👉👉如有问题或者想进入会员交流群请联系我的徽信 stoeng
🔥🔥🔥本项目代码由AI超元域频道制作,观看更多大模型微调视频请访问我的频道⬇
👉👉👉我的哔哩哔哩频道
👉👉👉我的YouTube频道
👉👉👉我的开源项目 https://github.com/win4r/AISuperDomain
视频中所用到的代码
!pip install git+https://github.com/openai/swarm.git
import os
from google.colab import userdata
os.environ["OPENAI_API_KEY"]=userdata.get('OPENAI_API_KEY')import requests
from swarm import Swarm, Agentdef get_weather(latitude: float, longitude: float) -> str:
"""
使用Open-Meteo API获取给定坐标的当前天气。 参数:
latitude (float): 纬度
longitude (float): 经度 返回:
str: 包含当前温度和风速的天气信息JSON字符串
"""
url = f"https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}¤t=temperature_2m,wind_speed_10m"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
current = data['current']
return str(current) # 返回JSON字符串
else:
return '{"error": "无法获取天气数据"}'def get_coordinates(location: str) -> str:
"""
获取给定位置名称的坐标。 参数:
location (str): 位置名称 返回:
str: 包含纬度和经度的JSON字符串
"""
coordinates = {
"纽约": (40.7128, -74.0060),
"伦敦": (51.5074, -0.1278),
"东京": (35.6762, 139.6503),
"巴黎": (48.8566, 2.3522),
"柏林": (52.5200, 13.4050)
}
lat, lon = coordinates.get(location.lower(), (0, 0))
return f'{{"latitude": {lat}, "longitude": {lon}}}'client = Swarm()weather_agent = Agent(
name="天气助手",
instructions="""
你是一个有帮助的天气助手。当被问到特定位置的天气时:
1. 使用get_coordinates函数获取该位置的坐标。
2. 使用get_weather函数获取天气数据。
3. 解析返回的JSON数据,提供一个友好的回复,包含天气信息。
如果无法识别该位置,礼貌地通知用户并建议他们尝试一个主要城市。
""",
functions=[get_coordinates, get_weather]
)def run_weather_query(query: str) -> str:
"""
运行天气查询并返回结果。 参数:
query (str): 用户的天气查询 返回:
str: Agent的响应
"""
messages = [{"role": "user", "content": query}]
response = client.run(agent=weather_agent, messages=messages)
return response.messages[-1]["content"]# 使用示例
print(run_weather_query("纽约的天气怎么样?"))
print(run_weather_query("柏林的天气如何?"))
print(run_weather_query("巴黎的天气如何?"))
print(run_weather_query("伦敦的天气如何?"))
print(run_weather_query("东京的天气怎么样?"))
print(run_weather_query("告诉我小村庄的天气"))from swarm import Swarm, Agentclient = Swarm()# 模拟数据库
accounts = {
123: {"name": "James", "balance": 1000},
456: {"name": "Emma", "balance": 1500},
}def instructions(context_variables):
name = context_variables.get("name", "User")
return f"You are a helpful banking assistant. Greet {name} and assist with their banking needs."def check_balance(context_variables: dict):
user_id = context_variables.get("user_id")
if user_id in accounts:
return f"Your current balance is ${accounts[user_id]['balance']:.2f}"
return "Account not found."def deposit(context_variables: dict):
user_id = context_variables.get("user_id")
amount = context_variables.get("amount", 0)
if user_id in accounts and amount > 0:
accounts[user_id]['balance'] += amount
return f"Deposit of ${amount:.2f} successful. New balance is ${accounts[user_id]['balance']:.2f}"
return "Deposit failed. Please check your account and amount."def withdraw(context_variables: dict):
user_id = context_variables.get("user_id")
amount = context_variables.get("amount", 0)
if user_id in accounts and 0 < amount <= accounts[user_id]['balance']:
accounts[user_id]['balance'] -= amount
return f"Withdrawal of ${amount:.2f} successful. New balance is ${accounts[user_id]['balance']:.2f}"
return "Withdrawal failed. Please check your account and amount."agent = Agent(
name="BankingAgent",
instructions=instructions,
functions=[check_balance, deposit, withdraw],
)def process_request(user_id, message):
context_variables = {"user_id": user_id, "name": accounts[user_id]["name"]}
response = client.run(
messages=[{"role": "user", "content": message}],
agent=agent,
context_variables=context_variables,
)
return response.messages[-1]["content"]# 示例使用
user_id = 123
print(process_request(user_id, "Hi! What's my current balance?"))
print(process_request(user_id, "I'd like to deposit $500"))
context_variables = {"user_id": user_id, "amount": 500}
print(deposit(context_variables))
print(process_request(user_id, "What's my new balance?"))
print(process_request(user_id, "I want to withdraw $200"))
context_variables = {"user_id": user_id, "amount": 200}
print(withdraw(context_variables))
print(process_request(user_id, "What's my final balance?"))
import sqlite3
import re
from swarm import Swarm, Agent
from tabulate import tabulate# 初始化 Swarm 客户端
client = Swarm()# 创建内存数据库连接
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()# 创建员工表
cursor.execute('''
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
name TEXT,
department TEXT,
salary REAL
)
''')# 插入20条示例数据
employees = [
(1, '张伟', 'IT', 75000),
(2, '王芳', 'HR', 65000),
(3, '李斯', '销售', 80000),
(4, '赵辉', '市场', 70000),
(5, '陈明', 'IT', 72000),
(6, '杨丽', 'HR', 68000),
(7, '周浩', '销售', 82000),
(8, '吴娜', '市场', 71000),
(9, '刘洋', 'IT', 76000),
(10, '孙琳', 'HR', 67000),
(11, '朱峰', '销售', 81000),
(12, '徐婷', '市场', 72000),
(13, '郭震', 'IT', 74000),
(14, '马梅', 'HR', 66000),
(15, '胡勇', '销售', 83000),
(16, '林梅', '市场', 73000),
(17, '韩磊', 'IT', 77000),
(18, '董芳', 'HR', 69000),
(19, '萧峰', '销售', 84000),
(20, '沈丹', '市场', 74000)
]
cursor.executemany('INSERT INTO employees VALUES (?,?,?,?)', employees)
conn.commit()def instructions(context_variables):
return """你是一个能够将中文自然语言查询转换为SQL查询的AI助手。
数据库有一个名为'employees'的表,包含以下列:id, name, department, salary。
只返回SQL查询,不要包含任何其他文本或解释。支持复杂查询,如比较、排序和聚合函数。"""def clean_sql_query(sql_query):
"""清理SQL查询,移除可能的Markdown格式和多余空白"""
cleaned = re.sub(r'```sql\s*|\s*```', '', sql_query).strip()
return cleaneddef execute_sql(sql_query):
"""执行SQL查询并返回结果"""
try:
cursor.execute(sql_query)
results = cursor.fetchall()
return results
except sqlite3.Error as e:
return f"SQL错误: {e}"def explain_query(sql_query):
"""提供更具体的SQL查询解释"""
parts = sql_query.upper().split()
explanation = "这个查询"
if 'SELECT' in parts:
select_index = parts.index('SELECT')
from_index = parts.index('FROM')
fields = ', '.join(parts[select_index+1:from_index]).lower()
table = parts[from_index+1].lower()
explanation += f"从{table}表中获取{fields}"
if 'WHERE' in parts:
where_index = parts.index('WHERE')
condition = ' '.join(parts[where_index+1:]).lower()
explanation += f",条件是{condition}"
if 'ORDER BY' in parts:
order_index = parts.index('ORDER')
order = ' '.join(parts[order_index+2:]).lower()
explanation += f",结果按{order}排序"
if 'LIMIT' in parts:
limit_index = parts.index('LIMIT')
limit = parts[limit_index+1]
explanation += f",只显示前{limit}条结果"
return explanation + "。"def format_results(results, sql_query):
"""格式化查询结果,添加上下文和单位"""
if not results or len(results) == 0:
return "没有找到匹配的结果。"
if isinstance(results, str): # 错误消息
return results
headers = [description[0] for description in cursor.description]
# 为薪水添加单位
if 'salary' in headers:
salary_index = headers.index('salary')
results = [list(row) for row in results]
for row in results:
row[salary_index] = f"{row[salary_index]}元"
formatted_results = tabulate(results, headers=headers, tablefmt="grid")
# 添加结果上下文
if len(results) == 1 and 'name' in headers and 'salary' in headers:
name_index = headers.index('name')
salary_index = headers.index('salary')
context = f"{results[0][name_index]}的薪水是{results[0][salary_index]}"
return f"{context}\n\n{formatted_results}"
return formatted_resultsagent = Agent(
name="SQLAgent",
instructions=instructions,
)def process_query(natural_language_query):
"""处理自然语言查询,转换为SQL,执行并返回结果"""
# 使用 Swarm 将自然语言转换为 SQL
response = client.run(
messages=[{"role": "user", "content": natural_language_query}],
agent=agent,
)
sql_query = clean_sql_query(response.messages[-1]["content"])
# 执行 SQL 查询
results = execute_sql(sql_query)
# 获取查询解释
explanation = explain_query(sql_query)
# 格式化结果
formatted_results = format_results(results, sql_query)
return f"SQL查询: {sql_query}\n解释: {explanation}\n结果:\n{formatted_results}"# 主程序循环
if __name__ == "__main__":
print("欢迎使用text to SQL系统!代码由AI超元域原创")
print("输入 'exit' 或 'quit' 退出程序。")
print("本系统支持复杂查询,如比较、排序和聚合函数。")
while True:
user_input = input("\n请输入您的查询 (或 'exit' 退出): ")
if user_input.lower() in ['exit', 'quit']:
print("谢谢使用,再见!")
break
print(process_query(user_input))# 关闭数据库连接
conn.close()import sqlite3
import re
from swarm import Swarm, Agent
from tabulate import tabulate# 初始化 Swarm 客户端
client = Swarm()# 创建内存数据库连接
conn = sqlite3.connect(':memory:')
cursor = conn.cursor()# 创建员工表
cursor.execute('''
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
name TEXT,
department_id INTEGER,
salary REAL
)
''')# 创建部门表
cursor.execute('''
CREATE TABLE departments (
id INTEGER PRIMARY KEY,
name TEXT,
location TEXT
)
''')# 插入员工数据
employees = [
(1, '张伟', 1, 75000),
(2, '王芳', 2, 65000),
(3, '李斯', 3, 80000),
(4, '赵静', 4, 70000),
(5, '陈明', 1, 72000),
(6, '杨丽', 2, 68000),
(7, '周浩', 3, 82000),
(8, '吴娜', 4, 71000),
(9, '刘洋', 1, 76000),
(10, '孙琳', 2, 67000)
]
cursor.executemany('INSERT INTO employees VALUES (?,?,?,?)', employees)# 插入部门数据
departments = [
(1, 'IT', '北京'),
(2, 'HR', '上海'),
(3, '销售', '广州'),
(4, '市场', '深圳')
]
cursor.executemany('INSERT INTO departments VALUES (?,?,?)', departments)conn.commit()def instructions(context_variables):
return """你是一个能够将中文自然语言查询转换为SQL查询的AI助手。
数据库有两个表:
1. 'employees'表,包含以下列:id, name, department_id, salary
2. 'departments'表,包含以下列:id, name, location
只返回SQL查询,不要包含任何其他文本或解释。支持复杂查询,包括多表连接、比较、排序和聚合函数。"""def clean_sql_query(sql_query):
"""清理SQL查询,移除可能的Markdown格式和多余空白"""
cleaned = re.sub(r'```sql\s*|\s*```', '', sql_query).strip()
return cleaneddef execute_sql(sql_query):
"""执行SQL查询并返回结果"""
try:
cursor.execute(sql_query)
results = cursor.fetchall()
return results
except sqlite3.Error as e:
return f"SQL错误: {e}"def explain_query(sql_query):
"""提供更具体的SQL查询解释,包括多表查询"""
parts = sql_query.upper().split()
explanation = "这个查询"
if 'SELECT' in parts:
select_index = parts.index('SELECT')
from_index = parts.index('FROM')
fields = ', '.join(parts[select_index+1:from_index]).lower()
tables = []
for i in range(from_index+1, len(parts)):
if parts[i] in ['WHERE', 'GROUP', 'ORDER', 'LIMIT']:
break
if parts[i] not in ['JOIN', 'INNER', 'LEFT', 'RIGHT', 'FULL', 'ON', 'AND']:
tables.append(parts[i].lower())
tables = ', '.join(tables)
explanation += f"从{tables}表中获取{fields}"
if 'JOIN' in parts:
explanation += ",进行了表连接"
if 'WHERE' in parts:
where_index = parts.index('WHERE')
condition = ' '.join(parts[where_index+1:]).lower()
explanation += f",条件是{condition}"
if 'GROUP BY' in parts:
group_index = parts.index('GROUP')
group = ' '.join(parts[group_index+2:]).lower()
explanation += f",按{group}进行分组"
if 'ORDER BY' in parts:
order_index = parts.index('ORDER')
order = ' '.join(parts[order_index+2:]).lower()
explanation += f",结果按{order}排序"
if 'LIMIT' in parts:
limit_index = parts.index('LIMIT')
limit = parts[limit_index+1]
explanation += f",只显示前{limit}条结果"
return explanation + "。"def format_results(results, sql_query):
"""格式化查询结果,添加上下文和单位"""
if not results or len(results) == 0:
return "没有找到匹配的结果。"
if isinstance(results, str): # 错误消息
return results
headers = [description[0] for description in cursor.description]
# 为薪水添加单位
if 'salary' in headers:
salary_index = headers.index('salary')
results = [list(row) for row in results]
for row in results:
row[salary_index] = f"{row[salary_index]}元"
formatted_results = tabulate(results, headers=headers, tablefmt="grid")
return formatted_resultsagent = Agent(
name="SQLAgent",
instructions=instructions,
)def process_query(natural_language_query):
"""处理自然语言查询,转换为SQL,执行并返回结果"""
# 使用 Swarm 将自然语言转换为 SQL
response = client.run(
messages=[{"role": "user", "content": natural_language_query}],
agent=agent,
)
sql_query = clean_sql_query(response.messages[-1]["content"])
# 执行 SQL 查询
results = execute_sql(sql_query)
# 获取查询解释
explanation = explain_query(sql_query)
# 格式化结果
formatted_results = format_results(results, sql_query)
return f"SQL查询: {sql_query}\n解释: {explanation}\n结果:\n{formatted_results}"# 主程序循环
if __name__ == "__main__":
print("欢迎使用支持多表查询的中文自然语言到SQL转换系统!")
print("输入 'exit' 或 'quit' 退出程序。")
print("本系统支持复杂查询,包括多表连接、比较、排序和聚合函数。")
while True:
user_input = input("\n请输入您的查询 (或 'exit' 退出): ")
if user_input.lower() in ['exit', 'quit']:
print("谢谢使用,再见!")
break
print(process_query(user_input))# 关闭数据库连接
conn.close()from swarm import Swarm, Agent# 创建Swarm客户端实例
client = Swarm()# 定义专门的agents# 需求分析agent
requirements_agent = Agent(
name="Requirements Analysis Agent",
instructions="You are a requirements analysis specialist. Analyze user requirements and create a detailed specification."
)# 设计agent
design_agent = Agent(
name="Design Agent",
instructions="You are a software design specialist. Create a high-level design based on the requirements specification."
)# 编码agent
coding_agent = Agent(
name="Coding Agent",
instructions="You are a coding specialist. Implement the design in clean, efficient code."
)# 测试agent
testing_agent = Agent(
name="Testing Agent",
instructions="You are a software testing specialist. Review the code, identify potential bugs, and suggest improvements."
)# 文档agent
documentation_agent = Agent(
name="Documentation Agent",
instructions="You are a technical documentation specialist. Create clear and comprehensive documentation for the code."
)def generate_code(user_requirement):
"""
多阶段代码生成函数
参数:
user_requirement (str): 用户提供的功能需求
返回:
dict: 包含需求规格、设计文档、代码和文档的字典
""" # 阶段1:需求分析
req_response = client.run(
agent=requirements_agent,
messages=[{"role": "user", "content": f"Analyze these requirements: {user_requirement}"}]
)
requirements_spec = req_response.messages[-1]["content"] # 阶段2:设计
design_response = client.run(
agent=design_agent,
messages=[{"role": "user", "content": f"Create a design based on this specification: {requirements_spec}"}]
)
design_doc = design_response.messages[-1]["content"] # 阶段3:编码
coding_response = client.run(
agent=coding_agent,
messages=[{"role": "user", "content": f"Implement this design in code: {design_doc}"}]
)
initial_code = coding_response.messages[-1]["content"] # 阶段4:测试和优化
testing_response = client.run(
agent=testing_agent,
messages=[{"role": "user", "content": f"Review and suggest improvements for this code: {initial_code}"}]
)
test_feedback = testing_response.messages[-1]["content"] # 根据测试反馈进行代码优化
refinement_response = client.run(
agent=coding_agent,
messages=[
{"role": "user", "content": f"Refine the code based on this feedback: {test_feedback}"},
{"role": "user", "content": f"Original code: {initial_code}"}
]
)
refined_code = refinement_response.messages[-1]["content"] # 阶段5:文档编写
doc_response = client.run(
agent=documentation_agent,
messages=[{"role": "user", "content": f"Create documentation for this code: {refined_code}"}]
)
documentation = doc_response.messages[-1]["content"]
# 返回包含所有生成内容的字典
return {
"requirements": requirements_spec,
"design": design_doc,
"code": refined_code,
"documentation": documentation
}# Example usage
user_requirement = "Create a Python function that calculates the Fibonacci sequence up to a given number, optimized for performance."
result = generate_code(user_requirement)print("Generated Code:")
print(result["code"])
print("\nDocumentation:")
print(result["documentation"])
# 更新示例使用 - 贪吃蛇游戏
user_requirement = """
创建一个简单的贪吃蛇游戏,具有以下功能:
1. 使用Python的pygame库实现
2. 游戏在一个固定大小的窗口中运行
3. 蛇可以使用方向键控制移动
4. 随机生成食物,蛇吃到食物后长度增加
5. 当蛇撞到自己或边界时游戏结束
6. 显示当前得分(蛇的长度)
7. 实现基本的开始和结束界面
请提供游戏的核心逻辑代码和必要的注释。
"""result = generate_code(user_requirement)print("生成的贪吃蛇游戏代码:")
print(result["code"])
print("\n游戏文档:")
print(result["documentation"])