source.py
 1  from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
 2  from .stream import Transactions
 3  import logging
 4  import requests
 5  from airbyte_cdk.sources import AbstractSource
 6  from airbyte_cdk.sources.streams import Stream
 7  
 8  logger = logging.getLogger("airbyte")
 9  # Source
10  class SourceBitcoinExplorer(AbstractSource):
11      def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
12  
13          failed = []
14          for wallet in config["wallets"]:
15  
16              url = "https://blockchain.info/rawaddr/" + wallet["address"]
17              response = requests.get(url)
18  
19              if response.status_code != 200:
20                  failed.append(f"Could not make REST GET request {response.status_code} - {url}")
21                  continue
22              logger.info(f"Successfully connected to {url}")
23  
24          return len(failed) == 0, "\n".join(failed) if failed else None
25  
26      def streams(self, config: Mapping[str, Any]) -> List[Stream]:
27          return [
28              Transactions(wallets=config['wallets'])
29          ]