source.py
1 from typing import Any, Iterable, List, Mapping, Optional, Tuple 2 from airbyte_cdk.sources import AbstractSource 3 from airbyte_cdk.sources.streams import Stream 4 from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream 5 from airbyte_cdk.models.airbyte_protocol import SyncMode 6 import time 7 import requests 8 import logging 9 import os 10 11 logger = logging.getLogger("airbyte") 12 13 class ApiStream(HttpStream): 14 15 primary_key: Optional[str] = None 16 17 def __init__(self, url: str): 18 super().__init__() 19 self.__url_base = url + ("api/v2" if url.endswith("/") else "/api/v2") 20 21 @property 22 def url_base(self) -> str: 23 return self.__url_base 24 25 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 26 return None 27 28 29 30 class Stats(ApiStream): 31 32 def __init__(self, url: str, **kwargs): 33 super().__init__(url) 34 35 def path(self, **kwargs): 36 return f"{self.url_base}/stats" 37 38 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: 39 data: dict[str, Any] = response.json() 40 yield data 41 42 43 44 class Blocks(ApiStream): 45 46 def __init__(self, url: str, starting_block: int, blocks_to_do: int = 0, limit: int = 9): 47 """ 48 Parameters: 49 - `url` - the base URL of the API 50 - `starting_block` - the block that will terminate the REST API pagination. 51 NOTE: You start from the current block on the blockchain 52 and go traverse backwards until `starting_block` is reached 53 - `blocks_to_do` - number of block pages that will be fetched. If 0, then all missing 54 blocks will be fetched. Put a POSITIVE value to 55 - `limit` - the number of REST API requests that can be made before a small pause is put 56 """ 57 super().__init__(url) 58 self.__starting_block = starting_block 59 self.__highest_block = starting_block 60 self.__class_name = self.__class__.__name__ 61 self.completed = 0 62 self.blocks_to_do = blocks_to_do 63 64 if blocks_to_do > 0: 65 logger.info(f"Blocks to do: {blocks_to_do}") 66 67 self.limit = limit 68 self.count = 0 69 70 71 72 @property 73 def use_cache(self) -> bool: 74 return True 75 76 77 78 @property 79 def starting_block(self) -> int: 80 """ 81 Custom Airibyte variable 82 """ 83 return self.__starting_block 84 85 86 87 @property 88 def highest_block(self) -> int: 89 """ 90 Custom Airibyte variable 91 """ 92 return self.__highest_block 93 94 95 96 def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: 97 98 data: dict = response.json() 99 next_page_params = data.get("next_page_params") 100 items = data.get("items") 101 102 debug_mode = self.completed > self.blocks_to_do and self.blocks_to_do > 0 103 if not items or debug_mode: 104 message = f"no 'items' in 'next_page_params' to process" if not debug_mode else f"DEBUG MODE. Completed {self.blocks_to_do} blocks" 105 logger.info(f"{self.__class_name}: {message}") 106 return None 107 108 self.completed += 1 109 110 biggest_block = items[0]["height"] 111 smallest_block = items[-1]["height"] 112 113 if smallest_block == 0 or biggest_block <= self.__starting_block: 114 logger.info(f"starting_block: {self.starting_block}") 115 logger.info(f"highest_block: {self.highest_block}") 116 self.__starting_block = self.__highest_block 117 return None 118 119 return next_page_params 120 121 122 123 def path(self, **kwargs): 124 url = f"{self.url_base}/blocks" 125 return url 126 127 128 129 def request_params(self, stream_state: Optional[dict[str, Any]], stream_slice: Optional[dict[str, Any]] = None, next_page_token: Optional[dict[str, Any]] = None): 130 return next_page_token 131 132 133 134 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: 135 data: dict[str, Any] = response.json() 136 items: Optional[list[dict]] = data.get("items") 137 largest_block: int = items[0]["height"] 138 139 for item in items: 140 141 if item["height"] <= self.__starting_block: 142 logger.info(f"Stopping execution - item[\"height\"] is {item['height']} and the starting_block is {self.__starting_block}") 143 break 144 145 yield item 146 147 smallest_block = items[-1]['height'] 148 logger.info(f"Processed {largest_block - smallest_block} blocks ({smallest_block} to {largest_block})") 149 150 if self.__highest_block < largest_block: 151 logger.info(f"Updated highest_block from {self.highest_block} to {largest_block} ({largest_block - self.__highest_block} blocks difference)") 152 self.__highest_block = largest_block 153 154 self.count += 1 155 156 if self.count == self.limit: 157 logger.info(f"Made {self.count} requests. Sleeping for 1s") 158 self.count = 0 159 time.sleep(1) 160 161 162 163 class Transactions(HttpSubStream, ApiStream): 164 165 def __init__(self, **kwargs): 166 super().__init__(parent=Blocks, url=kwargs["url"]) 167 self.parent = Blocks(**kwargs) 168 self.__class_name = self.__class__.__name__ 169 self.limit = kwargs["limit"] 170 self.count = 0 171 172 173 def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): 174 block: dict = stream_slice.get("parent") 175 block_id = block["hash"] 176 url = f"{self.url_base}/blocks/{block_id}/transactions" 177 return url 178 179 180 181 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: 182 183 data: dict = response.json() 184 items: Optional[list[dict]] = data.get("items", []) 185 186 url = response.url 187 start_prefix = "/blocks/" 188 start = url.index(start_prefix) + len(start_prefix) 189 190 end_prefix = "/transactions" 191 end = url.index(end_prefix) 192 193 block_id = url[start:end] 194 logger.info(f"{self.__class_name}: Block {block_id} has {len(items)} transactions") 195 196 for item in items: 197 yield {**item, "block_hash": block_id} 198 199 self.count += 1 200 201 if self.count == self.limit: 202 logger.info(f"Made {self.count} requests. Sleeping for 1s") 203 self.count = 0 204 time.sleep(1) 205 206 207 class SourceBlockchainExplorer(AbstractSource): 208 209 # Store the current block for the next Airbyte run 210 starting_block_path: str = "/data/starting_block.txt" 211 212 def __init__(self): 213 super().__init__() 214 self.__blocks: Optional[Blocks] = None 215 216 217 218 @property 219 def blocks(self) -> Blocks: 220 """ 221 Custom Airibyte variable 222 """ 223 if not self.__blocks: 224 raise Exception("Variable cannot be used before airbyte_cdk.entrypoint.launch runs") 225 226 return self.__blocks 227 228 229 230 def check_connection(self, logger: logging.Logger, config: dict) -> Tuple[bool, Any]: 231 232 response = requests.get(config["url_base"] + "/stats") 233 234 success = True 235 message = f"Successfully connected to {config['url_base']}" 236 try: 237 response.raise_for_status() 238 except Exception as e: 239 success = False 240 message = str(e) 241 242 return success, message 243 244 245 246 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 247 248 params = { 249 "url": config["url_base"], 250 "limit": 9 # Based on https://docs.blockscout.com/setup/env-variables/backend-env-variables#api-rate-limits (API_RATE_LIMIT) 251 } 252 253 starting_block = 0 254 if not os.path.exists(self.starting_block_path): 255 self.set_block(starting_block) 256 logger.info(f"Created text file {self.starting_block_path}") 257 else: 258 starting_block = self.get_block() 259 logger.info(f"Loaded text file from volume - {self.starting_block_path}") 260 261 logger.info(f"starting_block: {starting_block}") 262 263 blocks_params = { 264 **params, 265 "starting_block": starting_block, 266 "blocks_to_do": config["blocks_to_do"] 267 } 268 self.__blocks = Blocks(**blocks_params) 269 return [Stats(**params), self.__blocks, Transactions(**blocks_params)] 270 271 272 273 def get_block(self) -> int: 274 """ 275 Custom Airbyte function. Get the latest block that 276 has been completed uploaded from the previous Airbyte run 277 278 Output: 279 - The last processed block 280 """ 281 282 if not os.path.exists(self.starting_block_path): 283 return 0 284 285 with open(self.starting_block_path, "r") as f: 286 starting_block = int(f.read()) 287 288 return starting_block 289 290 291 292 def set_block(self, value: int): 293 """ 294 Custom Airbyte function. Write the given value (highest processed block) 295 so the next Airbyte run can pick up from where the current run finished. 296 297 Parameters: 298 - `value` - the highest processed block within the current Airbyte run 299 """ 300 # So local Docker development / new instance runs do not break 301 os.makedirs(os.path.dirname(self.starting_block_path), exist_ok=True) 302 303 with open(self.starting_block_path, "w") as f: 304 f.write(str(value))