177 lines
7.4 KiB
Python
177 lines
7.4 KiB
Python
# Copyright Huawei Technologies Co., Ltd. 2023-2024. All rights reserved.
|
|
import os
|
|
import argparse
|
|
import torch
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig
|
|
from msmodelslim.pytorch.llm_ptq.anti_outlier import AntiOutlier, AntiOutlierConfig
|
|
from msmodelslim.pytorch.llm_ptq.llm_ptq_tools import Calibrator, QuantConfig
|
|
|
|
|
|
from examples.convert.convert_utils import copy_tokenizer_files, modify_config
|
|
from examples.convert.model_slim.get_calibration_dataset import load_jsonl
|
|
|
|
|
|
CPU = "cpu"
|
|
NPU = "npu"
|
|
|
|
|
|
def cmd_bool(cmd_arg):
|
|
if cmd_arg == "True":
|
|
return True
|
|
elif cmd_arg == "False":
|
|
return False
|
|
raise ValueError(f"{cmd_arg} should be a boolean")
|
|
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--model_path', help="model and tokenizer path")
|
|
parser.add_argument('--save_directory')
|
|
parser.add_argument(
|
|
'--calib_texts',
|
|
type=str,
|
|
nargs='+',
|
|
default=["What's deep learning?"])
|
|
parser.add_argument(
|
|
'--calib_file',
|
|
type=str,
|
|
help='CSV or Numpy file containing tokenized input. Alternative to text input.',
|
|
default=f"{os.path.join(os.path.dirname(__file__), 'teacher_qualification.jsonl')}")
|
|
parser.add_argument(
|
|
'--calib_dataset_length',
|
|
type=int,
|
|
help='Max calibration dataset length.',
|
|
default=50)
|
|
parser.add_argument('--w_bit', type=int, default=8)
|
|
parser.add_argument('--a_bit', type=int, default=8)
|
|
parser.add_argument('--disable_names', type=str, nargs='+', default=None)
|
|
parser.add_argument('--device_type', type=str, choices=[CPU, NPU], default=CPU)
|
|
parser.add_argument('--fraction', type=float, default=0.01)
|
|
parser.add_argument("--act_method", type=int, choices=[1, 2, 3], default=1,
|
|
help=" `1`: `MinMax`, `2`: `Histogram`, `3`: `Auto`")
|
|
parser.add_argument('--co_sparse', type=cmd_bool, default=False)
|
|
parser.add_argument('--anti_method', type=str, default='',help=" `m3`: `AWQ`")
|
|
parser.add_argument('--disable_level', type=str, default='L0')
|
|
parser.add_argument('--input_ids_name', type=str, default='input_ids')
|
|
parser.add_argument('--attention_mask_name', type=str, default='attention_mask')
|
|
parser.add_argument('--do_smooth', type=cmd_bool, default=False)
|
|
parser.add_argument('--use_sigma', type=cmd_bool, default=False)
|
|
parser.add_argument('--sigma_factor', type=float, default=3.0)
|
|
parser.add_argument('--is_lowbit', type=cmd_bool, default=False)
|
|
parser.add_argument('--mm_tensor', type=cmd_bool, default=True)
|
|
parser.add_argument('--w_sym', type=cmd_bool, default=True)
|
|
parser.add_argument('--use_kvcache_quant', type=cmd_bool, default=False)
|
|
parser.add_argument('--open_outlier', type=cmd_bool, default=True)
|
|
parser.add_argument('--group_size', type=int, default=64)
|
|
return parser.parse_args()
|
|
|
|
|
|
class Quantifier:
|
|
def __init__(self, model_path_or_name, quant_config=None, anti_outlier_config=None, device_type='cpu', **kwargs):
|
|
self.device_type = device_type
|
|
device_map = CPU if self.device_type == CPU else "auto"
|
|
|
|
self.quant_config = quant_config
|
|
self.anti_outlier_config = anti_outlier_config
|
|
self.model_path_or_name = model_path_or_name
|
|
self.config = AutoConfig.from_pretrained(self.model_path_or_name, trust_remote_code=True)
|
|
self.dtype = self.config.torch_dtype if self.device_type == NPU else torch.float32
|
|
self.model = AutoModelForCausalLM.from_pretrained(
|
|
pretrained_model_name_or_path=model_path_or_name,
|
|
low_cpu_mem_usage=True, torch_dtype=self.dtype,
|
|
device_map=device_map,
|
|
use_safetensors=True, trust_remote_code=True)
|
|
|
|
tokenizer_args = kwargs.get("tokenizer_args", {})
|
|
self.tokenizer = AutoTokenizer.from_pretrained(
|
|
model_path_or_name, use_fast=False, trust_remote_code=True, legacy=False, **tokenizer_args
|
|
)
|
|
|
|
def get_tokenized_data(self, input_texts,
|
|
input_ids_name='input_ids',
|
|
attention_mask_name='attention_mask'):
|
|
tokenized_data = []
|
|
for input_text in input_texts:
|
|
inputs = self.tokenizer(input_text, return_tensors='pt', padding=True).to(self.device_type)
|
|
tokenized_data.append(
|
|
[inputs.data[input_ids_name], inputs.data[attention_mask_name]])
|
|
return tokenized_data
|
|
|
|
def convert(self, tokenized_data, save_path, disable_level):
|
|
if self.device_type == NPU:
|
|
# 避免在线编译算子,使用二进制编译的算子
|
|
torch.npu.set_compile_mode(jit_compile=False)
|
|
|
|
if self.anti_outlier_config is not None:
|
|
anti_outlier = AntiOutlier(self.model, calib_data=tokenized_data, cfg=self.anti_outlier_config)
|
|
anti_outlier.process()
|
|
|
|
if not os.path.exists(save_path):
|
|
os.mkdir(save_path)
|
|
|
|
calibrator = Calibrator(self.model, self.quant_config, calib_data=tokenized_data, disable_level=disable_level)
|
|
calibrator.run()
|
|
calibrator.save(save_path, save_type=["safe_tensor"])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
args = parse_arguments()
|
|
rank = int(os.getenv("RANK", "0"))
|
|
|
|
calib_file = args.calib_file
|
|
calib_texts = load_jsonl(calib_file) if calib_file else args.calib_texts
|
|
model_path = args.model_path
|
|
save_directory = args.save_directory
|
|
|
|
quant_conf = QuantConfig(
|
|
w_bit=args.w_bit,
|
|
a_bit=args.a_bit,
|
|
disable_names=args.disable_names,
|
|
dev_type=args.device_type,
|
|
dev_id=rank,
|
|
act_method=args.act_method,
|
|
pr=1.0, # randseed
|
|
nonuniform=False,
|
|
w_sym=args.w_sym,
|
|
mm_tensor=False,
|
|
co_sparse=args.co_sparse,
|
|
fraction=args.fraction,
|
|
sigma_factor=args.sigma_factor,
|
|
use_sigma=args.use_sigma,
|
|
is_lowbit=args.is_lowbit,
|
|
do_smooth=args.do_smooth,
|
|
use_kvcache_quant=args.use_kvcache_quant,
|
|
open_outlier=args.open_outlier,
|
|
group_size=args.group_size
|
|
)
|
|
anti_outlier_config = None
|
|
if args.anti_method == 'm3':
|
|
anti_outlier_config = AntiOutlierConfig(a_bit=args.a_bit, w_bit=args.w_bit,
|
|
anti_method=args.anti_method, w_sym=args.w_sym, dev_type=args.device_type)
|
|
elif args.anti_method:
|
|
anti_outlier_config = AntiOutlierConfig(anti_method=args.anti_method)
|
|
quantifier = Quantifier(
|
|
model_path, quant_conf, anti_outlier_config,
|
|
device_type=args.device_type
|
|
)
|
|
tokenized_calib_data = None
|
|
if calib_texts is not None:
|
|
tokenized_calib_data = quantifier.get_tokenized_data(
|
|
calib_texts,
|
|
input_ids_name=args.input_ids_name,
|
|
attention_mask_name=args.attention_mask_name
|
|
)
|
|
|
|
if not os.path.exists(save_directory):
|
|
os.makedirs(save_directory, exist_ok=True)
|
|
#为适配工具稀疏量化传入w_bit=4,a_bit=8暂时修改quant_type
|
|
quantifier.convert(tokenized_calib_data, save_directory, args.disable_level)
|
|
quant_type = f"w{args.w_bit}a{args.a_bit}"
|
|
is_sparseCompress = args.w_bit == 4 and args.a_bit == 8 and (args.co_sparse or args.is_lowbit)
|
|
if is_sparseCompress:
|
|
quant_type = "w8a8s"
|
|
auto_config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
|
|
modify_config(model_path, save_directory, auto_config.torch_dtype,
|
|
quant_type, args.use_kvcache_quant)
|
|
copy_tokenizer_files(model_path, save_directory)
|