telegram.py
1 #!/usr/bin/env python3 2 """Telegram Poller — cron-friendly script. 3 4 Polls Telegram for pending updates on all projects with a telegram_token configured, 5 processes each message through the project's chat pipeline, sends responses, then exits. 6 7 Usage: 8 uv run python crons/telegram.py 9 """ 10 11 import asyncio 12 import json 13 import logging 14 15 logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") 16 logger = logging.getLogger("restai.telegram") 17 18 from restai import config 19 from restai.settings import ensure_settings_table 20 from restai.database import get_db_wrapper, engine as db_engine 21 from restai.models.databasemodels import ProjectDatabase 22 from restai.brain import Brain 23 from restai.telegram import get_updates, send_message 24 25 26 def main(): 27 ensure_settings_table(db_engine) 28 29 brain = Brain(lightweight=True) 30 db = get_db_wrapper() 31 32 from restai.cron_log import CronLogger 33 cron = CronLogger("telegram") 34 processed = 0 35 36 try: 37 from restai.utils.crypto import decrypt_field 38 39 projects = db.db.query(ProjectDatabase).all() 40 logger.info(f"Scanning {len(projects)} project(s) for Telegram tokens") 41 42 enabled_count = 0 43 44 for proj in projects: 45 opts = json.loads(proj.options) if proj.options else {} 46 token = decrypt_field(opts.get("telegram_token") or "") 47 if not token: 48 continue 49 50 # Parse the per-project allowlist. Empty/missing = open to all 51 # (legacy behavior). Tolerant of whitespace, trailing commas, 52 # and stray non-numeric junk so an admin's typo doesn't lock 53 # everyone out silently. 54 allowed_raw = (opts.get("telegram_allowed_chat_ids") or "").strip() 55 allowed_ids: set[int] = set() 56 if allowed_raw: 57 for piece in allowed_raw.replace(";", ",").split(","): 58 piece = piece.strip() 59 if not piece: 60 continue 61 try: 62 allowed_ids.add(int(piece)) 63 except ValueError: 64 logger.warning(f" ⚠ ignoring non-numeric chat id in allowlist: {piece!r}") 65 66 enabled_count += 1 67 logger.info( 68 f"Polling project '{proj.name}' (id={proj.id})" 69 + (f" — allowlist: {sorted(allowed_ids)}" if allowed_ids else " — allowlist: open") 70 ) 71 72 updates, err = get_updates(token, offset=0, timeout=1) 73 if err is not None: 74 logger.warning(f"Telegram API error for project {proj.name} (id={proj.id}): {err}") 75 continue 76 if not updates: 77 logger.info(f"No new Telegram updates for project '{proj.name}'") 78 continue 79 80 logger.info(f"Got {len(updates)} update(s) for project '{proj.name}'") 81 82 for update in updates: 83 message = update.get("message") 84 if not message: 85 logger.info(f"Skipping non-message update: {list(update.keys())}") 86 continue 87 88 text = message.get("text") 89 chat_id = message.get("chat", {}).get("id") 90 if not text or not chat_id: 91 logger.info(f"Skipping message with no text/chat_id (text={bool(text)}, chat_id={chat_id})") 92 continue 93 94 from_user = message.get("from", {}).get("username") or message.get("from", {}).get("id") 95 logger.info(f" ← message from {from_user} (chat={chat_id}): {text[:200]!r}") 96 97 # Built-in shortcut: replies with the chat_id so admins (or 98 # would-be users) can grab their id without going through 99 # the agent. Always allowed — it's the primary way an 100 # unauthorized user finds out what id they need to ask the 101 # admin to add to the allowlist. 102 if text.strip().lower() in ("/chatid", "/myid"): 103 logger.info(f" → replying with chat id {chat_id}") 104 try: 105 send_message(token, chat_id, f"Chat ID: {chat_id}") 106 except Exception as e: 107 logger.warning(f"Failed to reply to /chatid: {e}") 108 processed += 1 109 continue 110 111 # Allowlist gate. Skip silently when the user isn't on the 112 # list — well, almost silently: we send a one-line "not 113 # authorized" reply so the user sees the bot is alive and 114 # knows to ask the admin. 115 if allowed_ids and chat_id not in allowed_ids: 116 logger.info(f" ✗ chat {chat_id} not in allowlist for project '{proj.name}', rejecting") 117 try: 118 send_message( 119 token, chat_id, 120 "You are not authorized to use this bot. " 121 f"If you should be, ask the admin to add chat id {chat_id} to the project's allowlist.", 122 ) 123 except Exception as e: 124 logger.warning(f"Failed to send unauthorized reply: {e}") 125 processed += 1 126 continue 127 128 try: 129 send_typing(token, chat_id) 130 logger.info(f" → invoking project '{proj.name}' agent for chat {chat_id}") 131 response = asyncio.run(_process_message(brain, db, proj.id, text, chat_id)) 132 if response: 133 logger.info(f" → sending response ({len(response)} chars): {response[:200]!r}") 134 send_message(token, chat_id, response) 135 else: 136 logger.warning(f" ✗ project '{proj.name}' returned no response for chat {chat_id}") 137 processed += 1 138 except Exception as e: 139 logger.exception(f"Error processing Telegram message for project {proj.name} (id={proj.id}): {e}") 140 141 # Acknowledge processed updates so Telegram doesn't redeliver. 142 if updates: 143 last_offset = updates[-1]["update_id"] + 1 144 logger.info(f"Acking {len(updates)} update(s) up to offset {last_offset}") 145 _, ack_err = get_updates(token, offset=last_offset, timeout=1) 146 if ack_err is not None: 147 logger.warning(f"Failed to ack updates for project {proj.name}: {ack_err}") 148 149 logger.info(f"Tick complete: {enabled_count} project(s) with Telegram, {processed} message(s) processed") 150 cron.finish(items_processed=processed) 151 except Exception as e: 152 cron.error(f"Telegram poller crashed: {e}", details=__import__("traceback").format_exc()) 153 cron.finish() 154 finally: 155 db.db.close() 156 157 158 async def _process_message(brain, db, project_id, text, chat_id): 159 from restai.models.models import ChatModel, User 160 from restai.helper import chat_main 161 from fastapi import BackgroundTasks 162 163 project = brain.find_project(project_id, db) 164 if not project: 165 logger.warning(f"_process_message: project {project_id} not found") 166 return None 167 168 # `chat_id=f"telegram_{chat_id}"` keeps each Telegram chat as its own 169 # conversation in the agent's memory store across cron ticks. 170 chat_input = ChatModel(question=text, id=f"telegram_{chat_id}") 171 172 user_db = db.get_user_by_username("admin") 173 if not user_db: 174 logger.warning("_process_message: no 'admin' user — cannot run agent") 175 return None 176 user = User.model_validate(user_db) 177 178 background_tasks = BackgroundTasks() 179 # chat_main signature: (request, brain, project, chat_input, user, db, background_tasks). 180 # The Request slot is `_` (unused) inside chat_main, so None is fine. 181 result = await chat_main(None, brain, project, chat_input, user, db, background_tasks) 182 await background_tasks() 183 184 if isinstance(result, dict): 185 return result.get("answer", "") 186 if result is None: 187 logger.warning(f"_process_message: chat_main returned None for project {project_id}") 188 return None 189 logger.warning(f"_process_message: chat_main returned unexpected type {type(result).__name__}") 190 return None 191 192 193 def send_typing(token, chat_id): 194 """Send typing indicator to Telegram chat.""" 195 import requests 196 try: 197 requests.post( 198 f"https://api.telegram.org/bot{token}/sendChatAction", 199 json={"chat_id": chat_id, "action": "typing"}, 200 timeout=5, 201 ) 202 except Exception: 203 pass 204 205 206 if __name__ == "__main__": 207 main()