forked from p83651209/CPM-9G-8B
add stream inference output function
This commit is contained in:
parent
ffa3ad00be
commit
ccb188a04e
|
@ -0,0 +1,21 @@
|
|||
# 流式输出操作手册
|
||||
|
||||
# 环境安装
|
||||
docker 路径:
|
||||
|
||||
由于流式输出需要特定的环境依赖,因此在新的env下进行推理和输出
|
||||
conda activate stream_info
|
||||
|
||||
# 流程:
|
||||
1 将模型进行convert处理,将训练模型转换成流式输出支持的格式
|
||||
python convert.py
|
||||
|
||||
2 模型推理: python deploy_llm_8b_demo.py
|
||||
(1) 设置CUDA_VISIBLE_DEVICES的数目
|
||||
(2) 修改LocalLoader 中的实际使用模型的属性
|
||||
(3) 在修改LocalLoader调用的时候,修改流式输出模型位置及其词表
|
||||
|
||||
3 测试请求:python request_demo.py
|
||||
若不清楚请求的ip port,可以在推理阶段保存的log文件(error_8b.log)中找到
|
||||
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
import torch
|
||||
import struct
|
||||
import numpy as np
|
||||
|
||||
|
||||
def write_string(fp, v):
|
||||
v = v.encode("utf-8")
|
||||
fp.write( struct.pack("I", len(v)) )
|
||||
fp.write(v)
|
||||
|
||||
def write_tuple(fp, v):
|
||||
fp.write( struct.pack("B", len(v)) )
|
||||
for i in v:
|
||||
fp.write( struct.pack("I", i) )
|
||||
|
||||
def write_dtype(fp, v):
|
||||
sv = -1
|
||||
if v == np.int8:
|
||||
sv = 0
|
||||
elif v == np.float16:
|
||||
sv = 1
|
||||
if sv == -1:
|
||||
raise TypeError("Unknown dtype %s" % v)
|
||||
fp.write( struct.pack("B", sv) )
|
||||
|
||||
def write_parameter(fp, name : str, value : torch.Tensor):
|
||||
write_string(fp, name)
|
||||
write_tuple(fp, value.size())
|
||||
value = np.ascontiguousarray(value.cpu().numpy())
|
||||
value_bytes = value.tobytes()
|
||||
fp.write( struct.pack("I", len(value_bytes)) )
|
||||
write_dtype(fp, value.dtype)
|
||||
fp.write(value_bytes)
|
||||
|
||||
def split(x, s):
|
||||
sizes = []
|
||||
for it in x.size():
|
||||
sizes.append(it)
|
||||
assert sizes[0] % s == 0
|
||||
sizes = [s, sizes[0] // s ] + sizes[1:]
|
||||
return x.reshape(*sizes)
|
||||
|
||||
|
||||
def main(src_model_path, dst_model_path, layer_num):
|
||||
#训练保存的原始模型
|
||||
model = torch.load(src_model_path, map_location="cpu")
|
||||
params = {}
|
||||
|
||||
params["input_embedding.weight"] = model["input_embedding.weight"].cpu()
|
||||
params["lm_head.weight"] = model["lm_head.weight"].cpu()
|
||||
params["output_layernorm.weight"] = (model["encoder.output_layernorm.weight"]).cpu()
|
||||
for i in range(layer_num):
|
||||
params[f"layers.{i}.ln_attn.weight"] = model[f"encoder.layers.{i}.self_att.layernorm_before_attention.weight"].cpu()
|
||||
|
||||
params[f"layers.{i}.attn.project_q.weight"] = model[f"encoder.layers.{i}.self_att.self_attention.project_q.weight"]
|
||||
params[f"layers.{i}.attn.project_k.weight"] = model[f"encoder.layers.{i}.self_att.self_attention.project_k.weight"]
|
||||
params[f"layers.{i}.attn.project_v.weight"] = model[f"encoder.layers.{i}.self_att.self_attention.project_v.weight"]
|
||||
|
||||
params[f"layers.{i}.attn.attn_out.weight"] = model[f"encoder.layers.{i}.self_att.self_attention.attention_out.weight"]
|
||||
|
||||
params[f"layers.{i}.ln_ff.weight"] = model[f"encoder.layers.{i}.ffn.layernorm_before_ffn.weight"].cpu()
|
||||
|
||||
params[f"layers.{i}.ff.w_in.weight"] = model[f"encoder.layers.{i}.ffn.ffn.w_in.w_0.weight"]
|
||||
params[f"layers.{i}.ff.w_gated.weight"] = model[f"encoder.layers.{i}.ffn.ffn.w_in.w_1.weight"]
|
||||
params[f"layers.{i}.ff.w_out.weight"] = model[f"encoder.layers.{i}.ffn.ffn.w_out.weight"]
|
||||
|
||||
#转换后的模型
|
||||
fout = open(dst_model_path, "wb")
|
||||
fout.write( struct.pack("I", len(params)) )
|
||||
for name, value in params.items():
|
||||
write_parameter(fout, name, value)
|
||||
fout.close()
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
src_model_path = "/home/wangyixuan/workplace/llm_service/sse/checkpoints-epoch-2/cpm9g-8b-sft-epoch-2.pt"
|
||||
dst_model_path = "model_8b.ckpt"
|
||||
|
||||
# 百亿:32
|
||||
# 千亿:80
|
||||
layer_num = 32
|
||||
|
||||
main(src_model_path, dst_model_path, layer_num)
|
|
@ -0,0 +1,152 @@
|
|||
import os
|
||||
import struct
|
||||
import json
|
||||
from typing import List
|
||||
|
||||
import libcpm
|
||||
from flask import Flask, Response, request
|
||||
|
||||
# from concurrent.futures import ThreadPoolExecutor
|
||||
# executor = ThreadPoolExecutor(1)
|
||||
|
||||
|
||||
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7"
|
||||
|
||||
|
||||
def _load_dtype(fp):
|
||||
dtype = struct.unpack("B", fp.read(1))[0]
|
||||
return dtype
|
||||
|
||||
def _load_string(fp):
|
||||
size = struct.unpack("I", fp.read(4))[0]
|
||||
return fp.read(size).decode("utf-8")
|
||||
|
||||
def _load_tuple(fp):
|
||||
ndim = struct.unpack("B", fp.read(1))[0]
|
||||
ret = []
|
||||
for i in range(ndim):
|
||||
ret.append(struct.unpack("I", fp.read(4))[0])
|
||||
return tuple(ret)
|
||||
|
||||
class LocalLoader(libcpm.ModelLoader):
|
||||
def __init__(self,
|
||||
model_path : str,
|
||||
vocab_path : str,
|
||||
):
|
||||
vocabs = []
|
||||
with open(vocab_path, "r") as fin:
|
||||
for line in fin:
|
||||
if line.startswith("\""):
|
||||
vocabs.append(json.loads(line))
|
||||
self._vocabs = vocabs
|
||||
# print(len(vocabs), "tokens")
|
||||
|
||||
with open(model_path, "rb") as fp:
|
||||
num_parameters = struct.unpack("I", fp.read(4))[0]
|
||||
parameters = {}
|
||||
for _ in range(num_parameters):
|
||||
param_name = "model." + _load_string(fp)
|
||||
_ = _load_tuple(fp)
|
||||
param_size = struct.unpack("I", fp.read(4))[0]
|
||||
_ = _load_dtype(fp)
|
||||
param = fp.read(param_size)
|
||||
parameters[param_name] = param
|
||||
self._parameters = parameters
|
||||
|
||||
def fetch_parameter(self, name):
|
||||
# print(name, len(self._parameters[name]))
|
||||
return self._parameters[name]
|
||||
|
||||
@property
|
||||
def num_layers(self):
|
||||
return 32
|
||||
|
||||
@property
|
||||
def dim_model(self):
|
||||
return 4096
|
||||
|
||||
@property
|
||||
def num_heads(self):
|
||||
return 32
|
||||
|
||||
@property
|
||||
def num_kv_heads(self):
|
||||
return 32
|
||||
|
||||
@property
|
||||
def dim_head(self):
|
||||
return 128
|
||||
|
||||
@property
|
||||
def dim_ff(self):
|
||||
return 14336
|
||||
|
||||
@property
|
||||
def tokens(self):
|
||||
return self._vocabs
|
||||
|
||||
@property
|
||||
def rope_theta(self):
|
||||
return 10000.0
|
||||
|
||||
|
||||
|
||||
model = libcpm.CPMCaterpillar(
|
||||
#add converted model and vocabs
|
||||
LocalLoader(
|
||||
"model_8b.ckpt",
|
||||
"vocabs.txt",
|
||||
),
|
||||
memory_limit = 40 << 30,
|
||||
)
|
||||
|
||||
app = Flask(__name__)
|
||||
import logging
|
||||
logging.basicConfig(filename='error_8b.log',level=logging.DEBUG)
|
||||
|
||||
@app.route("/llm", methods=["get", "post"])
|
||||
def llm():
|
||||
content: str = request.json["content"]
|
||||
if "params" in request.json:
|
||||
params = request.json["params"]
|
||||
else:
|
||||
params = {}
|
||||
# ret = executor.submit(_llm, content).result()
|
||||
ret = _llm(content, params)
|
||||
return ret
|
||||
|
||||
def _llm(content, params):
|
||||
logging.debug("~ content:\n" + content)
|
||||
logging.debug("~ input_params:\n" + json.dumps(params, ensure_ascii=False))
|
||||
|
||||
def generate_events(content):
|
||||
ipt = content.replace("<用户>", "<sep>用户:")
|
||||
ipt = ipt.replace("<AI>", "<sep>AI:")
|
||||
ipt = ipt.lstrip("<sep>")
|
||||
old_ans = ""
|
||||
logging.debug("~ ans:")
|
||||
true_params = {}
|
||||
USING_PARAMS = {"max_length", "repetition_penalty", "ngram_penalty", "seed", "temperature", "top_p", "top_k", "interval"}
|
||||
true_params = {}
|
||||
for p in USING_PARAMS:
|
||||
if p in params:
|
||||
true_params[p] = params[p]
|
||||
if "max_length" not in true_params:
|
||||
true_params["max_length"] = 4096
|
||||
|
||||
logging.debug("~ true_params:\n" + json.dumps(true_params, ensure_ascii=False))
|
||||
for it in model.random_search(ipt, **true_params):
|
||||
ans = it["result"]
|
||||
if ans is not None:
|
||||
return_data = "data:" + json.dumps({"text": ans[len(old_ans):]}, ensure_ascii=False) + "\n\n"
|
||||
yield return_data
|
||||
logging.debug("return_data[" + return_data.strip() + "]")
|
||||
old_ans = ans
|
||||
if it["stoped"]:
|
||||
break
|
||||
logging.debug("\n")
|
||||
return Response(generate_events(content), mimetype="text/event-stream")
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=8888, debug=True, use_reloader=False)
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
import json
|
||||
import pprint
|
||||
import requests
|
||||
import sseclient # pip install sseclient-py
|
||||
|
||||
|
||||
|
||||
# content = "hello"
|
||||
|
||||
|
||||
url = "http://10.1.2.1:8888/llm"
|
||||
|
||||
payload = json.dumps({
|
||||
"content": "<用户>hello<AI>"
|
||||
})
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
"accept": "text/event-stream"
|
||||
}
|
||||
|
||||
response = requests.request("POST", url, stream=True, headers=headers, data=payload)
|
||||
|
||||
# print(response.text)
|
||||
|
||||
|
||||
client = sseclient.SSEClient(response)
|
||||
|
||||
for event in client.events():
|
||||
pprint.pprint(event.data)
|
Loading…
Reference in New Issue