返回博客
MLOps
阅读时长 7 min read

使用现代工具构建实时机器学习流水线

学习如何为生产环境设计和构建可扩展的机器学习流水线。

实时机器学习流水线

在生产环境中部署机器学习模型需要考虑可扩展性、延迟监控和容错性等多个方面。本文将介绍如何构建一个现代化的实时 ML 流水线。

架构设计

一个典型的实时 ML 流水线包含以下组件:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   客户端    │────▶│  API 网关   │────▶│  模型服务   │
└─────────────┘     └─────────────┘     └─────────────┘
                                                │
                                                ▼
                                        ┌─────────────┐
                                        │ 特征存储    │
                                        └─────────────┘

模型服务化

FastAPI 服务

FastAPI 是构建高性能 ML API 的理想选择:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np
import joblib
import torch

app = FastAPI(title="ML Model API")

# 加载模型
model = joblib.load("model.pkl")
# 或者对于 PyTorch 模型
# model = torch.load("model.pth")
# model.eval()

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: float
    probability: float | None = None
    model_version: str
    latency_ms: float

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    import time
    start_time = time.time()

    try:
        # 预处理
        features = np.array(request.features).reshape(1, -1)

        # 推理
        prediction = model.predict(features)[0]

        # 可选:获取概率
        probability = None
        if hasattr(model, 'predict_proba'):
            probability = model.predict_proba(features)[0].max()

        # 计算延迟
        latency = (time.time() - start_time) * 1000

        return PredictionResponse(
            prediction=float(prediction),
            probability=probability,
            model_version="1.0.0",
            latency_ms=latency
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy", "model_loaded": model is not None}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

批量推理

对于高吞吐量场景,实现批量推理可以显著提高性能:

from fastapi import FastAPI, BackgroundTasks
from collections import deque
import asyncio

app = FastAPI()
batch_queue = deque()
BATCH_SIZE = 32
BATCH_TIMEOUT = 0.1  # 100ms

async def process_batch():
    while True:
        if len(batch_queue) >= BATCH_SIZE:
            # 处理批次
            batch = [batch_queue.popleft() for _ in range(min(BATCH_SIZE, len(batch_queue)))]
            predictions = model.predict_batch(batch)
            # 返回结果...
        else:
            await asyncio.sleep(BATCH_TIMEOUT)

@app.on_event("startup")
async def startup_event():
    asyncio.create_task(process_batch())

特征存储

Redis 作为特征存储

Redis 提供低延迟的特征访问:

import redis
import json
import numpy as np
from typing import Dict, Any

class FeatureStore:
    def __init__(self, host="localhost", port=6379):
        self.client = redis.Redis(host=host, port=port, decode_responses=True)

    def store_features(self, entity_id: str, features: Dict[str, Any]):
        """存储特征"""
        # 序列化 numpy 数组
        serialized = {}
        for key, value in features.items():
            if isinstance(value, np.ndarray):
                serialized[key] = json.dumps(value.tolist())
            else:
                serialized[key] = json.dumps(value)

        self.client.hset(f"features:{entity_id}", mapping=serialized)

    def get_features(self, entity_id: str, feature_names: list[str]) -> Dict[str, Any]:
        """获取特征"""
        raw_features = self.client.hmget(f"features:{entity_id}", feature_names)

        features = {}
        for key, value in zip(feature_names, raw_features):
            if value:
                features[key] = json.loads(value)

        return features

    def update_feature(self, entity_id: str, feature_name: str, value: Any):
        """更新单个特征"""
        if isinstance(value, np.ndarray):
            value = json.dumps(value.tolist())
        else:
            value = json.dumps(value)

        self.client.hset(f"features:{entity_id}", feature_name, value)

Feast 集成

Feast 是一个开源的特征存储解决方案:

from feast import FeatureStore
from datetime import datetime, timedelta

# 初始化 Feast
store = FeatureStore(repo_path=".")

# 获取历史特征
entity_df = pd.DataFrame({
    "user_id": [1, 2, 3],
    "event_timestamp": [
        datetime.now(),
        datetime.now(),
        datetime.now()
    ]
}

features = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "user_features:age",
        "user_features:country",
        "item_features:price"
    ]
)

# 获取在线特征
online_features = store.get_online_features(
    features=[
        "user_features:age",
        "user_features:last_login"
    ],
    entity_rows=[
        {"user_id": 1},
        {"user_id": 2}
    ]
)

模型监控

Prometheus 监控

from prometheus_client import Counter, Histogram, Gauge, generate_latest

# 定义指标
prediction_counter = Counter('predictions_total', 'Total predictions')
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency')
model_accuracy = Gauge('model_accuracy', 'Model accuracy')

@app.post("/predict")
@prediction_latency.time()
async def predict(request: PredictionRequest):
    prediction_counter.inc()

    # ... 预测逻辑 ...

    return {"prediction": prediction}

@app.get("/metrics")
async def metrics():
    return generate_latest()

数据漂移检测

from alibi_detect import CategoricalDrift, ChiKernelDrift
from scipy.stats import entropy

class DriftMonitor:
    def __init__(self, reference_data):
        self.reference_data = reference_data
        self.drift_detector = ChiKernelDrift(
            X_ref=reference_data,
            p_val=0.05
        )

    def check_drift(self, new_data):
        """检测数据漂移"""
        drift_result = self.drift_detector.predict(new_data)

        return {
            "is_drift": drift_result['data']['is_drift'],
            "p_value": drift_result['data']['p_val'],
            "distance": drift_result['data']['distance']
        }

    def distribution_similarity(self, new_data):
        """计算分布相似度"""
        ref_hist, _ = np.histogram(self.reference_data, bins=50)
        new_hist, _ = np.histogram(new_data, bins=50)

        # KL 散度
        ref_hist = ref_hist + 1e-10  # 避免除零
        new_hist = new_hist + 1e-10
        ref_hist = ref_hist / ref_hist.sum()
        new_hist = new_hist / new_hist.sum()

        kl_div = entropy(new_hist, ref_hist)

        return {"kl_divergence": kl_div}

容器化部署

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制模型和代码
COPY model.pkl .
COPY src/ ./src/

# 暴露端口
EXPOSE 8000

# 启动服务
CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]

Kubernetes 部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: ml-model
        image: your-registry/ml-model:latest
        ports:
        - containerPort: 8000
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
  name: ml-model-service
spec:
  selector:
    app: ml-model
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

CI/CD 流水线

GitHub Actions

name: ML Pipeline CI/CD

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt

    - name: Run tests
      run: |
        pytest tests/

    - name: Model validation
      run: |
        python scripts/validate_model.py

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
    - name: Build and push Docker image
      run: |
        docker build -t your-registry/ml-model:${{ github.sha }} .
        docker push your-registry/ml-model:${{ github.sha }}

    - name: Deploy to Kubernetes
      run: |
        kubectl set image deployment/ml-model \
          ml-model=your-registry/ml-model:${{ github.sha }}

总结

构建实时机器学习流水线需要综合考虑服务架构、性能优化、监控和部署等多个方面。

关键要点:

  • 使用 FastAPI 构建高性能 API
  • 利用 Redis 或 Feast 实现低延迟特征存储
  • 实施全面的模型监控和漂移检测
  • 采用容器化和 Kubernetes 实现可扩展部署
  • 建立 CI/CD 流水线实现自动化部署
分享这篇文章