返回博客
MLOps
阅读时长 7 min read
使用现代工具构建实时机器学习流水线
学习如何为生产环境设计和构建可扩展的机器学习流水线。
实时机器学习流水线
在生产环境中部署机器学习模型需要考虑可扩展性、延迟监控和容错性等多个方面。本文将介绍如何构建一个现代化的实时 ML 流水线。
架构设计
一个典型的实时 ML 流水线包含以下组件:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 客户端 │────▶│ API 网关 │────▶│ 模型服务 │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ 特征存储 │
└─────────────┘
模型服务化
FastAPI 服务
FastAPI 是构建高性能 ML API 的理想选择:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np
import joblib
import torch
app = FastAPI(title="ML Model API")
# 加载模型
model = joblib.load("model.pkl")
# 或者对于 PyTorch 模型
# model = torch.load("model.pth")
# model.eval()
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: float
probability: float | None = None
model_version: str
latency_ms: float
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
import time
start_time = time.time()
try:
# 预处理
features = np.array(request.features).reshape(1, -1)
# 推理
prediction = model.predict(features)[0]
# 可选:获取概率
probability = None
if hasattr(model, 'predict_proba'):
probability = model.predict_proba(features)[0].max()
# 计算延迟
latency = (time.time() - start_time) * 1000
return PredictionResponse(
prediction=float(prediction),
probability=probability,
model_version="1.0.0",
latency_ms=latency
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "healthy", "model_loaded": model is not None}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
批量推理
对于高吞吐量场景,实现批量推理可以显著提高性能:
from fastapi import FastAPI, BackgroundTasks
from collections import deque
import asyncio
app = FastAPI()
batch_queue = deque()
BATCH_SIZE = 32
BATCH_TIMEOUT = 0.1 # 100ms
async def process_batch():
while True:
if len(batch_queue) >= BATCH_SIZE:
# 处理批次
batch = [batch_queue.popleft() for _ in range(min(BATCH_SIZE, len(batch_queue)))]
predictions = model.predict_batch(batch)
# 返回结果...
else:
await asyncio.sleep(BATCH_TIMEOUT)
@app.on_event("startup")
async def startup_event():
asyncio.create_task(process_batch())
特征存储
Redis 作为特征存储
Redis 提供低延迟的特征访问:
import redis
import json
import numpy as np
from typing import Dict, Any
class FeatureStore:
def __init__(self, host="localhost", port=6379):
self.client = redis.Redis(host=host, port=port, decode_responses=True)
def store_features(self, entity_id: str, features: Dict[str, Any]):
"""存储特征"""
# 序列化 numpy 数组
serialized = {}
for key, value in features.items():
if isinstance(value, np.ndarray):
serialized[key] = json.dumps(value.tolist())
else:
serialized[key] = json.dumps(value)
self.client.hset(f"features:{entity_id}", mapping=serialized)
def get_features(self, entity_id: str, feature_names: list[str]) -> Dict[str, Any]:
"""获取特征"""
raw_features = self.client.hmget(f"features:{entity_id}", feature_names)
features = {}
for key, value in zip(feature_names, raw_features):
if value:
features[key] = json.loads(value)
return features
def update_feature(self, entity_id: str, feature_name: str, value: Any):
"""更新单个特征"""
if isinstance(value, np.ndarray):
value = json.dumps(value.tolist())
else:
value = json.dumps(value)
self.client.hset(f"features:{entity_id}", feature_name, value)
Feast 集成
Feast 是一个开源的特征存储解决方案:
from feast import FeatureStore
from datetime import datetime, timedelta
# 初始化 Feast
store = FeatureStore(repo_path=".")
# 获取历史特征
entity_df = pd.DataFrame({
"user_id": [1, 2, 3],
"event_timestamp": [
datetime.now(),
datetime.now(),
datetime.now()
]
}
features = store.get_historical_features(
entity_df=entity_df,
features=[
"user_features:age",
"user_features:country",
"item_features:price"
]
)
# 获取在线特征
online_features = store.get_online_features(
features=[
"user_features:age",
"user_features:last_login"
],
entity_rows=[
{"user_id": 1},
{"user_id": 2}
]
)
模型监控
Prometheus 监控
from prometheus_client import Counter, Histogram, Gauge, generate_latest
# 定义指标
prediction_counter = Counter('predictions_total', 'Total predictions')
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency')
model_accuracy = Gauge('model_accuracy', 'Model accuracy')
@app.post("/predict")
@prediction_latency.time()
async def predict(request: PredictionRequest):
prediction_counter.inc()
# ... 预测逻辑 ...
return {"prediction": prediction}
@app.get("/metrics")
async def metrics():
return generate_latest()
数据漂移检测
from alibi_detect import CategoricalDrift, ChiKernelDrift
from scipy.stats import entropy
class DriftMonitor:
def __init__(self, reference_data):
self.reference_data = reference_data
self.drift_detector = ChiKernelDrift(
X_ref=reference_data,
p_val=0.05
)
def check_drift(self, new_data):
"""检测数据漂移"""
drift_result = self.drift_detector.predict(new_data)
return {
"is_drift": drift_result['data']['is_drift'],
"p_value": drift_result['data']['p_val'],
"distance": drift_result['data']['distance']
}
def distribution_similarity(self, new_data):
"""计算分布相似度"""
ref_hist, _ = np.histogram(self.reference_data, bins=50)
new_hist, _ = np.histogram(new_data, bins=50)
# KL 散度
ref_hist = ref_hist + 1e-10 # 避免除零
new_hist = new_hist + 1e-10
ref_hist = ref_hist / ref_hist.sum()
new_hist = new_hist / new_hist.sum()
kl_div = entropy(new_hist, ref_hist)
return {"kl_divergence": kl_div}
容器化部署
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制模型和代码
COPY model.pkl .
COPY src/ ./src/
# 暴露端口
EXPOSE 8000
# 启动服务
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
Kubernetes 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-model
spec:
replicas: 3
selector:
matchLabels:
app: ml-model
template:
metadata:
labels:
app: ml-model
spec:
containers:
- name: ml-model
image: your-registry/ml-model:latest
ports:
- containerPort: 8000
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ml-model-service
spec:
selector:
app: ml-model
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancer
CI/CD 流水线
GitHub Actions
name: ML Pipeline CI/CD
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
- name: Run tests
run: |
pytest tests/
- name: Model validation
run: |
python scripts/validate_model.py
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Build and push Docker image
run: |
docker build -t your-registry/ml-model:${{ github.sha }} .
docker push your-registry/ml-model:${{ github.sha }}
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/ml-model \
ml-model=your-registry/ml-model:${{ github.sha }}
总结
构建实时机器学习流水线需要综合考虑服务架构、性能优化、监控和部署等多个方面。
关键要点:
- 使用 FastAPI 构建高性能 API
- 利用 Redis 或 Feast 实现低延迟特征存储
- 实施全面的模型监控和漂移检测
- 采用容器化和 Kubernetes 实现可扩展部署
- 建立 CI/CD 流水线实现自动化部署
分享这篇文章