/ mlflow / utils / string_utils.py
string_utils.py
  1  import re
  2  import shlex
  3  from datetime import datetime
  4  from typing import Any
  5  
  6  from mlflow.utils.os import is_windows
  7  
  8  
  9  def strip_prefix(original: str, prefix: str) -> str:
 10      if original.startswith(prefix):
 11          return original[len(prefix) :]
 12      return original
 13  
 14  
 15  def strip_suffix(original: str, suffix: str) -> str:
 16      if original.endswith(suffix) and suffix != "":
 17          return original[: -len(suffix)]
 18      return original
 19  
 20  
 21  def is_string_type(item: Any) -> bool:
 22      return isinstance(item, str)
 23  
 24  
 25  def generate_feature_name_if_not_string(s: Any) -> str:
 26      if isinstance(s, str):
 27          return s
 28  
 29      return f"feature_{s}"
 30  
 31  
 32  def truncate_str_from_middle(s: str, max_length: int) -> str:
 33      assert max_length > 5
 34      if len(s) <= max_length:
 35          return s
 36      else:
 37          left_part_len = (max_length - 3) // 2
 38          right_part_len = max_length - 3 - left_part_len
 39          return f"{s[:left_part_len]}...{s[-right_part_len:]}"
 40  
 41  
 42  def _create_table(
 43      rows: list[list[str]], headers: list[str], column_sep: str = " " * 2, min_column_width: int = 4
 44  ) -> str:
 45      """
 46      Creates a table from a list of rows and headers.
 47  
 48      Example
 49      =======
 50      >>> print(_create_table([["a", "b", "c"], ["d", "e", "f"]], ["x", "y", "z"]))
 51      x     y     z
 52      ----  ----  ----
 53      a     b     c
 54      d     e     f
 55      """
 56      column_widths = [
 57          max(len(max(col, key=len)), len(header) + 2, min_column_width)
 58          for col, header in zip(zip(*rows), headers)
 59      ]
 60      aligned_rows = [
 61          column_sep.join(header.ljust(width) for header, width in zip(headers, column_widths)),
 62          column_sep.join("-" * width for width in column_widths),
 63          *(
 64              column_sep.join(cell.ljust(width) for cell, width in zip(row, column_widths))
 65              for row in rows
 66          ),
 67      ]
 68      return "\n".join(aligned_rows)
 69  
 70  
 71  # Source: https://github.com/smoofra/mslex/blob/3338c347324d52af619ba39cebfdf7cbf46fa51b/mslex.py#L89-L139
 72  cmd_meta = r"([\"\^\&\|\<\>\(\)\%\!])"
 73  cmd_meta_or_space = r"[\s\"\^\&\|\<\>\(\)\%\!]"
 74  cmd_meta_inside_quotes = r"([\"\%\!])"
 75  
 76  
 77  def mslex_quote(s: str, for_cmd: bool = True) -> str:
 78      """
 79      Quote a string for use as a command line argument in DOS or Windows.
 80  
 81      On windows, before a command line argument becomes a char* in a
 82      program's argv, it must be parsed by both cmd.exe, and by
 83      CommandLineToArgvW.
 84  
 85      If for_cmd is true, then this will quote the string so it will
 86      be parsed correctly by cmd.exe and then by CommandLineToArgvW.
 87  
 88      If for_cmd is false, then this will quote the string so it will
 89      be parsed correctly when passed directly to CommandLineToArgvW.
 90  
 91      For some strings there is no way to quote them so they will
 92      parse correctly in both situations.
 93      """
 94      if not s:
 95          return '""'
 96      if not re.search(cmd_meta_or_space, s):
 97          return s
 98      if for_cmd and re.search(cmd_meta, s):
 99          if not re.search(cmd_meta_inside_quotes, s):
100              if m := re.search(r"\\+$", s):
101                  return '"' + s + m.group() + '"'
102              else:
103                  return '"' + s + '"'
104          if not re.search(r"[\s\"]", s):
105              return re.sub(cmd_meta, r"^\1", s)
106          return re.sub(cmd_meta, r"^\1", mslex_quote(s, for_cmd=False))
107      i = re.finditer(r"(\\*)(\"+)|(\\+)|([^\\\"]+)", s)
108  
109      def parts():
110          yield '"'
111          for m in i:
112              _, end = m.span()
113              slashes, quotes, onlyslashes, text = m.groups()
114              if quotes:
115                  yield slashes
116                  yield slashes
117                  yield r"\"" * len(quotes)
118              elif onlyslashes:
119                  if end == len(s):
120                      yield onlyslashes
121                      yield onlyslashes
122                  else:
123                      yield onlyslashes
124              else:
125                  yield text
126          yield '"'
127  
128      return "".join(parts())
129  
130  
131  def quote(s: str) -> str:
132      return mslex_quote(s) if is_windows() else shlex.quote(s)
133  
134  
135  def _backtick_quote(s: str) -> str:
136      """
137      Quotes the given string with backticks if it is not already quoted with backticks.
138      """
139      return f"`{s}`" if not (s.startswith("`") and s.endswith("`")) else s
140  
141  
142  def format_table_cell_value(field: str, cell_value: Any, values: list[Any] | None = None) -> str:
143      """
144      Format cell values for table display with field-specific formatting.
145  
146      Args:
147          field: The field name (e.g., "info.request_time")
148          cell_value: The value to format
149          values: List of extracted values (for multiple values handling)
150  
151      Returns:
152          Formatted string value suitable for table display
153      """
154      if values is None:
155          values = [cell_value] if cell_value is not None else []
156  
157      # Handle empty/missing values
158      if not values:
159          return "N/A"
160      elif len(values) == 1:
161          cell_value = values[0]
162      else:
163          # Multiple values - join them
164          cell_value = ", ".join(str(v) for v in values[:3])  # Limit to first 3
165          if len(values) > 3:
166              cell_value += f", ... (+{len(values) - 3} more)"
167  
168      # Format specific fields
169      if field == "info.request_time" and cell_value != "N/A":
170          # Convert ISO timestamp to readable format
171          try:
172              dt = datetime.fromisoformat(str(cell_value).replace("Z", "+00:00"))
173              cell_value = dt.strftime("%Y-%m-%d %H:%M:%S %Z")
174          except Exception:
175              pass  # Keep original if conversion fails
176      elif field == "info.execution_duration_ms" and cell_value != "N/A" and cell_value is not None:
177          try:
178              duration_ms = float(cell_value)
179              if duration_ms < 1000:
180                  cell_value = f"{int(duration_ms)}ms"
181              else:
182                  cell_value = f"{duration_ms / 1000:.1f}s"
183          except (ValueError, TypeError):
184              pass  # Keep original if conversion fails
185      elif field in ["info.request_preview", "info.response_preview"]:
186          # Truncate previews to keep table readable
187          if len(str(cell_value)) > 20:
188              cell_value = str(cell_value)[:17] + "..."
189  
190      return str(cell_value)