共计 7180 个字符,预计需要花费 18 分钟才能阅读完成。
目标 → 数据 → 指标 → 方法 → 验证 → 复盘。任何分析从 明确业务问题 开始,并以 可复现的交付物 结束(脚本、SQL、图表、说明文档)。
0. 环境与项目骨架(拎包即用)
project/
├─ data/
│ ├─ raw/ # 原始文件(只读)│ ├─ interim/ # 中间结果(可删)│ └─ processed/ # 清洗后结果
├─ notebooks/ # 探索 / 展示
├─ sql/ # SQL 脚本
├─ src/ # 可复用代码(ETL/ 特征 / 统计)├─ tests/ # 单元测试(pytest)├─ config/ # 参数、口径、阈值(YAML/JSON)├─ requirements.txt # 依赖
└─ README.md # 复现实验步骤
requirements.txt 示例(按需增减)
pandas
numpy
scipy
matplotlib
pyyaml
python-dateutil
duckdb
README 模板要点:
- 数据来源与时间范围;2) 口径定义(如“活跃用户”);3) 运行步骤;4) 已知限制与 TODO。
1. 案例 A:电商用户增长(RFM + 漏斗)
业务问题:识别高价值用户与卡点环节,制定精细化运营策略。
数据表(最小闭环):
users(user_id, signup_dt, channel)主键:user_idorders(order_id, user_id, amount, status, created_at)主键:order_id,外键:user_idevents(user_id, event, ts, sku_id)行为日志(view/add_to_cart/pay)
1.1 数据清洗检查清单(DQ Checks)
- 主键唯一:
orders.order_id不重复。 - 参照完整性:
orders.user_id⊆users.user_id。 - 字段域:
status∈ {paid, canceled, refunded}。 - 异常金额:
amount >= 0,P99 是否离群。 - 时间合理性:
created_at >= signup_dt。
Pandas 断言示例
import pandas as pd
orders = pd.read_csv("data/raw/orders.csv", dtype={"order_id":"int64","user_id":"int64"})
users = pd.read_csv("data/raw/users.csv", parse_dates=["signup_dt"])
orders["created_at"] = pd.to_datetime(orders["created_at"])
# 主键唯一
assert orders["order_id"].is_unique, "order_id 存在重复"
# 参照完整性
assert set(orders["user_id"]).issubset(set(users["user_id"])), "孤儿订单:未知用户"
# 金额非负
assert (orders["amount"] >= 0).all(), "存在负数金额"
# 时间不倒流
merged = orders.merge(users[["user_id","signup_dt"]], on="user_id", how="left")
assert (merged["created_at"] >= merged["signup_dt"]).all(), "下单早于注册"
1.2 构建 RFM(最近一次消费 Recency、频次 Frequency、金额 Monetary)
SQL 版本
WITH paid AS (
SELECT user_id, amount, created_at
FROM orders
WHERE status = 'paid'
), base AS (
SELECT
user_id,
MAX(created_at) AS last_dt,
COUNT(*) AS freq,
SUM(amount) AS monetary
FROM paid
GROUP BY user_id
)
SELECT
b.*,
DATE_DIFF('day', b.last_dt, CURRENT_DATE) AS recency
FROM base b;
Pandas 版本
paid = orders.query("status =='paid'")
rfm = (paid.groupby("user_id")
.agg(last_dt=("created_at","max"),
freq=("order_id","count"),
monetary=("amount","sum"))
.reset_index())
rfm["recency"] = (pd.Timestamp.today().normalize() - rfm["last_dt"]).dt.days
分群(四分位打分)
for col, ascending in [("recency", True), ("freq", False), ("monetary", False)]:
rfm[col+"_q"] = pd.qcut(rfm[col].rank(method="first", ascending=ascending), 4, labels=[1,2,3,4]).astype(int)
rfm["segment"] = rfm[["recency_q","freq_q","monetary_q"]].sum(axis=1)
# 10-12 高价值;7-9 成长;<=6 需激活
1.3 漏斗诊断(view→add_to_cart→pay)
SQL(同一时间窗口 / 同一用户口径):
WITH win AS (
SELECT * FROM events
WHERE ts >= DATE_TRUNC('month', CURRENT_DATE)
), s AS (
SELECT user_id,
MAX(CASE WHEN event='view' THEN 1 ELSE 0 END) AS v,
MAX(CASE WHEN event='add_to_cart' THEN 1 ELSE 0 END) AS a,
MAX(CASE WHEN event='pay' THEN 1 ELSE 0 END) AS p
FROM win
GROUP BY user_id
)
SELECT
SUM(v) AS viewers,
SUM(a) AS adders,
SUM(p) AS payers,
1.0*SUM(a)/NULLIF(SUM(v),0) AS view2cart,
1.0*SUM(p)/NULLIF(SUM(a),0) AS cart2pay,
1.0*SUM(p)/NULLIF(SUM(v),0) AS view2pay
FROM s;
可视化建议:
- 分端(iOS/Android/Web)与渠道分层展示漏斗;
- 同比 / 环比线图观察趋势;
- 结合 RFM 分群看转化差异,定位“该投放给谁、用什么文案”。
2. 案例 B:A/B 测试端到端
问题:新结算页是否提升支付率?
2.1 设计
- 指标(KPI):
支付率 = 支付用户数 / 进入结算页用户数。 - 样本量粗估:
- 设基线 20%,期望提升至 22%,显著性 α=0.05,功效 1-β=0.8。
- 经验公式(两样本比例):每组约需上千级样本(精确计算可用
statsmodels或在线计算器)。
- 随机化:用户级、持久化分桶(避免跨天串组)。
- 冻结期:功能上线后先稳定采集,再评估。
2.2 数据抽取(SQL)
WITH base AS (SELECT user_id, variant, MIN(ts) AS first_seen
FROM ab_assignments
WHERE exp_id = 'checkout_v2'
GROUP BY 1,2
), expo AS (
SELECT e.user_id, b.variant
FROM events e JOIN base b USING(user_id)
WHERE e.event = 'checkout_view' AND e.ts >= b.first_seen
), conv AS (
SELECT e.user_id, b.variant
FROM events e JOIN base b USING(user_id)
WHERE e.event = 'pay' AND e.ts >= b.first_seen
)
SELECT
variant,
COUNT(DISTINCT expo.user_id) AS n_exposed,
COUNT(DISTINCT conv.user_id) AS n_converted
FROM expo
LEFT JOIN conv USING(user_id, variant)
GROUP BY 1;
2.3 统计检验(Python)
import numpy as np
from math import sqrt
# 输入:两组曝光与转化数
n_a, c_a = 12000, 2520
n_b, c_b = 11850, 2750
p_a, p_b = c_a/n_a, c_b/n_b
p_pool = (c_a+c_b)/(n_a+n_b)
se = sqrt(p_pool*(1-p_pool)*(1/n_a + 1/n_b))
z = (p_b - p_a)/se
# 将 z 转为 p 值(双尾)from scipy.stats import norm
p_value = 2*(1-norm.cdf(abs(z)))
print(p_a, p_b, z, p_value)
进阶:
- Welch t 用于均值类指标;
- 分层分析(端 / 新老客);
- CUPED(协变量预减噪);
- 多重假设 校正(Benjamini–Hochberg)。
解读框架:显著 ≠ 重要。关注效应大小、稳定性与业务代价;避免“边跑边改”。
3. 案例 C:时间序列监控与异常检测
目标:为日活、支付额等核心 KPI 搭建“自动预警”。
3.1 去季节化的鲁棒规则
import pandas as pd
# df: columns=[date, kpi]
df = df.sort_values("date").set_index("date")
# 7 日中位移动窗 + IQR 阈值
med = df["kpi"].rolling(7, center=True, min_periods=3).median()
iqr = (df["kpi"].rolling(28, min_periods=7)
.quantile(0.75) - df["kpi"].rolling(28, min_periods=7).quantile(0.25))
upper = med + 3*iqr
lower = med - 3*iqr
alerts = df[(df["kpi"]>upper) | (df["kpi"]<lower)]
实践要点:
- 维护 节假日表 与版本发布表,在报警里标注上下文;
- 新渠道上线、计费口径变更时,先“换挡”(重置基线)。
4. 中等规模数据的高效套路(单机)
- 列式存储 + 向量化:将 CSV 转 Parquet,读取速度与体积显著改善。
- DuckDB:在本地直接对 Parquet/CSV 跑 SQL,适合 10GB 级数据探索。
DuckDB 小例
import duckdb
con = duckdb.connect()
con.execute("""CREATE TABLE orders AS SELECT * FROM read_parquet('data/processed/orders.parquet');
SELECT user_id, SUM(amount) AS rev
FROM orders WHERE status='paid'
GROUP BY 1 ORDER BY rev DESC LIMIT 20;
""")
5. 数据质量自动化 & 回归保护
轻量校验器(不依赖大框架)
# src/checks.py
import pandas as pd
def non_negative(df, col):
bad = df[df[col] < 0]
return len(bad)==0, bad
def unique_key(df, cols):
dups = df.duplicated(subset=cols, keep=False)
return (~dups).all(), df[dups]
# tests/test_pipeline.py
import pandas as pd
from src import checks
def test_orders_amount_non_negative():
df = pd.DataFrame({"order_id":[1,2], "amount":[0,10]})
ok, _ = checks.non_negative(df, "amount")
assert ok
def test_orders_pk_unique():
df = pd.DataFrame({"order_id":[1,1]})
ok, dups = checks.unique_key(df, ["order_id"])
assert not ok and len(dups)==2
运行:pytest -q。把关键口径写成代码与测试,而不是留在口头与记忆里。
6. 指标口径与数据字典(模板)
指标模板
kpi_id: pay_rate
name: 支付率
owner: growth@company.com
period: daily
numerator: 当日支付用户数
denominator: 当日结算页曝光用户数
filters:
- 地区 != "测试"
- 端 in [iOS, Android]
backfill_start: 2025-01-01
notes: |
变更记录:2025-03-10 排除内部账号;2025-05-01 新增反作弊过滤。
数据字典模板
table: orders
primary_key: [order_id]
columns:
- name: order_id
type: int64
desc: 订单唯一 ID
- name: user_id
type: int64
desc: 下单用户
- name: amount
type: float64
desc: 订单金额(含税)- name: status
type: category
values: [paid, canceled, refunded]
- name: created_at
type: datetime64[ns]
tz: Asia/Shanghai
7. 性能与内存技巧(Pandas)
category压缩高基数不高的字符串列;- 分块读取:
pd.read_csv(..., chunksize=1_000_000); - 先筛后 JOIN,减少参照表大小;
- 统一时区再运算,避免“日跨界”误差;
- 使用
merge_asof做近邻时间对齐。
类型压缩示例
import pandas as pd
import numpy as np
def optimize(df: pd.DataFrame) -> pd.DataFrame:
for c in df.select_dtypes(include=['int64']).columns:
df[c] = pd.to_numeric(df[c], downcast='integer')
for c in df.select_dtypes(include=['float64']).columns:
df[c] = pd.to_numeric(df[c], downcast='float')
for c in df.select_dtypes(include=['object']).columns:
if df[c].nunique()/max(len(df),1) < 0.5:
df[c] = df[c].astype('category')
return df
8. 可视化与“先画后判”
- 分布:直方图 + 密度曲线(看偏态 / 长尾);
- 离群:箱线图(IQR)+ 点标注;
- 趋势:按周 / 自然月聚合的折线;
- 相关:散点 + 皮尔逊 / 斯皮尔曼相关系数。
Matplotlib 快速出图
import matplotlib.pyplot as plt
fig = plt.figure()
plt.hist(rfm['monetary'].clip(upper=rfm['monetary'].quantile(0.99)), bins=30)
plt.title('消费金额分布(截尾 P99)')
plt.xlabel('金额'); plt.ylabel('用户数')
plt.show()
9. 交付物清单(避免“只说不做”)
sql/含原始抽取、聚合、指标脚本;src/含清洗与特征工程模块;notebooks/含 EDA 与结果展示;tests/覆盖关键口径与边界;README.md明确运行顺序与依赖;data/processed/metrics_daily.parquet最终指标表;figures/导出的核心图(PNG/SVG)。
10. 练习题(含判题思路)
- 避免重复计数:给出
orders与order_items,计算 SKU 级 GMV。要求:避免 JOIN 后倍增。- 思路:先在
order_items聚合到order_id粒度再 JOIN,或在orders上 SELECT DISTINCT。
- 思路:先在
- 时间泄漏识别:用近 30 天数据训练预测“7 日内复购”。指出潜在泄漏源。
- 思路:训练集的特征窗口必须严格在预测窗口之前;下单后的行为不得进入特征。
- 异常值处理:
amount存在极端长尾,如何构建稳健均值?- 思路:Winsorize、对数变换、或以中位数 / 分位数替代;报告两套口径。
- A/B 多指标:支付率显著提升但退款率略升,给出结论与后续实验设计。
- 思路:联合目标(净收入 / 净 GMV),分层验证,灰度放量 + 留存复测。
11. 常见坑二刷(附避坑句柄)
- 平均被极端绑架 → 报告同时给出中位数与 P90/P95。
- JOIN 放大 → 先聚合再连,或在 ON 条件外加去重键。
- 分母错用 → 指标定义写进
config/kpi.yml并在代码里唯一定义。 - 抽样偏差 → 使用分层抽样(渠道 / 地区 / 新老客)并标注采样率。
- 时区混乱 → 统一到 UTC 或业务时区,再衍生日周月。
12. 拓展练习
- 加入 分群营销实验(RFM × 渠道 × 文案 A/B);
- 引入 特征存储 模式,打通实时 / 离线;
- 采用 作业编排(如 Airflow)的 DAG,把清洗→统计→可视化串起来;
- 数据契约(Data Contract)与回填流程规范化。
正文完

