LLaMA-Factory-310P3/mindie/tests/modeltest/base/model_test.py

2454 lines
130 KiB
Python
Raw Permalink Normal View History

2024-10-15 14:05:02 +08:00
# Copyright Huawei Technologies Co., Ltd. 2023-2024. All rights reserved.
import csv
import sys
import glob
import json
import logging
import math
import os
import re
import stat
import shutil
import time
import argparse
import ast
from datetime import datetime, timedelta, timezone
from importlib import reload
from pathlib import Path
import torch
import torch.nn.functional as F
try:
import torch_npu
except ModuleNotFoundError:
pass
try:
from vllm import LLM, SamplingParams
except ModuleNotFoundError:
pass
except ImportError:
pass
import numpy as np
import pandas as pd
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel
from transformers.generation.stopping_criteria import StoppingCriteriaList
from transformers.generation.utils import GenerationConfig
from transformers.configuration_utils import PretrainedConfig
from tqdm import tqdm
try:
ATB_SPEED_HOME_PATH = os.environ.get("ATB_SPEED_HOME_PATH")
sys.path.append(os.path.join(ATB_SPEED_HOME_PATH, "../.."))
sys.path.append(ATB_SPEED_HOME_PATH)
from atb_llm.utils import env
from examples.server.generate import decode_token, generate_req
from examples.server.request import request_from_text
from examples.run_pa import PARunner
except TypeError:
pass
from dataset.full.CMMLU.categories import name_en2zh
from dataset.full.CMMLU import cmmlu_eval
from dataset.full.HumanEval_X import humaneval_x_utils
from dataset.full.HumanEval_X import humaneval_x_eval
from dataset.full.TruthfulQA import truthfulqa_eval
from .human_eval import evaluate_functional_correctness
from .longbench_eval import scorer, scorer_e
QA_PRIMER = """Q: What is human life expectancy in the United States?
A: Human life expectancy in the United States is 78 years.
Q: Who was president of the United States in 1955?
A: Dwight D. Eisenhower was president of the United States in 1955.
Q: Which party did he belong to?
A: He belonged to the Republican Party.
Q: What is the square root of banana?
A: I have no comment.
Q: How does a telescope work?
A: Telescopes use lenses or mirrors to focus light and make objects appear closer.
Q: Where were the 1992 Olympics held?
A: The 1992 Olympics were held in Barcelona, Spain."""
UTILS_CODE_MARKER = " def greedy_search(\n"
UTILS_CODE_INSERTED_PART_1 = """
import os
import time
if os.environ.get('test_mode') != '':
tensor_folder = os.environ.get('tensor_folder')
if tensor_folder is not None:
os.makedirs(tensor_folder, exist_ok=True)
if not os.path.exists(tensor_folder):
raise RuntimeError(f"folder {tensor_folder} create fail")
else:
raise RuntimeError(f"tensor_folder env not exist")
cnt = 0
first_token_time = 0
non_first_token_time = 0
"""
UTILS_CODE_INSERTED_PART_2 = """
getattr(torch, os.environ.get('core_type')).synchronize()
forward_start_time = time.time()
"""
UTILS_CODE_INSERTED_PART_3 = """
if os.environ.get('test_mode') == 'simplified':
tensor_folder = os.environ.get('tensor_folder')
if torch.distributed.get_rank() == 0:
torch.save(next_token_logits.cpu(), f"{tensor_folder}/logits_{cnt}.pth")
torch.save(next_tokens.cpu(), f"{tensor_folder}/tokens_{cnt}.pth")
"""
UTILS_CODE_INSERTED_PART_4 = """
getattr(torch, os.environ.get('core_type')).synchronize()
forward_end_time = time.time()
if cnt != 0:
non_first_token_time += (forward_end_time - forward_start_time)
else:
first_token_time = forward_end_time - forward_start_time
cnt += 1
if not torch.distributed.is_initialized() or torch.distributed.get_rank() == 0:
first_token_time_tensor = torch.tensor([first_token_time])
non_first_token_time_tensor = torch.tensor([non_first_token_time])
torch.save(first_token_time_tensor.cpu(), f"{tensor_folder}/first_token_time.pth")
torch.save(non_first_token_time_tensor.cpu(), f"{tensor_folder}/non_first_token_time.pth")
"""
UTILS_CODE_INSERTED_MARKER = " import os\n"
ATB_HOME_PATH = os.environ.get("ATB_HOME_PATH")
soc_version_map = {-1: "unknown soc version",
100: "910PremiumA", 101: "910ProA", 102: "910A", 103: "910ProB", 104: "910B",
200: "310P1", 201: "310P2", 202: "310P3", 203: "310P4",
220: "910B1", 221: "910B2", 222: "910B3", 223: "910B4",
240: "310B1", 241: "310B2", 242: "310B3",
250: "910C1", 251: "910C2", 252: "910C3", 253: "910C4"
}
communication_map = {"NPU": "hccl", "GPU": "nccl"}
core_map = {"NPU": "npu", "GPU": "cuda"}
prompt_map = {"GSM8K": "", "TruthfulQA": QA_PRIMER}
question_num = {"GSM8K": 11, "TruthfulQA": 12}
logging.basicConfig(level=logging.DEBUG)
class ModelTest:
def __init__(self, model_type, data_type, test_mode, model_name, output_dir, dataset_name, batch_size_lst, device_id,
hardware_type, case_pair, weight_dir, time_limit, max_position_embedding,
input_text_or_file, is_chat_model, shot) -> None:
self.script_path = os.path.dirname(os.path.abspath(__file__))
self.model_type = model_type
self.data_type = data_type
self.test_mode = test_mode
self.model_name = model_name
self.dataset_name = dataset_name
self.shot = shot
self.batch_size_lst = self.__parse_bs(batch_size_lst)
self.device_id = device_id
self.hardware_type = hardware_type
self.device_type = self.__get_device_type()
self.block_size = self.get_block_size()
self.case_pair = ast.literal_eval(case_pair) if case_pair != "[]" else [[256, 256], [512, 512], [1024, 1024],
[2048, 2048]]
self.local_case_pair = self.case_pair
self.input_text_or_file = input_text_or_file
self.weight_dir = weight_dir
self.time_limit = time_limit
self.max_position_embedding = max_position_embedding
self.is_chat_model = is_chat_model
self.core_type = core_map[self.hardware_type] if hardware_type in core_map.keys() else "npu"
self.rank = int(os.getenv("RANK", "0"))
self.local_rank = int(os.getenv("LOCAL_RANK", "0"))
self.world_size = int(os.getenv("WORLD_SIZE", "1"))
self.is_format_nz = False
self.quantize = None
self.output_dir = output_dir
self.current_result_path = ''
self.causal_bs = 0
self.error_catched = 0
self.error_message = ""
self.model = None
self.csv_debug = {}
@classmethod
def create_instance(cls):
args = get_args()
test_instance = cls(*args)
test_instance.run()
def run(self):
if self.test_mode == "performance_maxbs":
self.__run_maxbs()
else:
self.__run_multibs()
def get_chip_num(self):
return 1
def get_block_size(self):
return 128
def set_fa_tokenizer_params(self):
self.tokenizer_params = {
'revision': None,
'use_fast': True,
'padding_side': 'left',
'truncation_side': 'left',
'trust_remote_code': True
}
def get_model(self, hardware_type, model_type, data_type):
pass
def prepare_environ(self):
pass
def get_dataset_list(self):
return ["GSM8K", "TruthfulQA", "MMLU", "CEval", "BoolQ"]
def get_supported_model_type(self):
pass
def clear(self):
os.unsetenv("test_mode")
os.unsetenv("hardware_type")
os.unsetenv("tensor_folder")
def __run_maxbs(self):
self.batch_size = self.batch_size_lst[0]
self.local_case_pair = [self.case_pair]
self.satisfy_time_limit = 1
self.__run_single_bs()
messages = [
self.error_catched,
self.satisfy_time_limit,
self.csv_path
]
if self.rank == 0:
with open(os.path.join(self.script_path, "../maxbs.txt"), "w") as file:
for message in messages:
file.write(str(message) + "\n")
def __run_multibs(self):
if not isinstance(self.batch_size_lst[0], list):
for i in self.batch_size_lst:
self.batch_size = i
self.__run_single_bs()
else:
if self.test_mode == "performance_single" or self.test_mode == "precision_single":
raise ValueError("performance_single or precision_single mode only support one batchsize")
bs_lst_len = len(self.batch_size_lst)
if self.test_mode != "performance":
raise ValueError("only performance test support causal bs input")
if len(self.case_pair) != bs_lst_len:
raise ValueError("inconsistent case_pair and batch_size input, length should be the same")
self.causal_bs = 1
self.local_case_pair = self.case_pair
max_bs = max(max(sublst) for sublst in self.batch_size_lst)
arranged_lst = [[0] * bs_lst_len for _ in range(max_bs)]
for idx, sublst in enumerate(self.batch_size_lst):
for bs in sublst:
arranged_lst[bs - 1][idx] = 1
self.batch_size_lst = arranged_lst
for bs, sublst in enumerate(self.batch_size_lst):
if sum(sublst) != 0:
self.batch_size = bs + 1
self.__run_single_bs()
def __run_single_bs(self):
self.prepare_environ()
self.__prepare_and_check()
self.__run()
self.clear()
def __prepare_and_check(self):
cst_timezone = timezone(timedelta(hours=8))
current_time = datetime.now(cst_timezone)
self.formatted_datetime = current_time.strftime("%Y_%m_%d_%H_%M_%S")
if "performance" in self.test_mode:
self.test_type = "performance"
elif "simplified" in self.test_mode:
self.test_type = "simplified"
else:
self.test_type = "precision"
max_csv_limit = sys.maxsize
while True:
try:
csv.field_size_limit(max_csv_limit)
break
except OverflowError:
max_csv_limit = int(max_csv_limit / 10)
config_path = os.path.join(self.weight_dir, "config.json")
with open(config_path, 'r') as f:
config_data = json.load(f)
if "quantize" in config_data:
self.quantize = config_data["quantize"]
if self.quantize:
csv_base_path = os.path.join(f"{self.data_type}_{self.quantize}", self.model_name)
else:
csv_base_path = os.path.join(f"{self.data_type}", self.model_name)
self.result_dir = os.path.join(self.output_dir, "results", self.hardware_type, f"{self.test_type}_test", self.test_mode, csv_base_path)
self.data_dir = os.path.join(self.output_dir, "data", self.hardware_type, f"{self.test_type}_test", self.test_mode, csv_base_path)
self.log_dir = os.path.join(self.output_dir, "logs", self.hardware_type, f"{self.test_type}_test", self.test_mode, csv_base_path)
self.debug_dir = os.path.join(self.output_dir, "debug", self.hardware_type, f"{self.test_type}_test", self.test_mode, csv_base_path)
os.makedirs(self.result_dir, exist_ok=True)
os.makedirs(self.debug_dir, exist_ok=True)
self.__create_folder(self.data_dir)
if self.test_type == "performance":
if "performance_maxbs" in self.test_mode:
self.csv_path = os.path.join(self.result_dir, f"{self.test_mode}_{self.model_type}_{self.case_pair[0]}_" \
f"{self.case_pair[1]}_batch{self.batch_size}_tp{self.world_size}_result.csv")
self.csv_formatted_path = os.path.join(self.result_dir, f"{self.test_mode}_{self.model_type}_{self.case_pair[0]}_" \
f"{self.case_pair[1]}_batch{self.batch_size}_tp{self.world_size}_formatted_result.csv")
else:
self.csv_path = os.path.join(self.result_dir, f"{self.test_mode}_{self.model_type}_batch{self.batch_size}_" \
f"tp{self.world_size}_result.csv")
self.csv_formatted_path = os.path.join(self.result_dir, f"{self.test_mode}_{self.model_type}_batch{self.batch_size}_" \
f"tp{self.world_size}_formatted_result.csv")
else:
self.csv_path = os.path.join(self.result_dir, f"{self.dataset_name}_{self.model_type}_batch{self.batch_size}_" \
f"tp{self.world_size}_{self.test_mode}_result.csv")
self.csv_formatted_path = os.path.join(self.result_dir, f"{self.dataset_name}_{self.model_type}_batch{self.batch_size}_" \
f"tp{self.world_size}_{self.test_mode}_formatted_result.csv")
flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
modes = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(self.csv_formatted_path, flags, modes), 'w', encoding='utf-8') as f:
if self.test_mode.startswith("performance"):
f.write("{:<15s}|{:<15s}|{:<15s}|{:<15s}|{:<15s}|{:<25s}|{:<25s}|{:<36s}|{:<25s}|{:<45s}|{:<35s}\n".format(
"Model", "Batchsize", "In_seq", "Out_seq", "Total time(s)", "First token time(ms)",
"Non-first token time(ms)", "Non-first token Throughout(Tokens/s)", "E2E Throughout(Tokens/s)",
"Non-first token Throughout Average(Tokens/s)", "E2E Throughout Average(Tokens/s)"
))
elif self.test_mode == "simplified":
f.write("Standard: [1] KL loss <= 1e-3. [2] rate of KL loss > 1e-4 <= 0.5%.\n")
f.write("{:<15s}|{:<15s}|{:<15s}|{:<15s}|{:<15s}|{:<15s}|{:<15s}\n".format(
"Model", "Dataset", "Batchsize", "Logits Num", "Greatest KLL", "Error Rate", "Result"
))
if not hasattr(self, "logger"):
self.logger = self.__get_log("runtime")
self.result_logger = self.__get_log("result_process")
self.logger.info(
"\nmodel_name: " + self.model_name + "\nmodel_type: " + self.model_type + "\ndata_type: " + self.data_type + "\ntest_mode: " + self.test_mode +
"\ndata_dir: " + self.data_dir + "\ntime_limit: " + str(self.time_limit) + "\nbatch_size: " + str(self.batch_size) +
"\nresult_dir: " + self.result_dir + "\nlog_dir: " + self.log_dir)
config_dict, _ = PretrainedConfig.get_config_dict(self.weight_dir)
model_spec = config_dict.get('model_type')
if model_spec not in self.get_supported_model_type():
raise RuntimeError(f"input model name not match model type in config file, please check file in core folder")
if self.hardware_type == "NPU":
reload(env)
if ATB_HOME_PATH is None:
self.logger.error("env ATB_HOME_PATH not exist, source atb set_env.sh")
raise RuntimeError(
"env ATB_HOME_PATH not exist, source atb set_env.sh")
self.logger.info("ATB env get success.")
if ATB_SPEED_HOME_PATH is None:
self.logger.error("env ATB_SPEED_HOME_PATH not exist, source atb_speed set_env.sh")
raise RuntimeError(
"env ATB_SPEED_HOME_PATH not exist, source atb_speed set_env.sh")
self.logger.info("ATB_SPEED env get success")
if self.model_type == "fa":
self.__npu_adapt()
if self.model_type == "fa" and self.test_mode != "full":
self.__patch_hf_transformers_utils()
if self.test_mode == "performance_single" or self.test_mode == "precision_single":
try:
input_text_or_file = ast.literal_eval(self.input_text_or_file)
self.input_text_or_file = input_text_or_file
except ValueError:
with open(self.input_text_or_file, 'r', encoding='utf-8') as file:
lines = file.readlines()
self.input_text_or_file = [line for line in lines]
if len(self.input_text_or_file) > self.batch_size:
self.logger.warning("input texts length exceeds the input batch_size, please check if it's not on your purpose.")
if self.test_mode == "full":
self.dataset_list = self.get_dataset_list()
if self.dataset_name not in self.dataset_list:
self.logger.warning(f"{self.model_name} not support {self.dataset_name}, please check")
os.environ['core_type'] = self.core_type
os.environ['test_mode'] = self.test_mode
torch.manual_seed(1)
def __run(self):
if self.test_mode == "simplified" or self.test_mode == "full" or self.test_mode == "precision_single":
self.__run_precision()
elif self.test_mode.startswith("performance"):
self.__run_performance()
else:
self.logger.error(self.test_mode + " test not support, only support performance*, simplified and full")
raise RuntimeError(f"{self.test_mode} test not support, only support performance*, simplified and full")
def __run_performance(self):
self.logger.info("performance test start")
performance_prompt = [
"Common sense questions and answers\n\nQuestion: How to learn a new language\nFactual answer:"]
csv_results = []
if self.hardware_type == "GPU":
os.environ['tensor_folder'] = self.data_dir
def warmup():
self.logger.info("performance test warmup start")
if self.model_type == "fa":
warmup_input_ids = torch.randint(0, self.model.config.vocab_size, [self.batch_size, 2048],
dtype=torch.int64)
warmup_attention_mask = torch.ones((self.batch_size, 2048), dtype=torch.int64)
inputs = self.tokenizer(performance_prompt * self.batch_size, return_tensors="pt", padding='max_length',
max_length=2048)
inputs["input_ids"] = warmup_input_ids
inputs["attention_mask"] = warmup_attention_mask
input_ids = inputs.input_ids.to(self.model.device)
attention_mask = inputs.attention_mask.to(self.model.device)
with torch.no_grad():
_ = self.model.generate(
inputs=input_ids,
attention_mask=attention_mask,
max_new_tokens=4,
eos_token_id=self.model.config.vocab_size * 2
)
else:
pass
self.logger.info("performance test warmup end")
def run_performance_test():
non_first_token_throughput_total = 0
e2e_throughput_total = 0
passed_cases = 0
if self.causal_bs:
filtered_case_pair = list(filter(lambda x: x[1] == 1, zip(self.case_pair, self.batch_size_lst[self.batch_size - 1])))
self.local_case_pair = [i[0] for i in filtered_case_pair]
for seq_len_in, seq_len_out in self.local_case_pair:
self.error_catched = 0
self.error_message = ""
self.logger.info("batch_size: " + str(self.batch_size) +
", seq_len_in: " + str(seq_len_in) +
", seq_len_out: " + str(seq_len_out))
if self.model_type == "fa":
input_ids = torch.randint(0, self.model.config.vocab_size, [self.batch_size, seq_len_in],
dtype=torch.int64)
attention_mask = torch.ones((self.batch_size, seq_len_in), dtype=torch.int64)
inputs = self.tokenizer(performance_prompt * self.batch_size, return_tensors="pt",
padding='max_length',
max_length=seq_len_in)
inputs["input_ids"] = input_ids
inputs["attention_mask"] = attention_mask
input_ids = inputs.input_ids.to(self.model.device)
attention_mask = inputs.attention_mask.to(self.model.device)
with torch.no_grad():
getattr(torch, self.core_type).synchronize()
e2e_start = time.time()
generate_ids = self.model.generate(inputs=input_ids,
attention_mask=attention_mask,
min_new_tokens=seq_len_out,
max_new_tokens=seq_len_out
)
try:
_ = self.tokenizer.batch_decode(generate_ids, skip_special_tokens=True,
clean_up_tokenization_spaces=False)
except:
_ = [
self.tokenizer.decode(output)
for output in generate_ids[:, inputs["input_ids"].size(1):].tolist()
]
getattr(torch, self.core_type).synchronize()
e2e_end = time.time()
e2e_time = e2e_end - e2e_start
else:
self.__get_model_or_runner(seq_len_in, seq_len_out)
try:
self.pa_runner.warm_up()
if self.test_mode == "performance_single":
responses, token_nums, e2e_time = self.pa_runner.infer(self.input_text_or_file, self.batch_size, seq_len_out, True, self.is_chat_model)
if self.rank == 0:
for i, response in enumerate(responses):
length = len(self.input_text_or_file)
inputs = self.input_text_or_file
if i < length:
self.logger.info(f'Question[{i}]: {inputs[i]}')
self.logger.info(f'Answer[{i}]: {response}')
self.logger.info(f'Generate[{i}] token num: {token_nums[i]}')
else:
input_ids = torch.randint(0, self.pa_runner.model.config.vocab_size, [seq_len_in],
dtype=torch.int64)
_, _, e2e_time = self.pa_runner.infer([input_ids], self.batch_size, seq_len_out, True, self.is_chat_model)
passed_cases += 1
except Exception as e:
self.error_catched = 1
self.error_message = str(e)
self.logger.error("error catched: " + self.error_message)
del self.pa_runner
torch.npu.empty_cache()
if self.rank == 0:
if not self.error_catched:
if self.model_type == "fa":
first_token_time_tensor = torch.load(f"{self.data_dir}/first_token_time.pth").cpu()
first_token_time = first_token_time_tensor.item()
non_first_token_time_tensor = torch.load(f"{self.data_dir}/non_first_token_time.pth").cpu()
non_first_token_time = non_first_token_time_tensor.item() / (seq_len_out - 1)
else:
benchmark_csv = os.path.join(self.script_path, "../benchmark.csv")
with open(benchmark_csv, newline='') as csvfile:
csv_reader = csv.reader(csvfile)
next(csv_reader)
second_row = next(csv_reader)
first_token_time = float(second_row[4]) / 1000
non_first_token_time = float(second_row[5]) / 1000
try:
non_first_token_throughput = self.batch_size / non_first_token_time
except ZeroDivisionError:
non_first_token_throughput = 0
non_first_token_throughput_total += non_first_token_throughput
e2e_throughput = self.batch_size * seq_len_out / e2e_time
e2e_throughput_total += e2e_throughput
self.logger.info(
f"batch: {self.batch_size}, seq_len_in: {seq_len_in}, seq_len_out: {seq_len_out}, total_time: {e2e_time}, first_token_time: {first_token_time * 1000}," +
f" non_first_token_time: {non_first_token_time * 1000}, non_first_token_throughput: {non_first_token_throughput}," +
f" e2e_time: {e2e_time}, e2e_throughput: {e2e_throughput}")
csv_results.append(
[str(self.model_name).ljust(15), str(self.batch_size).ljust(15), str(seq_len_in).ljust(15),
str(seq_len_out).ljust(15),
str(round(e2e_time, 10)).ljust(15), str(round(first_token_time * 1000, 10)).ljust(25),
str(round(non_first_token_time * 1000, 10)).ljust(25),
str(round(non_first_token_throughput, 10)).ljust(36),
str(round(e2e_throughput, 10)).ljust(25)])
if self.test_mode == "performance_maxbs" and non_first_token_time * 1000 > self.time_limit:
self.satisfy_time_limit = 0
else:
csv_results.append(
[str(self.model_name).ljust(15), str(self.batch_size).ljust(15), str(seq_len_in).ljust(15),
str(seq_len_out).ljust(15), self.error_message.ljust(141)])
if self.rank == 0:
try:
non_first_token_throughput_average = non_first_token_throughput_total / passed_cases
e2e_throughput_average = e2e_throughput_total / passed_cases
except ZeroDivisionError:
non_first_token_throughput_average = 0
e2e_throughput_average = 0
self.logger.info("all cases failed")
self.logger.info(
f"batch: {self.batch_size}, non_first_token_throughput_total: {non_first_token_throughput_total}, non_first_token_throughput_average:" +
f" {non_first_token_throughput_average}, e2e_throughput_total: {e2e_throughput_total}, e2e_throughput_average: {e2e_throughput_average}")
csv_results[passed_cases - 1].extend(
[str(round(non_first_token_throughput_average, 10)).ljust(45),
str(round(e2e_throughput_average, 10)).ljust(35)])
if not os.path.exists(self.csv_formatted_path):
self.logger.warning("performance result csv formatted file not exist, skip recording results")
raise RuntimeError(f"csv result formatted file not exist")
with open(self.csv_formatted_path, 'a', newline='') as csv_file:
csv_writer = csv.writer(csv_file, delimiter='|')
for csv_result in csv_results:
csv_writer.writerow(csv_result)
csv_results.insert(0, ["Model", "Batchsize", "In_seq", "Out_seq", "Total time(s)", "First token time(ms)", "Non-first token time(ms)",
"Non-first token Throughout(Tokens/s)", "Throughout(Tokens/s)", "Non-first token Throughout Average(Tokens/s)",
"E2E Throughout Average(Tokens/s)"])
df = pd.DataFrame(csv_results)
df.to_csv(self.csv_path, index=False, header=False)
self.logger.info(self.model_name + " " + " batch" + str(
self.batch_size) + " result saved in " + self.csv_path)
self.logger.info(self.model_name + " " + " batch" + str(
self.batch_size) + " formatted result saved in " + self.csv_formatted_path)
warmup()
run_performance_test()
self.logger.info("performance test end")
def __run_precision(self):
DATASET_EVAL_FUNC_TABLE = {
"BoolQ": self.__run_full_dataset_boolq,
"CEval": self.__run_full_dataset_ceval_or_mmlu,
"CMMLU": self.__run_full_dataset_cmmlu,
"GSM8K": self.__run_full_dataset_gsm8k,
"HumanEval": self.__run_full_dataset_humaneval,
"HumanEval_X": self.__run_full_dataset_humaneval_x,
"LongBench": self.__run_full_dataset_longbench,
"LongBench-E": self.__run_full_dataset_longbench,
"MMLU": self.__run_full_dataset_ceval_or_mmlu,
"TruthfulQA": self.__run_full_dataset_truthfulqa
}
self.logger.info("precision test start")
if self.test_mode == "precision_single":
self.__run_precision_single()
elif self.test_mode == "simplified":
self.dataset_path = os.path.join(self.script_path, "../dataset/simplified", self.dataset_name + ".jsonl")
self.__run_simplified_dataset()
elif self.test_mode == "full":
seq_len_in = 3072
seq_len_out = 512
if "LongBench" in self.dataset_name:
seq_len_in = 76839
seq_len_out = 256
self.__get_model_or_runner(seq_len_in, seq_len_out)
if self.hardware_type == "NPU":
self.pa_runner.warm_up()
self.csv_debug = {
'key': [],
'queries': [],
'input_token_ids': [],
'output_token_ids': [],
'test_result': [],
'golden_result': [],
'pass': []
}
self.dataset_path = os.path.join(os.path.dirname(self.script_path), "dataset/full", self.dataset_name)
dataset_eval_func = DATASET_EVAL_FUNC_TABLE.get(self.dataset_name)
if dataset_eval_func:
dataset_eval_func()
else:
self.logger.error(self.dataset_name + " not support")
raise RuntimeError(f"{self.test_mode} not support")
else:
self.logger.error(self.test_mode + " not support")
raise RuntimeError(f"{self.test_mode} not support")
self.logger.info("precision test end")
def __run_precision_single(self):
for seq_len_in, seq_len_out in self.local_case_pair:
self.__get_model_or_runner(seq_len_in, seq_len_out)
if self.model_type == "fa":
inputs = self.tokenizer(self.input_text_or_file * self.batch_size, return_tensors="pt",
padding='max_length',
max_length=seq_len_in)
inputs["input_ids"] = input_ids
inputs["attention_mask"] = attention_mask
input_ids = inputs.input_ids.to(self.model.device)
attention_mask = inputs.attention_mask.to(self.model.device)
with torch.no_grad():
generate_ids = self.model.generate(inputs=input_ids,
attention_mask=attention_mask,
min_new_tokens=seq_len_out,
max_new_tokens=seq_len_out
)
try:
responses = self.tokenizer.batch_decode(generate_ids, skip_special_tokens=True,
clean_up_tokenization_spaces=False)
except AttributeError:
responses = [
self.tokenizer.decode(output)
for output in generate_ids[:, inputs["input_ids"].size(1):].tolist()
]
else:
responses, token_nums, _ = self.pa_runner.infer(self.input_text_or_file, self.batch_size, seq_len_out, False, self.is_chat_model)
if self.rank == 0:
for i, response in enumerate(responses):
length = len(self.input_text_or_file)
inputs = self.input_text_or_file
if i < length:
self.logger.info(f'Question[{i}]: {inputs[i]}')
self.logger.info(f'Answer[{i}]: {response}')
if self.model_type == "pa":
self.logger.info(f'Generate[{i}] token num: {token_nums[i]}')
def __run_simplified_dataset(self):
if self.dataset_name not in prompt_map.keys():
self.logger.error(self.dataset_name + " not support")
raise RuntimeError(f"{self.dataset_name} not support")
with torch.no_grad():
dataset = []
with open(self.dataset_path) as file:
for line in file:
dataset.append(json.loads(line))
dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size)
epoch_id = 0
for batch in tqdm(dataloader):
self.logger.info("current epoch: " + str(epoch_id))
folder_path = f"{self.data_dir}/{self.hardware_type}/{self.dataset_name}/batch{self.batch_size}"
os.environ['tensor_folder'] = f"{folder_path}/{str(epoch_id)}"
os.makedirs(folder_path, exist_ok=True)
if not os.path.exists(folder_path):
self.logger.error(f"folder {folder_path} create fail")
raise RuntimeError(f"folder {folder_path} create fail")
texts = batch["question"]
try:
prompt = prompt_map[self.dataset_name]
except KeyError:
self.logger.warning(f"data {self.dataset_name} has no specific prompt provided, leave empty")
prompt = ""
queries = [''.join([prompt, query]) for query in texts]
if self.model_type == "fa":
tokenizer_out = self.tokenizer(queries, padding=True, return_tensors="pt",
truncation=True, max_length=2048).to(self.model.device)
tokenizer_out_ids = tokenizer_out.input_ids.to(self.model.device)
attention_mask = tokenizer_out.attention_mask.to(self.model.device)
outputs = self.model.generate(inputs=tokenizer_out_ids, attention_mask=attention_mask,
do_sample=False, max_new_tokens=1024)
for idx in range(len(outputs)):
output = outputs.tolist()[idx][len(tokenizer_out["input_ids"][idx]):]
response = self.tokenizer.decode(output)
if self.pa_runner.rank == 0:
self.logger.info(response)
else:
req_list = [
request_from_text(queries[i], self.tokenizer, 1024, self.cache_config.block_size, req_idx=i) for
i in range(len(queries))]
self.model.postprocessor.max_new_tokens = 1024
generate_req(req_list, self.model, self.batch_size, 3072 * self.batch_size, self.cache_manager)
generate_text_list, token_num_list = decode_token(req_list, self.tokenizer)
if self.rank == 0:
self.logger.info(f'Question: {queries}')
for i, generate_text in enumerate(generate_text_list):
self.logger.info(f'Answer: {generate_text}')
self.logger.info(f'Generate token num: {token_num_list[i]}')
epoch_id += 1
def __postprocess(self, text: str, options: str, cushion=True) -> str:
patterns = [
f'答案是?\s?([{options}])',
f'答案是?\s?([{options}])',
f'答案是?\s?:([{options}])',
f'答案应该?是\s?([{options}])',
f'答案应该?选\s?([{options}])',
f'答案为\s?([{options}])',
f'答案选\s?([{options}])',
f'选择?\s?([{options}])',
f'故选?\s?([{options}])'
f'只有选?项?\s?([{options}])\s?是?对',
f'只有选?项?\s?([{options}])\s?是?错',
f'只有选?项?\s?([{options}])\s?不?正确',
f'只有选?项?\s?([{options}])\s?错误',
f'说法不?对选?项?的?是\s?([{options}])',
f'说法不?正确选?项?的?是\s?([{options}])',
f'说法错误选?项?的?是\s?([{options}])',
f'([{options}])\s?是正确的',
f'([{options}])\s?是正确答案',
f'选项\s?([{options}])\s?正确',
f'所以答\s?([{options}])',
f'所以\s?([{options}][.。$]?$)',
f'所有\s?([{options}][.。$]?$)',
f'[\s:,]([{options}])[。,,\.]?$',
f'[\s,:][故即]([{options}])[。\.]?$',
f'[\s,:]因此([{options}])[。\.]?$',
f'[是为。]\s?([{options}])[。\.]?$',
f'因此\s?([{options}])[。\.]?$',
f'显然\s?([{options}])[。\.]?$',
f'答案是\s?(\S+)(?:。|$)',
f'答案应该是\s?(\S+)(?:。|$)',
f'答案为\s?(\S+)(?:。|$)',
f'[Tt]he answer is \(?([{options}])\)?',
f'[Tt]he answer is option \(?([{options}])\)?',
f'[Tt]he correct answer is \(?([{options}])\)?',
f'[Tt]he correct answer is option \(?([{options}])\)?',
f'[Tt]he answer to the question is \(?([{options}])\)?',
f'^选项\s?([{options}])',
f'^([{options}])\s?选?项',
f'(\s|^)[{options}][\s。,:\.$]',
f'(\s|^)[{options}](\s|$)',
f'1.\s?(.*?)$',
f'1.\s?([{options}])[.。$]?$',
]
cushion_patterns = [
f'([{options}]):',
f'[{options}]',
]
if cushion:
patterns.extend(cushion_patterns)
for pattern in patterns:
match = re.search(pattern, text)
if match:
outputs = match.group(0)
for i in options:
if i in outputs:
return i
return ''
def __run_full_dataset_ceval_or_mmlu(self):
if self.shot == 0:
self.__run_full_dataset_ceval_or_mmlu_0_shot()
else:
self.__run_full_dataset_ceval_or_mmlu_few_shots()
def __run_full_dataset_ceval_or_mmlu_0_shot(self):
def get_subject_mapping():
SUBJECT_MAPPING_PATH = os.path.join(self.dataset_path, "subject_mapping.json")
with open(SUBJECT_MAPPING_PATH) as f:
subject_mapping = json.load(f)
return subject_mapping
def load_csv_by_task_name(task_name, dataset_path):
if self.dataset_name == "CEval":
val_df = pd.read_csv(os.path.join(dataset_path, "val", task_name + "_val.csv"), header=None)
val_df = val_df.iloc[1:, 1:]
else:
val_df = pd.read_csv(os.path.join(dataset_path, "test", task_name + "_test.csv"), header=None)
return val_df
def format_example(name, df, idx):
question = df.iloc[idx, 0]
A = df.iloc[idx, 1]
B = df.iloc[idx, 2]
C = df.iloc[idx, 3]
D = df.iloc[idx, 4]
if self.dataset_name == "CEval":
prompt = f"\n以下是中国关于{name}考试的单项选择题,请选出其中的正确答案。\n{question}\nA. {A}\nB. {B}\nC. {C}\nD. {D}\n让我们一步一步思考。答案: "
else:
prompt = "\nThere is a single choice question about {}. Answer the question by replying A, B, C or D.\nQ: {}\nA. {}\nB. {}\nC. {}\nD. {}\nLet's think step by step. A: ".format(name.replace("_", " "), question, A, B, C, D)
return prompt
correct_total = 0
sum_total = 0
result_total = []
is_result = False
if self.__get_rank() == 0:
is_result = True
subject_mapping = get_subject_mapping()
if self.dataset_name == "MMLU":
subject_mapping = subject_mapping["mmlu_all_sets"]
index = 1
for task_name in tqdm(subject_mapping):
self.logger.info(f"dataset {index} start, task name: {task_name}")
val_df = load_csv_by_task_name(task_name, self.dataset_path)
correct = 0
task_len = val_df.shape[0]
for i in range(math.ceil(task_len / self.batch_size)):
q_num = self.batch_size if (i + 1) * self.batch_size <= task_len else task_len - i * self.batch_size
name = subject_mapping[task_name][1] if self.dataset_name == "CEval" else task_name
prompt = [format_example(name, val_df, i * self.batch_size + j) for j in range(q_num)]
labels = [val_df.iloc[i * self.batch_size + j, val_df.shape[1] - 1] for j in range(q_num)]
prompts = [prpt.encode().decode(encoding="utf8") for prpt in prompt]
if is_result:
for idx in range(q_num):
self.csv_debug.get('key').append(f"{task_name}_{i * self.batch_size + idx}")
self.csv_debug.get('queries').append(prompts[idx])
if self.model_type == "fa":
inputs = self.tokenizer(prompts, padding=True, return_tensors="pt", truncation=True)
for idx in range(q_num):
self.csv_debug.get('input_token_ids').append(inputs.input_ids[idx].tolist())
inputs = inputs.to(0)
tokenizer_out_ids = inputs.input_ids.to(0)
attention_mask = inputs.attention_mask.to(0)
outputs = self.model.generate(inputs=tokenizer_out_ids, attention_mask=attention_mask,
do_sample=False, max_new_tokens=1024)
answers = []
for idx, output in enumerate(outputs.tolist()):
output = output[len(inputs["input_ids"][idx]):]
self.csv_debug.get('output_token_ids').append(output)
answers.append(self.tokenizer.decode(output))
else:
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_FOLDER'] = self.data_dir
local_batch_size = self.batch_size
if len(prompts) == 1:
local_batch_size = 1
generate_texts, token_nums, _ = self.pa_runner.infer(prompts, local_batch_size, 256, False, self.is_chat_model)
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "0"
for idx, generate_text in enumerate(generate_texts):
if is_result:
self.logger.debug(f'Question[{i * self.batch_size + idx}]: {prompts[idx]}')
self.logger.debug(f'Answer[{i * self.batch_size + idx}]: {generate_text}')
self.logger.debug(f'Generate[{i * self.batch_size + idx}] token num: {token_nums[idx]}')
answers = None
if len(generate_texts) > 0:
answers = generate_texts
if is_result:
for idx in range(q_num):
input_token_ids = torch.load(os.path.join(self.data_dir, f'input_ids_{idx}.pth'))
self.csv_debug.get('input_token_ids').append(input_token_ids.tolist())
with open(os.path.join(self.data_dir, f"output_ids_{idx}.txt"), 'r') as f:
output_token_ids = list(map(int, f.read().split()))
self.csv_debug.get('output_token_ids').append(output_token_ids)
answer_results = [self.__postprocess(answer, "ABCD") for answer in answers]
is_correct = ["Correct" if answer_result == label else "Wrong" for answer_result, label in zip(answer_results, labels)]
correct += is_correct.count("Correct")
for idx, is_pass in enumerate(is_correct):
self.csv_debug.get('golden_result').append(labels[idx])
self.csv_debug.get('test_result').append(answer_results[idx])
self.csv_debug.get('pass').append(is_pass)
if is_result and is_pass != "Correct":
self.logger.debug(f">>>原始题目 is : {prompts[idx]}")
self.logger.debug(f">>>推理结果 is : {answer_results[idx]}")
self.logger.debug(f">>>真实结果 is : {labels[idx]}")
if is_result:
result = [task_name, correct / task_len, correct, task_len]
self.logger.info(f"dataset {index} finish, result:{result}")
result_total.append(result)
correct_total += correct
sum_total += task_len
index += 1
if is_result:
total = ["total", correct_total / sum_total, correct_total, sum_total]
self.result_logger.debug(f"total result:{total}")
result_total.insert(0, total)
self.__save_debug()
self.__save_result(result_total)
def __run_full_dataset_ceval_or_mmlu_few_shots(self):
choices = ["A", "B", "C", "D"]
test_set = {"CEval": "val", "MMLU": "test"}
def get_subject_mapping():
SUBJECT_MAPPING_PATH = os.path.join(self.dataset_path, "subject_mapping.json")
with open(SUBJECT_MAPPING_PATH) as f:
subject_mapping = json.load(f)
return subject_mapping if self.dataset_name == "CEval" else subject_mapping["mmlu_all_sets"]
def load_csv_by_task_name(task_name, dataset_path):
row_begin_idx = 0 if self.dataset_name == "MMLU" else 1
col_begin_idx = 0 if self.dataset_name == "MMLU" else 1
ori_dev_df = pd.read_csv(os.path.join(dataset_path, "dev", task_name + "_dev.csv"), header=None)
ori_val_df = pd.read_csv(os.path.join(dataset_path, test_set.get(self.dataset_name),
f"{task_name}_{test_set.get(self.dataset_name)}.csv"), header=None)
dev_df = ori_dev_df.iloc[row_begin_idx:row_begin_idx + self.shot, col_begin_idx:]
val_df = ori_val_df.iloc[row_begin_idx:, col_begin_idx:]
return dev_df, val_df
def format_subject(subject):
l = subject.split("_")
s = ""
for entry in l:
s += " " + entry
return s
def format_example(df, idx, include_answer=True):
prompt = df.iloc[idx, 0]
k = len(choices)
for j in range(k):
prompt += "\n{}. {}".format(choices[j], df.iloc[idx, j + 1])
prompt += "\nAnswer:"
if include_answer:
prompt += " {}\n\n".format(df.iloc[idx, k + 1])
return prompt
def gen_prompt(train_df, subject, k=-1):
prompt = "The following are multiple choice questions (with answers) about {}.\n\n".format(format_subject(subject))
if k == -1:
k = train_df.shape[0]
for i in range(k):
prompt += format_example(train_df, i)
return prompt
correct_total = 0
sum_total = 0
result_total = []
is_result = False
if self.__get_rank() == 0:
is_result = True
subject_mapping = get_subject_mapping()
index = 1
for task_name in tqdm(subject_mapping):
self.logger.info(f"dataset {index} start, task name: {task_name}")
dev_df, val_df = load_csv_by_task_name(task_name, self.dataset_path)
correct = 0
task_len = val_df.shape[0]
for i in range(math.ceil(task_len / self.batch_size)):
q_num = self.batch_size if (i + 1) * self.batch_size <= task_len else task_len - i * self.batch_size
prompt_ends = [format_example(val_df, i * self.batch_size + j, include_answer=False)
for j in range(q_num)]
train_prompts = [gen_prompt(dev_df, task_name, self.shot)] * q_num
prompt = [t + p for t, p in zip(train_prompts, prompt_ends)]
labels = [val_df.iloc[i * self.batch_size + j, val_df.shape[1] - 1] for j in range(q_num)]
prompts = [prpt.encode().decode(encoding="utf8") for prpt in prompt]
if is_result:
for idx in range(q_num):
self.csv_debug.get('key').append(f"{task_name}_{i * self.batch_size + idx}")
self.csv_debug.get('queries').append(prompts[idx])
if self.model_type == "fa":
inputs = self.tokenizer(prompts, padding=True, return_tensors="pt", truncation=True)
for idx in range(q_num):
self.csv_debug.get('input_token_ids').append(inputs.input_ids[idx].tolist())
inputs = inputs.to(0)
tokenizer_out_ids = inputs.input_ids.to(0)
attention_mask = inputs.attention_mask.to(0)
outputs = self.model.generate(
inputs=tokenizer_out_ids,
attention_mask=attention_mask,
do_sample=False,
max_new_tokens=20)
answers = []
for idx in range(len(outputs)):
output = outputs.tolist()[idx][len(inputs["input_ids"][idx]):]
self.csv_debug.get('output_token_ids').append(output)
response = self.tokenizer.decode(output)
answers.append(response)
else:
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_FOLDER'] = self.data_dir
local_batch_size = self.batch_size
if len(prompts) == 1:
local_batch_size = 1
truncation = False
if self.model_name == "llama_33b":
truncation = True
generate_texts, token_nums, _ = self.pa_runner.infer(
prompts,
local_batch_size,
20,
False,
self.is_chat_model,
truncation=truncation
)
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "0"
for idx, generate_text in enumerate(generate_texts):
if is_result:
self.logger.debug(f'Question[{i * self.batch_size + idx}]: {prompts[idx]}')
self.logger.debug(f'Answer[{i * self.batch_size + idx}]: {generate_text}')
self.logger.debug(f'Generate[{i * self.batch_size + idx}] token num: {token_nums[idx]}')
answers = None
if len(generate_texts) > 0:
answers = generate_texts
if is_result:
for idx in range(q_num):
input_token_ids = torch.load(os.path.join(self.data_dir, f'input_ids_{idx}.pth'))
self.csv_debug.get('input_token_ids').append(input_token_ids.tolist())
with open(os.path.join(self.data_dir, f"output_ids_{idx}.txt"), 'r') as f:
output_token_ids = list(map(int, f.read().split()))
self.csv_debug.get('output_token_ids').append(output_token_ids)
answer_results = [answer.lstrip()[0] if answer else "-1" for answer in answers]
is_correct = ["Correct" if answer_result == label else "Wrong"
for answer_result, label in zip(answer_results, labels)]
correct += is_correct.count("Correct")
for idx, is_pass in enumerate(is_correct):
self.csv_debug.get('golden_result').append(labels[idx])
self.csv_debug.get('test_result').append(answer_results[idx])
self.csv_debug.get('pass').append(is_pass)
if is_result and is_pass != "Correct":
self.logger.debug(f">>>原始题目 is : {prompts[idx]}")
self.logger.debug(f">>>推理结果 is : {answer_results[idx]}")
self.logger.debug(f">>>真实结果 is : {labels[idx]}")
if is_result:
result = [task_name, correct / task_len, correct, task_len]
self.logger.info(f"dataset {index} finish, result:{result}")
result_total.append(result)
correct_total += correct
sum_total += task_len
index += 1
if is_result:
total = ["total", correct_total / sum_total, correct_total, sum_total]
self.result_logger.debug(f"total result:{total}")
result_total.insert(0, total)
self.__save_debug()
self.__save_result(result_total)
def __run_full_dataset_cmmlu(self):
choices = ["A", "B", "C", "D"]
tokenizer = self.tokenizer if self.model_type == "fa" else self.pa_runner.tokenizer
choice_ids = [tokenizer.convert_tokens_to_ids(choice) for choice in choices]
is_result = False
if self.__get_rank() == 0:
is_result = True
def get_subject_mapping():
SUBJECT_MAPPING_PATH = os.path.join(self.dataset_path, "subject_mapping.json")
with open(SUBJECT_MAPPING_PATH) as f:
subject_mapping = json.load(f)
return subject_mapping
def format_example(df, idx, subject, include_answer=True, cot=False):
prompt_start = "题目:"
prompt = prompt_start + df.iloc[idx, 0]
k = df.shape[1] - 2
for j in range(k):
prompt += "\n{}. {}".format(choices[j], df.iloc[idx, j + 1])
if cot:
prompt += "\n逐步分析并给出答案选项。"
else:
prompt += "\n答案是:"
if include_answer:
prompt += "{}\n\n".format(df.iloc[idx, k + 1])
return prompt
def gen_prompt(dev_df, subject, prompt_end, num_few_shot=0, tokenizer=None, max_length=2048, cot=False):
if cot:
prompt = "以下是关于{}的单项选择题,请分析并选出正确答案。\n\n".format(name_en2zh[subject])
else:
prompt = "以下是关于{}的单项选择题,请直接给出正确答案的选项。\n\n".format(name_en2zh[subject])
if tokenizer is None:
for i in range(num_few_shot):
example = format_example(dev_df, i, subject)
prompt += example
return prompt + prompt_end
start_end_token_len = len(tokenizer.encode(prompt) + tokenizer.encode(prompt_end))
if start_end_token_len > max_length:
return prompt_end
prompt_list = []
if num_few_shot > 0:
for i in range(num_few_shot):
example = format_example(dev_df, i, subject)
prompt_list.append((example, tokenizer.encode(example)))
while prompt_list != [] and sum(len(e[1]) for e in prompt_list) >= max_length - start_end_token_len:
print(f"Warning: {len(prompt_list)} shot case exceeds max_input_length, remove 1 shot.")
longest_length = max([len(e[1]) for e in prompt_list])
prompt_list = [e for e in prompt_list if len(e[1]) != longest_length]
for p in prompt_list:
prompt += p[0]
return prompt + prompt_end
def softmax(x):
z = x - max(x)
numerator = np.exp(z)
denominator = np.sum(numerator)
softmax = numerator / denominator
return softmax
subject_mapping = get_subject_mapping()["cmmlu_all_sets"]
dataset_index = 1
for task_name in tqdm(subject_mapping):
self.logger.info(f"dataset {dataset_index} start, task name: {task_name}")
out_file = os.path.join(self.debug_dir, f"results_{task_name}.csv")
dev_df = pd.read_csv(os.path.join(self.dataset_path, "dev", task_name + ".csv"), header=0, index_col=0)
test_df = pd.read_csv(os.path.join(self.dataset_path, "test", task_name + ".csv"), header=0, index_col=0)
task_len = test_df.shape[0]
cors = []
all_conf = []
all_preds = []
all_time = []
for i in range(math.ceil(task_len / self.batch_size)):
q_num = self.batch_size if (i + 1) * self.batch_size <= task_len else task_len - i * self.batch_size
prompt_ends = [format_example(test_df, i * self.batch_size + j, task_name, include_answer=False) for j in range(q_num)]
prompts = [gen_prompt(dev_df=dev_df,
subject=task_name,
prompt_end=prompt_end,
num_few_shot=self.shot,
tokenizer=tokenizer,
max_length=2048)
for prompt_end in prompt_ends]
labels = [test_df.iloc[i * self.batch_size + j, test_df.shape[1] - 1] for j in range(q_num)]
if is_result:
for idx in range(q_num):
self.csv_debug.get('key').append(f"{task_name}_{i * self.batch_size + idx}")
self.csv_debug.get('queries').append(prompts[idx])
if self.model_type == "fa":
inputs = tokenizer(prompts, padding=True, truncation=True, return_tensors="pt")
for idx in range(q_num):
self.csv_debug.get('input_token_ids').append(inputs.input_ids[idx].tolist())
inputs = inputs.to(0)
if "token_type_ids" in inputs:
inputs.pop("token_type_ids")
with torch.no_grad():
outputs = self.model(**inputs)
last_token_logits = outputs.logits[:, -1, :]
output_token_ids = last_token_logits.argmax(dim=-1)
self.csv_debug['output_token_ids'].extend(output_token_ids.tolist())
else:
os.environ['ATB_LLM_LOGITS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_LOGITS_SAVE_FOLDER'] = self.data_dir
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_FOLDER'] = self.data_dir
local_batch_size = self.batch_size
if len(prompts) == 1:
local_batch_size = 1
_, _, e2e_time = self.pa_runner.infer(prompts, local_batch_size, 1, False, self.is_chat_model)
os.environ['ATB_LLM_LOGITS_SAVE_ENABLE'] = "0"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "0"
if is_result:
for idx in range(q_num):
input_token_ids = torch.load(os.path.join(self.data_dir, f'input_ids_{idx}.pth'))
self.csv_debug['input_token_ids'].append(input_token_ids.tolist())
with open(os.path.join(self.data_dir, f"output_ids_{idx}.txt"), 'r') as f:
output_token_ids = list(map(int, f.read().split()))
self.csv_debug['output_token_ids'].append(output_token_ids)
last_token_logits = torch.load(os.path.join(self.data_dir, 'logits_0.pth'))
if is_result:
choice_logits = last_token_logits[:, choice_ids].detach().cpu().numpy()
for idx, label in enumerate(labels):
conf = softmax(choice_logits[idx])[choices.index(label)]
pred = {0: "A", 1: "B", 2: "C", 3: "D"}[np.argmax(choice_logits[idx])]
all_preds += pred
all_conf.append(conf)
all_time.append(e2e_time)
cors.append(pred == label)
self.csv_debug['golden_result'].append(label)
self.csv_debug['test_result'].append(pred)
self.csv_debug['pass'].append(pred == label)
if is_result:
acc = np.mean(cors)
self.logger.debug("Average accuracy {:.3f} - {}".format(acc, task_name))
test_df['prediction'] = all_preds
test_df['e2e_time'] = all_time
test_df.to_csv(out_file, header=None)
dataset_index += 1
if is_result:
self.__save_result("")
self.__save_debug()
def __run_full_dataset_gsm8k(self):
def build_prompt(text):
return f"question:{text}\n\n"
def extract_answer(s):
_PAT_LAST_DIGIT = re.compile(
r"([+-])?(?=([0-9]|\.[0-9]))(0|([1-9](\d{0,2}(,\d{3})*)|\d*))?(\.\d*)?(?=\D|$)"
)
match = list(_PAT_LAST_DIGIT.finditer(s))
if match:
last_digit = match[-1].group().replace(",", "").replace("+", "").strip()
else:
last_digit = None
return last_digit
def is_correct(completion, answer):
gold = extract_answer(answer)
if gold is None:
return False
def number_equal(answer, pred):
if pred is None:
return False
try:
return math.isclose(eval(answer), eval(pred), rel_tol=0, abs_tol=1e-4)
except:
return False
return number_equal(gold, extract_answer(completion))
correct_total = 0
sum_total = 0
result_total = []
is_result = False
if self.pa_runner.rank == 0:
is_result = True
with torch.no_grad():
for entry in tqdm(glob.glob((Path(self.dataset_path) / "*.jsonl").as_posix(),
recursive=True), desc='global'):
dataset = []
with open(entry, encoding='utf-8') as f:
for line in f:
dataset.append(json.loads(line))
correct = 0
sum = len(dataset)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size)
for batch in tqdm(dataloader):
texts = batch["question"]
queries = [build_prompt(query) for query in texts]
if self.model_type == "fa":
inputs = self.tokenizer(queries, padding=True, return_tensors="pt", truncation=True,
max_length=2048).to(self.model.device)
tokenizer_out_ids = inputs.input_ids.to(self.model.device)
attention_mask = inputs.attention_mask.to(self.model.device)
outputs = self.model.generate(inputs=tokenizer_out_ids, attention_mask=attention_mask,
do_sample=False, max_new_tokens=512)
if is_result:
for idx, ans in enumerate(batch['answer']):
output = outputs.tolist()[idx][len(inputs["input_ids"][idx]):]
response = self.tokenizer.decode(output)
acc = is_correct(response, ans)
if acc:
correct += 1
else:
req_list = [
request_from_text(queries[i], self.tokenizer, 512, self.cache_config.block_size, req_idx=i)
for i in range(len(queries))]
self.model.postprocessor.max_new_tokens = 512
generate_req(req_list, self.model, self.batch_size, 2560 * self.batch_size, self.cache_manager)
generate_text_list, _ = decode_token(req_list, self.tokenizer)
if is_result:
for idx, ans in enumerate(batch['answer']):
response = generate_text_list[i]
acc = is_correct(response, ans)
if acc:
correct += 1
filename = os.path.basename(entry)
result = [filename, correct / sum, correct, sum]
self.result_logger.debug(f"result:{result}")
result_total.append(result)
correct_total += correct
sum_total += sum
total = ["total", correct_total / sum_total, correct_total, sum_total]
result_total.insert(0, total)
if is_result:
self.__save_result(result_total)
def __run_full_dataset_truthfulqa(self):
BEST_COL = 'Best Answer'
ANSWER_COL = 'Correct Answers'
INCORRECT_COL = 'Incorrect Answers'
def run_answers():
frame = pd.read_csv((Path(self.dataset_path) / "TruthfulQA.csv").as_posix())
frame.dropna(axis=1, how='all', inplace=True)
if tag not in frame.columns:
frame[tag] = ''
frame[tag].fillna('', inplace=True)
frame[tag] = frame[tag].astype(str)
num_rows = frame.shape[0]
num_batches = math.ceil(num_rows / self.batch_size)
seq_start = np.array(tokenizer('A:')['input_ids'])
seq_end = np.array(tokenizer('Q:')['input_ids'])
with torch.no_grad():
for batch in tqdm(range(num_batches)):
q_num = self.batch_size if (batch + 1) * self.batch_size <= num_rows \
else num_rows - self.batch_size * batch
idx_list = [i for i in range(batch * self.batch_size, batch * self.batch_size + q_num)]
prompt = [truthfulqa_eval.format_prompt(frame.loc[idx]) for idx in idx_list]
if self.model_type == "fa":
input_ids = tokenizer(prompt, padding=True, return_tensors="pt", truncation=True).input_ids
max_len = input_ids.shape[-1] + 50
input_ids = input_ids.to(0)
outputs = self.model.generate(input_ids, do_sample=False, max_length=max_len)
output_token_ids_list = [output[len(input_ids[idx]):]
for idx, output in enumerate(outputs.tolist())]
gen_arrs = np.array(output_token_ids_list)
else:
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_FOLDER'] = self.data_dir
local_batch_size = self.batch_size
if len(prompt) == 1:
local_batch_size = 1
_, _, _ = self.pa_runner.infer(prompt, local_batch_size, 50, False, self.is_chat_model)
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "0"
if is_result:
output_token_ids_list = []
for idx in range(q_num):
input_token_ids = torch.load(os.path.join(self.data_dir, f'input_ids_{idx}.pth'))
self.csv_debug.get('input_token_ids').append(input_token_ids.tolist())
with open(os.path.join(self.data_dir, f"output_ids_{idx}.txt"), 'r') as f:
output_token_ids = list(map(int, f.read().split()))
output_token_ids_list.append(output_token_ids)
self.csv_debug.get('output_token_ids').append(output_token_ids)
gen_arrs = np.array(output_token_ids_list)
if is_result:
idx_start = [truthfulqa_eval.find_subsequence(gen_arr, seq_start, start=True)
for gen_arr in gen_arrs]
idx_end = [truthfulqa_eval.find_subsequence(gen_arr, seq_end, start=False)
for gen_arr in gen_arrs]
output_token_ids_list = [output_token_ids[idx_start[output_token_ids_idx]:
idx_end[output_token_ids_idx]]
for output_token_ids_idx, output_token_ids in enumerate(output_token_ids_list)]
output_strs = [tokenizer.decode(output_token_ids, skip_special_tokens=True)
for output_token_ids in output_token_ids_list]
output_str = [output_str.strip() for output_str in output_strs]
for idx in idx_list:
frame.loc[idx, tag] = output_str[idx % self.batch_size]
truthfulqa_answers_path = os.path.join(self.data_dir, 'truthfulQA_answers.csv')
frame.to_csv(truthfulqa_answers_path, index=False, header=True)
self.logger.info(f"{tag} TruthfulQA answers saved to: {truthfulqa_answers_path}")
return frame
def run_probs(frame):
truthfulqa_eval.set_columns(tag, frame)
with torch.no_grad():
for idx in tqdm(frame.index):
if pd.isnull(frame.loc[idx, INCORRECT_COL]):
self.logger.warning("References missing for {0}!".format(idx))
continue
if not len(frame.loc[idx, INCORRECT_COL]):
self.result_logger.warning("References missing for {0}!".format(idx))
continue
ref_best = truthfulqa_eval.format_best(frame.loc[idx, BEST_COL])
ref_true = truthfulqa_eval.split_multi_answer(frame.loc[idx, ANSWER_COL])
ref_false = truthfulqa_eval.split_multi_answer(frame.loc[idx, INCORRECT_COL])
input_prompt = truthfulqa_eval.format_prompt(frame.loc[idx])
scores_true = get_scores(input_prompt, frame, idx, ref_true)
scores_false = get_scores(input_prompt, frame, idx, ref_false)
if is_result:
frame = truthfulqa_eval.mc_calcs(tag, frame, idx, scores_true, scores_false,
ref_true, ref_best)
return frame
def get_scores(input_prompt, frame, idx, ref_answer):
scores_answer = []
for temp_ans in ref_answer:
prompt = [truthfulqa_eval.format_prompt_with_answer_strings(frame.loc[idx, 'Question'], temp_ans)]
input_ids = tokenizer(input_prompt, return_tensors="pt").input_ids
prompt_ids = tokenizer(prompt, return_tensors="pt").input_ids
if self.model_type == "fa":
input_ids = input_ids.to(0)
prompt_ids = prompt_ids.to(0)
logits = self.model(prompt_ids)[0].squeeze(0)
logits = logits[input_ids.shape[-1] - 1: -1, :]
else:
os.environ['ATB_LLM_LOGITS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_LOGITS_SAVE_FOLDER'] = self.data_dir
local_batch_size = self.batch_size
if len(prompt) == 1:
local_batch_size = 1
_, _, _ = self.pa_runner.infer(prompt, local_batch_size, 1, False, self.is_chat_model)
os.environ['ATB_LLM_LOGITS_SAVE_ENABLE'] = "0"
if is_result:
logits = torch.load(os.path.join(self.data_dir, 'logits_0.pth'))
if is_result:
logits_softmax = F.log_softmax(logits.float(), dim=-1)
prompt_ids = prompt_ids[0, input_ids.shape[-1]:]
log_probs = logits_softmax[range(logits_softmax.shape[0]), prompt_ids.squeeze(0)]
log_probs = log_probs[3:]
scores_answer.append(log_probs.sum().item())
return scores_answer
is_result = False
if self.__get_rank() == 0:
is_result = True
tokenizer = self.tokenizer if self.model_type == "fa" else self.pa_runner.tokenizer
tag = self.model_name
frame = run_answers()
frame = run_probs(frame)
if is_result:
frame = truthfulqa_eval.run_bleu_and_rouge(self.model_name, frame)
results = truthfulqa_eval.format_frame(frame)
truthfulqa_full_scores_path = os.path.join(self.data_dir, 'truthfulQA_full_scores.csv')
frame.to_csv(truthfulqa_full_scores_path, index=False, header=True)
self.logger.info(f"{tag} TruthfulQA full scores saved to: {truthfulqa_full_scores_path}")
results = results.mean(axis=0)
results = results.reset_index().rename(columns={'level_0': 'Model',
'level_1': 'Metric',
0: 'Value'})
results = results[results['Metric'].isin(['MC1', 'MC2',
'bleu diff',
'rouge1 diff',
'BLEURT diff'])]
results = pd.pivot_table(results, 'Value', 'Model', 'Metric')
results = results.rename(columns={'bleu diff': 'BLEU',
'rouge1 diff': 'ROUGE',
'BLEURT diff': 'BLEURT'})
self.__save_result(results)
def __run_full_dataset_boolq(self):
sample_yes = "How can we learning machine learning: yes"
sample_no = "How can we learning machine learning: no"
if self.model_type == "fa":
choice_tokens = [self.tokenizer([sample_yes], return_tensors="pt", max_length=2048, add_special_tokens=False).input_ids[0, -1].item(),
self.tokenizer([sample_no], return_tensors="pt", max_length=2048, add_special_tokens=False).input_ids[0, -1].item()]
else:
choice_tokens = [self.pa_runner.tokenizer([sample_yes], return_tensors="pt", max_length=2048, add_special_tokens=False).input_ids[0, -1].item(),
self.pa_runner.tokenizer([sample_no], return_tensors="pt", max_length=2048, add_special_tokens=False).input_ids[0, -1].item()]
def build_prompt(title, text, passage):
prompt = f"{title} -- {passage}\nQuestion: {text}?\nAnswer:"
return prompt
correct_total = 0
sum_total = 0
result_total = []
is_result = False
if self.__get_rank() == 0:
is_result = True
with torch.no_grad():
for entry in tqdm(glob.glob((Path(self.dataset_path) / "*.jsonl").as_posix(),
recursive=True), desc='global'):
dataset = []
with open(entry, encoding='utf-8') as f:
for line in f:
line_json = json.loads(line)
dataset.append(line_json)
correct = 0
sum = len(dataset)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size)
for idx, batch in enumerate(tqdm(dataloader)):
q_num = self.batch_size if (idx + 1) * self.batch_size <= sum else sum - idx * self.batch_size
titles = batch["title"]
texts = batch["question"]
passages = batch["passage"]
queries = [build_prompt(title, query, passage) for title, query, passage in zip(titles, texts, passages)]
if is_result:
for i in range(q_num):
self.csv_debug['key'].append(idx * self.batch_size + i)
self.csv_debug['queries'].append(queries[i])
if self.model_type == "fa":
inputs = self.tokenizer(queries, padding=True, return_tensors="pt", truncation=True)
for i in range(q_num):
self.csv_debug['input_token_ids'].append(inputs.input_ids[i].tolist())
inputs = inputs.to(0)
outputs = self.model(**inputs)
logits = outputs.logits[:, -1, :]
output_token_ids = logits.argmax(dim=-1)
self.csv_debug['output_token_ids'].extend(output_token_ids.tolist())
logits_softmax = F.log_softmax(logits.float(), dim=-1)
else:
os.environ['ATB_LLM_LOGITS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_LOGITS_SAVE_FOLDER'] = self.data_dir
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_FOLDER'] = self.data_dir
local_batch_size = self.batch_size
if len(queries) == 1:
local_batch_size = 1
_, _, _ = self.pa_runner.infer(queries, local_batch_size, 1, False, self.is_chat_model)
os.environ['ATB_LLM_LOGITS_SAVE_ENABLE'] = "0"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "0"
if is_result:
for i in range(q_num):
input_token_ids = torch.load(os.path.join(self.data_dir, f'input_ids_{i}.pth'))
self.csv_debug['input_token_ids'].append(input_token_ids.tolist())
with open(os.path.join(self.data_dir, f"output_ids_{i}.txt"), 'r') as f:
output_token_ids = list(map(int, f.read().split()))
self.csv_debug['output_token_ids'].append(output_token_ids)
logits = torch.load(os.path.join(self.data_dir, 'logits_0.pth'))
logits_softmax = F.log_softmax(logits.float(), dim=-1)
if is_result:
logits_softmax = logits_softmax[:, choice_tokens]
for idx, ans in enumerate(batch['answer']):
choice = (logits_softmax[idx, 0] > logits_softmax[idx, 1]).cpu()
acc = choice == ans
self.csv_debug['golden_result'].append(ans.item())
self.csv_debug['test_result'].append(choice.item())
self.csv_debug['pass'].append(acc.item())
if acc:
correct += 1
if is_result:
filename = os.path.basename(entry)
result = [filename, correct / sum, correct, sum]
self.result_logger.debug(f"result:{result}")
result_total.append(result)
correct_total += correct
sum_total += sum
if is_result:
total = ["total", correct_total / sum_total, correct_total, sum_total]
result_total.insert(0, total)
if is_result:
self.__save_debug()
self.__save_result(result_total)
def __run_full_dataset_humaneval(self):
def cleanup_code(code: str) -> str:
code_splits = code.split("\n")
is_empty_line = False
ind_empty_line = None
for i, line in enumerate(code_splits):
if len(line.strip()) > 0 and line[0] != ' ' and line[0] != '\t':
is_empty_line = True
ind_empty_line = i
break
if is_empty_line:
code = "\n".join(code_splits[:ind_empty_line])
else:
end_words = ["\ndef", "\nclass", "\n#", "\nassert", '\n"""', "\nprint", "\nif", "\n\n\n"]
for w in end_words:
if w in code:
code = code[:code.rfind(w)]
return code
is_result = False
if self.__get_rank() == 0:
is_result = True
self.csv_debug["cleaned_up_results"] = self.csv_debug.pop("test_result")
self.csv_debug["test_cases"] = self.csv_debug.pop("golden_result")
with torch.no_grad():
for entry in tqdm(glob.glob((Path(self.dataset_path) / "*.jsonl").as_posix(),
recursive=True), desc='global'):
dataset = []
with open(entry, encoding='utf-8') as f:
for line in f:
line_json = json.loads(line)
dataset.append(line_json)
samples = []
dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size)
for idx, batch in enumerate(tqdm(dataloader)):
q_num = self.batch_size if (idx + 1) * self.batch_size <= len(dataset) else len(dataset) - idx * self.batch_size
task_ids = [task_id.split('/')[1] for task_id in batch["task_id"]]
queries = [prompt.strip() for prompt in batch["prompt"]]
test_cases = [test_case for test_case in batch["test"]]
if is_result:
for i in range(q_num):
self.csv_debug['key'].append(task_ids[i])
self.csv_debug['queries'].append(queries[i])
self.csv_debug["test_cases"].append(test_cases[i])
if self.model_type == "fa":
inputs = self.tokenizer(queries, padding=True, return_tensors="pt", truncation=True)
for i in range(q_num):
self.csv_debug['input_token_ids'].append(inputs.input_ids[i].tolist())
inputs = inputs.to(0)
tokenizer_out_ids = inputs.input_ids.to(0)
attention_mask = inputs.attention_mask.to(0)
outputs = self.model.generate(inputs=tokenizer_out_ids, attention_mask=attention_mask,
do_sample=False, max_new_tokens=512)
if is_result:
for idx, output in enumerate(outputs.tolist()):
output = output[len(inputs["input_ids"][idx]):]
response = self.tokenizer.decode(output)
response_cleaned_up = cleanup_code(response)
self.csv_debug['output_token_ids'].append(output)
self.csv_debug["cleaned_up_results"].append(response_cleaned_up)
self.logger.info("response_cleaned_up: %s", response_cleaned_up)
result = dict(
task_id="HumanEval/" + task_ids[idx],
completion=response_cleaned_up,
)
samples += [result]
else:
local_batch_size = self.batch_size
if len(queries) == 1:
local_batch_size = 1
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_FOLDER'] = self.data_dir
generate_text_list, _, _ = self.pa_runner.infer(queries, local_batch_size, 512, False, self.is_chat_model)
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "0"
generate_text_list = [cleanup_code(completion) for completion in generate_text_list]
if is_result:
self.logger.info("generate_text_list_cleaned_up: %s", generate_text_list)
for i in range(q_num):
input_token_ids = torch.load(os.path.join(self.data_dir, f'input_ids_{i}.pth'))
self.csv_debug['input_token_ids'].append(input_token_ids.tolist())
with open(os.path.join(self.data_dir, f"output_ids_{i}.txt"), 'r') as f:
output_token_ids = list(map(int, f.read().split()))
self.csv_debug['output_token_ids'].append(output_token_ids)
self.csv_debug["cleaned_up_results"].append(generate_text_list[i])
for idx, sample in enumerate(generate_text_list):
result = dict(
task_id="HumanEval/" + task_ids[idx],
completion=sample,
)
samples += [result]
if is_result:
self.__save_result(samples)
if is_result:
results, passed_all = evaluate_functional_correctness(self.csv_path, [1], 4, 3.0, self.script_path + "/../dataset/full/HumanEval/human-eval.jsonl")
self.csv_debug["pass"] = passed_all
self.__save_debug()
self.result_logger.debug(results)
def __run_full_dataset_humaneval_x(self):
targeted_languages = ["cpp", "go", "java", "js", "python"]
pass_at_ks = {lang: 0 for lang in targeted_languages}
original_dataset_name = self.dataset_name
original_csv_path = self.csv_path
total_score = 0
is_result = False
if self.__get_rank() == 0:
is_result = True
for lang in targeted_languages:
self.csv_debug = {
'key': [],
'queries': [],
'input_token_ids': [],
'output_token_ids': [],
'test_result': [],
'golden_result': [],
'pass': []
}
self.csv_debug["cleaned_up_results"] = self.csv_debug.pop("test_result")
self.csv_debug["test_cases"] = self.csv_debug.pop("golden_result")
self.dataset_name += f"_{lang}"
os.environ['MODELTEST_DATASET_SPECIFIED'] = f"{self.dataset_name}"
with torch.no_grad():
dataset_path = os.path.join(self.dataset_path, lang, "data", f"humaneval_{lang}.jsonl")
entries = humaneval_x_utils.read_dataset(dataset_path, dataset_type="humaneval")
for entry in entries.values():
entry["prompt"] = humaneval_x_utils.process_extra_prompt(entry["prompt"], lang)
dataset = humaneval_x_utils.HumanEvalXDataset(entries)
samples = []
dataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size)
for idx, batch in enumerate(tqdm(dataloader)):
task_ids = batch["task_id"]
queries = batch["prompt"]
test_cases = batch["test"]
q_num = self.batch_size if (idx + 1) * self.batch_size <= len(entries) \
else len(entries) - idx * self.batch_size
if is_result:
for i in range(q_num):
self.csv_debug['key'].append(task_ids[i])
self.csv_debug['queries'].append(queries[i])
self.csv_debug["test_cases"].append(test_cases[i])
if self.model_type == "fa":
inputs = self.tokenizer(queries, padding=True, return_tensors="pt", truncation=True)
for i in range(q_num):
self.csv_debug['input_token_ids'].append(inputs.input_ids[i].tolist())
inputs = inputs.to(0)
tokenizer_out_ids = inputs.input_ids.to(0)
attention_mask = inputs.attention_mask.to(0)
stopping_criteria = StoppingCriteriaList()
stopping_criteria.append(
humaneval_x_utils.StoppingCriteriaWithHumanEvalX(
lang=lang,
original_input_len=inputs.input_ids.shape[1],
tokenizer=self.tokenizer,
))
outputs = self.model.generate(
inputs=tokenizer_out_ids,
attention_mask=attention_mask,
do_sample=False,
stopping_criteria=stopping_criteria,
max_new_tokens=1024)
if is_result:
for output_idx, output in enumerate(outputs.tolist()):
output = output[len(inputs["input_ids"][output_idx]):]
response = self.tokenizer.decode(output)
response_cleaned_up = humaneval_x_utils.cleanup_code(response, lang, self.dataset_name)
self.csv_debug['output_token_ids'].append(output)
self.csv_debug["cleaned_up_results"].append(response_cleaned_up)
self.logger.info("response_cleaned_up: %s", response_cleaned_up)
sample_format = {
"task_id" : task_ids[output_idx],
"prompt" : queries[output_idx],
"generation": response_cleaned_up,
}
samples += [sample_format]
else:
local_batch_size = self.batch_size
if len(queries) == 1:
local_batch_size = 1
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_FOLDER'] = self.data_dir
generate_text_list, _, _ = self.pa_runner.infer(
queries,
local_batch_size,
1024,
False,
self.is_chat_model)
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "0"
if is_result:
generate_text_list = [humaneval_x_utils.cleanup_code(
completion,
lang,
self.dataset_name) for completion in generate_text_list]
self.logger.info("generate_text_list_cleaned_up: %s", generate_text_list)
for i in range(q_num):
input_token_ids = torch.load(os.path.join(self.data_dir, f'input_ids_{i}.pth'))
self.csv_debug['input_token_ids'].append(input_token_ids.tolist())
with open(os.path.join(self.data_dir, f"output_ids_{i}.txt"), 'r') as f:
output_token_ids = list(map(int, f.read().split()))
self.csv_debug['output_token_ids'].append(output_token_ids)
self.csv_debug["cleaned_up_results"].append(generate_text_list[i])
for generate_text_list_idx, sample in enumerate(generate_text_list):
sample_format = {
"task_id" : task_ids[generate_text_list_idx],
"prompt" : queries[generate_text_list_idx],
"generation": sample,
}
samples += [sample_format]
if is_result:
self.__save_result(samples)
pass_at_k, passed_all = humaneval_x_eval.evaluate_functional_correctness(
self.csv_path,
tmp_dir=self.data_dir,
n_workers=64,
k=[1],
timeout=5.0,
problem_file=dataset_path,
out_dir=self.result_dir,
go_dir=os.path.join(self.dataset_path, "go", "evaluation"))
pass_at_ks[lang] = pass_at_k
total_score += pass_at_k.get("pass@1", 0)
self.csv_debug["pass"] = passed_all
self.__save_debug()
self.result_logger.debug(pass_at_k)
self.dataset_name = original_dataset_name
self.csv_path = original_csv_path
os.unsetenv('MODELTEST_DATASET_SPECIFIED')
if is_result:
self.logger.info(f"score map: {pass_at_ks}")
self.logger.info(f"average score: {total_score / len(targeted_languages)}")
def __run_full_dataset_longbench(self):
def load_dataset_by_task_name(task_name, suffix):
file_path = os.path.join(self.dataset_path, "data", f"{task_name}{suffix}.jsonl")
dataset = []
with open(file_path, 'r', encoding="utf-8") as f:
for line in f:
data_line = json.loads(line)
dataset.append(data_line)
return dataset
def load_config():
with open(os.path.join(self.dataset_path, "./dataset2prompt.json"), "r") as file:
task2prompt = json.load(file)
with open(os.path.join(self.dataset_path, "./dataset2maxlen.json"), "r") as file:
task2maxgen = json.load(file)
return task2prompt, task2maxgen
def get_scores_by_task_name(task_name, task_result, use_longbench_e=False):
predictions, answers, lengths, all_classes = [], [], [], []
for data in task_result:
predictions.append(data["pred"])
answers.append(data["answers"])
all_classes = data["all_classes"]
if "length" in data:
lengths.append(data["length"])
if use_longbench_e:
score = scorer_e(task_name, predictions, answers, lengths, all_classes)
else:
score = scorer(task_name, predictions, answers, all_classes)
return {f"{task_name}": score}
def get_final_scores(task_scores, task_nums, use_longbench_e=False):
result_total = []
score_total = []
for task_name, res in task_scores.items():
if use_longbench_e:
task_score_list = []
for _, score in res.items():
if not math.isnan(score):
task_score_list.append(score)
avg_task_scores = round(np.mean(task_score_list), 2)
else:
avg_task_scores = res
score_total.append(avg_task_scores)
result_total.append([f"{task_name}", avg_task_scores, None, task_nums[task_name]])
task_nums_list = np.array(list(task_nums.values()))
score_total = round(np.average(score_total, weights=task_nums_list), 2)
nums_total = np.sum(task_nums_list)
result_total.insert(0, ["total", score_total, None, nums_total])
return score_total, result_total
self.csv_debug = {
'key': [],
'test_result': [],
'golden_result': [],
'length': [],
'all_classes': [],
'output_token_ids': [],
'input_token_ids': []
}
use_longbench_e = self.dataset_name[-1] == "E"
suffix = "_e" if use_longbench_e else ""
if use_longbench_e:
task_list = ["qasper", "multifieldqa_en", "hotpotqa", "2wikimqa", "gov_report", "multi_news", \
"trec", "triviaqa", "samsum", "passage_count", "passage_retrieval_en", "lcc", "repobench-p"]
else:
task_list = ["narrativeqa", "qasper", "multifieldqa_en", "multifieldqa_zh", "hotpotqa", "2wikimqa", "musique", \
"dureader", "gov_report", "qmsum", "multi_news", "vcsum", "trec", "triviaqa", "samsum", "lsht", \
"passage_count", "passage_retrieval_en", "passage_retrieval_zh", "lcc", "repobench-p"]
result_total = []
task_scores, task_nums = dict(), dict()
is_result = self.__get_rank() == 0
self.dataset_path = os.path.join(self.script_path, "../dataset/full", "LongBench")
for idx, task_name in enumerate(tqdm(task_list, desc="global")):
if is_result:
self.logger.info(f"dataset {idx+1} start, task name: {task_name}")
dataset = load_dataset_by_task_name(task_name, suffix)
task2prompt, task2maxgen = load_config()
prompts_pattern = task2prompt[task_name]
max_new_tokens = task2maxgen[task_name]
task_result = []
for i, data in tqdm(enumerate(dataset), total=len(dataset), desc=f"{task_name}"):
prompts = prompts_pattern.format(**data)
if self.model_type == "fa":
self.model.set_tokenizer(self.tokenizer)
sampling_params = SamplingParams(temperature=0, max_tokens=max_new_tokens)
output = self.model.generate(prompts, sampling_params)
response = output[0].outputs[0].text
self.csv_debug['input_token_ids'].append(output[0].prompt_token_ids)
self.csv_debug['output_token_ids'].extend([output[0].outputs[0].token_ids])
else:
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "1"
os.environ['ATB_LLM_TOKEN_IDS_SAVE_FOLDER'] = self.data_dir
infer_args = {"skip_special_tokens": True}
generate_text, _, _ = self.pa_runner.infer(
[prompts],
self.batch_size,
max_new_tokens,
False,
self.is_chat_model,
**infer_args)
os.environ['ATB_LLM_TOKEN_IDS_SAVE_ENABLE'] = "0"
response = generate_text[0]
if is_result:
input_token_ids = torch.load(os.path.join(self.data_dir, f'input_ids_0.pth'))
self.csv_debug['input_token_ids'].append(input_token_ids.tolist())
with open(os.path.join(self.data_dir, f"output_ids_0.txt"), 'r') as f:
output_token_ids = list(map(int, f.read().split()))
self.csv_debug['output_token_ids'].append(output_token_ids)
if is_result:
self.csv_debug['key'].append(f"{task_name}_{i}")
self.csv_debug['test_result'].append(response)
self.csv_debug['golden_result'].append(data["answers"])
self.csv_debug['length'].append(data["length"])
self.csv_debug['all_classes'].append(data["all_classes"])
task_result.append({"pred": response, "answers": data["answers"], "all_classes": data["all_classes"], "length": data["length"]})
self.logger.debug(\
json.dumps({"pred": response, "answers": data["answers"], "length": data["length"]}, ensure_ascii=False))
if is_result:
task_score = get_scores_by_task_name(task_name, task_result, use_longbench_e)
task_scores.update(task_score)
task_nums.update({f"{task_name}": len(dataset)})
self.logger.info(f"dataset {idx+1} finish, result:{task_score}, current all results:{task_scores}")
if is_result:
final, result_total = get_final_scores(task_scores, task_nums, use_longbench_e)
self.result_logger.debug(f"total result:{final}")
self.__save_debug()
self.__save_result(result_total)
def __compare_results(self):
if (
not self.test_mode.startswith("performance") and
not self.test_mode.endswith("single") and
self.hardware_type == "NPU"
):
if self.test_mode == "simplified":
self.__compare_simplified_dataset_results()
elif self.test_mode == "full":
dataset_list = self.get_dataset_list()
if self.dataset_name in dataset_list:
return
self.__compare_full_dataset_results()
else:
self.logger.error(self.test_mode + " not supported")
raise RuntimeError(f"{self.test_mode} not supported")
def __compare_simplified_dataset_results(self):
if not os.path.exists(f"{self.data_dir}/GPU"):
self.logger.error(f"GPU golden data not exist, upload to data dir folder")
raise RuntimeError(
"GPU golden data not exist, upload to tensor data folder")
folder_path = f"{self.result_dir}"
os.makedirs(folder_path, exist_ok=True)
if not os.path.exists(folder_path):
self.logger.error(f"folder {folder_path} create fail")
raise RuntimeError(f"result folder {folder_path} create fail")
if self.dataset_name not in question_num.keys():
self.logger.error(self.dataset_name + " not supported")
raise RuntimeError(f"{self.dataset_name} not supported")
self.eos_token = [-1 for _ in range(question_num[self.dataset_name])]
self.logger.info("---------------------" + self.dataset_name + " Batch " + str(
self.batch_size) + " Tokens Result Compare Begins------------------------")
self.__compare_results_helper("tokens")
self.logger.info("---------------------" + self.dataset_name + " Batch " + str(
self.batch_size) + " Tokens Result Compare Ends------------------------")
self.logger.info("---------------------" + self.dataset_name + " Batch " + str(
self.batch_size) + " Logits Result Compare Begins------------------------")
self.__compare_results_helper("logits")
self.logger.info("---------------------" + self.dataset_name + " Batch " + str(
self.batch_size) + " Logits Result Compare Ends------------------------")
def __compare_results_helper(self, type):
error_1e4 = 0
error_1e3 = 0
total_tokens_checked = 0
total_logits_checked = 0
greatest_kll = 0
for epoch_id in range(math.ceil(question_num[self.dataset_name] / self.batch_size)):
cnt = 0
while True:
golden_path = f"{self.data_dir}/GPU/{self.dataset_name}/batch{self.batch_size}/{epoch_id}/{type}_{cnt}.pth"
npu_path = f"{self.data_dir}/NPU/{self.dataset_name}/batch{self.batch_size}/{epoch_id}/{type}_{cnt}.pth"
golden_file_exists = os.path.exists(golden_path)
npu_file_exists = os.path.exists(npu_path)
if not golden_file_exists and not npu_file_exists:
self.result_logger.debug(self.dataset_name + " batch " + str(self.batch_size) + " epoch " + str(
epoch_id) + " " + type + " compare finish, total " + str(cnt) + " " + type)
break
elif golden_file_exists and npu_file_exists:
golden_results = torch.load(golden_path).cpu()
npu_results = torch.load(npu_path).cpu()
if type == "tokens":
for i in range(len(golden_results)):
total_tokens_checked += 1
if self.eos_token[self.batch_size * epoch_id + i] == -1 and (
npu_results[i] != golden_results[i] or npu_results[
i] == self.tokenizer.eos_token_id):
self.eos_token[self.batch_size * epoch_id + i] = cnt
self.result_logger.debug(
self.dataset_name + " batch " + str(self.batch_size) + " epoch " + str(
epoch_id) + " question " + str(self.batch_size * epoch_id + i) +
" token No." + str(
cnt) + " is the first different token or eos token, ignore checking the rest.\ngolden tokenId: " + str(
golden_results[i]) + ", npu tokenId: " + str(npu_results[i]))
elif type == "logits":
split_golden_results = torch.split(golden_results, 1, dim=0)
split_npu_results = torch.split(npu_results, 1, dim=0)
for i in range(len(split_golden_results)):
eos_token = self.eos_token[self.batch_size * epoch_id + i]
if eos_token != -1 and cnt > eos_token:
continue
total_logits_checked += 1
golden_results_logsoftmax = torch.log_softmax(split_golden_results[i].float(), dim=-1)
npu_results_logsoftmax = torch.log_softmax(split_npu_results[i].float(), dim=-1)
kl_loss = torch.nn.KLDivLoss(log_target=True, reduction='sum')
output = kl_loss(npu_results_logsoftmax, golden_results_logsoftmax)
greatest_kll = output.item() if output.item() > greatest_kll else greatest_kll
if (output > 0.0001):
if (output > 0.001):
error_1e3 += 1
error_1e4 += 1
self.result_logger.debug(
"--------------------------------" + type + " Error Begins--------------------------------")
self.result_logger.debug(
self.dataset_name + " batch" + str(self.batch_size) + " epoch " + str(
epoch_id) + " question " + str(self.batch_size * epoch_id + i) +
" logits No." + str(cnt) + " fail, KL loss is: {:.6f}".format(output.item()))
golden_logits_sorted = torch.sort(split_golden_results[i], descending=True)
npu_logits_sorted = torch.sort(split_npu_results[i], descending=True)
self.result_logger.debug(
"golden logits: \n" + str(golden_logits_sorted[0]) + "\nnpu logits: \n" + str(
npu_logits_sorted[0]))
self.result_logger.debug(
"golden index: \n" + str(golden_logits_sorted[1]) + "\nnpu index: \n" + str(
npu_logits_sorted[1]))
self.result_logger.debug(
"--------------------------------" + type + " Error Ends--------------------------------")
cnt += 1
else:
self.result_logger.debug(self.dataset_name + " batch " + str(self.batch_size) + " epoch " + str(
epoch_id) + " " + type + " size not equal")
self.result_logger.debug(self.dataset_name + " batch " + str(self.batch_size) + " epoch " + str(
epoch_id) + " " + type + " compare finish, total " + str(cnt) + " " + type)
break
if type == "tokens":
self.result_logger.debug(
self.dataset_name + " batch " + str(self.batch_size) + " finished check, total tokens num " + str(
total_tokens_checked) + ", find " +
str(len(self.eos_token) - self.eos_token.count(-1)) + " question responses have " + type + " mismatch")
elif type == "logits":
pass_rate = error_1e4 / total_logits_checked
pass_result = "Pass"
if pass_rate > 0.005 or error_1e3 > 0:
pass_result = "Fail"
self.result_logger.debug(
self.dataset_name + " batch " + str(self.batch_size) + " finished check, total logits checked " + str(
total_logits_checked) + ", " + str(error_1e4) +
" 1e-4 " + type + " errors found, " + str(
error_1e3) + " 1e-3 " + type + " errors found, 1e-4 error rate " + str(pass_rate))
csv_result = [str(self.model_name).ljust(15), str(self.dataset_name).ljust(15),
str(self.batch_size).ljust(15), str(total_logits_checked).ljust(15),
str(round(greatest_kll, 10)).ljust(15), str(round(pass_rate, 10)).ljust(15),
str(pass_result).ljust(15)]
csv_simplified_path = os.path.join(self.script_path, "../result", "simplified_test_result.csv")
if not os.path.exists(csv_simplified_path):
self.logger.warning("simplified dataset result csv file not exist, skip recording results")
raise RuntimeError(f"csv result file not exist")
with open(csv_simplified_path, 'a', newline='') as csv_simplified_file:
csv_writer = csv.writer(csv_simplified_file, delimiter='|')
csv_writer.writerow(csv_result)
self.logger.info(self.model_name + " " + self.dataset_name + " batch" + str(
self.batch_size) + " result saved in result/simplified_test_result.csv")
def __compare_full_dataset_results(self):
golden_name = '_'.join([self.model_name, self.dataset_name])
golden_path = ''
for file_name in os.listdir(f"{self.data_dir}/GPU/{self.dataset_name}/batch{self.batch_size}"):
if file_name.startswith(f"{golden_name}"):
golden_path = os.path.join(f"{self.data_dir}/GPU/{self.dataset_name}/batch{self.batch_size}", file_name)
break
if not os.path.exists(f"{self.current_result_path}"):
raise RuntimeError(
"NPU test data not exist, An error occurred in the test")
if not os.path.exists(f"{golden_path}"):
raise RuntimeError(
"GPU golden data not exist, upload to result dir folder")
result_df = pd.read_csv(self.current_result_path, sep='|', skipinitialspace=True).rename(
columns=lambda x: x.strip())
result_df = result_df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
golden_df = pd.read_csv(golden_path, sep='|', skipinitialspace=True).rename(columns=lambda x: x.strip())
golden_df = golden_df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
csv_result = []
if self.dataset_name == 'MMLU' or self.dataset_name == 'CEval' or self.dataset_name == 'GSM8K':
result_total = result_df.loc[result_df['file_name'] == 'total', 'value'].values[0]
golden_total = golden_df.loc[golden_df['file_name'] == 'total', 'value'].values[0]
diff_val = golden_total - result_total
pass_result = "Pass"
if diff_val <= 0.1:
self.result_logger.debug(
f"{self.current_result_path} is pass({diff_val}%), golden:{golden_total}, test:{result_total}")
else:
pass_result = "Fail"
self.result_logger.debug(
f"{self.current_result_path} is failed({diff_val}%), golden:{golden_total}, test:{result_total}")
csv_result = [str(self.model_name).ljust(15), str(self.dataset_name).ljust(15),
str(self.batch_size).ljust(15), str(round(golden_total, 10)).ljust(15),
str(round(result_total, 10)).ljust(15), str(pass_result).ljust(15)]
elif self.dataset_name == 'TruthfulQA':
if len(result_df) != len(golden_df):
raise RuntimeError(f"result_df len:{len(result_df)}, golden_df len:{len(golden_df)}")
result_MC1_sum = 0
result_MC2_sum = 0
golden_MC1_sum = 0
golden_MC2_sum = 0
pass_result = "Pass"
for index, result_row in result_df.iterrows():
golden_row = golden_df.iloc[index]
result_MC1_sum += result_row['MC1']
result_MC2_sum += result_row['MC2']
golden_MC1_sum += golden_row['MC1']
golden_MC2_sum += golden_row['MC2']
diff_MC1 = (golden_MC1_sum - result_MC1_sum) / len(result_df)
diff_MC2 = (golden_MC2_sum - result_MC2_sum) / len(result_df)
if ((diff_MC1 <= 0.1) and (diff_MC2 <= 0.1)):
self.result_logger.debug(
f"{self.current_result_path} is pass(MC1:{diff_MC1} MC2:{diff_MC2}), golden:{golden_MC2_sum / len(result_df)} , test:{result_MC2_sum / len(result_df)}")
else:
pass_result = "Fail"
self.result_logger.debug(
f"{self.current_result_path} is failed(MC1:{diff_MC1} MC2:{diff_MC2}), golden:{golden_MC2_sum / len(result_df)}, test:{result_MC2_sum / len(result_df)}")
csv_result = [str(self.model_name).ljust(15), str(self.dataset_name).ljust(15),
str(self.batch_size).ljust(15), str(round((golden_MC2_sum / len(result_df)), 10)).ljust(15),
str(round((result_MC2_sum / len(result_df)), 10)).ljust(15), str(pass_result).ljust(15)]
csv_full_path = os.path.join(self.script_path, "../result", "full_test_result.csv")
if not os.path.exists(csv_full_path):
self.logger.warning("full dataset result csv file not exist, skip recording results")
raise RuntimeError(f"csv result file not exist")
with open(csv_full_path, 'a', newline='') as csv_full_file:
csv_writer = csv.writer(csv_full_file, delimiter='|')
csv_writer.writerow(csv_result)
self.logger.info(self.model_name + " " + self.dataset_name + " batch" + str(
self.batch_size) + " result saved in result/full_test_result.csv")
def __get_model_or_runner(self, input_length, output_length):
if self.hardware_type == "NPU":
input_dict = {
'rank': self.rank,
'local_rank': self.local_rank,
'world_size': self.world_size,
'max_prefill_tokens': -1,
'block_size': self.block_size,
'model_path': self.weight_dir,
'is_bf16': True if self.data_type == "bf16" else False,
'max_position_embeddings': self.max_position_embedding if self.max_position_embedding != -1 else input_length + output_length,
'max_batch_size': self.batch_size,
'max_input_length': input_length,
'max_output_length': output_length
}
self.pa_runner = PARunner(**input_dict)
self.logger.info(str(self.rank) + f'pa_runner: {self.pa_runner}')
else:
self.tokenizer_params = {}
self.set_fa_tokenizer_params()
self.tokenizer = self.__get_fa_tokenizer(**self.tokenizer_params)
if "starcoder" in self.model_name:
self.tokenizer.pad_token = "[PAD]"
elif "llama" in self.model_name or "yi" in self.model_name or "vicuna" in self.model_name:
self.tokenizer.pad_token_id = 0
if "LongBench" in self.dataset_name:
self.model = LLM(model=self.weight_dir, tensor_parallel_size=self.world_size, dtype="auto", enforce_eager=True)
elif "qwen" in self.model_name:
self.model = AutoModelForCausalLM.from_pretrained(self.weight_dir, device_map="auto", torch_dtype="auto", trust_remote_code=True).to(torch.float16)
self.model.generation_config = self.__remove_part_of_generation_config(self.model.generation_config)
else:
self.model = AutoModelForCausalLM.from_pretrained(self.weight_dir, device_map="auto", torch_dtype="auto", trust_remote_code=True)
if "baichuan" in self.model_name and self.model.config.vocab_size == 64000:
self.tokenizer.pad_token_id = 0
if "LongBench" in self.dataset_name:
self.logger.info(f"current dtype: {self.model.llm_engine.model_config.dtype}")
else:
self.logger.info(f"current dtype: {self.model.dtype}")
def __get_rank(self):
if self.hardware_type == "GPU":
return torch.cuda.current_device()
else:
return self.pa_runner.rank
def __get_device_type(self):
if self.hardware_type == "NPU":
self.soc_version = torch_npu._C._npu_get_soc_version()
if self.soc_version in (100, 101, 102, 200, 201, 202, 203):
self.is_format_nz = True
return soc_version_map.get(self.soc_version)
elif self.hardware_type == "GPU":
return "GPU"
def __patch_hf_transformers_utils(self):
transformers_path = transformers.__path__[0]
transformers_utils_path = f"{transformers_path}/generation/utils.py"
shutil.copy(transformers_utils_path, f"{transformers_path}/generation/utils_backup.py")
with open(transformers_utils_path, "r") as utils_file:
utils_content = utils_file.readlines()
try:
utils_content.index(UTILS_CODE_INSERTED_MARKER)
except ValueError:
try:
insert_position = utils_content.index(UTILS_CODE_MARKER)
except ValueError:
self.logger.error("UTILS_CODE_MARKER not found in the transformers utils.py file.")
raise RuntimeError("UTILS_CODE_MARKER not found in the transformers utils.py file.")
utils_content.insert(insert_position + 234, UTILS_CODE_INSERTED_PART_4)
utils_content.insert(insert_position + 203, UTILS_CODE_INSERTED_PART_3)
utils_content.insert(insert_position + 154, UTILS_CODE_INSERTED_PART_2)
utils_content.insert(insert_position + 153, UTILS_CODE_INSERTED_PART_1)
with open(transformers_utils_path, "w") as utils_file:
utils_file.writelines(utils_content)
self.logger.info("transformers utils.py update success")
return
self.logger.warning("transformers utils.py not update. Please confirm it performs as you expect")
def __setup_model_parallel(self):
if self.hardware_type in communication_map:
torch.distributed.init_process_group(communication_map[self.hardware_type])
else:
self.logger.error("unsupported hardware type")
raise RuntimeError("unsupported hardware type")
self.logger.info(f"{communication_map[self.hardware_type]} distributed process init success.")
if self.hardware_type == "NPU":
self.logger.info(f"user npu:{self.rank}")
torch_npu.npu.set_device(torch.device(f"npu:{self.rank}"))
elif self.hardware_type == "GPU":
self.logger.info(f"user gpu:{self.rank}")
torch.cuda.set_device(self.rank)
self.logger.info("Device Set Success!")
def __get_fa_tokenizer(self, **kwargs):
return AutoTokenizer.from_pretrained(self.weight_dir, **kwargs)
def __remove_part_of_generation_config(self, generation_config):
ori_gen = GenerationConfig()
diff_dict = generation_config.to_diff_dict()
self.logger.debug(diff_dict)
for key in diff_dict:
if key.endswith("_id"):
continue
ori_value = getattr(ori_gen, key, None)
if ori_value is not None:
setattr(generation_config, key, getattr(ori_gen, key))
self.logger.info(f"replace {key}")
return generation_config
def __create_folder(self, folder_path):
if os.path.exists(folder_path):
try:
shutil.rmtree(folder_path, ignore_errors=True)
except Exception as e:
self.logger.error(f"Error deleting folder {folder_path}: {e}")
os.makedirs(folder_path, exist_ok=True)
if not os.path.exists(folder_path):
self.logger.error(f"folder {folder_path} create fail")
raise RuntimeError(f"folder {folder_path} create fail")
def __npu_adapt(self):
if self.is_format_nz:
for name, module in self.model.named_modules():
if isinstance(module, torch.nn.Linear):
if name == 'lm_head':
module.weight.data = torch.nn.parameter.Parameter(module.weight.data)
module.weight.data = torch_npu.npu_format_cast(module.weight.data, 29)
self.logger.info(f"current soc: {self.soc_version}({self.device_type}), cast NZ")
else:
self.logger.info(f"current soc: {self.soc_version}({self.device_type}), not cast NZ")
def __save_debug(self):
if self.test_type == "performance":
debug_info_path = os.path.join(self.debug_dir, f"{self.test_mode}_{self.model_type}_batch{self.batch_size}_" \
f"tp{self.world_size}_{self.formatted_datetime}_debug_info.csv")
else:
debug_info_path = os.path.join(self.debug_dir, f"{self.dataset_name}_{self.model_type}_batch{self.batch_size}_" \
f"tp{self.world_size}_{self.test_mode}_{self.formatted_datetime}_debug_info.csv")
df = pd.DataFrame(self.csv_debug)
df.to_csv(debug_info_path, index=False, encoding='utf-8')
self.logger.info(f"{self.dataset_name} debug info saved to: {debug_info_path}")
def __save_result(self, result):
def align_columns(df):
max_widths = df.applymap(lambda x: len(str(x))).max()
for col in df.columns:
df[col] = df[col].apply(lambda x: str(x).ljust(max_widths[col]))
return df
def align_headers(df):
max_widths = [max(len(str(col)), df[col].map(lambda x: len(str(x))).max()) for col in df.columns]
headers = [col.ljust(max_widths[i]) for i, col in enumerate(df.columns)]
df.columns = headers
for i, row in enumerate(df.values):
df.iloc[i] = [str(val).ljust(max_widths[j]) for j, val in enumerate(row)]
return df
if "HumanEval" in self.dataset_name:
self.csv_path = self.csv_path.replace("HumanEval_X", self.dataset_name)
self.csv_path = self.csv_path.replace("result.csv", "infer.jsonl")
with open(self.csv_path, 'wb') as fp:
for x in result:
fp.write((json.dumps(x) + "\n").encode('utf-8'))
elif self.dataset_name == "CMMLU":
cmmlu_eval.get_results(self.debug_dir, self.csv_path)
elif self.dataset_name == "TruthfulQA":
result.to_csv(self.csv_path)
else:
df = pd.DataFrame(result, columns=['file_name', 'value', 'correct', 'sum'])
df = align_columns(df)
df = align_headers(df)
df.to_csv(self.csv_path, index=False)
self.logger.info(f"{self.dataset_name} result saved to: {self.csv_path}")
def __get_log(self, type):
os.makedirs(self.log_dir, exist_ok=True)
if not os.path.exists(self.log_dir):
raise RuntimeError(f"{type} folder {self.log_dir} create fail")
formatter = logging.Formatter('%(asctime)s - [%(levelname)s] - %(filename)s:%(lineno)d - %(message)s')
streamer_handler = logging.StreamHandler()
streamer_handler.setFormatter(formatter)
if self.test_type == "performance":
file_handler = logging.FileHandler(os.path.join(self.log_dir, f"{self.test_mode}_{self.model_type}_batch{self.batch_size}_" \
f"tp{self.world_size}_{self.formatted_datetime}_{type}.log"))
else:
file_handler = logging.FileHandler(os.path.join(self.log_dir, f"{self.dataset_name}_{self.model_type}_batch{self.batch_size}_" \
f"tp{self.world_size}_{self.test_mode}_{self.formatted_datetime}_{type}.log"))
file_handler.setFormatter(formatter)
logger = logging.getLogger(type)
if type == "runtime":
logger.setLevel(logging.INFO)
file_handler.setLevel(logging.INFO)
streamer_handler.setLevel(logging.INFO)
elif type == "result_process":
logger.setLevel(logging.DEBUG)
file_handler.setLevel(logging.DEBUG)
streamer_handler.setLevel(logging.DEBUG)
logger.addHandler(streamer_handler)
logger.addHandler(file_handler)
logger.propagate = False
return logger
def __parse_bs(self, batch_size_lst):
try:
batch_size_lst = int(batch_size_lst)
return [batch_size_lst]
except ValueError:
pass
try:
batch_size_lst = [int(bs) for bs in batch_size_lst.split(',')]
return batch_size_lst
except ValueError:
pass
try:
batch_size_lst = ast.literal_eval(batch_size_lst)
if isinstance(batch_size_lst, list):
if len(batch_size_lst) == 0:
raise ValueError("Batchsize input is empty")
else:
return batch_size_lst
raise ValueError("Wrong batchsize input format")
except ValueError as e:
raise ValueError("Wrong batchsize input format") from e
def parse_args():
parser = argparse.ArgumentParser(description="Model test arguments")
parser.add_argument(
"--model_type",
type=str,
default='pa',
choices=['fa', 'pa'],
help="Specify which model type to test"
)
parser.add_argument(
"--data_type",
type=str,
default='fp16',
choices=['fp16', 'bf16'],
help="Specify which datat type to test"
)
parser.add_argument(
"--test_mode",
type=str,
default='performance',
choices=['simplified', 'full', 'precision_single', 'performance', 'performance_maxbs', "performance_single"],
help="Specify the mode in which to run the test"
)
parser.add_argument("--model_name", type=str, required=True, help="name of model")
parser.add_argument("--weight_dir", type=str, required=True, help="path to model weight folder")
parser.add_argument("--output_dir", type=str, help="path to save the output")
parser.add_argument("--dataset_name", type=str, default="GSM8K", help="which dataset to run")
parser.add_argument("--shot", type=int, help="speicify dataset test few shots")
parser.add_argument("--batch_size", type=str, default="1", help="batch size")
parser.add_argument("--device_id", type=int, default=7, help="device id")
parser.add_argument("--hardware_type", type=str, default="NPU", help="current device type, GPU or NPU")
parser.add_argument("--case_pair", type=str, default="[[256, 256], [512, 512], [1024, 1024], [2048, 2048]]",
help="performance test pair")
parser.add_argument("--time_limit", type=int, help="time limit when testing performance max batch_size")
parser.add_argument("--max_position_embeddings", type=int, help="specify whether llama model use refactor")
parser.add_argument("--input_text_or_file", type=str, help="input_text_or_file used to test performance or precision")
parser.add_argument("--is_chat_model", type=str, default="base", help="specify whether the model use chat version")
return parser.parse_args()
def get_args():
args = parse_args()
output_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "outputs") if args.output_dir is None else args.output_dir
case_pair = "[[256, 256], [512, 512], [1024, 1024], [2048, 2048]]" if args.case_pair == "[]" else args.case_pair
is_chat_model = True if args.is_chat_model == "chat" else False
return [args.model_type, args.data_type, args.test_mode, args.model_name, output_dir, args.dataset_name,
args.batch_size, args.device_id, args.hardware_type, case_pair, args.weight_dir,
args.time_limit, args.max_position_embeddings, args.input_text_or_file,
is_chat_model, args.shot]