source.py
1 import requests, logging, re, datetime, time 2 from urllib.parse import urlparse, parse_qsl 3 from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple 4 from airbyte_cdk.sources import AbstractSource 5 from airbyte_cdk.sources.streams import Stream 6 from airbyte_cdk.sources.streams.http import HttpStream 7 from airbyte_cdk.models import SyncMode 8 9 class EtherscanStream(HttpStream): 10 url_base = "https://api.etherscan.io/" 11 WEI_DECIMALS = 18 12 GWEI_DECIMALS = 9 13 EMPTY_ADDRESS = "0x0000000000000000000000000000000000000000" 14 15 primary_key = None 16 cursor_field = [] 17 18 def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs): 19 super().__init__() 20 self.api_key = api_key 21 self.wallets = wallets 22 self.chain_id = chain_id 23 self.pagination_offset = pagination_offset 24 self.wallet_info = { 25 wallet["address"]: { 26 "tags": wallet["tags"], 27 "name": wallet["name"] 28 } 29 for wallet in self.wallets 30 } 31 self.is_balance_stream = self.name.endswith('balance') 32 self.sleep_seconds = sleep_seconds 33 self.logger.info(f"{self.name} > Sleep per request: {self.sleep_seconds}s") 34 35 yesterday = datetime.datetime.now().date() - datetime.timedelta(days=1) 36 # Syncing since Ethereum first transaction 37 start_date = yesterday if not backfill else datetime.date(year=2015, month=7, day=30) 38 39 self.historical_mapping = { 40 wallet["address"]: { 41 "start_date": start_date, 42 "end_date": yesterday, 43 } 44 for wallet in self.wallets 45 } 46 47 def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]: 48 49 for wallet in self.wallets: 50 selected = self.historical_mapping[wallet["address"]] 51 msg = f"{self.name} > stream_slice: Fetching data for {wallet['name']}" + ("" if self.is_balance_stream else f" from {selected['start_date']} to {selected['end_date']}") 52 self.logger.info(msg) 53 time.sleep(self.sleep_seconds) 54 yield { 55 "address": wallet["address"], 56 "name": wallet["name"], 57 "tags": wallet["tags"], 58 } 59 60 def path(self, **kwargs) -> str: 61 return "v2/api" 62 63 def backoff_time(self, response: requests.Response) -> Optional[float]: 64 output: dict = response.json() 65 result = output.get("result") 66 if not result: 67 return None 68 69 seconds = 0 70 if "rate limit reached" in str(result).lower(): 71 match = re.search(r"\((\d+)\s*/", str(result)) 72 seconds = int(match.group(1)) 73 74 elif self.is_balance_stream: 75 seconds = 1 76 77 self.logger.info(f"{self.name} > backoff_time: {seconds}s") 78 return seconds 79 80 def next_page_token(self, response: requests.Response): 81 82 if self.is_balance_stream: 83 return None 84 85 result = response.json().get("result", []) 86 if isinstance(result, str): 87 seconds = self.backoff_time(response) 88 time.sleep(seconds) 89 # Retry same page 90 current_page = int(self.get_params(response).get("page", 1)) 91 return {"page": current_page} 92 93 if not result: 94 return None 95 96 params = self.get_params(response) 97 current_page = int(params.get("page", 1)) 98 # Last page may have less records 99 if len(result) < self.pagination_offset: 100 return None 101 102 to_lower = lambda result: {key.lower(): value for key, value in result.items()} 103 earliest_date = self.to_datetime(to_lower(result[-1])["timestamp"]) 104 params = {"page": current_page + 1} if self.is_valid(params["address"], earliest_date) else None 105 time.sleep(self.sleep_seconds) 106 return params 107 108 def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: 109 if not stream_slice: 110 return {} 111 112 params = { 113 "chainid": self.chain_id, 114 "address": stream_slice["address"], 115 "apikey": self.api_key, 116 "sort": "desc", 117 "module": "account", 118 "offset": self.pagination_offset, 119 "page": next_page_token["page"] if next_page_token else 1 120 } 121 if self.is_balance_stream: 122 params.pop("sort") 123 params.pop("offset") 124 params.pop("page") 125 126 self.logger.info(f"{self.name} > request_params: {params}") 127 return params 128 129 def to_datetime(self, timestamp: str) -> datetime.datetime: 130 """ 131 Convert the default Etherscan timestamp to a `datetime` 132 """ 133 return datetime.datetime.fromtimestamp(int(timestamp)) 134 135 def is_valid(self, wallet_address: str, timestamp: datetime.datetime) -> bool: 136 """ 137 Check if the transaction is between the specified wallet's start and end date 138 """ 139 start_date = self.historical_mapping[wallet_address]["start_date"] 140 end_date = self.historical_mapping[wallet_address]["end_date"] 141 return timestamp.date() <= end_date and timestamp.date() >= start_date 142 143 def has_finished(self, wallet_address: str, timestamp: datetime.datetime) -> bool: 144 """ 145 Check if the given timestamp is before the start date. 146 If `"sort": "desc"` is modified then `start_date` should be replaced with `end_date` 147 """ 148 start_date = self.historical_mapping[wallet_address]["start_date"] 149 return timestamp.date() < start_date 150 151 def get_params(self, response: requests.Response) -> dict: 152 """ 153 Get the parameters used for the GET request 154 """ 155 parsed_url = urlparse(response.request.path_url) 156 return dict(parse_qsl(parsed_url.query)) 157 158 def camel_to_title(self, text: str) -> str: 159 """ 160 Convert transaction Method information into etherscan.io format 161 """ 162 if len(text) == 0: 163 return None 164 165 name = text.split("(", 1)[0] 166 name = re.sub(r'(?<!^)(?=[A-Z])', ' ', name) 167 name = name.replace("_", " ") 168 return name.title() 169 170 def get_transactions(self, response: requests.Response) -> list: 171 """ 172 Get the transactions from the current request. 173 Avoids `TypeError: 'NoneType' object is not iterable` 174 """ 175 data: dict = response.json() 176 txs: list[dict] = data.get("result", []) 177 txs = txs if txs else [] 178 return txs 179 180 class WalletTransactions(EtherscanStream): 181 """ 182 A transaction where an EOA (Externally Owned Address, or typically referred to as a wallet address) sends ETH directly to another EOA. 183 When viewing an address on Etherscan, this type of transaction will be shown under the Transaction tab. 184 """ 185 def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs): 186 super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs) 187 188 def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: 189 params = { 190 **super().request_params(stream_state, stream_slice, next_page_token), 191 "action": "txlist", 192 } 193 return params 194 195 def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): 196 txs = self.get_transactions(response) 197 params = self.get_params(response) 198 selected = self.wallet_info.get(params["address"], {}) 199 200 for trx in txs: 201 timestamp = self.to_datetime(trx["timeStamp"]) 202 if self.has_finished(params["address"], timestamp): 203 break 204 205 if not self.is_valid(params["address"], timestamp): 206 continue 207 208 method_call = trx["functionName"] if len(trx["functionName"]) > 0 else None 209 210 point = { 211 "wallet_address": params["address"], 212 "wallet_name": selected["name"], 213 "tags": selected["tags"], 214 "hash": trx["hash"], 215 "block": int(trx["blockNumber"]), 216 "timestamp": timestamp, 217 "from_address": trx["from"], 218 "movement": None, 219 "to_address": trx["to"], 220 "amount": trx["value"], 221 "token_name": "Ethereum", 222 "token_symbol": "ETH", 223 "token_decimal": self.WEI_DECIMALS, 224 "chain_id": int(self.chain_id), 225 "gas_price": trx["gasPrice"], 226 "gas_used": trx["gasUsed"], 227 "gas_decimals": self.WEI_DECIMALS, 228 "method_id": trx["methodId"], 229 "method_call": method_call, 230 "method_name": self.camel_to_title(method_call) if method_call else trx["methodId"], 231 "is_error": bool(int(trx["isError"])) 232 } 233 if len(point["to_address"]) == 0: 234 point["to_address"] = point["wallet_address"] 235 236 if point["from_address"].lower() == point["wallet_address"].lower(): 237 point["movement"] = "out" 238 elif point["to_address"].lower() == point["wallet_address"].lower(): 239 point["movement"] = "in" 240 241 yield point 242 243 class WalletInternalTransactions(EtherscanStream): 244 """ 245 This refers to a transfer of ETH that is carried out through a smart contract as an intermediary. 246 When viewing an address on Etherscan, this type of transaction will be shown under the Internal Txns tab 247 """ 248 def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs): 249 super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs) 250 251 def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: 252 params = { 253 **super().request_params(stream_state, stream_slice, next_page_token), 254 "action": "txlistinternal", 255 } 256 return params 257 258 def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): 259 txs = self.get_transactions(response) 260 params = self.get_params(response) 261 selected = self.wallet_info.get(params["address"], {}) 262 263 for trx in txs: 264 timestamp = self.to_datetime(trx["timeStamp"]) 265 if self.has_finished(params["address"], timestamp): 266 break 267 268 if not self.is_valid(params["address"], timestamp): 269 continue 270 271 point = { 272 "wallet_address": params["address"], 273 "wallet_name": selected["name"], 274 "tags": selected["tags"], 275 "hash": trx["hash"], 276 "block": int(trx["blockNumber"]), 277 "timestamp": timestamp, 278 "from_address": trx["from"], 279 "movement": None, 280 "to_address": trx["to"], 281 "amount": trx["value"], 282 "token_name": "Ethereum", 283 "token_symbol": "ETH", 284 "token_decimal": self.WEI_DECIMALS, 285 "chain_id": int(self.chain_id), 286 "gas": trx["gas"], 287 "gas_used": trx["gasUsed"], 288 "gas_decimals": self.WEI_DECIMALS, 289 "is_error": bool(int(trx["isError"])) 290 } 291 if len(point["to_address"]) == 0: 292 point["to_address"] = point["wallet_address"] 293 294 if point["from_address"].lower() == point["wallet_address"].lower(): 295 point["movement"] = "out" 296 elif point["to_address"].lower() == point["wallet_address"].lower(): 297 point["movement"] = "in" 298 299 yield point 300 301 class WalletTokenTransactions(EtherscanStream): 302 """ 303 Transactions of ERC-20 or ERC-721 tokens are labelled as Token Transfer transactions. 304 When viewing an address on Etherscan, this type of transaction will be shown under either the Erc20 Token Txns or Erc721 Token Txns tab, depending on the respective token type. 305 """ 306 def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs): 307 super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs) 308 309 def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: 310 params = { 311 **super().request_params(stream_state, stream_slice, next_page_token), 312 "action": "tokentx", 313 } 314 return params 315 316 def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): 317 txs = self.get_transactions(response) 318 params = self.get_params(response) 319 selected = self.wallet_info.get(params["address"], {}) 320 321 for trx in txs: 322 timestamp = self.to_datetime(trx["timeStamp"]) 323 if self.has_finished(params["address"], timestamp): 324 break 325 326 if not self.is_valid(params["address"], timestamp): 327 continue 328 329 method_call = trx["functionName"] if len(trx["functionName"]) > 0 else None 330 331 point = { 332 "wallet_address": params["address"], 333 "wallet_name": selected.get("name"), 334 "tags": selected.get("tags"), 335 "hash": trx["hash"], 336 "method_id": trx["methodId"], 337 "method_call": method_call, 338 "method_name": self.camel_to_title(method_call) if method_call else trx["methodId"], 339 "block": int(trx["blockNumber"]), 340 "timestamp": timestamp, 341 "from_address": trx["from"], 342 "movement": None, 343 "to_address": trx["to"], 344 "amount": trx["value"], 345 "token_name": trx["tokenName"], 346 "token_symbol": trx["tokenSymbol"], 347 "token_decimal": int(trx["tokenDecimal"]), 348 "token_address": trx["contractAddress"], 349 "gas_price": trx["gasPrice"], 350 "gas_used": trx["gasUsed"], 351 "gas_decimals": self.WEI_DECIMALS, 352 "chain_id": int(self.chain_id), 353 "is_error": False 354 } 355 if len(point["to_address"]) == 0: 356 point["to_address"] = point["wallet_address"] 357 358 if point["from_address"].lower() == point["wallet_address"].lower(): 359 point["movement"] = "out" 360 elif point["to_address"].lower() == point["wallet_address"].lower(): 361 point["movement"] = "in" 362 363 yield point 364 365 class NativeBalance(EtherscanStream): 366 """ 367 Current native balance (ETH) for the wallets 368 369 NOTE: Used for debugging purposes 370 """ 371 372 def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs): 373 super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs) 374 375 def next_page_token(self, response: requests.Response): 376 time.sleep(self.sleep_seconds) 377 return None 378 379 def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: 380 params = { 381 **super().request_params(stream_state, stream_slice, next_page_token), 382 "action": "balance", 383 } 384 return params 385 386 def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): 387 data: dict = response.json() 388 params = self.get_params(response) 389 wallet_address = params["address"] 390 wallet = self.wallet_info[wallet_address] 391 point = { 392 "timestamp": datetime.datetime.now(), 393 "wallet_address": wallet_address, 394 "wallet_name": wallet["name"], 395 "tags": wallet["tags"], 396 "token_symbol": "ETH", 397 "token_decimal": self.WEI_DECIMALS, 398 "amount": data["result"], 399 "chain_id": int(self.chain_id) 400 } 401 yield point 402 403 class TokenBalance(EtherscanStream): 404 """ 405 Current ERC-20 token balance 406 407 NOTE: Used for debugging purposes 408 """ 409 410 def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, tokens: list[dict[str, str]], **kwargs): 411 super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs) 412 self.tokens = tokens 413 414 def next_page_token(self, response: requests.Response): 415 time.sleep(self.sleep_seconds) 416 return None 417 418 def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: 419 params = { 420 **super().request_params(stream_state, stream_slice, next_page_token), 421 "action": "tokenbalance", 422 "contractaddress": stream_slice["token_address"] 423 } 424 self.logger.info(f"{self.name} > request_params: {stream_slice['token_symbol']} ({stream_slice['token_address']})") 425 return params 426 427 def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]: 428 429 for current_stream_slice in super().stream_slices(sync_mode, cursor_field, stream_state): 430 for token in self.tokens: 431 time.sleep(self.sleep_seconds) 432 yield { 433 **current_stream_slice, 434 "token_symbol": token["name"], 435 "token_address": token["address"] 436 } 437 438 def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): 439 data: dict = response.json() 440 params = self.get_params(response) 441 wallet_address = params["address"] 442 wallet = self.wallet_info[wallet_address] 443 point = { 444 "timestamp": datetime.datetime.now(), 445 "wallet_address": wallet_address, 446 "wallet_name": wallet["name"], 447 "tags": wallet["tags"], 448 "amount": data["result"], 449 "token_symbol": stream_slice["token_symbol"], 450 "token_address": stream_slice["token_address"], 451 "token_decimal": self.WEI_DECIMALS, 452 "chain_id": int(self.chain_id) 453 } 454 yield point 455 456 457 class MinedBlocks(EtherscanStream): 458 """ 459 Get newly minted ETH block rewards earned by the wallet 460 """ 461 def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs): 462 super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs) 463 464 def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: 465 params = { 466 **super().request_params(stream_state, stream_slice, next_page_token), 467 "action": "getminedblocks", 468 } 469 return params 470 471 def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): 472 txs = self.get_transactions(response) 473 params = self.get_params(response) 474 selected = self.wallet_info.get(params["address"], {}) 475 476 for trx in txs: 477 timestamp = self.to_datetime(trx["timeStamp"]) 478 if self.has_finished(params["address"], timestamp): 479 break 480 481 if not self.is_valid(params["address"], timestamp): 482 continue 483 484 point = { 485 "wallet_address": params["address"], 486 "wallet_name": selected.get("name"), 487 "tags": selected.get("tags"), 488 "block": int(trx["blockNumber"]), 489 "timestamp": timestamp, 490 "from_address": self.EMPTY_ADDRESS, 491 "movement": "in", 492 "to_address": params["address"], 493 "amount": trx["blockReward"], 494 "token_name": "Ethereum", 495 "token_symbol": "ETH", 496 "token_decimal": self.WEI_DECIMALS, 497 "chain_id": int(self.chain_id), 498 "is_error": False 499 } 500 501 yield point 502 503 class BeaconWithdrawals(EtherscanStream): 504 505 def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs): 506 super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs) 507 508 def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]: 509 params = { 510 **super().request_params(stream_state, stream_slice, next_page_token), 511 "action": "txsBeaconWithdrawal", 512 } 513 return params 514 515 def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): 516 txs = self.get_transactions(response) 517 params = self.get_params(response) 518 selected = self.wallet_info.get(params["address"], {}) 519 520 for trx in txs: 521 timestamp = self.to_datetime(trx["timestamp"]) 522 if self.has_finished(params["address"], timestamp): 523 break 524 525 if not self.is_valid(params["address"], timestamp): 526 continue 527 528 point = { 529 "wallet_address": params["address"], 530 "wallet_name": selected.get("name"), 531 "tags": selected.get("tags"), 532 "block": int(trx["blockNumber"]), 533 "timestamp": timestamp, 534 "from_address": self.EMPTY_ADDRESS, 535 "movement": "in", 536 "to_address": params["address"], 537 "amount": trx["amount"], 538 "token_name": "Ethereum", 539 "token_symbol": "ETH", 540 "token_decimal": self.GWEI_DECIMALS, 541 "chain_id": int(self.chain_id), 542 "is_error": False, 543 "withdrawal_index": trx["withdrawalIndex"], 544 "validator_index": trx["validatorIndex"], 545 } 546 547 yield point 548 549 class SourceEtherscan(AbstractSource): 550 551 url = "https://api.etherscan.io/v2/api" 552 553 def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]: 554 logger.info(f"URL: {self.url}") 555 failed = [] 556 wallets: list[dict] = config["wallets"] 557 for wallet in wallets: 558 params = { 559 "chainid": config["chain_id"], 560 "module": "account", 561 "action": "balance", 562 "address": wallet["address"] 563 } 564 logger.info(f"Params: {params}") 565 params["apikey"] = config["api_key"] 566 response = requests.get(self.url, params=params) 567 logger.info(f"Status Code: {response.status_code}") 568 if response.status_code == 200: 569 continue 570 571 failed.append(f"Failed connection check for {wallet}") 572 573 return len(failed) == 0, "\n".join(failed) if failed else None 574 575 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 576 params = { 577 **config, 578 "wallets": [ 579 { 580 "tags": wallet["tags"], 581 "name": wallet["name"], 582 # Etherscan addresses are always lowercase 583 "address": wallet["address"] 584 } 585 for wallet in config["wallets"] 586 ] 587 } 588 streams = [ 589 WalletTransactions(**params), 590 WalletInternalTransactions(**params), 591 WalletTokenTransactions(**params), 592 NativeBalance(**params), 593 TokenBalance(**params), 594 MinedBlocks(**params), 595 BeaconWithdrawals(**params) 596 ] 597 return streams