source.py
1 from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple 2 from datetime import datetime, timedelta 3 import logging 4 import requests 5 import re 6 7 from airbyte_cdk.sources import AbstractSource 8 from airbyte_cdk.sources.streams import Stream 9 from airbyte_cdk.sources.streams.http import HttpStream 10 from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator 11 12 logger = logging.getLogger("airbyte") 13 14 class BlueskyStream(HttpStream): 15 url_base = "https://bsky.social" 16 17 def __init__(self, search_terms: List[str] = None, limit: int = 25, **kwargs): 18 super().__init__(**kwargs) 19 self.search_terms = search_terms or [] 20 self.limit = limit 21 22 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 23 return None 24 25 def request_headers( 26 self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None 27 ) -> MutableMapping[str, Any]: 28 return {"Accept": "application/json"} 29 30 class PostsStream(BlueskyStream): 31 32 primary_key = "uri" 33 cursor_field = "indexed_at" 34 35 @property 36 def name(self) -> str: 37 return "posts" #otherwise it will lokk for the name of the classe (posts_stream) in the schema folder 38 39 def path(self, **kwargs) -> str: 40 return "/xrpc/app.bsky.feed.searchPosts" 41 42 def _find_matching_term(self, post_text: str) -> Optional[str]: 43 if not post_text or not self.search_terms: 44 return None 45 46 post_text_lower = post_text.lower() 47 48 for term in self.search_terms: 49 term_lower = term.lower() 50 51 if term.startswith('#'): 52 if term_lower in post_text_lower: 53 return term 54 elif term.startswith('from:'): 55 continue 56 else: 57 if re.search(r'\b' + re.escape(term_lower) + r'\b', post_text_lower): 58 return term 59 60 return self.search_terms[0] if self.search_terms else None 61 62 def request_params( 63 self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None 64 ) -> MutableMapping[str, Any]: 65 # Bluesky API has a maximum limit of 100 66 api_limit = min(self.limit, 100) 67 params = {"limit": api_limit} 68 if stream_slice and "search_term" in stream_slice: 69 params["q"] = stream_slice["search_term"] 70 return params 71 72 def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: 73 if not self.search_terms: 74 yield {} 75 return 76 77 for term in self.search_terms: 78 yield {"search_term": term} 79 80 def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]: 81 try: 82 data = response.json() 83 posts = data.get('posts', []) 84 85 current_term = stream_slice.get("search_term") if stream_slice else None 86 logger.info(f"Found {len(posts)} posts for term: {current_term}") 87 88 for post in posts: 89 post_text = post.get('record', {}).get('text', '') 90 91 # Determine which term matched this post 92 matched_term = current_term or self._find_matching_term(post_text) 93 94 # Extract data (flattened for easier SQL queries) 95 post_data = { 96 "uri": post.get('uri'), 97 "cid": post.get('cid'), 98 "author_did": post.get('author', {}).get('did'), 99 "author_handle": post.get('author', {}).get('handle'), 100 "author_display_name": post.get('author', {}).get('displayName'), 101 "author_avatar": post.get('author', {}).get('avatar'), 102 "text": post_text, 103 "created_at": post.get('record', {}).get('createdAt'), 104 "langs": post.get('record', {}).get('langs'), 105 "embed": post.get('embed'), 106 "reply_count": post.get('replyCount', 0), 107 "repost_count": post.get('repostCount', 0), 108 "like_count": post.get('likeCount', 0), 109 "indexed_at": post.get('indexedAt'), 110 "labels": post.get('labels'), 111 "search_term": matched_term 112 } 113 114 yield post_data 115 116 except Exception as e: 117 logger.error(f"Error parsing response: {e}") 118 raise 119 120 class SourceBlueskyFetcher(AbstractSource): 121 122 def _get_access_token(self, config: Mapping[str, Any]) -> str: 123 login_url = "https://bsky.social/xrpc/com.atproto.server.createSession" 124 125 response = requests.post(login_url, json={ 126 "identifier": config.get("identifier"), 127 "password": config.get("password") 128 }) 129 130 response.raise_for_status() 131 session_data = response.json() 132 133 return session_data.get("accessJwt") 134 135 def check_connection(self, logger, config) -> Tuple[bool, Any]: 136 try: 137 identifier = config.get("identifier") 138 password = config.get("password") 139 140 if not identifier or not password: 141 return False, "Identifier and password are required" 142 143 access_token = self._get_access_token(config) 144 145 if not access_token: 146 return False, "Failed to get access token" 147 148 return True, None 149 150 except Exception as e: 151 return False, f"Connection failed: {str(e)}" 152 153 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 154 search_terms = config.get("search_terms", []) 155 156 if not search_terms and config.get("search_query"): 157 search_terms = [config.get("search_query")] 158 159 try: 160 access_token = self._get_access_token(config) 161 auth = TokenAuthenticator(token=access_token) 162 except Exception as e: 163 raise Exception(f"Authentication failed: {str(e)}. Check your credentials.") 164 165 return [ 166 PostsStream( 167 search_terms=search_terms, 168 limit=config.get("limit", 25), 169 authenticator=auth 170 ) 171 ]