AI Agent数据分析:自动生成报表和洞察

7次阅读
没有评论

AI Agent数据分析:自动生成报表和洞察

一、开场:数据分析师的日常

大家好,我是老金。

数据分析师的日常是什么?

  • 早上:老板问”上周销售数据怎么样?”
  • 中午:产品问”用户留存率趋势如何?”
  • 下午:运营问”哪个渠道效果最好?”
  • 晚上:写SQL、做图表、写报告…

一天下来,80%时间在跑数、做表,只有20%在思考。

如果AI能帮忙跑数、做表、写报告呢?

今天分享AI Agent在数据分析中的应用。

二、AI数据分析Agent架构

系统架构

┌─────────────────────────────────────────────────────────┐
│                  AI数据分析Agent架构                     │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │              自然语言查询层                      │   │
│  │  "上周销售趋势如何?"                            │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              意图理解层                          │   │
│  │  • 问题类型识别(查询/对比/预测)                │   │
│  │  • 时间范围解析                                  │   │
│  │  • 指标和维度提取                                │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              SQL生成层                           │   │
│  │  • 自然语言转SQL                                │   │
│  │  • SQL优化                                      │   │
│  │  • 安全检查                                     │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              数据执行层                          │   │
│  │  • 执行查询                                      │   │
│  │  • 数据处理                                      │   │
│  │  • 缓存管理                                      │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              分析洞察层                          │   │
│  │  • 趋势分析                                      │   │
│  │  • 异常检测                                      │   │
│  │  • 归因分析                                      │   │
│  └────────────────────┬────────────────────────────┘   │
│                       ↓                                 │
│  ┌─────────────────────────────────────────────────┐   │
│  │              可视化呈现层                        │   │
│  │  • 图表推荐                                      │   │
│  │  • 自动生成                                      │   │
│  │  • 报告输出                                      │   │
│  └─────────────────────────────────────────────────┘   │
│                                                         │
└─────────────────────────────────────────────────────────┘

三、自然语言转SQL

SQL生成Agent

class TextToSQLAgent:
    """自然语言转SQL Agent"""

    def __init__(self, llm, schema_info: dict):
        self.llm = llm
        self.schema_info = schema_info

    async def text_to_sql(self, question: str) -> dict:
        """自然语言转SQL"""
        # 构建Prompt
        prompt = self._build_prompt(question)

        # 生成SQL
        response = await self.llm.generate(prompt)

        # 解析SQL
        sql = self._extract_sql(response)

        # 安全检查
        if not self._is_safe(sql):
            raise SecurityError("SQL不安全,可能包含危险操作")

        # 优化SQL
        optimized_sql = self._optimize_sql(sql)

        return {
            "original_question": question,
            "sql": optimized_sql,
            "explanation": self._explain_sql(optimized_sql)
        }

    def _build_prompt(self, question: str) -> str:
        """构建SQL生成Prompt"""
        schema_description = self._format_schema()

        return f"""
你是一个SQL专家。根据用户问题生成SQL查询。

## 数据库Schema
{schema_description}

## 用户问题
{question}

## 要求
1. 只返回SELECT语句
2. 使用正确的表名和字段名
3. 添加适当的WHERE条件
4. 添加必要的JOIN
5. 结果按时间或数量排序

## 输出格式
SQL:
```sql
你的SQL语句

解释:简要说明SQL逻辑
“””

def _format_schema(self) -> str:
    """格式化Schema信息"""
    lines = []
    for table, info in self.schema_info.items():
        columns = ", ".join(
            f"{col['name']}({col['type']})"
            for col in info['columns']
        )
        lines.append(f"表 {table}: {columns}")
    return "n".join(lines)

def _is_safe(self, sql: str) -> bool:
    """检查SQL安全性"""
    dangerous_keywords = [
        "DROP", "DELETE", "TRUNCATE", "ALTER", 
        "CREATE", "INSERT", "UPDATE", "GRANT"
    ]
    sql_upper = sql.upper()
    return not any(kw in sql_upper for kw in dangerous_keywords)

### 示例:查询转换

```python
# 用户问题
question = "上周每个渠道的销售额是多少?"

# 生成的SQL
sql = """
SELECT 
    channel,
    SUM(amount) as total_sales,
    COUNT(DISTINCT order_id) as order_count
FROM orders
WHERE order_date >= DATE_SUB(CURRENT_DATE, INTERVAL 7 DAY)
  AND order_date  dict:
        """分析数据并生成洞察"""
        # 计算基础统计
        stats = self._calculate_statistics(data)

        # 趋势分析
        trends = self._analyze_trends(data)

        # 异常检测
        anomalies = self._detect_anomalies(data)

        # 生成洞察
        insights = await self._generate_insights(
            data, stats, trends, anomalies, context
        )

        return {
            "statistics": stats,
            "trends": trends,
            "anomalies": anomalies,
            "insights": insights
        }

    def _calculate_statistics(self, data: list) -> dict:
        """计算统计指标"""
        if not data:
            return {}

        values = [d.get('value', 0) for d in data]

        return {
            "count": len(values),
            "sum": sum(values),
            "mean": statistics.mean(values),
            "median": statistics.median(values),
            "std": statistics.stdev(values) if len(values) > 1 else 0,
            "min": min(values),
            "max": max(values)
        }

    def _analyze_trends(self, data: list) -> dict:
        """分析趋势"""
        if len(data)  0:
            trend = "上升"
            change_rate = (y[-1] - y[0]) / y[0] * 100 if y[0] != 0 else 0
        elif slope  list:
        """检测异常值"""
        values = [d.get('value', 0) for d in data]

        if len(values)  0 else 0
            if z_score > 2:  # 超过2个标准差
                anomalies.append({
                    "index": i,
                    "data": d,
                    "z_score": round(z_score, 2),
                    "type": "高于均值" if d.get('value', 0) > mean else "低于均值"
                })

        return anomalies

    async def _generate_insights(self, data, stats, trends, 
                                 anomalies, context) -> list:
        """生成数据洞察"""
        prompt = f"""
分析以下数据并生成洞察:

## 数据概况
- 数据量:{stats['count']}条
- 总和:{stats['sum']}
- 平均值:{stats['mean']}
- 最大值:{stats['max']}
- 最小值:{stats['min']}

## 趋势分析
- 趋势方向:{trends['trend']}
- 变化率:{trends['change_rate']}%

## 异常检测
发现{len(anomalies)}个异常值

## 上下文
{context.get('question', '无具体问题')}

请生成3-5条关键洞察:
1. 主要发现
2. 异常原因推测
3. 行动建议
        """

        response = await self.llm.generate(prompt)
        return self._parse_insights(response)

五、图表推荐与生成

图表类型推荐

class ChartRecommender:
    """图表推荐器"""

    CHART_RULES = [
        {
            "condition": lambda data: len(data)  5 and data[0].get('date'),
            "chart_type": "line",
            "reason": "时序数据适合折线图"
        },
        {
            "condition": lambda data: len(data) > 5 and data[0].get('category'),
            "chart_type": "bar",
            "reason": "分类比较适合柱状图"
        },
        {
            "condition": lambda data: all(k in data[0] for k in ['x', 'y', 'size']),
            "chart_type": "bubble",
            "reason": "三维数据适合气泡图"
        }
    ]

    def recommend(self, data: list, question: str = "") -> dict:
        """推荐最佳图表类型"""
        for rule in self.CHART_RULES:
            if rule["condition"](data):
                return {
                    "chart_type": rule["chart_type"],
                    "reason": rule["reason"]
                }

        return {
            "chart_type": "table",
            "reason": "数据特征不明确,建议用表格"
        }

自动生成图表配置

class ChartGenerator:
    """图表生成器"""

    def generate_chart_config(self, data: list, 
                             chart_type: str,
                             title: str = "") -> dict:
        """生成图表配置"""
        if chart_type == "line":
            return self._generate_line_chart(data, title)
        elif chart_type == "bar":
            return self._generate_bar_chart(data, title)
        elif chart_type == "pie":
            return self._generate_pie_chart(data, title)
        else:
            return self._generate_table(data, title)

    def _generate_line_chart(self, data: list, title: str) -> dict:
        """生成折线图配置"""
        return {
            "title": {"text": title},
            "tooltip": {"trigger": "axis"},
            "xAxis": {
                "type": "category",
                "data": [d.get('date', d.get('category', '')) for d in data]
            },
            "yAxis": {"type": "value"},
            "series": [{
                "name": "数值",
                "type": "line",
                "data": [d.get('value', 0) for d in data],
                "smooth": True,
                "itemStyle": {"color": "#5470c6"}
            }]
        }

    def _generate_bar_chart(self, data: list, title: str) -> dict:
        """生成柱状图配置"""
        return {
            "title": {"text": title},
            "tooltip": {"trigger": "axis"},
            "xAxis": {
                "type": "category",
                "data": [d.get('category', d.get('channel', '')) for d in data]
            },
            "yAxis": {"type": "value"},
            "series": [{
                "name": "数值",
                "type": "bar",
                "data": [d.get('value', d.get('total_sales', 0)) for d in data],
                "itemStyle": {"color": "#5470c6"}
            }]
        }

六、报告自动生成

报告生成Agent

class ReportGeneratorAgent:
    """报告生成Agent"""

    async def generate_report(self, analysis_result: dict, 
                             template: str = "daily") -> str:
        """生成分析报告"""
        if template == "daily":
            return await self._generate_daily_report(analysis_result)
        elif template == "weekly":
            return await self._generate_weekly_report(analysis_result)
        elif template == "adhoc":
            return await self._generate_adhoc_report(analysis_result)

    async def _generate_daily_report(self, result: dict) -> str:
        """生成日报"""
        stats = result['statistics']
        trends = result['trends']
        insights = result['insights']

        report = f"""
# 数据日报 - {datetime.now().strftime('%Y-%m-%d')}

## 一、数据概览

| 指标 | 数值 |
|------|------|
| 数据量 | {stats['count']} |
| 总计 | {stats['sum']:,.0f} |
| 平均值 | {stats['mean']:,.2f} |
| 最大值 | {stats['max']:,.0f} |
| 最小值 | {stats['min']:,.0f} |

## 二、趋势分析

**趋势方向**:{trends['trend']}
**变化率**:{trends['change_rate']}%

## 三、关键洞察

{self._format_insights(insights)}

## 四、建议行动

{await self._generate_actions(insights)}

---
报告生成时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        """

        return report

    def _format_insights(self, insights: list) -> str:
        """格式化洞察"""
        return "n".join([
            f"{i+1}. {insight}"
            for i, insight in enumerate(insights)
        ])

    async def _generate_actions(self, insights: list) -> str:
        """生成行动建议"""
        prompt = f"""
基于以下洞察,生成3-5条具体的行动建议:

洞察:
{chr(10).join(insights)}

要求:
1. 建议具体可执行
2. 优先级明确
3. 预期效果说明
        """

        return await self.llm.generate(prompt)

七、完整示例

使用示例

# 初始化Agent
data_agent = DataAnalysisAgent(
    llm=GPT4Client(),
    db_connector=MySQLConnector(config),
    schema_info=load_schema()
)

# 用户提问
question = "上周各渠道销售情况如何?哪个渠道增长最快?"

# 执行分析
result = await data_agent.query(question)

# 输出结果
print(result['report'])

生成报告示例

# 销售数据分析报告

## 数据概览

| 渠道 | 销售额 | 订单数 | 占比 |
|------|--------|--------|------|
| 微信小程序 | ¥150,000 | 1,200 | 37.5% |
| APP | ¥120,000 | 800 | 30.0% |
| 官网 | ¥80,000 | 500 | 20.0% |
| 线下门店 | ¥50,000 | 300 | 12.5% |

## 趋势分析

- 微信小程序:增长15.2%,增长最快
- APP:增长8.3%
- 官网:下降2.1%
- 线下门店:下降5.5%

## 关键洞察

1. **微信小程序成为主力渠道**:占总销售额37.5%,且保持高速增长
2. **线下渠道持续萎缩**:需要关注线下门店运营问题
3. **官网流量下降**:可能与SEO或用户体验有关

## 建议行动

1. **加大微信小程序投入**:增加营销预算,优化用户体验
2. **调查线下门店问题**:分析客流、转化率等指标
3. **官网SEO优化**:提升自然流量获取能力

八、性能优化

查询缓存

class QueryCache:
    """查询缓存"""

    def __init__(self, ttl: int = 3600):
        self.cache = {}
        self.ttl = ttl

    def get(self, question: str) -> Optional[dict]:
        """获取缓存"""
        key = self._hash_question(question)
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry['timestamp']  str:
        """问题哈希"""
        # 标准化问题
        normalized = question.lower().strip()
        return hashlib.md5(normalized.encode()).hexdigest()

异步执行

class AsyncDataAgent:
    """异步数据Agent"""

    async def batch_analyze(self, questions: list) -> list:
        """批量分析"""
        tasks = [self.analyze(q) for q in questions]
        return await asyncio.gather(*tasks)

    async def analyze_stream(self, question: str):
        """流式分析"""
        # 生成SQL
        sql_result = await self.text_to_sql(question)
        yield {"stage": "sql", "result": sql_result}

        # 执行查询
        data = await self.execute_query(sql_result['sql'])
        yield {"stage": "data", "result": data}

        # 分析数据
        analysis = await self.analyze_data(data)
        yield {"stage": "analysis", "result": analysis}

        # 生成报告
        report = await self.generate_report(analysis)
        yield {"stage": "report", "result": report}

九、最佳实践

问题设计

# ✅ 好的问题
"上周各渠道销售额对比"
"最近30天用户留存率趋势"
"哪个产品类目增长最快"

# ❌ 模糊的问题
"数据怎么样"
"给我看看报表"
"分析一下"

数据安全

class DataSecurityManager:
    """数据安全管理"""

    def __init__(self):
        self.sensitive_tables = ['users', 'payments', 'personal_info']
        self.access_control = AccessControl()

    async def check_access(self, user: str, table: str) -> bool:
        """检查访问权限"""
        if table in self.sensitive_tables:
            return await self.access_control.has_permission(
                user, 'read_sensitive'
            )
        return True

    def mask_sensitive_data(self, data: list, columns: list) -> list:
        """脱敏处理"""
        for row in data:
            for col in columns:
                if col in row:
                    row[col] = self._mask(row[col])
        return data

十、总结

效率提升

任务 传统方式 AI辅助 提升
SQL编写 30分钟 2分钟 93%
数据分析 2小时 15分钟 87%
报告撰写 1小时 10分钟 83%
图表制作 30分钟 3分钟 90%

下期预告

明天聊聊AI Agent内容创作——自动化写作与营销文案!


往期回顾

正文完
 0
技术老金
版权声明:本站原创文章,由 技术老金 于2026-03-31发表,共计9480字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)