source.py
  1  from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
  2  from datetime import datetime, timedelta
  3  import logging
  4  import requests
  5  import re
  6  
  7  from airbyte_cdk.sources import AbstractSource
  8  from airbyte_cdk.sources.streams import Stream
  9  from airbyte_cdk.sources.streams.http import HttpStream
 10  from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
 11  
 12  logger = logging.getLogger("airbyte")
 13  
 14  class BlueskyStream(HttpStream):
 15      url_base = "https://bsky.social"
 16      
 17      def __init__(self, search_terms: List[str] = None, limit: int = 25, **kwargs):
 18          super().__init__(**kwargs)
 19          self.search_terms = search_terms or []
 20          self.limit = limit
 21      
 22      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
 23          return None
 24      
 25      def request_headers(
 26          self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
 27      ) -> MutableMapping[str, Any]:
 28          return {"Accept": "application/json"}
 29  
 30  class PostsStream(BlueskyStream):
 31      
 32      primary_key = "uri"
 33      cursor_field = "indexed_at"
 34      
 35      @property
 36      def name(self) -> str:
 37          return "posts" #otherwise it will lokk for the name of the classe (posts_stream) in the schema folder
 38      
 39      def path(self, **kwargs) -> str:
 40          return "/xrpc/app.bsky.feed.searchPosts"
 41      
 42      def _find_matching_term(self, post_text: str) -> Optional[str]:
 43          if not post_text or not self.search_terms:
 44              return None
 45          
 46          post_text_lower = post_text.lower()
 47          
 48          for term in self.search_terms:
 49              term_lower = term.lower()
 50              
 51              if term.startswith('#'):
 52                  if term_lower in post_text_lower:
 53                      return term
 54              elif term.startswith('from:'):
 55                  continue 
 56              else:
 57                  if re.search(r'\b' + re.escape(term_lower) + r'\b', post_text_lower):
 58                      return term
 59          
 60          return self.search_terms[0] if self.search_terms else None
 61      
 62      def request_params(
 63          self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
 64      ) -> MutableMapping[str, Any]:
 65          # Bluesky API has a maximum limit of 100
 66          api_limit = min(self.limit, 100)
 67          params = {"limit": api_limit}
 68          if stream_slice and "search_term" in stream_slice:
 69              params["q"] = stream_slice["search_term"]
 70          return params
 71      
 72      def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
 73          if not self.search_terms:
 74              yield {}
 75              return
 76              
 77          for term in self.search_terms:
 78              yield {"search_term": term}
 79      
 80      def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]:
 81          try:
 82              data = response.json()
 83              posts = data.get('posts', [])
 84              
 85              current_term = stream_slice.get("search_term") if stream_slice else None
 86              logger.info(f"Found {len(posts)} posts for term: {current_term}")
 87              
 88              for post in posts:
 89                  post_text = post.get('record', {}).get('text', '')
 90                  
 91                  # Determine which term matched this post
 92                  matched_term = current_term or self._find_matching_term(post_text)
 93                  
 94                  # Extract data (flattened for easier SQL queries)
 95                  post_data = {
 96                      "uri": post.get('uri'),
 97                      "cid": post.get('cid'),
 98                      "author_did": post.get('author', {}).get('did'),
 99                      "author_handle": post.get('author', {}).get('handle'),
100                      "author_display_name": post.get('author', {}).get('displayName'),
101                      "author_avatar": post.get('author', {}).get('avatar'),
102                      "text": post_text,
103                      "created_at": post.get('record', {}).get('createdAt'),
104                      "langs": post.get('record', {}).get('langs'),
105                      "embed": post.get('embed'),
106                      "reply_count": post.get('replyCount', 0),
107                      "repost_count": post.get('repostCount', 0),
108                      "like_count": post.get('likeCount', 0),
109                      "indexed_at": post.get('indexedAt'),
110                      "labels": post.get('labels'),
111                      "search_term": matched_term
112                  }
113                  
114                  yield post_data
115                  
116          except Exception as e:
117              logger.error(f"Error parsing response: {e}")
118              raise
119  
120  class SourceBlueskyFetcher(AbstractSource):
121      
122      def _get_access_token(self, config: Mapping[str, Any]) -> str:
123          login_url = "https://bsky.social/xrpc/com.atproto.server.createSession"
124          
125          response = requests.post(login_url, json={
126              "identifier": config.get("identifier"),
127              "password": config.get("password")
128          })
129          
130          response.raise_for_status()
131          session_data = response.json()
132          
133          return session_data.get("accessJwt")
134      
135      def check_connection(self, logger, config) -> Tuple[bool, Any]:
136          try:
137              identifier = config.get("identifier")
138              password = config.get("password")
139              
140              if not identifier or not password:
141                  return False, "Identifier and password are required"
142              
143              access_token = self._get_access_token(config)
144              
145              if not access_token:
146                  return False, "Failed to get access token"
147              
148              return True, None
149              
150          except Exception as e:
151              return False, f"Connection failed: {str(e)}"
152      
153      def streams(self, config: Mapping[str, Any]) -> List[Stream]:
154          search_terms = config.get("search_terms", [])
155          
156          if not search_terms and config.get("search_query"):
157              search_terms = [config.get("search_query")]
158          
159          try:
160              access_token = self._get_access_token(config)
161              auth = TokenAuthenticator(token=access_token)
162          except Exception as e:
163              raise Exception(f"Authentication failed: {str(e)}. Check your credentials.")
164          
165          return [
166              PostsStream(
167                  search_terms=search_terms,
168                  limit=config.get("limit", 25),
169                  authenticator=auth
170              )
171          ]