共计 9134 个字符,预计需要花费 23 分钟才能阅读完成。
把之前的模型与导出形态(ONNX / TensorRT / OpenVINO)上成服务,支持多路视频、弹性扩缩与可观测。覆盖三条主线:
- Triton Inference Server:标准化推理服务(HTTP/gRPC/Prometheus),动态批处理、实例并行、多模型管理;
- DeepStream:端 / 边 /NVIDIA 平台的视频管线(多路解码→预处理→推理→跟踪→OSD→推送);
- FastAPI 微服务 :轻量 HTTP 服务与 WebSocket/ 帧流式推理,适配 CPU/ORT/OV。
并给出 基准 / 监控 / 排错 清单与 多路并发模板,实现从摄像头到看板的一条龙。
0) 总览与选型
场景矩阵
- 离线批 / 简易在线:FastAPI + ONNX Runtime / OpenVINO(轻量、好改);
- 通用在线服务:Triton(HTTP/gRPC,自动批、实例并行、多模型热更新);
- 视频流媒体:DeepStream(GStreamer 硬解码 + 零拷贝 + nvinfer/nvinferserver)。
经验法则:单机多路视频 → DeepStream;跨机房 / 弹性伸缩 → Triton(配 K8s);快速原型 /CPU 环境 → FastAPI。
1) Triton 快速上手(HTTP 8000 / gRPC 8001 / Metrics 8002)
1.1 模型库结构(Model Repository)
models/
yolov8_onnx/
1/
model.onnx
config.pbtxt
yolov8_trt/
1/
model.plan # TensorRT engine
config.pbtxt
config.pbtxt(ONNX 示例)
name: "yolov8_onnx"
platform: "onnxruntime_onnx"
max_batch_size: 8
input: [
{name: "images", data_type: TYPE_FP32, dims: [3, 640, 640] }
]
output: [
{name: "output0", data_type: TYPE_FP32, dims: [-1, 85] } # [N*anchors, 85]
]
instance_group: [ {kind: KIND_GPU, count: 1} ]
dynamic_batching: {preferred_batch_size: [ 4, 8]
max_queue_delay_microseconds: 2000
}
形状以导出的实际维度为准;若是
[B, 8400, 85]则将dims写为[8400, 85],并保留max_batch_size>0表示支持批。
1.2 运行 Triton(Docker)
docker run --gpus=all --rm -p8000:8000 -p8001:8001 -p8002:8002 \
-v $PWD/models:/models nvcr.io/nvidia/tritonserver:24.08-py3 \
tritonserver --model-repository=/models --strict-model-config=false
健康检查 :GET /v2/health/ready; 指标:GET :8002/metrics(Prometheus)。
1.3 Python gRPC 客户端(异步并发)
import asyncio, numpy as np, cv2
import tritonclient.grpc.aio as grpc
async def infer_loop(uri, model='yolov8_onnx', n=200):
cli = grpc.InferenceServerClient(url=uri)
iname, oname = 'images', 'output0'
for _ in range(n):
# 640 letterbox(简化:直接 resize)img = cv2.resize(cv2.imread('sample.jpg'), (640,640))
x = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype(np.float32)/255.0
x = np.transpose(x, (2,0,1))[None]
inp = grpc.InferInput(iname, x.shape, 'FP32'); inp.set_data_from_numpy(x)
out = grpc.InferRequestedOutput(oname)
res = await cli.infer(model, [inp], outputs=[out])
y = res.as_numpy(oname) # (N*anchors,85) 或 (1,8400,85)
await cli.close()
asyncio.run(infer_loop('localhost:8001'))
吞吐开关:dynamic_batching.max_queue_delay、preferred_batch_size、instance_group.count(同一 GPU 并行实例)、response_cache(开启需写入 config)。
1.4 Ensemble:前后处理拼装
把预处理 / 后处理做成 Python Backend,与主模型组装为流水线:
name: "yolo_ens"
platform: "ensemble"
max_batch_size: 8
input: [{name: "raw", data_type: TYPE_UINT8, dims: [-1] }]
output: [{name: "det", data_type: TYPE_FP32, dims: [-1, 6] }]
ensemble_scheduling: {step: [{ model_name: "preproc_py", input_map: {"raw":"raw"}, output_map:{"x":"x"} },
{model_name: "yolov8_onnx", input_map: {"images":"x"}, output_map:{"output0":"pred"} },
{model_name: "postproc_py", input_map: {"pred":"pred"}, output_map:{"det":"det"} }]
}
优点:服务端口径统一、客户端更轻。缺点:Python Backend 需注意 GIL/ 多进程与零拷贝(可用 CUDA/IPC Shared Memory)。
2) DeepStream 多路视频管线
2.1 典型 Pipeline(多路 RTSP)
rtsp → uridecodebin ×N → nvstreammux → nvinfer / nvinferserver → nvtracker → nvdsosd → sink
nvstreammux:合帧批处理(batch-size = 路数),对齐输入帧率;nvinfer:本地 TensorRT 引擎;nvinferserver:直连 Triton(推荐大规模管理时)。
2.2 nvinferserver 配置(片段)
# config_infer_primary.txt
infer_config {
unique_id: 1
network_type: 0 # detector
backend {
triton {
model_name: "yolov8_trt"
model_repo {root: "/models"}
grpc {url: "triton:8001"}
}
}
preprocess {network_input_order: NCHW; tensor_name: "images"}
postprocess {labelfile_path: labels.txt; nms_iou_threshold: 0.45; conf_sigmoid: true}
}
input_control {process_mode: 1}
docker-compose(DeepStream + Triton)
services:
triton:
image: nvcr.io/nvidia/tritonserver:24.08-py3
command: ["tritonserver","--model-repository=/models"]
volumes: ["./models:/models"]
deploy: {resources: { reservations: { devices: [{ capabilities: [gpu] }] } } }
ports: ["8000:8000","8001:8001","8002:8002"]
ds:
image: nvcr.io/nvidia/deepstream:7.0-devel
runtime: nvidia
environment: ["DISPLAY=:0"]
volumes: ["./cfg:/opt/cfg","./models:/models"]
command: ["bash","-lc","deepstream-app -c /opt/cfg/app.txt"]
2.3 DeepStream 性能要点
- 零拷贝:保持 NVMM 缓存;尽量避免 CPU 回传;
- batch-size 匹配路数与
nvstreammux的batched-push-timeout; - OSD 开关:在边上可渲染,在云端可只输出元数据(Kafka/Redis)降带宽;
- 跟踪器:KLT/NvDCF/NvSORT;多目标跟踪提高视频级体验但会吃算力。
3) FastAPI 轻量服务(HTTP + WebSocket)
3.1 最小 HTTP 推理服务
# app.py
from fastapi import FastAPI, UploadFile
import onnxruntime as ort, numpy as np, cv2
app = FastAPI()
sess = ort.InferenceSession('best.onnx', providers=['CPUExecutionProvider'])
name = sess.get_inputs()[0].name
@app.post('/predict')
async def predict(file: UploadFile):
data = await file.read()
im = cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR)
im = cv2.cvtColor(cv2.resize(im,(640,640)), cv2.COLOR_BGR2RGB)/255.0
x = np.transpose(im,(2,0,1))[None].astype(np.float32)
y = sess.run(None,{name:x})[0]
return {"shape": y.shape}
并发建议:Uvicorn + --workers(多进程)或 --loop uvloop;CPU 上用 ORT 的线程池参数控制内核运行。
3.2 WebSocket / 帧流式推理(多路并发模板)
# ws_app.py
from fastapi import FastAPI, WebSocket
import asyncio, cv2, numpy as np, onnxruntime as ort
app = FastAPI()
sess = ort.InferenceSession('best.onnx', providers=['CUDAExecutionProvider','CPUExecutionProvider'])
name = sess.get_inputs()[0].name
async def infer_frame(img):
img = cv2.cvtColor(cv2.resize(img,(640,640)), cv2.COLOR_BGR2RGB)/255.0
x = np.transpose(img,(2,0,1))[None].astype(np.float32)
return sess.run(None,{name:x})[0]
@app.websocket('/ws')
async def ws(ws: WebSocket):
await ws.accept()
while True:
data = await ws.receive_bytes()
img = cv2.imdecode(np.frombuffer(data, np.uint8), cv2.IMREAD_COLOR)
y = await asyncio.get_event_loop().run_in_executor(None, infer_frame, img)
await ws.send_text(str(y.shape))
Backpressure(背压):在接收端设置 环形缓冲(仅保留最新帧),以防生产者(摄像头)快于消费者(推理)。
4) 多路并发:三段式流水线(Reader → Infer → Writer)
架构:
- Reader:采集 / 解码(RTSP/ 文件),放入
asyncio.Queue(maxsize=k); - Infer:批处理聚合(可按时间或队列水位触发),发至 Triton/ 本地引擎;
- Writer:渲染 / 编码 / 推流(RTMP/RTSP/WebRTC/ 文件)。
关键代码片段(时间窗聚合批处理):
import asyncio, time
from collections import deque
class Batcher:
def __init__(self, max_bs=8, max_wait=0.002):
self.buf, self.max_bs, self.max_wait = deque(), max_bs, max_wait
self.t0 = None
def add(self, item):
now = time.perf_counter()
if not self.buf:
self.t0 = now
self.buf.append(item)
if len(self.buf)>=self.max_bs or (now-self.t0)>=self.max_wait:
batch = list(self.buf); self.buf.clear(); return batch
return None
经验:小延迟 换高吞吐,网关 / 直播常用
1–5ms聚合窗口;边侧可更激进。
5) 可观测性与压测
5.1 指标
- Triton:
:8002/metrics(Prometheus),关注nv_inference_request_duration_us、nv_inference_queue_duration_us、nv_inference_exec_count、批大小直方图; - DeepStream:
GST_DEBUG,NVDS_ENABLE_LATENCY_MEASUREMENT=1,输出帧级耗时; - 应用:导出
p50/p95/p99、丢帧率、CPU/GPU/ 显存,异常帧样本。
5.2 压测
- HTTP:
wrk/ab;gRPC:ghz;视频:回放多路文件并叠加随机抖动; - 过程:预热(50–200 次)→ 统计
avg/p95→ 阶梯升载直至 SLO 临界。
5.3 基线与告警
- SLO 示例:单帧 p95 < 25ms(数据中心)、端到端延迟 < 150ms(直播);
- 告警:队列积压 > 阈值、批大小降为 1 的比例异常升高、Triton 429(队列满)。
6) 性能调优清单
- 动态批处理:Triton
preferred_batch_size与max_queue_delay协同; - 实例并行:
instance_group.count>1 时可提升吞吐,但注意显存; - 内存:固定输入分辨率减少分配;Pinned Memory + 异步拷贝;
- INT8/FP16:参考第 3 课量化;
- 零拷贝:CUDA / IPC 共享内存;DeepStream 保持 NVMM;
- I/O:GStreamer 硬件解码(
nvh264dec等),避免 Python 解码瓶颈。
7) 部署与扩缩
- Docker/K8s:Triton 无状态(模型库挂载),结合 HPA 按
gpu.util/QPS 水位扩缩; - 多 GPU:一机多卡
--gpus all+ Triton 实例组按 GPU 亲和; - Jetson:DeepStream + TensorRT/INT8 + DLA(如有);
- 灰度 / 多版本:同名模型多版本目录(
1/ 2/),用--strict-model-config=false热加载; - 存储:大引擎放对象存储 / 本地 SSD,注意 IO 抖动。
8) 端到端样例(最小可运行组合)
- 导出模型(第 3 课):
yolo export model=best.pt format=engine half=True - 放入模型库:
models/yolov8_trt/1/model.plan+config.pbtxt - 起 Triton:
docker run ... tritonserver --model-repository=/models - DeepStream:
nvinferserver指向triton:8001;回放 4 路 RTSP - 看板:Prometheus 抓
:8002/metrics,Grafana 展p95/FPS/ 批大小 / 队列时延
9) 常见错误与排查
- 维度不符:config 与真实模型输出不一致 →
curl /v2/models/<name>看签名; - 批处理失效:观察
nv_inference_exec_count与直方图;延迟窗口太小或客户端独占请求; - DeepStream 掉帧:
nvstreammux超时、解码能力不足、OSD 吃满; - 显存 OOM:实例并行过多 / 输入分辨率过大 / 跟踪器缓存;
- 端到端延迟大:解码在 CPU、拷贝频繁、后处理在 Python 单线程;
- Jetson 构建失败:TRT/JetPack 版本不匹配。
10) 练习(60–120 分钟)
- 在 Triton 上分别开启 / 关闭 动态批处理,记录批直方图与 p95;
- DeepStream 同时回放 4/8/16 路视频,对比
nvstreammux的batched-push-timeout对延迟的影响; - 实现一个 三段式并发 脚手架(Reader/Infer/Writer),加入环形缓冲测丢帧率;
- 建立 Grafana 看板:展示
p50/p95/p99、批大小分布、队列时延、GPU 利用率; - 写一页短报告:当输入分辨率从 640→960 时,吞吐与 p95 的变化,以及最佳批大小选择。
小结
- Triton 负责“标准化服务与批处理 ”,DeepStream 负责“ 视频端到端 ”,FastAPI 负责“ 轻量快速起步”。
- 多路并发的核心是 批处理窗口与背压控制,I/O 往往比网络更卡脖子。
- 可观测性一定要先到位:以 p95/ 队列 / 批直方图为三大看门人,守住体验底线。
附录
给你一套 可直接落地的最小包 (Triton + DeepStream + Prometheus + Grafana)外加 FastAPI + WebSocket 客户端 示例。下载链接:https://pan.quark.cn/s/7da658d31aa8 提取码:YYn3
内容结构(精简版):
docker-compose.yml:一键起 Triton / DeepStream / Prometheus / Grafana。models/:含 ONNX 与 TensorRT 两套config.pbtxt示例(默认输入[3,640,640]、输出[8400,85]),版本目录1/已建好;把你的model.onnx或model.plan丢进去即可跑。cfg/app.txt+cfg/config_infer_primary.txt:最小 DeepStream 管线,nvinferserver → Triton gRPC;labels.txt已带上 COCO 类名。monitoring/prometheus.yml:默认抓取triton:8002/metrics。fastapi/ws_server.py&ws_client.py:WebSocket 推流 / 回传 小样;服务端可直连 Triton gRPC 或本地 ONNX Runtime。README.md:一步步跑通的命令与坑位说明。
开箱即用三步:
- 把你的模型放好
- TRT 引擎:
models/yolov8_trt/1/model.plan - ONNX:
models/yolov8_onnx/1/model.onnx
若模型签名不同,改models/*/config.pbtxt的输入 / 输出名与形状(默认输入名images,输出名output0)。
- TRT 引擎:
- 修改 DeepStream 数据源
cfg/app.txt里把source0.uri改成你的rtsp://...或file:///opt/cfg/xxx.mp4。
- 起服务
docker compose up -d # Triton: :8000(HTTP)/:8001(gRPC)/:8002(metrics) # Prometheus: :9090 Grafana: :3000 (admin/admin)
可选:WebSocket 推流 / 拉流演示
- 启服务端(走 Triton)
cd fastapi pip install -r requirements.txt export TRITON_URL=127.0.0.1:8001 export MODEL_NAME=yolov8_onnx # 或 yolov8_trt uvicorn ws_server:app --host 0.0.0.0 --port 8000 - 或启服务端(走 ONNX Runtime)
export MODEL_PATH=/abs/path/to/best.onnx uvicorn ws_server:app --host 0.0.0.0 --port 8000 - 客户端推流(摄像头 / 文件 /RTSP)
python ws_client.py --uri ws://127.0.0.1:8000/ws --src cam # 或:python ws_client.py --src file.mp4 --save out.mp4
几条端侧 / 实战注意:
- Triton 的
config.pbtxt需与模型真实签名一致(输入 / 输出名、形状、类型);不确定就用tritonclient拉get_model_config对一下。 - DeepStream 推理插件这份配置是 nvinferserver→Triton 的最小化示例;如果你要“本地直连引擎”,就把
primary-gie换成nvinfer方案并指向model-engine-file。 - Prometheus 已默认抓 Triton metrics;Grafana 里把 Prometheus 数据源设为
http://prometheus:9090,导入 Triton 相关 dashboard 即可。 - WS 帧率卡顿:把 JPEG 质量降到 70 左右、或缩小分辨率,或者在服务端做微型 动态小批(已给 Triton 开了
dynamic_batching模板)。

