/ qwencoder-eval / instruct / eval_plus / convert_data.py
convert_data.py
  1  import numpy as np
  2  import jsonlines
  3  import os
  4  import sys
  5  
  6  eval_plus_path = os.path.dirname(os.path.abspath(__file__)) + "/evalplus/"
  7  sys.path = [eval_plus_path] + sys.path
  8  from evalplus.data import get_human_eval_plus, get_mbpp_plus
  9  
 10  MBPP_OUTPUT_SET_EQ_TASKS = [
 11      "similar_elements",  # Mbpp/2
 12      "find_char_long",  # Mbpp/7
 13      "common_in_nested_lists",  # Mbpp/111
 14      "extract_singly",  # Mbpp/140
 15      "larg_nnum",  # Mbpp/232
 16      "intersection_array",  # Mbpp/249
 17      "find_dissimilar",  # Mbpp/579
 18      "Diff",  # Mbpp/769
 19  ]
 20  MBPP_OUTPUT_NOT_NONE_TASKS = ["check_str", "text_match_three", "text_starta_endb"]
 21  
 22  
 23  def convert_file(root_dir):
 24      import jsonlines
 25      from copy import deepcopy
 26      import sys
 27      import tqdm
 28      import re
 29      sys.set_int_max_str_digits(10000000)
 30  
 31      def write_jsonl_file(objs, target_path):
 32          os.makedirs(os.path.dirname(target_path), exist_ok=True)
 33          with jsonlines.open(target_path, "w") as w:
 34              for obj in objs:
 35                  w.write(obj)
 36          print(f"Successfully saving to {target_path}")
 37  
 38      # def get_humaneval_prompt(doc, language):
 39      #     language = language.lower()
 40      #     question = doc["prompt"].strip()
 41      #     return """
 42      # Please continue to complete the function and return all completed code in a codeblock. Here is the given code to do completion:
 43      # ```{}
 44      # {}
 45      # ```
 46      # """.strip().format(
 47      #         language.lower(), question.strip()
 48      #     )
 49  
 50      def get_prompt(doc, language):
 51          language = language.lower()
 52          question = doc["prompt"].strip()
 53          return """
 54  Can you complete the following Python function?
 55  ```{}
 56  {}
 57  ```
 58      """.strip().format(language.lower(), question.strip())
 59  
 60      def create_high_accuracy_function(code, entry_point):
 61          high_accuracy = """
 62  from decimal import Decimal, getcontext
 63  from functools import wraps
 64  getcontext().prec = 100
 65  
 66  def convert_to_decimal(value):
 67      if isinstance(value, float):
 68          return Decimal(str(value))
 69      elif isinstance(value, list):
 70          return [convert_to_decimal(item) for item in value]
 71      elif isinstance(value, dict):
 72          return {k: convert_to_decimal(v) for k, v in value.items()}
 73      return value
 74  
 75  def float_to_decimal(func):
 76      @wraps(func)
 77      def wrapper(*args, **kwargs):
 78          new_args = [convert_to_decimal(arg) for arg in args]
 79          new_kwargs = {k: convert_to_decimal(v) for k, v in kwargs.items()}
 80          result = func(*new_args, **new_kwargs)
 81          return result
 82      return wrapper
 83  
 84  def convert_to_float(value):
 85      if isinstance(value, Decimal):
 86          return float(value)
 87      elif isinstance(value, list):
 88          return [convert_to_float(item) for item in value]
 89      elif isinstance(value, dict):
 90          return {k: convert_to_float(v) for k, v in value.items()}
 91      return value
 92  
 93  def decimal_to_float(func):
 94      @wraps(func)
 95      def wrapper(*args, **kwargs):
 96          # Execute the wrapped function
 97          result = func(*args, **kwargs)
 98          
 99          # Convert the result back to float, if necessary
100          result = convert_to_float(result)
101          return result
102      return wrapper
103  """
104          """Execute trusted code in place."""
105          code = high_accuracy + code
106          code = code.split("\n")
107          new_code = []
108          cnt = 0
109          for c in code:
110              if re.search(rf"def {entry_point}\(.*?\)", c) is not None:
111                  cnt += 1
112                  new_code.append("@float_to_decimal")
113                  new_code.append("@decimal_to_float")
114              new_code.append(c)
115          code = "\n".join(new_code)
116          return code
117  
118      def trusted_exec(code, inputs, entry_point, record_time=False, output_not_none=False):
119          exec_globals = {}
120          # if entry_point not in ["triangle_area", "angle_complex", "volume_sphere"]: # avoid special case (a ** b)
121          #     code = create_high_accuracy_function(code, entry_point)
122          if "**" not in code and entry_point not in ["triangle_area", "angle_complex", "volume_sphere"]:
123              code = create_high_accuracy_function(code, entry_point)
124          #print(code)
125          exec(code, exec_globals)
126          fn = exec_globals[entry_point]
127  
128          rtime = []
129          ret = []
130          for inp in inputs:
131              inp = deepcopy(inp)
132              if record_time:
133                  start = time.time()
134                  ret.append(fn(*inp))
135                  rtime.append(time.time() - start)
136              else:
137                  ret.append(fn(*inp))
138  
139          if output_not_none:
140              ret = [i is not None for i in ret]
141  
142          if record_time:
143              return ret, rtime
144          else:
145              return ret
146  
147      def convert(objs, test_set="base_input", task_name=f"evalplus/humaneval"):
148          type
149          data = []
150          for obj in tqdm.tqdm(objs):
151              prompt = get_prompt(obj, language="python")
152              if test_set == "base_input":
153                  inputs = obj["base_input"]
154              else:
155                  inputs = obj["base_input"] + obj["plus_input"] if not isinstance(obj["plus_input"], dict) else obj["base_input"]
156              #outputs = trusted_exec(code = obj["prompt"] + obj["canonical_solution"], inputs = obj["base_input"], entry_point = obj["entry_point"])
157              #tests = create_check_function(test_cases = inputs, entry_point=obj["entry_point"], outputs = outputs)
158              outputs = trusted_exec(code=obj["prompt"] + obj["canonical_solution"], inputs=[obj["base_input"][0]], entry_point=obj["entry_point"])
159              atol = obj["atol"]
160              if atol == 0:
161                  atol = 1e-6  # enforce atol for float comparison
162                  #```python
163              if obj["entry_point"] == "find_zero":  #humaneval
164                  tests = create_dynamic_check_function_find_zero(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], atol=atol)
165              elif obj["entry_point"] in MBPP_OUTPUT_NOT_NONE_TASKS:
166                  tests = create_dynamic_check_function(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], check_style="not_none", atol=atol)
167              elif obj["entry_point"] in MBPP_OUTPUT_SET_EQ_TASKS:  # mbpp
168                  tests = create_dynamic_check_function(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], check_style="set", atol=atol)
169              elif obj["entry_point"] == "are_equivalent":  # mbpp
170                  tests = create_dynamic_check_function_are_equivalent(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], atol=atol)
171              elif obj["entry_point"] == "sum_div":  # mbpp
172                  tests = create_dynamic_check_function_sum_div(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], atol=atol)
173              elif isinstance(outputs[0], float) or (isinstance(outputs[0], list) and len(outputs[0]) > 0 and isinstance(outputs[0][0], float)):
174                  tests = create_dynamic_check_function(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], check_style="np.allcose", atol=atol)
175              else:
176                  tests = create_dynamic_check_function(test_cases=inputs, entry_point=obj["entry_point"], prompt=obj["prompt"], correct_solution=obj["canonical_solution"], check_style="==", atol=atol)
177              data.append({
178                  "prompt": prompt,
179                  "test": tests,
180                  "entry_point": obj["entry_point"],
181                  "tags": f"coding,en,python,core",
182                  "task": task_name,
183                  "source": f"evalplus",
184                  "eval_args": {
185                      "greedy": True,
186                      #"seed": 1234,
187                      "out_seq_length": 1024,
188                      "repetition_penalty": 1.0,
189                      "temperature": 1.0,
190                      "top_k": -1,
191                      "top_p": 0.95,
192                      "presence_penalty": 0,
193                      "system_str": "You are an intelligent programming assistant to produce Python algorithmic solutions",
194                  },
195                  "extra_response_prefix": "```python\n"
196              })
197          return data
198  
199      def create_check_function(test_cases, entry_point, outputs):
200          test_cases_str = "def check():\n"
201          for case, output in zip(test_cases, outputs):
202              for i in range(len(case)):
203                  if isinstance(case[i], str) and "\n" in case[i]:
204                      case[i] = case[i].replace("\n", "\\n")
205              input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case])
206              output = str(output) if not isinstance(output, str) else f"'{output}'"
207              single_test_case_str = f"\tassert {entry_point}({input_params}) == {output}\n"
208              test_cases_str += single_test_case_str
209          test_cases_str += "check()"
210          return test_cases_str
211  
212      def create_dynamic_check_function_are_equivalent(test_cases, entry_point, prompt, correct_solution, check_style="np.allclose", atol=0):
213          test_cases_str = "import numpy as np\n" + prompt + correct_solution
214          test_cases_str = test_cases_str.replace(f"def {entry_point}(", f"def {entry_point}_ground_truth(")
215          test_cases_str += "def check():\n"
216          for case in test_cases:
217              for i in range(len(case)):
218                  if isinstance(case[i], str) and "\n" in case[i]:
219                      case[i] = case[i].replace("\n", "\\n")
220              input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case])
221              single_test_case_str = f"\tassert {entry_point}({input_params}) == {entry_point}_ground_truth({input_params}) or {entry_point}({input_params}) == 0\n"
222              test_cases_str += single_test_case_str
223          test_cases_str += "check()"
224          return test_cases_str
225  
226      def create_dynamic_check_function_sum_div(test_cases, entry_point, prompt, correct_solution, check_style="np.allclose", atol=0):
227          test_cases_str = "import numpy as np\n" + prompt + correct_solution
228          test_cases_str = test_cases_str.replace(f"def {entry_point}(", f"def {entry_point}_ground_truth(")
229          test_cases_str += "def check():\n"
230          for case in test_cases:
231              for i in range(len(case)):
232                  if isinstance(case[i], str) and "\n" in case[i]:
233                      case[i] = case[i].replace("\n", "\\n")
234              input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case])
235              single_test_case_str = f"\tassert {entry_point}({input_params}) == {entry_point}_ground_truth({input_params}) or {entry_point}({input_params}) == 0\n"
236              test_cases_str += single_test_case_str
237          test_cases_str += "check()"
238          return test_cases_str
239  
240      def create_dynamic_check_function(test_cases, entry_point, prompt, correct_solution, check_style="np.allclose", atol=0):
241          test_cases_str = "import numpy as np\n" + prompt + correct_solution
242          test_cases_str = test_cases_str.replace(f"def {entry_point}(", f"def {entry_point}_ground_truth(")
243          test_cases_str += "def check():\n"
244          for case in test_cases:
245              for i in range(len(case)):
246                  if isinstance(case[i], str) and "\n" in case[i]:
247                      case[i] = case[i].replace("\n", "\\n")
248              input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case])
249              if check_style == "np.allcose":
250                  single_test_case_str = f"\tassert np.allclose({entry_point}({input_params}), {entry_point}_ground_truth({input_params}), rtol=1e-07, atol={atol})\n"
251              elif check_style == "==":
252                  single_test_case_str = f"\tassert {entry_point}({input_params}) == {entry_point}_ground_truth({input_params})\n"
253              elif check_style == "set":
254                  single_test_case_str = f"\tassert set({entry_point}({input_params})) == set({entry_point}_ground_truth({input_params}))\n"
255              elif check_style == "not_none":
256                  single_test_case_str = f"\tif isinstance({entry_point}({input_params}), bool):\n"
257                  single_test_case_str += f"\t\tassert {entry_point}({input_params}) == ({entry_point}_ground_truth({input_params}) is not None)\n"
258                  single_test_case_str += f"\telse:\n"
259                  single_test_case_str += f"\t\tassert ({entry_point}({input_params}) is not None) == ({entry_point}_ground_truth({input_params}) is not None)\n"
260              test_cases_str += single_test_case_str
261          test_cases_str += "check()"
262          return test_cases_str
263  
264      def create_dynamic_check_function_find_zero(test_cases, entry_point, prompt, correct_solution, atol=0):
265          test_cases_str = "import numpy as np\n" + prompt + correct_solution
266          test_cases_str = test_cases_str.replace(f"def {entry_point}(", f"def {entry_point}_ground_truth(")
267          test_cases_str += "def check():\n"
268          for case in test_cases:
269              for i in range(len(case)):
270                  if isinstance(case[i], str) and "\n" in case[i]:
271                      case[i] = case[i].replace("\n", "\\n")
272              input_params = ", ".join([str(c) if not isinstance(c, str) else f"'{c}'" for c in case])
273              single_test_case_str = f"\tassert abs(poly({input_params}, {entry_point}({input_params}))) <= {atol}\n"
274              test_cases_str += single_test_case_str
275          test_cases_str += "check()"
276          return test_cases_str
277  
278      humaneval_data = get_human_eval_plus()
279      data1 = convert(humaneval_data.values(), test_set="base_input", task_name="evalplus/humaneval")
280      write_jsonl_file(data1, f"{root_dir}/evalplus_v2/humaneval.jsonl")
281      data2 = convert(humaneval_data.values(), test_set="plus_input", task_name="evalplus/humaneval_plus")
282      write_jsonl_file(data2, f"{root_dir}/evalplus_v2/humaneval_plus.jsonl")
283  
284      mbpp_data = get_mbpp_plus()
285      data3 = convert(mbpp_data.values(), test_set="base_input", task_name="evalplus/mbpp")
286      write_jsonl_file(data3, f"{root_dir}/evalplus_v2/mbpp.jsonl")
287      data4 = convert(mbpp_data.values(), test_set="plus_input", task_name="evalplus/mbpp_plus")
288      write_jsonl_file(data4, f"{root_dir}/evalplus_v2/mbpp_plus.jsonl")
289  
290      all_data = data1 + data2 + data3 + data4
291      write_jsonl_file(all_data, f"{root_dir}/evalplus_v2/evalplus.jsonl")
292  
293      all_data = np.random.choice(all_data, 10)
294      write_jsonl_file(all_data, f"{root_dir}/evalplus_v2/evalplus.jsonl.sampled")
295  
296  
297  if __name__ == "__main__":
298      convert_file(root_dir="data/eval/code/")