source.py
  1  import requests, logging, re, datetime, time
  2  from urllib.parse import urlparse, parse_qsl
  3  from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
  4  from airbyte_cdk.sources import AbstractSource
  5  from airbyte_cdk.sources.streams import Stream
  6  from airbyte_cdk.sources.streams.http import HttpStream
  7  from airbyte_cdk.models import SyncMode
  8  
  9  class EtherscanStream(HttpStream):
 10      url_base = "https://api.etherscan.io/"
 11      WEI_DECIMALS = 18
 12      GWEI_DECIMALS = 9
 13      EMPTY_ADDRESS = "0x0000000000000000000000000000000000000000"
 14  
 15      primary_key = None
 16      cursor_field = []
 17  
 18      def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs):
 19          super().__init__()
 20          self.api_key = api_key
 21          self.wallets = wallets
 22          self.chain_id = chain_id
 23          self.pagination_offset = pagination_offset
 24          self.wallet_info = {
 25              wallet["address"]: {
 26                  "tags": wallet["tags"],
 27                  "name": wallet["name"]
 28              }
 29              for wallet in self.wallets
 30          }
 31          self.is_balance_stream = self.name.endswith('balance')
 32          self.sleep_seconds = sleep_seconds
 33          self.logger.info(f"{self.name} > Sleep per request: {self.sleep_seconds}s")
 34  
 35          yesterday = datetime.datetime.now().date() - datetime.timedelta(days=1)
 36          # Syncing since Ethereum first transaction
 37          start_date = yesterday if not backfill else datetime.date(year=2015, month=7, day=30)
 38  
 39          self.historical_mapping = {
 40              wallet["address"]: {
 41                  "start_date": start_date,
 42                  "end_date": yesterday,
 43              }
 44              for wallet in self.wallets
 45          }
 46  
 47      def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
 48  
 49          for wallet in self.wallets:
 50              selected = self.historical_mapping[wallet["address"]]
 51              msg = f"{self.name} > stream_slice: Fetching data for {wallet['name']}" + ("" if self.is_balance_stream else f" from {selected['start_date']} to {selected['end_date']}")
 52              self.logger.info(msg)
 53              time.sleep(self.sleep_seconds)
 54              yield {
 55                  "address": wallet["address"],
 56                  "name": wallet["name"],
 57                  "tags": wallet["tags"],
 58              }
 59  
 60      def path(self, **kwargs) -> str:
 61          return "v2/api"
 62  
 63      def backoff_time(self, response: requests.Response) -> Optional[float]:
 64          output: dict = response.json()
 65          result = output.get("result")
 66          if not result:
 67              return None
 68  
 69          seconds = 0
 70          if "rate limit reached" in str(result).lower():
 71              match = re.search(r"\((\d+)\s*/", str(result))
 72              seconds = int(match.group(1))
 73  
 74          elif self.is_balance_stream:
 75              seconds = 1
 76  
 77          self.logger.info(f"{self.name} > backoff_time: {seconds}s")
 78          return seconds
 79  
 80      def next_page_token(self, response: requests.Response):
 81  
 82          if self.is_balance_stream:
 83              return None
 84  
 85          result = response.json().get("result", [])
 86          if isinstance(result, str):
 87              seconds = self.backoff_time(response)
 88              time.sleep(seconds)
 89              # Retry same page
 90              current_page = int(self.get_params(response).get("page", 1))
 91              return {"page": current_page}
 92  
 93          if not result:
 94              return None
 95  
 96          params = self.get_params(response)
 97          current_page = int(params.get("page", 1))
 98          # Last page may have less records
 99          if len(result) < self.pagination_offset:
100              return None
101  
102          to_lower = lambda result: {key.lower(): value for key, value in result.items()}
103          earliest_date = self.to_datetime(to_lower(result[-1])["timestamp"])
104          params = {"page": current_page + 1} if self.is_valid(params["address"], earliest_date) else None
105          time.sleep(self.sleep_seconds)
106          return params
107  
108      def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
109          if not stream_slice:
110              return {}
111  
112          params = {
113              "chainid": self.chain_id,
114              "address": stream_slice["address"],
115              "apikey": self.api_key,
116              "sort": "desc",
117              "module": "account",
118              "offset": self.pagination_offset,
119              "page": next_page_token["page"] if next_page_token else 1
120          }
121          if self.is_balance_stream:
122              params.pop("sort")
123              params.pop("offset")
124              params.pop("page")
125  
126          self.logger.info(f"{self.name} > request_params: {params}")
127          return params
128  
129      def to_datetime(self, timestamp: str) -> datetime.datetime:
130          """
131          Convert the default Etherscan timestamp to a `datetime`
132          """
133          return datetime.datetime.fromtimestamp(int(timestamp))
134  
135      def is_valid(self, wallet_address: str, timestamp: datetime.datetime) -> bool:
136          """
137          Check if the transaction is between the specified wallet's start and end date
138          """
139          start_date = self.historical_mapping[wallet_address]["start_date"]
140          end_date = self.historical_mapping[wallet_address]["end_date"]
141          return timestamp.date() <= end_date and timestamp.date() >= start_date
142  
143      def has_finished(self, wallet_address: str, timestamp: datetime.datetime) -> bool:
144          """
145          Check if the given timestamp is before the start date.
146          If `"sort": "desc"` is modified then `start_date` should be replaced with `end_date`
147          """
148          start_date = self.historical_mapping[wallet_address]["start_date"]
149          return timestamp.date() < start_date
150  
151      def get_params(self, response: requests.Response) -> dict:
152          """
153          Get the parameters used for the GET request
154          """
155          parsed_url = urlparse(response.request.path_url)
156          return dict(parse_qsl(parsed_url.query))
157  
158      def camel_to_title(self, text: str) -> str:
159          """
160          Convert transaction Method information into etherscan.io format
161          """
162          if len(text) == 0:
163              return None
164  
165          name = text.split("(", 1)[0]
166          name = re.sub(r'(?<!^)(?=[A-Z])', ' ', name)
167          name = name.replace("_", " ")
168          return name.title()
169  
170      def get_transactions(self, response: requests.Response) -> list:
171          """
172          Get the transactions from the current request.
173          Avoids `TypeError: 'NoneType' object is not iterable`
174          """
175          data: dict = response.json()
176          txs: list[dict] = data.get("result", [])
177          txs = txs if txs else []
178          return txs
179  
180  class WalletTransactions(EtherscanStream):
181      """
182      A transaction where an EOA (Externally Owned Address, or typically referred to as a wallet address) sends ETH directly to another EOA.
183      When viewing an address on Etherscan, this type of transaction will be shown under the Transaction tab.
184      """
185      def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs):
186          super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs)
187  
188      def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
189          params = {
190              **super().request_params(stream_state, stream_slice, next_page_token),
191              "action": "txlist",
192          }
193          return params
194  
195      def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None):
196          txs = self.get_transactions(response)
197          params = self.get_params(response)
198          selected = self.wallet_info.get(params["address"], {})
199  
200          for trx in txs:
201              timestamp = self.to_datetime(trx["timeStamp"])
202              if self.has_finished(params["address"], timestamp):
203                  break
204  
205              if not self.is_valid(params["address"], timestamp):
206                  continue
207  
208              method_call = trx["functionName"] if len(trx["functionName"]) > 0 else None
209  
210              point = {
211                  "wallet_address": params["address"],
212                  "wallet_name": selected["name"],
213                  "tags": selected["tags"],
214                  "hash": trx["hash"],
215                  "block": int(trx["blockNumber"]),
216                  "timestamp": timestamp,
217                  "from_address": trx["from"],
218                  "movement": None,
219                  "to_address": trx["to"],
220                  "amount": trx["value"],
221                  "token_name": "Ethereum",
222                  "token_symbol": "ETH",
223                  "token_decimal": self.WEI_DECIMALS,
224                  "chain_id": int(self.chain_id),
225                  "gas_price": trx["gasPrice"],
226                  "gas_used": trx["gasUsed"],
227                  "gas_decimals": self.WEI_DECIMALS,
228                  "method_id": trx["methodId"],
229                  "method_call": method_call,
230                  "method_name": self.camel_to_title(method_call) if method_call else trx["methodId"],
231                  "is_error": bool(int(trx["isError"]))
232              }
233              if len(point["to_address"]) == 0:
234                  point["to_address"] = point["wallet_address"]
235  
236              if point["from_address"].lower() == point["wallet_address"].lower():
237                  point["movement"] = "out"
238              elif point["to_address"].lower() == point["wallet_address"].lower():
239                  point["movement"] = "in"
240  
241              yield point
242  
243  class WalletInternalTransactions(EtherscanStream):
244      """
245      This refers to a transfer of ETH that is carried out through a smart contract as an intermediary.
246      When viewing an address on Etherscan, this type of transaction will be shown under the Internal Txns tab
247      """
248      def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs):
249          super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs)
250  
251      def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
252          params = {
253              **super().request_params(stream_state, stream_slice, next_page_token),
254              "action": "txlistinternal",
255          }
256          return params
257  
258      def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None):
259          txs = self.get_transactions(response)
260          params = self.get_params(response)
261          selected = self.wallet_info.get(params["address"], {})
262  
263          for trx in txs:
264              timestamp = self.to_datetime(trx["timeStamp"])
265              if self.has_finished(params["address"], timestamp):
266                  break
267  
268              if not self.is_valid(params["address"], timestamp):
269                  continue
270  
271              point = {
272                  "wallet_address": params["address"],
273                  "wallet_name": selected["name"],
274                  "tags": selected["tags"],
275                  "hash": trx["hash"],
276                  "block": int(trx["blockNumber"]),
277                  "timestamp": timestamp,
278                  "from_address": trx["from"],
279                  "movement": None,
280                  "to_address": trx["to"],
281                  "amount": trx["value"],
282                  "token_name": "Ethereum",
283                  "token_symbol": "ETH",
284                  "token_decimal": self.WEI_DECIMALS,
285                  "chain_id": int(self.chain_id),
286                  "gas": trx["gas"],
287                  "gas_used": trx["gasUsed"],
288                  "gas_decimals": self.WEI_DECIMALS,
289                  "is_error": bool(int(trx["isError"]))
290              }
291              if len(point["to_address"]) == 0:
292                  point["to_address"] = point["wallet_address"]
293  
294              if point["from_address"].lower() == point["wallet_address"].lower():
295                  point["movement"] = "out"
296              elif point["to_address"].lower() == point["wallet_address"].lower():
297                  point["movement"] = "in"
298  
299              yield point
300  
301  class WalletTokenTransactions(EtherscanStream):
302      """
303      Transactions of ERC-20 or ERC-721 tokens are labelled as Token Transfer transactions.
304      When viewing an address on Etherscan, this type of transaction will be shown under either the Erc20 Token Txns or Erc721 Token Txns tab, depending on the respective token type.
305      """
306      def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs):
307          super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs)
308  
309      def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
310          params = {
311              **super().request_params(stream_state, stream_slice, next_page_token),
312              "action": "tokentx",
313          }
314          return params
315  
316      def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None):
317          txs = self.get_transactions(response)
318          params = self.get_params(response)
319          selected = self.wallet_info.get(params["address"], {})
320  
321          for trx in txs:
322              timestamp = self.to_datetime(trx["timeStamp"])
323              if self.has_finished(params["address"], timestamp):
324                  break
325  
326              if not self.is_valid(params["address"], timestamp):
327                  continue
328  
329              method_call = trx["functionName"] if len(trx["functionName"]) > 0 else None
330  
331              point = {
332                  "wallet_address": params["address"],
333                  "wallet_name": selected.get("name"),
334                  "tags": selected.get("tags"),
335                  "hash": trx["hash"],
336                  "method_id": trx["methodId"],
337                  "method_call": method_call,
338                  "method_name": self.camel_to_title(method_call) if method_call else trx["methodId"],
339                  "block": int(trx["blockNumber"]),
340                  "timestamp": timestamp,
341                  "from_address": trx["from"],
342                  "movement": None,
343                  "to_address": trx["to"],
344                  "amount": trx["value"],
345                  "token_name": trx["tokenName"],
346                  "token_symbol": trx["tokenSymbol"],
347                  "token_decimal": int(trx["tokenDecimal"]),
348                  "token_address": trx["contractAddress"],
349                  "gas_price": trx["gasPrice"],
350                  "gas_used": trx["gasUsed"],
351                  "gas_decimals": self.WEI_DECIMALS,
352                  "chain_id": int(self.chain_id),
353                  "is_error": False
354              }
355              if len(point["to_address"]) == 0:
356                  point["to_address"] = point["wallet_address"]
357  
358              if point["from_address"].lower() == point["wallet_address"].lower():
359                  point["movement"] = "out"
360              elif point["to_address"].lower() == point["wallet_address"].lower():
361                  point["movement"] = "in"
362  
363              yield point
364  
365  class NativeBalance(EtherscanStream):
366      """
367      Current native balance (ETH) for the wallets
368  
369      NOTE: Used for debugging purposes
370      """
371  
372      def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs):
373          super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs)
374  
375      def next_page_token(self, response: requests.Response):
376          time.sleep(self.sleep_seconds)
377          return None
378  
379      def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
380          params = {
381              **super().request_params(stream_state, stream_slice, next_page_token),
382              "action": "balance",
383          }
384          return params
385  
386      def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None):
387          data: dict = response.json()
388          params = self.get_params(response)
389          wallet_address = params["address"]
390          wallet = self.wallet_info[wallet_address]
391          point = {
392              "timestamp": datetime.datetime.now(),
393              "wallet_address": wallet_address,
394              "wallet_name": wallet["name"],
395              "tags": wallet["tags"],
396              "token_symbol": "ETH",
397              "token_decimal": self.WEI_DECIMALS,
398              "amount": data["result"],
399              "chain_id": int(self.chain_id)
400          }
401          yield point
402  
403  class TokenBalance(EtherscanStream):
404      """
405      Current ERC-20 token balance
406  
407      NOTE: Used for debugging purposes
408      """
409  
410      def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, tokens: list[dict[str, str]], **kwargs):
411          super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs)
412          self.tokens = tokens
413  
414      def next_page_token(self, response: requests.Response):
415          time.sleep(self.sleep_seconds)
416          return None
417  
418      def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
419          params = {
420              **super().request_params(stream_state, stream_slice, next_page_token),
421              "action": "tokenbalance",
422              "contractaddress": stream_slice["token_address"]
423          }
424          self.logger.info(f"{self.name} > request_params: {stream_slice['token_symbol']} ({stream_slice['token_address']})")
425          return params
426  
427      def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None) -> Iterable[Optional[Mapping[str, Any]]]:
428  
429          for current_stream_slice in super().stream_slices(sync_mode, cursor_field, stream_state):
430              for token in self.tokens:
431                  time.sleep(self.sleep_seconds)
432                  yield {
433                      **current_stream_slice,
434                      "token_symbol": token["name"],
435                      "token_address": token["address"]
436                  }
437  
438      def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None):
439          data: dict = response.json()
440          params = self.get_params(response)
441          wallet_address = params["address"]
442          wallet = self.wallet_info[wallet_address]
443          point = {
444              "timestamp": datetime.datetime.now(),
445              "wallet_address": wallet_address,
446              "wallet_name": wallet["name"],
447              "tags": wallet["tags"],
448              "amount": data["result"],
449              "token_symbol": stream_slice["token_symbol"],
450              "token_address": stream_slice["token_address"],
451              "token_decimal": self.WEI_DECIMALS,
452              "chain_id": int(self.chain_id)
453          }
454          yield point
455  
456  
457  class MinedBlocks(EtherscanStream):
458      """
459      Get newly minted ETH block rewards earned by the wallet
460      """
461      def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs):
462          super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs)
463  
464      def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
465          params = {
466              **super().request_params(stream_state, stream_slice, next_page_token),
467              "action": "getminedblocks",
468          }
469          return params
470  
471      def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None):
472          txs = self.get_transactions(response)
473          params = self.get_params(response)
474          selected = self.wallet_info.get(params["address"], {})
475  
476          for trx in txs:
477              timestamp = self.to_datetime(trx["timeStamp"])
478              if self.has_finished(params["address"], timestamp):
479                  break
480  
481              if not self.is_valid(params["address"], timestamp):
482                  continue
483  
484              point = {
485                  "wallet_address": params["address"],
486                  "wallet_name": selected.get("name"),
487                  "tags": selected.get("tags"),
488                  "block": int(trx["blockNumber"]),
489                  "timestamp": timestamp,
490                  "from_address": self.EMPTY_ADDRESS,
491                  "movement": "in",
492                  "to_address": params["address"],
493                  "amount": trx["blockReward"],
494                  "token_name": "Ethereum",
495                  "token_symbol": "ETH",
496                  "token_decimal": self.WEI_DECIMALS,
497                  "chain_id": int(self.chain_id),
498                  "is_error": False
499              }
500  
501              yield point
502  
503  class BeaconWithdrawals(EtherscanStream):
504  
505      def __init__(self, api_key: str, wallets: list[dict], chain_id: str, backfill: bool, sleep_seconds: int, pagination_offset: int, **kwargs):
506          super().__init__(api_key, wallets, chain_id, backfill, sleep_seconds, pagination_offset, **kwargs)
507  
508      def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None) -> MutableMapping[str, Any]:
509          params = {
510              **super().request_params(stream_state, stream_slice, next_page_token),
511              "action": "txsBeaconWithdrawal",
512          }
513          return params
514  
515      def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None):
516          txs = self.get_transactions(response)
517          params = self.get_params(response)
518          selected = self.wallet_info.get(params["address"], {})
519  
520          for trx in txs:
521              timestamp = self.to_datetime(trx["timestamp"])
522              if self.has_finished(params["address"], timestamp):
523                  break
524  
525              if not self.is_valid(params["address"], timestamp):
526                  continue
527  
528              point = {
529                  "wallet_address": params["address"],
530                  "wallet_name": selected.get("name"),
531                  "tags": selected.get("tags"),
532                  "block": int(trx["blockNumber"]),
533                  "timestamp": timestamp,
534                  "from_address": self.EMPTY_ADDRESS,
535                  "movement": "in",
536                  "to_address": params["address"],
537                  "amount": trx["amount"],
538                  "token_name": "Ethereum",
539                  "token_symbol": "ETH",
540                  "token_decimal": self.GWEI_DECIMALS,
541                  "chain_id": int(self.chain_id),
542                  "is_error": False,
543                  "withdrawal_index": trx["withdrawalIndex"],
544                  "validator_index": trx["validatorIndex"],
545              }
546  
547              yield point
548  
549  class SourceEtherscan(AbstractSource):
550  
551      url = "https://api.etherscan.io/v2/api"
552  
553      def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
554          logger.info(f"URL: {self.url}")
555          failed = []
556          wallets: list[dict] = config["wallets"]
557          for wallet in wallets:
558              params = {
559                  "chainid": config["chain_id"],
560                  "module": "account",
561                  "action": "balance",
562                  "address": wallet["address"]
563              }
564              logger.info(f"Params: {params}")
565              params["apikey"] = config["api_key"]
566              response = requests.get(self.url, params=params)
567              logger.info(f"Status Code: {response.status_code}")
568              if response.status_code == 200:
569                  continue
570  
571              failed.append(f"Failed connection check for {wallet}")
572  
573          return len(failed) == 0, "\n".join(failed) if failed else None
574  
575      def streams(self, config: Mapping[str, Any]) -> List[Stream]:
576          params = {
577              **config,
578              "wallets": [
579                  {
580                      "tags": wallet["tags"],
581                      "name": wallet["name"],
582                      # Etherscan addresses are always lowercase
583                      "address": wallet["address"]
584                  }
585                  for wallet in config["wallets"]
586              ]
587          }
588          streams = [
589              WalletTransactions(**params),
590              WalletInternalTransactions(**params),
591              WalletTokenTransactions(**params),
592              NativeBalance(**params),
593              TokenBalance(**params),
594              MinedBlocks(**params),
595              BeaconWithdrawals(**params)
596          ]
597          return streams