source.py
  1  """Telegram Bot API Airbyte Source Connector."""
  2  
  3  from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
  4  import logging
  5  import requests
  6  from airbyte_cdk.sources import AbstractSource
  7  from airbyte_cdk.sources.streams import Stream
  8  from airbyte_cdk.sources.streams.http import HttpStream
  9  
 10  logger = logging.getLogger("airbyte")
 11  
 12  
 13  class TelegramStream(HttpStream):
 14      """Base stream for Telegram Bot API."""
 15  
 16      url_base = "https://api.telegram.org/"
 17  
 18      def __init__(self, bot_token: str, **kwargs):
 19          super().__init__(**kwargs)
 20          self.bot_token = bot_token
 21  
 22      @property
 23      def url_base(self) -> str:
 24          return f"https://api.telegram.org/bot{self.bot_token}/"
 25  
 26      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
 27          return None
 28  
 29      def request_params(
 30          self,
 31          stream_state: Optional[Mapping[str, Any]],
 32          stream_slice: Optional[Mapping[str, Any]] = None,
 33          next_page_token: Optional[Mapping[str, Any]] = None,
 34      ) -> MutableMapping[str, Any]:
 35          return {}
 36  
 37  
 38  class Messages(TelegramStream):
 39      """
 40      Stream for fetching Telegram messages via getUpdates.
 41  
 42      Telegram only retains unacknowledged updates for ~24 hours.
 43      Once fetched with an offset, updates are marked as acknowledged and removed.
 44  
 45      This stream uses timeout=0 (instant response, not waiting for more updates).
 46      """
 47  
 48      primary_key = "update_id"
 49  
 50      def __init__(self, bot_token: str, chat_ids: List[str] = None, **kwargs):
 51          super().__init__(bot_token=bot_token, **kwargs)
 52          self.chat_ids = chat_ids or []
 53  
 54      def path(
 55          self,
 56          stream_state: Mapping[str, Any] = None,
 57          stream_slice: Mapping[str, Any] = None,
 58          next_page_token: Mapping[str, Any] = None,
 59      ) -> str:
 60          return "getUpdates"
 61  
 62      def request_params(
 63          self,
 64          stream_state: Optional[Mapping[str, Any]],
 65          stream_slice: Optional[Mapping[str, Any]] = None,
 66          next_page_token: Optional[Mapping[str, Any]] = None,
 67      ) -> MutableMapping[str, Any]:
 68          params = {
 69              "timeout": 0,  # immediate response
 70              "allowed_updates": '["message", "channel_post"]', # we don't care about edits here
 71          }
 72  
 73          # Use offset from state to get only new messages
 74          if next_page_token and "offset" in next_page_token:
 75              params["offset"] = next_page_token["offset"]
 76          elif stream_state and "offset" in stream_state:
 77              params["offset"] = stream_state["offset"]
 78  
 79          return params
 80  
 81      def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
 82          """Telegram returns all available updates at once, no pagination needed."""
 83          return None
 84  
 85      def parse_response(
 86          self,
 87          response: requests.Response,
 88          stream_slice: Mapping[str, Any] = None,
 89          stream_state: Mapping[str, Any] = None,
 90          **kwargs,
 91      ) -> Iterable[Mapping]:
 92          data = response.json()
 93  
 94          if not data.get("ok"):
 95              logger.error("Telegram API error: %s", data)
 96              return
 97  
 98          results = data.get("result", [])
 99          logger.info("Fetched %d updates from Telegram", len(results))
100  
101          for update in results:
102              message = update.get("message") or update.get("channel_post")
103  
104              if not message:
105                  continue
106  
107              chat = message.get("chat", {})
108              chat_id = str(chat.get("id", ""))
109  
110              if self.chat_ids and chat_id not in self.chat_ids:
111                  continue
112  
113              enriched_update = {
114                  "update_id": update["update_id"],
115                  "update_type": self._get_update_type(update),
116                  "message_id": message.get("message_id"),
117                  "date": message.get("date"),
118                  "text": message.get("text"),
119                  "caption": message.get("caption"),
120                  "chat_id": chat.get("id"),
121                  "chat_title": chat.get("title"),
122                  "chat_type": chat.get("type"),
123                  "chat_username": chat.get("username"),
124                  "is_forum": chat.get("is_forum", False),
125                  "from_id": message.get("from", {}).get("id"),
126                  "from_username": message.get("from", {}).get("username"),
127                  "from_first_name": message.get("from", {}).get("first_name"),
128                  "from_last_name": message.get("from", {}).get("last_name"),
129                  "from_is_bot": message.get("from", {}).get("is_bot", False),
130                  "message_thread_id": message.get("message_thread_id"),
131                  "is_topic_message": message.get("is_topic_message", False),
132                  "reply_to_message_id": message.get("reply_to_message", {}).get("message_id"),
133                  "forward_from_chat_id": message.get("forward_from_chat", {}).get("id"),
134                  "forward_date": message.get("forward_date"),
135                  "has_photo": bool(message.get("photo")),
136                  "has_video": bool(message.get("video")),
137                  "has_document": bool(message.get("document")),
138                  "has_audio": bool(message.get("audio")),
139                  "has_voice": bool(message.get("voice")),
140                  "has_sticker": bool(message.get("sticker")),
141                  "has_poll": bool(message.get("poll")),
142                  "has_location": bool(message.get("location")),
143                  "raw_update": update,
144              }
145  
146              yield enriched_update
147  
148      def _get_update_type(self, update: Mapping[str, Any]) -> str:
149          if "message" in update:
150              return "message"
151          elif "channel_post" in update:
152              return "channel_post"
153          return "unknown"
154  
155      @property
156      def state_checkpoint_interval(self) -> int:
157          return 100
158  
159  
160  class ChatInfo(TelegramStream):
161      """
162      Stream for fetching information about configured chats.
163      Includes member count and chat details.
164      """
165  
166      primary_key = "id"
167  
168      def __init__(self, bot_token: str, chat_ids: List[str], **kwargs):
169          super().__init__(bot_token=bot_token, **kwargs)
170          self.chat_ids = chat_ids
171  
172      def path(
173          self,
174          stream_state: Mapping[str, Any] = None,
175          stream_slice: Mapping[str, Any] = None,
176          next_page_token: Mapping[str, Any] = None,
177      ) -> str:
178          return "getChat"
179  
180      def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
181          for chat_id in self.chat_ids:
182              yield {"chat_id": chat_id}
183  
184      def request_params(
185          self,
186          stream_state: Optional[Mapping[str, Any]],
187          stream_slice: Optional[Mapping[str, Any]] = None,
188          next_page_token: Optional[Mapping[str, Any]] = None,
189      ) -> MutableMapping[str, Any]:
190          return {"chat_id": stream_slice["chat_id"]}
191  
192      def parse_response(
193          self,
194          response: requests.Response,
195          stream_slice: Mapping[str, Any] = None,
196          **kwargs,
197      ) -> Iterable[Mapping]:
198          data = response.json()
199  
200          if not data.get("ok"):
201              logger.warning("Failed to get chat info for %s: %s", stream_slice["chat_id"], data)
202              return
203  
204          chat = data.get("result", {})
205  
206          # Get member count in a separate request
207          member_count = self._get_member_count(stream_slice["chat_id"])
208  
209          chat_info = {
210              "id": chat.get("id"),
211              "type": chat.get("type"),
212              "title": chat.get("title"),
213              "username": chat.get("username"),
214              "first_name": chat.get("first_name"),
215              "last_name": chat.get("last_name"),
216              "is_forum": chat.get("is_forum", False),
217              "description": chat.get("description"),
218              "invite_link": chat.get("invite_link"),
219              "linked_chat_id": chat.get("linked_chat_id"),
220              "member_count": member_count,
221          }
222  
223          yield chat_info
224  
225      def _get_member_count(self, chat_id: str) -> Optional[int]:
226          """Fetch member count for a chat."""
227          try:
228              url = f"https://api.telegram.org/bot{self.bot_token}/getChatMemberCount"
229              response = requests.get(url, params={"chat_id": chat_id})
230              data = response.json()
231              if data.get("ok"):
232                  return data.get("result")
233          except Exception as e:
234              logger.warning("Failed to get member count for %s: %s", chat_id, e)
235          return None
236  
237  
238  class SourceTelegramFetcher(AbstractSource):
239  
240      def check_connection(self, logger, config) -> Tuple[bool, any]:
241          try:
242              bot_token = config["bot_token"]
243              url = f"https://api.telegram.org/bot{bot_token}/getMe"
244              response = requests.get(url)
245              data = response.json()
246  
247              if data.get("ok"):
248                  bot_info = data.get("result", {})
249                  logger.info(
250                      "Connected to Telegram as @%s (%s)",
251                      bot_info.get("username"),
252                      bot_info.get("first_name"),
253                  )
254                  return True, None
255              else:
256                  return False, f"Telegram API error: {data.get('description', 'Unknown error')}"
257  
258          except requests.RequestException as e:
259              return False, f"Connection error: {str(e)}"
260  
261      def streams(self, config: Mapping[str, Any]) -> List[Stream]:
262          bot_token = config["bot_token"]
263          chat_ids = config.get("chat_ids", [])
264  
265          streams = [
266              Messages(bot_token=bot_token, chat_ids=chat_ids),
267          ]
268          if chat_ids:
269              streams.append(ChatInfo(bot_token=bot_token, chat_ids=chat_ids))
270  
271          return streams