source.py
1 """Telegram Bot API Airbyte Source Connector.""" 2 3 from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple 4 import logging 5 import requests 6 from airbyte_cdk.sources import AbstractSource 7 from airbyte_cdk.sources.streams import Stream 8 from airbyte_cdk.sources.streams.http import HttpStream 9 10 logger = logging.getLogger("airbyte") 11 12 13 class TelegramStream(HttpStream): 14 """Base stream for Telegram Bot API.""" 15 16 url_base = "https://api.telegram.org/" 17 18 def __init__(self, bot_token: str, **kwargs): 19 super().__init__(**kwargs) 20 self.bot_token = bot_token 21 22 @property 23 def url_base(self) -> str: 24 return f"https://api.telegram.org/bot{self.bot_token}/" 25 26 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 27 return None 28 29 def request_params( 30 self, 31 stream_state: Optional[Mapping[str, Any]], 32 stream_slice: Optional[Mapping[str, Any]] = None, 33 next_page_token: Optional[Mapping[str, Any]] = None, 34 ) -> MutableMapping[str, Any]: 35 return {} 36 37 38 class Messages(TelegramStream): 39 """ 40 Stream for fetching Telegram messages via getUpdates. 41 42 Telegram only retains unacknowledged updates for ~24 hours. 43 Once fetched with an offset, updates are marked as acknowledged and removed. 44 45 This stream uses timeout=0 (instant response, not waiting for more updates). 46 """ 47 48 primary_key = "update_id" 49 50 def __init__(self, bot_token: str, chat_ids: List[str] = None, **kwargs): 51 super().__init__(bot_token=bot_token, **kwargs) 52 self.chat_ids = chat_ids or [] 53 54 def path( 55 self, 56 stream_state: Mapping[str, Any] = None, 57 stream_slice: Mapping[str, Any] = None, 58 next_page_token: Mapping[str, Any] = None, 59 ) -> str: 60 return "getUpdates" 61 62 def request_params( 63 self, 64 stream_state: Optional[Mapping[str, Any]], 65 stream_slice: Optional[Mapping[str, Any]] = None, 66 next_page_token: Optional[Mapping[str, Any]] = None, 67 ) -> MutableMapping[str, Any]: 68 params = { 69 "timeout": 0, # immediate response 70 "allowed_updates": '["message", "channel_post"]', # we don't care about edits here 71 } 72 73 # Use offset from state to get only new messages 74 if next_page_token and "offset" in next_page_token: 75 params["offset"] = next_page_token["offset"] 76 elif stream_state and "offset" in stream_state: 77 params["offset"] = stream_state["offset"] 78 79 return params 80 81 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: 82 """Telegram returns all available updates at once, no pagination needed.""" 83 return None 84 85 def parse_response( 86 self, 87 response: requests.Response, 88 stream_slice: Mapping[str, Any] = None, 89 stream_state: Mapping[str, Any] = None, 90 **kwargs, 91 ) -> Iterable[Mapping]: 92 data = response.json() 93 94 if not data.get("ok"): 95 logger.error("Telegram API error: %s", data) 96 return 97 98 results = data.get("result", []) 99 logger.info("Fetched %d updates from Telegram", len(results)) 100 101 for update in results: 102 message = update.get("message") or update.get("channel_post") 103 104 if not message: 105 continue 106 107 chat = message.get("chat", {}) 108 chat_id = str(chat.get("id", "")) 109 110 if self.chat_ids and chat_id not in self.chat_ids: 111 continue 112 113 enriched_update = { 114 "update_id": update["update_id"], 115 "update_type": self._get_update_type(update), 116 "message_id": message.get("message_id"), 117 "date": message.get("date"), 118 "text": message.get("text"), 119 "caption": message.get("caption"), 120 "chat_id": chat.get("id"), 121 "chat_title": chat.get("title"), 122 "chat_type": chat.get("type"), 123 "chat_username": chat.get("username"), 124 "is_forum": chat.get("is_forum", False), 125 "from_id": message.get("from", {}).get("id"), 126 "from_username": message.get("from", {}).get("username"), 127 "from_first_name": message.get("from", {}).get("first_name"), 128 "from_last_name": message.get("from", {}).get("last_name"), 129 "from_is_bot": message.get("from", {}).get("is_bot", False), 130 "message_thread_id": message.get("message_thread_id"), 131 "is_topic_message": message.get("is_topic_message", False), 132 "reply_to_message_id": message.get("reply_to_message", {}).get("message_id"), 133 "forward_from_chat_id": message.get("forward_from_chat", {}).get("id"), 134 "forward_date": message.get("forward_date"), 135 "has_photo": bool(message.get("photo")), 136 "has_video": bool(message.get("video")), 137 "has_document": bool(message.get("document")), 138 "has_audio": bool(message.get("audio")), 139 "has_voice": bool(message.get("voice")), 140 "has_sticker": bool(message.get("sticker")), 141 "has_poll": bool(message.get("poll")), 142 "has_location": bool(message.get("location")), 143 "raw_update": update, 144 } 145 146 yield enriched_update 147 148 def _get_update_type(self, update: Mapping[str, Any]) -> str: 149 if "message" in update: 150 return "message" 151 elif "channel_post" in update: 152 return "channel_post" 153 return "unknown" 154 155 @property 156 def state_checkpoint_interval(self) -> int: 157 return 100 158 159 160 class ChatInfo(TelegramStream): 161 """ 162 Stream for fetching information about configured chats. 163 Includes member count and chat details. 164 """ 165 166 primary_key = "id" 167 168 def __init__(self, bot_token: str, chat_ids: List[str], **kwargs): 169 super().__init__(bot_token=bot_token, **kwargs) 170 self.chat_ids = chat_ids 171 172 def path( 173 self, 174 stream_state: Mapping[str, Any] = None, 175 stream_slice: Mapping[str, Any] = None, 176 next_page_token: Mapping[str, Any] = None, 177 ) -> str: 178 return "getChat" 179 180 def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: 181 for chat_id in self.chat_ids: 182 yield {"chat_id": chat_id} 183 184 def request_params( 185 self, 186 stream_state: Optional[Mapping[str, Any]], 187 stream_slice: Optional[Mapping[str, Any]] = None, 188 next_page_token: Optional[Mapping[str, Any]] = None, 189 ) -> MutableMapping[str, Any]: 190 return {"chat_id": stream_slice["chat_id"]} 191 192 def parse_response( 193 self, 194 response: requests.Response, 195 stream_slice: Mapping[str, Any] = None, 196 **kwargs, 197 ) -> Iterable[Mapping]: 198 data = response.json() 199 200 if not data.get("ok"): 201 logger.warning("Failed to get chat info for %s: %s", stream_slice["chat_id"], data) 202 return 203 204 chat = data.get("result", {}) 205 206 # Get member count in a separate request 207 member_count = self._get_member_count(stream_slice["chat_id"]) 208 209 chat_info = { 210 "id": chat.get("id"), 211 "type": chat.get("type"), 212 "title": chat.get("title"), 213 "username": chat.get("username"), 214 "first_name": chat.get("first_name"), 215 "last_name": chat.get("last_name"), 216 "is_forum": chat.get("is_forum", False), 217 "description": chat.get("description"), 218 "invite_link": chat.get("invite_link"), 219 "linked_chat_id": chat.get("linked_chat_id"), 220 "member_count": member_count, 221 } 222 223 yield chat_info 224 225 def _get_member_count(self, chat_id: str) -> Optional[int]: 226 """Fetch member count for a chat.""" 227 try: 228 url = f"https://api.telegram.org/bot{self.bot_token}/getChatMemberCount" 229 response = requests.get(url, params={"chat_id": chat_id}) 230 data = response.json() 231 if data.get("ok"): 232 return data.get("result") 233 except Exception as e: 234 logger.warning("Failed to get member count for %s: %s", chat_id, e) 235 return None 236 237 238 class SourceTelegramFetcher(AbstractSource): 239 240 def check_connection(self, logger, config) -> Tuple[bool, any]: 241 try: 242 bot_token = config["bot_token"] 243 url = f"https://api.telegram.org/bot{bot_token}/getMe" 244 response = requests.get(url) 245 data = response.json() 246 247 if data.get("ok"): 248 bot_info = data.get("result", {}) 249 logger.info( 250 "Connected to Telegram as @%s (%s)", 251 bot_info.get("username"), 252 bot_info.get("first_name"), 253 ) 254 return True, None 255 else: 256 return False, f"Telegram API error: {data.get('description', 'Unknown error')}" 257 258 except requests.RequestException as e: 259 return False, f"Connection error: {str(e)}" 260 261 def streams(self, config: Mapping[str, Any]) -> List[Stream]: 262 bot_token = config["bot_token"] 263 chat_ids = config.get("chat_ids", []) 264 265 streams = [ 266 Messages(bot_token=bot_token, chat_ids=chat_ids), 267 ] 268 if chat_ids: 269 streams.append(ChatInfo(bot_token=bot_token, chat_ids=chat_ids)) 270 271 return streams