source.py
1 from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple 2 from .stream import Transactions 3 import logging 4 import requests 5 from airbyte_cdk.sources import AbstractSource 6 from airbyte_cdk.sources.streams import Stream 7 8 logger = logging.getLogger("airbyte") 9 # Source 10 class SourceBitcoinExplorer(AbstractSource): 11 def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: 12 13 failed = [] 14 for wallet in config["wallets"]: 15 16 url = "https://blockchain.info/rawaddr/" + wallet["address"] 17 response = requests.get(url) 18 19 if response.status_code != 200: 20 failed.append(f"Could not make REST GET request {response.status_code} - {url}") 21 continue 22 logger.info(f"Successfully connected to {url}") 23 24 return len(failed) == 0, "\n".join(failed) if failed else None 25 26 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 27 return [ 28 Transactions(wallets=config['wallets']) 29 ]