/ aprxc.py
aprxc.py
1 #!/usr/bin/env python 2 # 3 # Copyright © 2024 Fabian Neumann 4 # Licensed under the European Union Public Licence (EUPL). 5 # https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12 6 # SPDX-License-Identifier: EUPL-1.2 7 8 import argparse 9 import math 10 import sys 11 from collections import Counter 12 from collections.abc import Hashable, Iterable 13 from itertools import chain 14 from random import getrandbits 15 from textwrap import dedent 16 from typing import Self 17 18 __version__ = "1.1.3" 19 20 21 class ApproxiCount: 22 """ 23 A class to estimate the number of distinct elements in an iterable. 24 25 It uses the 'F0-Estimator' algorithm by S. Chakraborty, N. V. Vinodchandran 26 and K. S. Meel, as described in their 2023 paper "Distinct Elements in 27 Streams: An Algorithm for the (Text) Book" 28 (https://arxiv.org/pdf/2301.10191#section.2). 29 """ 30 31 def __init__( 32 self, 33 m: int = sys.maxsize, 34 *, 35 e: float = 0.1, 36 d: float = 0.1, 37 top: int = 0, 38 cheat: bool = True, 39 _debug: bool = False, 40 ) -> None: 41 42 self.n = min(m, int(math.ceil((12 / e**2) * math.log2((8 * m) / d)))) 43 self._round: int = 0 44 self._total: int = 0 45 self._memory: set[Hashable] = set() 46 47 self.cheat = cheat 48 self.top = top 49 self._counters: Counter[int] = Counter() 50 51 self._debug = _debug 52 self._mean_inacc = 0.0 53 self._max_inacc = 0.0 54 55 def count(self, item: Hashable) -> None: 56 self._total += 1 57 58 if getrandbits(self._round) == 0: 59 self._memory.add(item) 60 if self.top: 61 self._counters[item] += 2**self._round 62 else: 63 self._memory.discard(item) 64 65 if len(self._memory) == self.n: 66 self._round += 1 67 self._memory = {item for item in self._memory if getrandbits(1)} 68 if self.top: 69 self._counters = Counter(dict(self._counters.most_common(self.n))) 70 71 if self._debug: 72 self._print_debug() 73 74 def _print_debug(self) -> None: 75 inacc = abs((self._total - self.unique) / self._total) 76 self._mean_inacc = ( 77 (self._mean_inacc * (self._total - 1)) + inacc 78 ) / self._total 79 self._max_inacc = max(self._max_inacc, inacc) 80 if self._total % 50_000 == 0: 81 sys.stdout.write( 82 f"{self._total=} {self.unique=} {self._round=}" 83 f" {self.n} {len(self._memory)=}" 84 f" {inacc=:.2%} (mean: {self._mean_inacc:.3%} max: {self._max_inacc:.3%})" 85 "\n" 86 ) 87 88 @property 89 def unique(self) -> int: 90 # If `cheat` is True, we diverge slightly from the paper's algorithm: 91 # normally it overestimates in 50%, and underestimates in 50% of cases. 92 # But as we count the total number of items seen, we can use that as an 93 # upper bound of possible unique values. 94 result: int = int(len(self._memory) / (1 / 2 ** (self._round))) 95 return min(self._total, result) if self.cheat else result 96 97 def is_exact(self) -> bool: 98 # During the first round, i.e. before the first random cleanup of our 99 # memory set, our reported counts are exact. 100 return self._round == 0 101 102 def get_top(self) -> list[tuple[int, bytes]]: 103 # EXPERIMENTAL 104 return [(c, item) for item, c in self._counters.most_common(self.top)] 105 106 @classmethod 107 def from_iterable(cls, iterable: Iterable, /, **kw) -> Self: 108 inst = cls(**kw) 109 for x in iterable: 110 if inst._debug and inst._total > 10_000_000: # noqa: PLR2004 111 inst._print_debug() 112 break 113 inst.count(x) 114 return inst 115 116 117 Aprxc = ApproxiCount 118 119 120 def run() -> None: 121 parser = argparse.ArgumentParser( 122 prog="aprxc", 123 formatter_class=argparse.RawDescriptionHelpFormatter, 124 description=dedent( 125 """ 126 Estimate the number of distinct lines in a file or stream. 127 128 Motivation: 129 Easier to remember and always faster than `sort | uniq -c | wc -l`. 130 Uses a fixed amount of memory for huge datasets, unlike the 131 ever-growing footprint of `awk '!a[$0]++' | wc -l`. 132 Counts accurately for the first ~83k unique values (on 64-bit 133 systems), with a deviation of about 0.4-1% after that. 134 """ 135 ), 136 ) 137 parser.add_argument( 138 "path", 139 type=argparse.FileType("rb"), 140 default=[sys.stdin.buffer], 141 nargs="*", 142 help="Input file path(s) and/or '-' for stdin (default: stdin)", 143 ) 144 parser.add_argument( 145 "--top", 146 "-t", 147 type=int, 148 nargs="?", 149 const=10, 150 default=0, 151 metavar="X", 152 help="EXPERIMENTAL: Show X most common values. Off by default. If enabled, X defaults to 10.", 153 ) 154 parser.add_argument( 155 "--size", 156 "-s", 157 type=int, 158 default=sys.maxsize, 159 help="Total amount of data items, if known in advance. (Can be approximated.)", 160 ) 161 parser.add_argument("--epsilon", "-E", type=float, default=0.1) 162 parser.add_argument("--delta", "-D", type=float, default=0.1) 163 parser.add_argument( 164 "--cheat", 165 action=argparse.BooleanOptionalAction, 166 default=True, 167 help="Use 'total seen' number as upper bound for unique count.", 168 ) 169 parser.add_argument("--verbose", "-v", action="store_true") 170 parser.add_argument( 171 "--version", "-V", action="version", version=f"%(prog)s {__version__}" 172 ) 173 parser.add_argument("--debug", action="store_true") 174 175 config = parser.parse_args() 176 177 aprxc: ApproxiCount = ApproxiCount.from_iterable( 178 chain.from_iterable(config.path), 179 m=config.size, 180 e=config.epsilon, 181 d=config.delta, 182 top=config.top, 183 cheat=config.cheat, 184 _debug=config.debug, 185 ) 186 sys.stdout.write( 187 " ".join( 188 [ 189 str(aprxc.unique), 190 ( 191 ("(exact)" if aprxc.is_exact() else "(approximate)") 192 if config.verbose 193 else "" 194 ), 195 ] 196 ).strip() 197 ) 198 sys.stdout.write("\n") 199 if config.top: 200 sys.stdout.write(f"# {config.top} most common:\n") 201 for count, value in aprxc.get_top(): 202 s: str = value.decode("utf-8", "backslashreplace") 203 s = s.strip() 204 sys.stdout.write(f"{count!s} {s}\n") 205 206 207 if __name__ == "__main__": 208 run()