/ crons / telegram.py
telegram.py
  1  #!/usr/bin/env python3
  2  """Telegram Poller — cron-friendly script.
  3  
  4  Polls Telegram for pending updates on all projects with a telegram_token configured,
  5  processes each message through the project's chat pipeline, sends responses, then exits.
  6  
  7  Usage:
  8      uv run python crons/telegram.py
  9  """
 10  
 11  import asyncio
 12  import json
 13  import logging
 14  
 15  logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
 16  logger = logging.getLogger("restai.telegram")
 17  
 18  from restai import config
 19  from restai.settings import ensure_settings_table
 20  from restai.database import get_db_wrapper, engine as db_engine
 21  from restai.models.databasemodels import ProjectDatabase
 22  from restai.brain import Brain
 23  from restai.telegram import get_updates, send_message
 24  
 25  
 26  def main():
 27      ensure_settings_table(db_engine)
 28  
 29      brain = Brain(lightweight=True)
 30      db = get_db_wrapper()
 31  
 32      from restai.cron_log import CronLogger
 33      cron = CronLogger("telegram")
 34      processed = 0
 35  
 36      try:
 37          from restai.utils.crypto import decrypt_field
 38  
 39          projects = db.db.query(ProjectDatabase).all()
 40          logger.info(f"Scanning {len(projects)} project(s) for Telegram tokens")
 41  
 42          enabled_count = 0
 43  
 44          for proj in projects:
 45              opts = json.loads(proj.options) if proj.options else {}
 46              token = decrypt_field(opts.get("telegram_token") or "")
 47              if not token:
 48                  continue
 49  
 50              # Parse the per-project allowlist. Empty/missing = open to all
 51              # (legacy behavior). Tolerant of whitespace, trailing commas,
 52              # and stray non-numeric junk so an admin's typo doesn't lock
 53              # everyone out silently.
 54              allowed_raw = (opts.get("telegram_allowed_chat_ids") or "").strip()
 55              allowed_ids: set[int] = set()
 56              if allowed_raw:
 57                  for piece in allowed_raw.replace(";", ",").split(","):
 58                      piece = piece.strip()
 59                      if not piece:
 60                          continue
 61                      try:
 62                          allowed_ids.add(int(piece))
 63                      except ValueError:
 64                          logger.warning(f"  ⚠ ignoring non-numeric chat id in allowlist: {piece!r}")
 65  
 66              enabled_count += 1
 67              logger.info(
 68                  f"Polling project '{proj.name}' (id={proj.id})"
 69                  + (f" — allowlist: {sorted(allowed_ids)}" if allowed_ids else " — allowlist: open")
 70              )
 71  
 72              updates, err = get_updates(token, offset=0, timeout=1)
 73              if err is not None:
 74                  logger.warning(f"Telegram API error for project {proj.name} (id={proj.id}): {err}")
 75                  continue
 76              if not updates:
 77                  logger.info(f"No new Telegram updates for project '{proj.name}'")
 78                  continue
 79  
 80              logger.info(f"Got {len(updates)} update(s) for project '{proj.name}'")
 81  
 82              for update in updates:
 83                  message = update.get("message")
 84                  if not message:
 85                      logger.info(f"Skipping non-message update: {list(update.keys())}")
 86                      continue
 87  
 88                  text = message.get("text")
 89                  chat_id = message.get("chat", {}).get("id")
 90                  if not text or not chat_id:
 91                      logger.info(f"Skipping message with no text/chat_id (text={bool(text)}, chat_id={chat_id})")
 92                      continue
 93  
 94                  from_user = message.get("from", {}).get("username") or message.get("from", {}).get("id")
 95                  logger.info(f"  ← message from {from_user} (chat={chat_id}): {text[:200]!r}")
 96  
 97                  # Built-in shortcut: replies with the chat_id so admins (or
 98                  # would-be users) can grab their id without going through
 99                  # the agent. Always allowed — it's the primary way an
100                  # unauthorized user finds out what id they need to ask the
101                  # admin to add to the allowlist.
102                  if text.strip().lower() in ("/chatid", "/myid"):
103                      logger.info(f"  → replying with chat id {chat_id}")
104                      try:
105                          send_message(token, chat_id, f"Chat ID: {chat_id}")
106                      except Exception as e:
107                          logger.warning(f"Failed to reply to /chatid: {e}")
108                      processed += 1
109                      continue
110  
111                  # Allowlist gate. Skip silently when the user isn't on the
112                  # list — well, almost silently: we send a one-line "not
113                  # authorized" reply so the user sees the bot is alive and
114                  # knows to ask the admin.
115                  if allowed_ids and chat_id not in allowed_ids:
116                      logger.info(f"  ✗ chat {chat_id} not in allowlist for project '{proj.name}', rejecting")
117                      try:
118                          send_message(
119                              token, chat_id,
120                              "You are not authorized to use this bot. "
121                              f"If you should be, ask the admin to add chat id {chat_id} to the project's allowlist.",
122                          )
123                      except Exception as e:
124                          logger.warning(f"Failed to send unauthorized reply: {e}")
125                      processed += 1
126                      continue
127  
128                  try:
129                      send_typing(token, chat_id)
130                      logger.info(f"  → invoking project '{proj.name}' agent for chat {chat_id}")
131                      response = asyncio.run(_process_message(brain, db, proj.id, text, chat_id))
132                      if response:
133                          logger.info(f"  → sending response ({len(response)} chars): {response[:200]!r}")
134                          send_message(token, chat_id, response)
135                      else:
136                          logger.warning(f"  ✗ project '{proj.name}' returned no response for chat {chat_id}")
137                      processed += 1
138                  except Exception as e:
139                      logger.exception(f"Error processing Telegram message for project {proj.name} (id={proj.id}): {e}")
140  
141              # Acknowledge processed updates so Telegram doesn't redeliver.
142              if updates:
143                  last_offset = updates[-1]["update_id"] + 1
144                  logger.info(f"Acking {len(updates)} update(s) up to offset {last_offset}")
145                  _, ack_err = get_updates(token, offset=last_offset, timeout=1)
146                  if ack_err is not None:
147                      logger.warning(f"Failed to ack updates for project {proj.name}: {ack_err}")
148  
149          logger.info(f"Tick complete: {enabled_count} project(s) with Telegram, {processed} message(s) processed")
150          cron.finish(items_processed=processed)
151      except Exception as e:
152          cron.error(f"Telegram poller crashed: {e}", details=__import__("traceback").format_exc())
153          cron.finish()
154      finally:
155          db.db.close()
156  
157  
158  async def _process_message(brain, db, project_id, text, chat_id):
159      from restai.models.models import ChatModel, User
160      from restai.helper import chat_main
161      from fastapi import BackgroundTasks
162  
163      project = brain.find_project(project_id, db)
164      if not project:
165          logger.warning(f"_process_message: project {project_id} not found")
166          return None
167  
168      # `chat_id=f"telegram_{chat_id}"` keeps each Telegram chat as its own
169      # conversation in the agent's memory store across cron ticks.
170      chat_input = ChatModel(question=text, id=f"telegram_{chat_id}")
171  
172      user_db = db.get_user_by_username("admin")
173      if not user_db:
174          logger.warning("_process_message: no 'admin' user — cannot run agent")
175          return None
176      user = User.model_validate(user_db)
177  
178      background_tasks = BackgroundTasks()
179      # chat_main signature: (request, brain, project, chat_input, user, db, background_tasks).
180      # The Request slot is `_` (unused) inside chat_main, so None is fine.
181      result = await chat_main(None, brain, project, chat_input, user, db, background_tasks)
182      await background_tasks()
183  
184      if isinstance(result, dict):
185          return result.get("answer", "")
186      if result is None:
187          logger.warning(f"_process_message: chat_main returned None for project {project_id}")
188          return None
189      logger.warning(f"_process_message: chat_main returned unexpected type {type(result).__name__}")
190      return None
191  
192  
193  def send_typing(token, chat_id):
194      """Send typing indicator to Telegram chat."""
195      import requests
196      try:
197          requests.post(
198              f"https://api.telegram.org/bot{token}/sendChatAction",
199              json={"chat_id": chat_id, "action": "typing"},
200              timeout=5,
201          )
202      except Exception:
203          pass
204  
205  
206  if __name__ == "__main__":
207      main()