/ aprxc.py
aprxc.py
  1  #!/usr/bin/env python
  2  #
  3  # Copyright © 2024 Fabian Neumann
  4  # Licensed under the European Union Public Licence (EUPL).
  5  # https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
  6  # SPDX-License-Identifier: EUPL-1.2
  7  
  8  import argparse
  9  import math
 10  import sys
 11  from collections import Counter
 12  from collections.abc import Hashable, Iterable
 13  from itertools import chain
 14  from random import getrandbits
 15  from textwrap import dedent
 16  from typing import Self
 17  
 18  __version__ = "1.1.3"
 19  
 20  
 21  class ApproxiCount:
 22      """
 23      A class to estimate the number of distinct elements in an iterable.
 24  
 25      It uses the 'F0-Estimator' algorithm by S. Chakraborty, N. V. Vinodchandran
 26      and K. S. Meel, as described in their 2023 paper "Distinct Elements in
 27      Streams: An Algorithm for the (Text) Book"
 28      (https://arxiv.org/pdf/2301.10191#section.2).
 29      """
 30  
 31      def __init__(
 32          self,
 33          m: int = sys.maxsize,
 34          *,
 35          e: float = 0.1,
 36          d: float = 0.1,
 37          top: int = 0,
 38          cheat: bool = True,
 39          _debug: bool = False,
 40      ) -> None:
 41  
 42          self.n = min(m, int(math.ceil((12 / e**2) * math.log2((8 * m) / d))))
 43          self._round: int = 0
 44          self._total: int = 0
 45          self._memory: set[Hashable] = set()
 46  
 47          self.cheat = cheat
 48          self.top = top
 49          self._counters: Counter[int] = Counter()
 50  
 51          self._debug = _debug
 52          self._mean_inacc = 0.0
 53          self._max_inacc = 0.0
 54  
 55      def count(self, item: Hashable) -> None:
 56          self._total += 1
 57  
 58          if getrandbits(self._round) == 0:
 59              self._memory.add(item)
 60              if self.top:
 61                  self._counters[item] += 2**self._round
 62          else:
 63              self._memory.discard(item)
 64  
 65          if len(self._memory) == self.n:
 66              self._round += 1
 67              self._memory = {item for item in self._memory if getrandbits(1)}
 68              if self.top:
 69                  self._counters = Counter(dict(self._counters.most_common(self.n)))
 70  
 71          if self._debug:
 72              self._print_debug()
 73  
 74      def _print_debug(self) -> None:
 75          inacc = abs((self._total - self.unique) / self._total)
 76          self._mean_inacc = (
 77              (self._mean_inacc * (self._total - 1)) + inacc
 78          ) / self._total
 79          self._max_inacc = max(self._max_inacc, inacc)
 80          if self._total % 50_000 == 0:
 81              sys.stdout.write(
 82                  f"{self._total=} {self.unique=} {self._round=}"
 83                  f" {self.n} {len(self._memory)=}"
 84                  f" {inacc=:.2%} (mean: {self._mean_inacc:.3%} max: {self._max_inacc:.3%})"
 85                  "\n"
 86              )
 87  
 88      @property
 89      def unique(self) -> int:
 90          # If `cheat` is True, we diverge slightly from the paper's algorithm:
 91          # normally it overestimates in 50%, and underestimates in 50% of cases.
 92          # But as we count the total number of items seen, we can use that as an
 93          # upper bound of possible unique values.
 94          result: int = int(len(self._memory) / (1 / 2 ** (self._round)))
 95          return min(self._total, result) if self.cheat else result
 96  
 97      def is_exact(self) -> bool:
 98          # During the first round, i.e. before the first random cleanup of our
 99          # memory set, our reported counts are exact.
100          return self._round == 0
101  
102      def get_top(self) -> list[tuple[int, bytes]]:
103          # EXPERIMENTAL
104          return [(c, item) for item, c in self._counters.most_common(self.top)]
105  
106      @classmethod
107      def from_iterable(cls, iterable: Iterable, /, **kw) -> Self:
108          inst = cls(**kw)
109          for x in iterable:
110              if inst._debug and inst._total > 10_000_000:  # noqa: PLR2004
111                  inst._print_debug()
112                  break
113              inst.count(x)
114          return inst
115  
116  
117  Aprxc = ApproxiCount
118  
119  
120  def run() -> None:
121      parser = argparse.ArgumentParser(
122          prog="aprxc",
123          formatter_class=argparse.RawDescriptionHelpFormatter,
124          description=dedent(
125              """
126              Estimate the number of distinct lines in a file or stream.
127  
128              Motivation:
129              Easier to remember and always faster than `sort | uniq -c | wc -l`.
130              Uses a fixed amount of memory for huge datasets, unlike the
131              ever-growing footprint of `awk '!a[$0]++' | wc -l`.
132              Counts accurately for the first ~83k unique values (on 64-bit
133              systems), with a deviation of about 0.4-1% after that.
134              """
135          ),
136      )
137      parser.add_argument(
138          "path",
139          type=argparse.FileType("rb"),
140          default=[sys.stdin.buffer],
141          nargs="*",
142          help="Input file path(s) and/or '-' for stdin (default: stdin)",
143      )
144      parser.add_argument(
145          "--top",
146          "-t",
147          type=int,
148          nargs="?",
149          const=10,
150          default=0,
151          metavar="X",
152          help="EXPERIMENTAL: Show X most common values. Off by default. If enabled, X defaults to 10.",
153      )
154      parser.add_argument(
155          "--size",
156          "-s",
157          type=int,
158          default=sys.maxsize,
159          help="Total amount of data items, if known in advance. (Can be approximated.)",
160      )
161      parser.add_argument("--epsilon", "-E", type=float, default=0.1)
162      parser.add_argument("--delta", "-D", type=float, default=0.1)
163      parser.add_argument(
164          "--cheat",
165          action=argparse.BooleanOptionalAction,
166          default=True,
167          help="Use 'total seen' number as upper bound for unique count.",
168      )
169      parser.add_argument("--verbose", "-v", action="store_true")
170      parser.add_argument(
171          "--version", "-V", action="version", version=f"%(prog)s {__version__}"
172      )
173      parser.add_argument("--debug", action="store_true")
174  
175      config = parser.parse_args()
176  
177      aprxc: ApproxiCount = ApproxiCount.from_iterable(
178          chain.from_iterable(config.path),
179          m=config.size,
180          e=config.epsilon,
181          d=config.delta,
182          top=config.top,
183          cheat=config.cheat,
184          _debug=config.debug,
185      )
186      sys.stdout.write(
187          " ".join(
188              [
189                  str(aprxc.unique),
190                  (
191                      ("(exact)" if aprxc.is_exact() else "(approximate)")
192                      if config.verbose
193                      else ""
194                  ),
195              ]
196          ).strip()
197      )
198      sys.stdout.write("\n")
199      if config.top:
200          sys.stdout.write(f"# {config.top} most common:\n")
201          for count, value in aprxc.get_top():
202              s: str = value.decode("utf-8", "backslashreplace")
203              s = s.strip()
204              sys.stdout.write(f"{count!s} {s}\n")
205  
206  
207  if __name__ == "__main__":
208      run()