Docker torchserve workflow部署流程
1. 先部署相关模型, 步骤见: Docker torchserve 部署模型流程
搭建涉及到的模型,如ocr_detection,ocr_judge,ocr_text, xxx_detection …
2. workflow文件
2.1 串行执行,示例1
(1)xxx_workflow.yaml
models:#global model paramsbatch-size: 1max-batch-delay : 10retry-attempts : 3timeout-ms : 5000ocr_detection:url : ocr_detection.mar #local or public URIocr_judge:url : ocr_judge.marocr_text:url : ocr_text.mar dag:ocr_detection : [first_processing]first_processing: [ocr_judge]ocr_judge : [second_processing]second_processing : [ocr_text]
(2)xxx_workflow_handler.py
import json
import numpy as npdef first_processing(data, context):if data:image = data[0].get("data") or data[0].get("body")image = image.decode('utf-8')image = json.loads(image)return [{"image": image}] # [{"image":["xxx","xxx"]}]return Nonedef second_processing(data, context):if data:image = data[0].get("data") or data[0].get("body")image = image.decode('utf-8')image = json.loads(image)return [{"image": image}] # [{"image":["xxx","xxx"]}]return None
2.2 串行执行,示例2
(1)xxx_workflow.yaml
models:#global model paramsbatch-size: 1max-batch-delay : 10retry-attempts : 3timeout-ms : 5000xxx_detection:url : xxx_detection.mar #local or public URIxxx_recognition:url : xxx_recognition.mardag:xxx_detection : [xxx_recognition]
2.2 并行执行,示例
(1)xxx_workflow.yaml
models:#global model paramsbatch-size: 1max-batch-delay : 10retry-attempts : 3timeout-ms : 5000xxx_branch_1:url : xxx_branch_1.mar #local or public URIxxx_branch_2:url : xxx_branch_2.marxxx_branch_3:url : xxx_branch_3.mardag:pre_processing : [xxx_branch_1,xxx_branch_2,xxx_branch_3]xxx_branch_1 : [aggregate_func]xxx_branch_2 : [aggregate_func]xxx_branch_3 : [aggregate_func]
(2)xxx_workflow_handler.py
import jsondef pre_processing(data, context):'''Empty node as a starting node since the DAG doesn't support multiple start nodes'''if data:text = data[0].get("data") or data[0].get("body")return [text]return Nonedef aggregate_func(data, context):'''Changes the output keys obtained from the individual modelto be more appropriate for the workflow output'''if data:xxx_branch_1_result = json.loads(data[0].get("xxx_branch_1"))xxx_branch_2_result = json.loads(data[0].get("xxx_branch_2"))xxx_branch_3_result = json.loads(data[0].get("xxx_branch_3"))response = {"xxx_branch_1_result": xxx_branch_1_result,"xxx_branch_2_result": xxx_branch_2_result,"xxx_branch_3_result": xxx_branch_3_result}return [response]
3. 打包workflow文件
torch-workflow-archiver -f --workflow-name xxx_workflow --spec-file xxx_workflow.yaml --handler xxx_workflow_handler.py --export-path /home/model-server/model-store/
4. 配置接口
(1)查询已注册的workflow
curl "http://localhost:8381/workflows"
(2)注册workflow并为其分配资源
将.war文件注册,注意:.war文件必须放在model-store文件夹下
,即/path/model-server/model-store
curl -X POST "$server/workflows?url=$model_url&workflow_name=$model_name"
# 示例
curl -X POST "localhost:8381/workflows?url=xxx_workflow.war&workflow_name=xxx_workflow"
(3)查看workflow状态
curl http://localhost:8381/workflows/xxx_workflow
(4)删除注册workflow
curl -X DELETE http://localhost:8381/workflows/xxx_workflow
5. workflow推理
# -*- coding: utf-8 -*-
import requests
import json
from PIL import Image
import base64
import iodef get_image_bytes(image_path):# 打开图像并转换为 RGBwith Image.open(image_path).convert("RGB") as img:# 创建一个字节流byte_stream = io.BytesIO()# 将图像保存到字节流中,格式为 JPEGimg.save(byte_stream, format="JPEG")# 获取字节流内容byte_stream.seek(0)return byte_stream.read()image_path = '/path/img1.jpg'image_bytes = get_image_bytes(image_path)
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
data = {'data':json.dumps({'image':image_base64})}
response = requests.post('http://localhost:8380/wfpredict/xxx_workflow',data = data)
print(response)
print(response.status_code)
print(eval(response.text))
参考:
容器内/serve readme或示例
https://pytorch.org/serve/workflows.html