AI Agent数据分析:自动生成报表和洞察
一、开场:数据分析师的日常
大家好,我是老金。
数据分析师的日常是什么?
- 早上:老板问”上周销售数据怎么样?”
- 中午:产品问”用户留存率趋势如何?”
- 下午:运营问”哪个渠道效果最好?”
- 晚上:写SQL、做图表、写报告…
一天下来,80%时间在跑数、做表,只有20%在思考。
如果AI能帮忙跑数、做表、写报告呢?
今天分享AI Agent在数据分析中的应用。
二、AI数据分析Agent架构
系统架构
┌─────────────────────────────────────────────────────────┐
│ AI数据分析Agent架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 自然语言查询层 │ │
│ │ "上周销售趋势如何?" │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 意图理解层 │ │
│ │ • 问题类型识别(查询/对比/预测) │ │
│ │ • 时间范围解析 │ │
│ │ • 指标和维度提取 │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ SQL生成层 │ │
│ │ • 自然语言转SQL │ │
│ │ • SQL优化 │ │
│ │ • 安全检查 │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 数据执行层 │ │
│ │ • 执行查询 │ │
│ │ • 数据处理 │ │
│ │ • 缓存管理 │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 分析洞察层 │ │
│ │ • 趋势分析 │ │
│ │ • 异常检测 │ │
│ │ • 归因分析 │ │
│ └────────────────────┬────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 可视化呈现层 │ │
│ │ • 图表推荐 │ │
│ │ • 自动生成 │ │
│ │ • 报告输出 │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────┘
三、自然语言转SQL
SQL生成Agent
class TextToSQLAgent:
"""自然语言转SQL Agent"""
def __init__(self, llm, schema_info: dict):
self.llm = llm
self.schema_info = schema_info
async def text_to_sql(self, question: str) -> dict:
"""自然语言转SQL"""
# 构建Prompt
prompt = self._build_prompt(question)
# 生成SQL
response = await self.llm.generate(prompt)
# 解析SQL
sql = self._extract_sql(response)
# 安全检查
if not self._is_safe(sql):
raise SecurityError("SQL不安全,可能包含危险操作")
# 优化SQL
optimized_sql = self._optimize_sql(sql)
return {
"original_question": question,
"sql": optimized_sql,
"explanation": self._explain_sql(optimized_sql)
}
def _build_prompt(self, question: str) -> str:
"""构建SQL生成Prompt"""
schema_description = self._format_schema()
return f"""
你是一个SQL专家。根据用户问题生成SQL查询。
## 数据库Schema
{schema_description}
## 用户问题
{question}
## 要求
1. 只返回SELECT语句
2. 使用正确的表名和字段名
3. 添加适当的WHERE条件
4. 添加必要的JOIN
5. 结果按时间或数量排序
## 输出格式
SQL:
```sql
你的SQL语句
解释:简要说明SQL逻辑
“””
def _format_schema(self) -> str:
"""格式化Schema信息"""
lines = []
for table, info in self.schema_info.items():
columns = ", ".join(
f"{col['name']}({col['type']})"
for col in info['columns']
)
lines.append(f"表 {table}: {columns}")
return "n".join(lines)
def _is_safe(self, sql: str) -> bool:
"""检查SQL安全性"""
dangerous_keywords = [
"DROP", "DELETE", "TRUNCATE", "ALTER",
"CREATE", "INSERT", "UPDATE", "GRANT"
]
sql_upper = sql.upper()
return not any(kw in sql_upper for kw in dangerous_keywords)
### 示例:查询转换
```python
# 用户问题
question = "上周每个渠道的销售额是多少?"
# 生成的SQL
sql = """
SELECT
channel,
SUM(amount) as total_sales,
COUNT(DISTINCT order_id) as order_count
FROM orders
WHERE order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
AND order_date dict:
"""分析数据并生成洞察"""
# 计算基础统计
stats = self._calculate_statistics(data)
# 趋势分析
trends = self._analyze_trends(data)
# 异常检测
anomalies = self._detect_anomalies(data)
# 生成洞察
insights = await self._generate_insights(
data, stats, trends, anomalies, context
)
return {
"statistics": stats,
"trends": trends,
"anomalies": anomalies,
"insights": insights
}
def _calculate_statistics(self, data: list) -> dict:
"""计算统计指标"""
if not data:
return {}
values = [d.get('value', 0) for d in data]
return {
"count": len(values),
"sum": sum(values),
"mean": statistics.mean(values),
"median": statistics.median(values),
"std": statistics.stdev(values) if len(values) > 1 else 0,
"min": min(values),
"max": max(values)
}
def _analyze_trends(self, data: list) -> dict:
"""分析趋势"""
if len(data) 0:
trend = "上升"
change_rate = (y[-1] - y[0]) / y[0] * 100 if y[0] != 0 else 0
elif slope list:
"""检测异常值"""
values = [d.get('value', 0) for d in data]
if len(values) 0 else 0
if z_score > 2: # 超过2个标准差
anomalies.append({
"index": i,
"data": d,
"z_score": round(z_score, 2),
"type": "高于均值" if d.get('value', 0) > mean else "低于均值"
})
return anomalies
async def _generate_insights(self, data, stats, trends,
anomalies, context) -> list:
"""生成数据洞察"""
prompt = f"""
分析以下数据并生成洞察:
## 数据概况
- 数据量:{stats['count']}条
- 总和:{stats['sum']}
- 平均值:{stats['mean']}
- 最大值:{stats['max']}
- 最小值:{stats['min']}
## 趋势分析
- 趋势方向:{trends['trend']}
- 变化率:{trends['change_rate']}%
## 异常检测
发现{len(anomalies)}个异常值
## 上下文
{context.get('question', '无具体问题')}
请生成3-5条关键洞察:
1. 主要发现
2. 异常原因推测
3. 行动建议
"""
response = await self.llm.generate(prompt)
return self._parse_insights(response)
五、图表推荐与生成
图表类型推荐
class ChartRecommender:
"""图表推荐器"""
CHART_RULES = [
{
"condition": lambda data: len(data) 5 and data[0].get('date'),
"chart_type": "line",
"reason": "时序数据适合折线图"
},
{
"condition": lambda data: len(data) > 5 and data[0].get('category'),
"chart_type": "bar",
"reason": "分类比较适合柱状图"
},
{
"condition": lambda data: all(k in data[0] for k in ['x', 'y', 'size']),
"chart_type": "bubble",
"reason": "三维数据适合气泡图"
}
]
def recommend(self, data: list, question: str = "") -> dict:
"""推荐最佳图表类型"""
for rule in self.CHART_RULES:
if rule["condition"](data):
return {
"chart_type": rule["chart_type"],
"reason": rule["reason"]
}
return {
"chart_type": "table",
"reason": "数据特征不明确,建议用表格"
}
自动生成图表配置
class ChartGenerator:
"""图表生成器"""
def generate_chart_config(self, data: list,
chart_type: str,
title: str = "") -> dict:
"""生成图表配置"""
if chart_type == "line":
return self._generate_line_chart(data, title)
elif chart_type == "bar":
return self._generate_bar_chart(data, title)
elif chart_type == "pie":
return self._generate_pie_chart(data, title)
else:
return self._generate_table(data, title)
def _generate_line_chart(self, data: list, title: str) -> dict:
"""生成折线图配置"""
return {
"title": {"text": title},
"tooltip": {"trigger": "axis"},
"xAxis": {
"type": "category",
"data": [d.get('date', d.get('category', '')) for d in data]
},
"yAxis": {"type": "value"},
"series": [{
"name": "数值",
"type": "line",
"data": [d.get('value', 0) for d in data],
"smooth": True,
"itemStyle": {"color": "#5470c6"}
}]
}
def _generate_bar_chart(self, data: list, title: str) -> dict:
"""生成柱状图配置"""
return {
"title": {"text": title},
"tooltip": {"trigger": "axis"},
"xAxis": {
"type": "category",
"data": [d.get('category', d.get('channel', '')) for d in data]
},
"yAxis": {"type": "value"},
"series": [{
"name": "数值",
"type": "bar",
"data": [d.get('value', d.get('total_sales', 0)) for d in data],
"itemStyle": {"color": "#5470c6"}
}]
}
六、报告自动生成
报告生成Agent
class ReportGeneratorAgent:
"""报告生成Agent"""
async def generate_report(self, analysis_result: dict,
template: str = "daily") -> str:
"""生成分析报告"""
if template == "daily":
return await self._generate_daily_report(analysis_result)
elif template == "weekly":
return await self._generate_weekly_report(analysis_result)
elif template == "adhoc":
return await self._generate_adhoc_report(analysis_result)
async def _generate_daily_report(self, result: dict) -> str:
"""生成日报"""
stats = result['statistics']
trends = result['trends']
insights = result['insights']
report = f"""
# 数据日报 - {datetime.now().strftime('%Y-%m-%d')}
## 一、数据概览
| 指标 | 数值 |
|------|------|
| 数据量 | {stats['count']} |
| 总计 | {stats['sum']:,.0f} |
| 平均值 | {stats['mean']:,.2f} |
| 最大值 | {stats['max']:,.0f} |
| 最小值 | {stats['min']:,.0f} |
## 二、趋势分析
**趋势方向**:{trends['trend']}
**变化率**:{trends['change_rate']}%
## 三、关键洞察
{self._format_insights(insights)}
## 四、建议行动
{await self._generate_actions(insights)}
---
报告生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return report
def _format_insights(self, insights: list) -> str:
"""格式化洞察"""
return "n".join([
f"{i+1}. {insight}"
for i, insight in enumerate(insights)
])
async def _generate_actions(self, insights: list) -> str:
"""生成行动建议"""
prompt = f"""
基于以下洞察,生成3-5条具体的行动建议:
洞察:
{chr(10).join(insights)}
要求:
1. 建议具体可执行
2. 优先级明确
3. 预期效果说明
"""
return await self.llm.generate(prompt)
七、完整示例
使用示例
# 初始化Agent
data_agent = DataAnalysisAgent(
llm=GPT4Client(),
db_connector=MySQLConnector(config),
schema_info=load_schema()
)
# 用户提问
question = "上周各渠道销售情况如何?哪个渠道增长最快?"
# 执行分析
result = await data_agent.query(question)
# 输出结果
print(result['report'])
生成报告示例
# 销售数据分析报告
## 数据概览
| 渠道 | 销售额 | 订单数 | 占比 |
|------|--------|--------|------|
| 微信小程序 | ¥150,000 | 1,200 | 37.5% |
| APP | ¥120,000 | 800 | 30.0% |
| 官网 | ¥80,000 | 500 | 20.0% |
| 线下门店 | ¥50,000 | 300 | 12.5% |
## 趋势分析
- 微信小程序:增长15.2%,增长最快
- APP:增长8.3%
- 官网:下降2.1%
- 线下门店:下降5.5%
## 关键洞察
1. **微信小程序成为主力渠道**:占总销售额37.5%,且保持高速增长
2. **线下渠道持续萎缩**:需要关注线下门店运营问题
3. **官网流量下降**:可能与SEO或用户体验有关
## 建议行动
1. **加大微信小程序投入**:增加营销预算,优化用户体验
2. **调查线下门店问题**:分析客流、转化率等指标
3. **官网SEO优化**:提升自然流量获取能力
八、性能优化
查询缓存
class QueryCache:
"""查询缓存"""
def __init__(self, ttl: int = 3600):
self.cache = {}
self.ttl = ttl
def get(self, question: str) -> Optional[dict]:
"""获取缓存"""
key = self._hash_question(question)
if key in self.cache:
entry = self.cache[key]
if time.time() - entry['timestamp'] str:
"""问题哈希"""
# 标准化问题
normalized = question.lower().strip()
return hashlib.md5(normalized.encode()).hexdigest()
异步执行
class AsyncDataAgent:
"""异步数据Agent"""
async def batch_analyze(self, questions: list) -> list:
"""批量分析"""
tasks = [self.analyze(q) for q in questions]
return await asyncio.gather(*tasks)
async def analyze_stream(self, question: str):
"""流式分析"""
# 生成SQL
sql_result = await self.text_to_sql(question)
yield {"stage": "sql", "result": sql_result}
# 执行查询
data = await self.execute_query(sql_result['sql'])
yield {"stage": "data", "result": data}
# 分析数据
analysis = await self.analyze_data(data)
yield {"stage": "analysis", "result": analysis}
# 生成报告
report = await self.generate_report(analysis)
yield {"stage": "report", "result": report}
九、最佳实践
问题设计
# ✅ 好的问题
"上周各渠道销售额对比"
"最近30天用户留存率趋势"
"哪个产品类目增长最快"
# ❌ 模糊的问题
"数据怎么样"
"给我看看报表"
"分析一下"
数据安全
class DataSecurityManager:
"""数据安全管理"""
def __init__(self):
self.sensitive_tables = ['users', 'payments', 'personal_info']
self.access_control = AccessControl()
async def check_access(self, user: str, table: str) -> bool:
"""检查访问权限"""
if table in self.sensitive_tables:
return await self.access_control.has_permission(
user, 'read_sensitive'
)
return True
def mask_sensitive_data(self, data: list, columns: list) -> list:
"""脱敏处理"""
for row in data:
for col in columns:
if col in row:
row[col] = self._mask(row[col])
return data
十、总结
效率提升
| 任务 | 传统方式 | AI辅助 | 提升 |
|---|---|---|---|
| SQL编写 | 30分钟 | 2分钟 | 93% |
| 数据分析 | 2小时 | 15分钟 | 87% |
| 报告撰写 | 1小时 | 10分钟 | 83% |
| 图表制作 | 30分钟 | 3分钟 | 90% |
下期预告
明天聊聊AI Agent内容创作——自动化写作与营销文案!
往期回顾
正文完