diff --git a/stream_infer/README.md b/stream_infer/README.md new file mode 100644 index 0000000..13e4d71 --- /dev/null +++ b/stream_infer/README.md @@ -0,0 +1,21 @@ +# 流式输出操作手册 + +# 环境安装 +docker 路径: + +由于流式输出需要特定的环境依赖,因此在新的env下进行推理和输出 +conda activate stream_info + +# 流程: +1 将模型进行convert处理,将训练模型转换成流式输出支持的格式 + python convert.py + +2 模型推理: python deploy_llm_8b_demo.py + (1) 设置CUDA_VISIBLE_DEVICES的数目 + (2) 修改LocalLoader 中的实际使用模型的属性 + (3) 在修改LocalLoader调用的时候,修改流式输出模型位置及其词表 + +3 测试请求:python request_demo.py + 若不清楚请求的ip port,可以在推理阶段保存的log文件(error_8b.log)中找到 + + diff --git a/stream_infer/convert.py b/stream_infer/convert.py new file mode 100644 index 0000000..40b6670 --- /dev/null +++ b/stream_infer/convert.py @@ -0,0 +1,84 @@ +import torch +import struct +import numpy as np + + +def write_string(fp, v): + v = v.encode("utf-8") + fp.write( struct.pack("I", len(v)) ) + fp.write(v) + +def write_tuple(fp, v): + fp.write( struct.pack("B", len(v)) ) + for i in v: + fp.write( struct.pack("I", i) ) + +def write_dtype(fp, v): + sv = -1 + if v == np.int8: + sv = 0 + elif v == np.float16: + sv = 1 + if sv == -1: + raise TypeError("Unknown dtype %s" % v) + fp.write( struct.pack("B", sv) ) + +def write_parameter(fp, name : str, value : torch.Tensor): + write_string(fp, name) + write_tuple(fp, value.size()) + value = np.ascontiguousarray(value.cpu().numpy()) + value_bytes = value.tobytes() + fp.write( struct.pack("I", len(value_bytes)) ) + write_dtype(fp, value.dtype) + fp.write(value_bytes) + +def split(x, s): + sizes = [] + for it in x.size(): + sizes.append(it) + assert sizes[0] % s == 0 + sizes = [s, sizes[0] // s ] + sizes[1:] + return x.reshape(*sizes) + + +def main(src_model_path, dst_model_path, layer_num): + #训练保存的原始模型 + model = torch.load(src_model_path, map_location="cpu") + params = {} + + params["input_embedding.weight"] = model["input_embedding.weight"].cpu() + params["lm_head.weight"] = model["lm_head.weight"].cpu() + params["output_layernorm.weight"] = (model["encoder.output_layernorm.weight"]).cpu() + for i in range(layer_num): + params[f"layers.{i}.ln_attn.weight"] = model[f"encoder.layers.{i}.self_att.layernorm_before_attention.weight"].cpu() + + params[f"layers.{i}.attn.project_q.weight"] = model[f"encoder.layers.{i}.self_att.self_attention.project_q.weight"] + params[f"layers.{i}.attn.project_k.weight"] = model[f"encoder.layers.{i}.self_att.self_attention.project_k.weight"] + params[f"layers.{i}.attn.project_v.weight"] = model[f"encoder.layers.{i}.self_att.self_attention.project_v.weight"] + + params[f"layers.{i}.attn.attn_out.weight"] = model[f"encoder.layers.{i}.self_att.self_attention.attention_out.weight"] + + params[f"layers.{i}.ln_ff.weight"] = model[f"encoder.layers.{i}.ffn.layernorm_before_ffn.weight"].cpu() + + params[f"layers.{i}.ff.w_in.weight"] = model[f"encoder.layers.{i}.ffn.ffn.w_in.w_0.weight"] + params[f"layers.{i}.ff.w_gated.weight"] = model[f"encoder.layers.{i}.ffn.ffn.w_in.w_1.weight"] + params[f"layers.{i}.ff.w_out.weight"] = model[f"encoder.layers.{i}.ffn.ffn.w_out.weight"] + + #转换后的模型 + fout = open(dst_model_path, "wb") + fout.write( struct.pack("I", len(params)) ) + for name, value in params.items(): + write_parameter(fout, name, value) + fout.close() + + + +if __name__ == '__main__': + src_model_path = "/home/wangyixuan/workplace/llm_service/sse/checkpoints-epoch-2/cpm9g-8b-sft-epoch-2.pt" + dst_model_path = "model_8b.ckpt" + + # 百亿:32 + # 千亿:80 + layer_num = 32 + + main(src_model_path, dst_model_path, layer_num) \ No newline at end of file diff --git a/stream_infer/deploy_llm_8b_demo.py b/stream_infer/deploy_llm_8b_demo.py new file mode 100644 index 0000000..d9724d3 --- /dev/null +++ b/stream_infer/deploy_llm_8b_demo.py @@ -0,0 +1,152 @@ +import os +import struct +import json +from typing import List + +import libcpm +from flask import Flask, Response, request + +# from concurrent.futures import ThreadPoolExecutor +# executor = ThreadPoolExecutor(1) + + +os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3,4,5,6,7" + + +def _load_dtype(fp): + dtype = struct.unpack("B", fp.read(1))[0] + return dtype + +def _load_string(fp): + size = struct.unpack("I", fp.read(4))[0] + return fp.read(size).decode("utf-8") + +def _load_tuple(fp): + ndim = struct.unpack("B", fp.read(1))[0] + ret = [] + for i in range(ndim): + ret.append(struct.unpack("I", fp.read(4))[0]) + return tuple(ret) + +class LocalLoader(libcpm.ModelLoader): + def __init__(self, + model_path : str, + vocab_path : str, + ): + vocabs = [] + with open(vocab_path, "r") as fin: + for line in fin: + if line.startswith("\""): + vocabs.append(json.loads(line)) + self._vocabs = vocabs + # print(len(vocabs), "tokens") + + with open(model_path, "rb") as fp: + num_parameters = struct.unpack("I", fp.read(4))[0] + parameters = {} + for _ in range(num_parameters): + param_name = "model." + _load_string(fp) + _ = _load_tuple(fp) + param_size = struct.unpack("I", fp.read(4))[0] + _ = _load_dtype(fp) + param = fp.read(param_size) + parameters[param_name] = param + self._parameters = parameters + + def fetch_parameter(self, name): + # print(name, len(self._parameters[name])) + return self._parameters[name] + + @property + def num_layers(self): + return 32 + + @property + def dim_model(self): + return 4096 + + @property + def num_heads(self): + return 32 + + @property + def num_kv_heads(self): + return 32 + + @property + def dim_head(self): + return 128 + + @property + def dim_ff(self): + return 14336 + + @property + def tokens(self): + return self._vocabs + + @property + def rope_theta(self): + return 10000.0 + + + +model = libcpm.CPMCaterpillar( + #add converted model and vocabs + LocalLoader( + "model_8b.ckpt", + "vocabs.txt", + ), + memory_limit = 40 << 30, +) + +app = Flask(__name__) +import logging +logging.basicConfig(filename='error_8b.log',level=logging.DEBUG) + +@app.route("/llm", methods=["get", "post"]) +def llm(): + content: str = request.json["content"] + if "params" in request.json: + params = request.json["params"] + else: + params = {} + # ret = executor.submit(_llm, content).result() + ret = _llm(content, params) + return ret + +def _llm(content, params): + logging.debug("~ content:\n" + content) + logging.debug("~ input_params:\n" + json.dumps(params, ensure_ascii=False)) + + def generate_events(content): + ipt = content.replace("<用户>", "用户:") + ipt = ipt.replace("", "AI:") + ipt = ipt.lstrip("") + old_ans = "" + logging.debug("~ ans:") + true_params = {} + USING_PARAMS = {"max_length", "repetition_penalty", "ngram_penalty", "seed", "temperature", "top_p", "top_k", "interval"} + true_params = {} + for p in USING_PARAMS: + if p in params: + true_params[p] = params[p] + if "max_length" not in true_params: + true_params["max_length"] = 4096 + + logging.debug("~ true_params:\n" + json.dumps(true_params, ensure_ascii=False)) + for it in model.random_search(ipt, **true_params): + ans = it["result"] + if ans is not None: + return_data = "data:" + json.dumps({"text": ans[len(old_ans):]}, ensure_ascii=False) + "\n\n" + yield return_data + logging.debug("return_data[" + return_data.strip() + "]") + old_ans = ans + if it["stoped"]: + break + logging.debug("\n") + return Response(generate_events(content), mimetype="text/event-stream") + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8888, debug=True, use_reloader=False) + diff --git a/stream_infer/request_demo.py b/stream_infer/request_demo.py new file mode 100644 index 0000000..64d9ec8 --- /dev/null +++ b/stream_infer/request_demo.py @@ -0,0 +1,29 @@ +import json +import pprint +import requests +import sseclient # pip install sseclient-py + + + +# content = "hello" + + +url = "http://10.1.2.1:8888/llm" + +payload = json.dumps({ + "content": "<用户>hello" +}) +headers = { + 'Content-Type': 'application/json', + "accept": "text/event-stream" +} + +response = requests.request("POST", url, stream=True, headers=headers, data=payload) + +# print(response.text) + + +client = sseclient.SSEClient(response) + +for event in client.events(): + pprint.pprint(event.data) \ No newline at end of file