convert_data.py
1 import numpy as np 2 import jsonlines 3 import os 4 import sys 5 6 eval_plus_path = os.path.dirname(os.path.abspath(__file__)) + "/evalplus/" 7 sys.path = [eval_plus_path] + sys.path 8 from evalplus.data import get_human_eval_plus, get_mbpp_plus 9 10 MBPP_OUTPUT_SET_EQ_TASKS = [ 11 "similar_elements", # Mbpp/2 12 "find_char_long", # Mbpp/7 13 "common_in_nested_lists", # Mbpp/111 14 "extract_singly", # Mbpp/140 15 "larg_nnum", # Mbpp/232 16 "intersection_array", # Mbpp/249 17 "find_dissimilar", # Mbpp/579 18 "Diff", # Mbpp/769 19 ] 20 MBPP_OUTPUT_NOT_NONE_TASKS = ["check_str", "text_match_three", "text_starta_endb"] 21 22 23 def convert_file(root_dir): 24 import jsonlines 25 from copy import deepcopy 26 import sys 27 import tqdm 28 import re 29 sys.set_int_max_str_digits(10000000) 30 31 def write_jsonl_file(objs, target_path): 32 os.makedirs(os.path.dirname(target_path), exist_ok=True) 33 with jsonlines.open(target_path, "w") as w: 34 for obj in objs: 35 w.write(obj) 36 print(f"Successfully saving to {target_path}") 37 38 # def get_humaneval_prompt(doc, language): 39 # language = language.lower() 40 # question = doc["prompt"].strip() 41 # return """ 42 # Please continue to complete the function and return all completed code in a codeblock. Here is the given code to do completion: 43 # ```{} 44 # {} 45 # ``` 46 # """.strip().format( 47 # language.lower(), question.strip() 48 # ) 49 50 def get_prompt(doc, language): 51 language = language.lower() 52 question = doc["prompt"].strip() 53 return """ 54 Can you complete the following Python function? 55 ```{} 56 {} 57 ``` 58 """.strip().format(language.lower(), question.strip()) 59 60 def create_high_accuracy_function(code, entry_point): 61 high_accuracy = """ 62 from decimal import Decimal, getcontext 63 from functools import wraps 64 getcontext().prec = 100 65 66 def convert_to_decimal(value): 67 if isinstance(value, float): 68 return Decimal(str(value)) 69 elif isinstance(value, list): 70 return [convert_to_decimal(item) for item in value] 71 elif isinstance(value, dict): 72 return {k: convert_to_decimal(v) for k, v in value.items()} 73 return value 74 75 def float_to_decimal(func): 76 @wraps(func) 77 def wrapper(*args, **kwargs): 78 new_args = [convert_to_decimal(arg) for arg in args] 79 new_kwargs = {k: convert_to_decimal(v) for k, v in kwargs.items()} 80 result = func(*new_args, **new_kwargs) 81 return result 82 return wrapper 83 84 def convert_to_float(value): 85 if isinstance(value, Decimal): 86 return float(value) 87 elif isinstance(value, list): 88 return [convert_to_float(item) for item in value] 89 elif isinstance(value, dict): 90 return {k: convert_to_float(v) for k, v in value.items()} 91 return value 92 93 def decimal_to_float(func): 94 @wraps(func) 95 def wrapper(*args, **kwargs): 96 # Execute the wrapped function 97 result = func(*args, **kwargs) 98 99 # Convert the result back to float, if necessary 100 result = convert_to_float(result) 101 return result 102 return wrapper 103 """ 104 """Execute trusted code in place.""" 105 code = high_accuracy + code 106 code = code.split("\n") 107 new_code = [] 108 cnt = 0 109 for c in code: 110 if re.search(rf"def {entry_point}\(.*?\)", c) is not None: 111 cnt += 1 112 new_code.append("@float_to_decimal") 113 new_code.append("@decimal_to_float") 114 new_code.append(c) 115 code = "\n".join(new_code) 116 return code 117 118 def trusted_exec(code, inputs, entry_point, record_time=False, output_not_none=False): 119 exec_globals = {} 120 # if entry_point not in ["triangle_area", "angle_complex", "volume_sphere"]: # avoid special case (a ** b) 121 # code = create_high_accuracy_function(code, entry_point) 122 if "**" not in code and entry_point not in ["triangle_area", "angle_complex", "volume_sphere"]: 123 code = create_high_accuracy_function(code, entry_point) 124 #print(code) 125 exec(code, exec_globals) 126 fn = exec_globals[entry_point] 127 128 rtime = [] 129 ret = [] 130 for inp in inputs: 131 inp = deepcopy(inp) 132 if record_time: 133 start = time.time() 134 ret.append(fn(*inp)) 135 rtime.append(time.time() - start) 136 else: 137 ret.append(fn(*inp)) 138 139 if output_not_none: 140 ret = [i is not None for i in ret] 141 142 if record_time: 143 return ret, rtime 144 else: 145 return ret 146 147 def convert(objs, test_set="base_input", task_name=f"evalplus/humaneval"): 148 type 149 data = [] 150 for obj in tqdm.tqdm(objs): 151 prompt = get_prompt(obj, language="python") 152 if test_set == "base_input": 153 inputs = obj["base_input"] 154 else: 155 inputs = obj["base_input"] + obj["plus_input"] if not isinstance(obj["plus_input"], dict) else obj["base_input"] 156 #outputs = trusted_exec(code = obj["prompt"] + obj["canonical_solution"], inputs = obj["base_input"], entry_point = obj["entry_point"]) 157 #tests = create_check_function(test_cases = inputs, entry_point=obj["entry_point"], outputs = outputs) 158 outputs = trusted_exec(code=obj["prompt"] + obj["canonical_solution"], inputs=[obj["base_input"][0]], entry_point=obj["entry_point"]) 159 atol = obj["atol"] 160 if atol == 0: 161 atol = 1e-6 # enforce atol for float comparison 162 #```python 163 if obj["entry_point"] == "find_zero": #humaneval 164 tests = create_dynamic_check_function_find_zero(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], atol=atol) 165 elif obj["entry_point"] in MBPP_OUTPUT_NOT_NONE_TASKS: 166 tests = create_dynamic_check_function(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], check_style="not_none", atol=atol) 167 elif obj["entry_point"] in MBPP_OUTPUT_SET_EQ_TASKS: # mbpp 168 tests = create_dynamic_check_function(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], check_style="set", atol=atol) 169 elif obj["entry_point"] == "are_equivalent": # mbpp 170 tests = create_dynamic_check_function_are_equivalent(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], atol=atol) 171 elif obj["entry_point"] == "sum_div": # mbpp 172 tests = create_dynamic_check_function_sum_div(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], atol=atol) 173 elif isinstance(outputs[0], float) or (isinstance(outputs[0], list) and len(outputs[0]) > 0 and isinstance(outputs[0][0], float)): 174 tests = create_dynamic_check_function(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], check_style="np.allcose", atol=atol) 175 else: 176 tests = create_dynamic_check_function(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], check_style="==", atol=atol) 177 data.append({ 178 "prompt": prompt, 179 "test": tests, 180 "entry_point": obj["entry_point"], 181 "tags": f"coding,en,python,core", 182 "task": task_name, 183 "source": f"evalplus", 184 "eval_args": { 185 "greedy": True, 186 #"seed": 1234, 187 "out_seq_length": 1024, 188 "repetition_penalty": 1.0, 189 "temperature": 1.0, 190 "top_k": -1, 191 "top_p": 0.95, 192 "presence_penalty": 0, 193 "system_str": "You are an intelligent programming assistant to produce Python algorithmic solutions", 194 }, 195 "extra_response_prefix": "```python\n" 196 }) 197 return data 198 199 def create_check_function(test_cases, entry_point, outputs): 200 test_cases_str = "def check():\n" 201 for case, output in zip(test_cases, outputs): 202 for i in range(len(case)): 203 if isinstance(case[i], str) and "\n" in case[i]: 204 case[i] = case[i].replace("\n", "\\n") 205 input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case]) 206 output = str(output) if not isinstance(output, str) else f"'{output}'" 207 single_test_case_str = f"\tassert {entry_point}({input_params}) == {output}\n" 208 test_cases_str += single_test_case_str 209 test_cases_str += "check()" 210 return test_cases_str 211 212 def create_dynamic_check_function_are_equivalent(test_cases, entry_point, prompt, correct_solution, check_style="np.allclose", atol=0): 213 test_cases_str = "import numpy as np\n" + prompt + correct_solution 214 test_cases_str = test_cases_str.replace(f"def {entry_point}(", f"def {entry_point}_ground_truth(") 215 test_cases_str += "def check():\n" 216 for case in test_cases: 217 for i in range(len(case)): 218 if isinstance(case[i], str) and "\n" in case[i]: 219 case[i] = case[i].replace("\n", "\\n") 220 input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case]) 221 single_test_case_str = f"\tassert {entry_point}({input_params}) == {entry_point}_ground_truth({input_params}) or {entry_point}({input_params}) == 0\n" 222 test_cases_str += single_test_case_str 223 test_cases_str += "check()" 224 return test_cases_str 225 226 def create_dynamic_check_function_sum_div(test_cases, entry_point, prompt, correct_solution, check_style="np.allclose", atol=0): 227 test_cases_str = "import numpy as np\n" + prompt + correct_solution 228 test_cases_str = test_cases_str.replace(f"def {entry_point}(", f"def {entry_point}_ground_truth(") 229 test_cases_str += "def check():\n" 230 for case in test_cases: 231 for i in range(len(case)): 232 if isinstance(case[i], str) and "\n" in case[i]: 233 case[i] = case[i].replace("\n", "\\n") 234 input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case]) 235 single_test_case_str = f"\tassert {entry_point}({input_params}) == {entry_point}_ground_truth({input_params}) or {entry_point}({input_params}) == 0\n" 236 test_cases_str += single_test_case_str 237 test_cases_str += "check()" 238 return test_cases_str 239 240 def create_dynamic_check_function(test_cases, entry_point, prompt, correct_solution, check_style="np.allclose", atol=0): 241 test_cases_str = "import numpy as np\n" + prompt + correct_solution 242 test_cases_str = test_cases_str.replace(f"def {entry_point}(", f"def {entry_point}_ground_truth(") 243 test_cases_str += "def check():\n" 244 for case in test_cases: 245 for i in range(len(case)): 246 if isinstance(case[i], str) and "\n" in case[i]: 247 case[i] = case[i].replace("\n", "\\n") 248 input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case]) 249 if check_style == "np.allcose": 250 single_test_case_str = f"\tassert np.allclose({entry_point}({input_params}), {entry_point}_ground_truth({input_params}), rtol=1e-07, atol={atol})\n" 251 elif check_style == "==": 252 single_test_case_str = f"\tassert {entry_point}({input_params}) == {entry_point}_ground_truth({input_params})\n" 253 elif check_style == "set": 254 single_test_case_str = f"\tassert set({entry_point}({input_params})) == set({entry_point}_ground_truth({input_params}))\n" 255 elif check_style == "not_none": 256 single_test_case_str = f"\tif isinstance({entry_point}({input_params}), bool):\n" 257 single_test_case_str += f"\t\tassert {entry_point}({input_params}) == ({entry_point}_ground_truth({input_params}) is not None)\n" 258 single_test_case_str += f"\telse:\n" 259 single_test_case_str += f"\t\tassert ({entry_point}({input_params}) is not None) == ({entry_point}_ground_truth({input_params}) is not None)\n" 260 test_cases_str += single_test_case_str 261 test_cases_str += "check()" 262 return test_cases_str 263 264 def create_dynamic_check_function_find_zero(test_cases, entry_point, prompt, correct_solution, atol=0): 265 test_cases_str = "import numpy as np\n" + prompt + correct_solution 266 test_cases_str = test_cases_str.replace(f"def {entry_point}(", f"def {entry_point}_ground_truth(") 267 test_cases_str += "def check():\n" 268 for case in test_cases: 269 for i in range(len(case)): 270 if isinstance(case[i], str) and "\n" in case[i]: 271 case[i] = case[i].replace("\n", "\\n") 272 input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case]) 273 single_test_case_str = f"\tassert abs(poly({input_params}, {entry_point}({input_params}))) <= {atol}\n" 274 test_cases_str += single_test_case_str 275 test_cases_str += "check()" 276 return test_cases_str 277 278 humaneval_data = get_human_eval_plus() 279 data1 = convert(humaneval_data.values(), test_set="base_input", task_name="evalplus/humaneval") 280 write_jsonl_file(data1, f"{root_dir}/evalplus_v2/humaneval.jsonl") 281 data2 = convert(humaneval_data.values(), test_set="plus_input", task_name="evalplus/humaneval_plus") 282 write_jsonl_file(data2, f"{root_dir}/evalplus_v2/humaneval_plus.jsonl") 283 284 mbpp_data = get_mbpp_plus() 285 data3 = convert(mbpp_data.values(), test_set="base_input", task_name="evalplus/mbpp") 286 write_jsonl_file(data3, f"{root_dir}/evalplus_v2/mbpp.jsonl") 287 data4 = convert(mbpp_data.values(), test_set="plus_input", task_name="evalplus/mbpp_plus") 288 write_jsonl_file(data4, f"{root_dir}/evalplus_v2/mbpp_plus.jsonl") 289 290 all_data = data1 + data2 + data3 + data4 291 write_jsonl_file(all_data, f"{root_dir}/evalplus_v2/evalplus.jsonl") 292 293 all_data = np.random.choice(all_data, 10) 294 write_jsonl_file(all_data, f"{root_dir}/evalplus_v2/evalplus.jsonl.sampled") 295 296 297 if __name__ == "__main__": 298 convert_file(root_dir="data/eval/code/")