source.py
  1  from typing import Any, Iterable, List, Mapping, Optional, Tuple
  2  from airbyte_cdk.sources import AbstractSource
  3  from airbyte_cdk.sources.streams import Stream
  4  from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream
  5  from airbyte_cdk.models.airbyte_protocol import SyncMode
  6  import time
  7  import requests
  8  import logging
  9  import os
 10  
 11  logger = logging.getLogger("airbyte")
 12  
 13  class ApiStream(HttpStream):
 14  
 15      primary_key: Optional[str] = None
 16  
 17      def __init__(self, url: str):
 18          super().__init__()
 19          self.__url_base = url + ("api/v2" if url.endswith("/") else "/api/v2")
 20  
 21      @property
 22      def url_base(self) -> str:
 23          return self.__url_base
 24  
 25      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
 26          return None
 27  
 28  
 29  
 30  class Stats(ApiStream):
 31  
 32      def __init__(self, url: str, **kwargs):
 33          super().__init__(url)
 34  
 35      def path(self, **kwargs):
 36          return f"{self.url_base}/stats"
 37          
 38      def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
 39          data: dict[str, Any] = response.json()
 40          yield data
 41  
 42  
 43  
 44  class Blocks(ApiStream):
 45      
 46      def __init__(self, url: str, starting_block: int, blocks_to_do: int = 0, limit: int = 9):
 47          """
 48          Parameters:
 49              - `url` - the base URL of the API
 50              - `starting_block` - the block that will terminate the REST API pagination.
 51                                   NOTE: You start from the current block on the blockchain 
 52                                   and go traverse backwards until `starting_block` is reached
 53              - `blocks_to_do` - number of block pages that will be fetched. If 0, then all missing
 54                                 blocks will be fetched. Put a POSITIVE value to
 55              - `limit` - the number of REST API requests that can be made before a small pause is put
 56          """
 57          super().__init__(url)
 58          self.__starting_block = starting_block
 59          self.__highest_block = starting_block       
 60          self.__class_name = self.__class__.__name__
 61          self.completed = 0
 62          self.blocks_to_do = blocks_to_do
 63          
 64          if blocks_to_do > 0:
 65              logger.info(f"Blocks to do: {blocks_to_do}")
 66          
 67          self.limit = limit
 68          self.count = 0
 69  
 70  
 71  
 72      @property  
 73      def use_cache(self) -> bool:  
 74          return True 
 75  
 76  
 77  
 78      @property
 79      def starting_block(self) -> int:
 80          """
 81          Custom Airibyte variable
 82          """
 83          return self.__starting_block
 84  
 85  
 86  
 87      @property
 88      def highest_block(self) -> int:
 89          """
 90          Custom Airibyte variable
 91          """
 92          return self.__highest_block
 93  
 94  
 95  
 96      def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]:
 97  
 98          data: dict = response.json()
 99          next_page_params = data.get("next_page_params")        
100          items = data.get("items")
101  
102          debug_mode = self.completed > self.blocks_to_do and self.blocks_to_do > 0
103          if not items or debug_mode:
104              message = f"no 'items' in 'next_page_params' to process" if not debug_mode else f"DEBUG MODE. Completed {self.blocks_to_do} blocks"
105              logger.info(f"{self.__class_name}: {message}")
106              return None
107          
108          self.completed += 1
109  
110          biggest_block = items[0]["height"]
111          smallest_block = items[-1]["height"]
112  
113          if smallest_block == 0 or biggest_block <= self.__starting_block:
114              logger.info(f"starting_block: {self.starting_block}")
115              logger.info(f"highest_block: {self.highest_block}")
116              self.__starting_block = self.__highest_block
117              return None
118          
119          return next_page_params
120  
121  
122  
123      def path(self, **kwargs):
124          url = f"{self.url_base}/blocks"
125          return url
126  
127  
128  
129      def request_params(self, stream_state: Optional[dict[str, Any]], stream_slice: Optional[dict[str, Any]] = None, next_page_token: Optional[dict[str, Any]] = None):
130          return next_page_token
131  
132  
133  
134      def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
135          data: dict[str, Any] = response.json()
136          items: Optional[list[dict]] = data.get("items")
137          largest_block: int = items[0]["height"]
138          
139          for item in items:
140  
141              if item["height"] <= self.__starting_block:
142                  logger.info(f"Stopping execution - item[\"height\"] is  {item['height']} and the starting_block is {self.__starting_block}")
143                  break
144  
145              yield item
146  
147          smallest_block = items[-1]['height']
148          logger.info(f"Processed {largest_block - smallest_block} blocks ({smallest_block} to {largest_block})")
149          
150          if self.__highest_block < largest_block:
151              logger.info(f"Updated highest_block from {self.highest_block} to {largest_block} ({largest_block - self.__highest_block} blocks difference)")
152              self.__highest_block = largest_block 
153  
154          self.count += 1
155  
156          if self.count == self.limit:
157              logger.info(f"Made {self.count} requests. Sleeping for 1s")
158              self.count = 0
159              time.sleep(1)
160  
161  
162  
163  class Transactions(HttpSubStream, ApiStream):
164      
165      def __init__(self, **kwargs):
166          super().__init__(parent=Blocks, url=kwargs["url"])
167          self.parent = Blocks(**kwargs)
168          self.__class_name = self.__class__.__name__
169          self.limit = kwargs["limit"]
170          self.count = 0
171  
172  
173      def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None):
174          block: dict = stream_slice.get("parent")
175          block_id = block["hash"]        
176          url = f"{self.url_base}/blocks/{block_id}/transactions"
177          return url
178  
179  
180  
181      def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
182          
183          data: dict = response.json()
184          items: Optional[list[dict]] = data.get("items", [])
185  
186          url = response.url
187          start_prefix = "/blocks/"
188          start = url.index(start_prefix) + len(start_prefix)
189  
190          end_prefix = "/transactions"
191          end = url.index(end_prefix)
192  
193          block_id = url[start:end]
194          logger.info(f"{self.__class_name}: Block {block_id} has {len(items)} transactions")
195  
196          for item in items:
197              yield {**item, "block_hash": block_id}
198  
199          self.count += 1
200  
201          if self.count == self.limit:
202              logger.info(f"Made {self.count} requests. Sleeping for 1s")
203              self.count = 0
204              time.sleep(1)
205  
206  
207  class SourceBlockchainExplorer(AbstractSource):
208  
209      # Store the current block for the next Airbyte run
210      starting_block_path: str = "/data/starting_block.txt"
211  
212      def __init__(self):
213          super().__init__()
214          self.__blocks: Optional[Blocks] = None
215          
216  
217  
218      @property
219      def blocks(self) -> Blocks:
220          """
221          Custom Airibyte variable
222          """
223          if not self.__blocks:
224              raise Exception("Variable cannot be used before airbyte_cdk.entrypoint.launch runs")
225          
226          return self.__blocks
227  
228  
229  
230      def check_connection(self, logger: logging.Logger, config: dict) -> Tuple[bool, Any]:
231          
232          response = requests.get(config["url_base"] + "/stats")
233  
234          success = True
235          message = f"Successfully connected to {config['url_base']}"
236          try:
237              response.raise_for_status()
238          except Exception as e:
239              success = False
240              message = str(e)
241  
242          return success, message
243  
244  
245  
246      def streams(self, config: Mapping[str, Any]) -> List[Stream]:
247          
248          params = {
249              "url": config["url_base"],
250              "limit": 9 # Based on https://docs.blockscout.com/setup/env-variables/backend-env-variables#api-rate-limits (API_RATE_LIMIT)
251          }
252          
253          starting_block = 0
254          if not os.path.exists(self.starting_block_path):            
255              self.set_block(starting_block)
256              logger.info(f"Created text file {self.starting_block_path}")
257          else:
258              starting_block = self.get_block()            
259              logger.info(f"Loaded text file from volume - {self.starting_block_path}")
260  
261          logger.info(f"starting_block: {starting_block}")
262  
263          blocks_params = {
264              **params,
265              "starting_block": starting_block, 
266              "blocks_to_do": config["blocks_to_do"]
267          }
268          self.__blocks = Blocks(**blocks_params)
269          return [Stats(**params), self.__blocks, Transactions(**blocks_params)]
270      
271  
272  
273      def get_block(self) -> int:
274          """
275          Custom Airbyte function. Get the latest block that 
276          has been completed uploaded from the previous Airbyte run
277  
278          Output:
279              - The last processed block
280          """
281  
282          if not os.path.exists(self.starting_block_path):
283              return 0
284  
285          with open(self.starting_block_path, "r") as f:
286              starting_block = int(f.read())
287  
288          return starting_block
289      
290  
291  
292      def set_block(self, value: int):
293          """
294          Custom Airbyte function. Write the given value (highest processed block)
295          so the next Airbyte run can pick up from where the current run finished.
296  
297          Parameters:
298              - `value` - the highest processed block within the current Airbyte run
299          """
300          # So local Docker development / new instance runs do not break
301          os.makedirs(os.path.dirname(self.starting_block_path), exist_ok=True)
302  
303          with open(self.starting_block_path, "w") as f:
304              f.write(str(value))