/ restai / helper.py
helper.py
  1  import time
  2  import socket
  3  import ipaddress
  4  from typing import AsyncGenerator, Any, Dict
  5  from urllib.parse import urlparse
  6  
  7  from starlette.responses import StreamingResponse
  8  from requests import Response
  9  from fastapi import BackgroundTasks
 10  from restai.database import DBWrapper
 11  from restai.project import Project
 12  from restai.projects.agent import Agent
 13  from restai.projects.block import Block
 14  from restai.projects.rag import RAG
 15  from restai.models.models import QuestionModel, User, ChatModel
 16  from restai.brain import Brain
 17  import requests
 18  from fastapi import HTTPException, Request
 19  import re
 20  import logging
 21  from restai.models.models import InteractionModel
 22  import base64
 23  from restai.tools import log_inference
 24  from restai.config import LOG_LEVEL
 25  import json
 26  
 27  from restai.projects.base import ProjectBase
 28  from restai.budget import check_budget, check_rate_limit, check_api_key_quota, record_api_key_tokens
 29  
 30  logging.basicConfig(level=LOG_LEVEL)
 31  logger = logging.getLogger(__name__)
 32  
 33  MAX_IMAGE_SIZE = 10 * 1024 * 1024  # 10 MB
 34  
 35  _BLOCKED_NETWORKS = [
 36      ipaddress.ip_network("127.0.0.0/8"),
 37      ipaddress.ip_network("10.0.0.0/8"),
 38      ipaddress.ip_network("172.16.0.0/12"),
 39      ipaddress.ip_network("192.168.0.0/16"),
 40      ipaddress.ip_network("169.254.0.0/16"),
 41      ipaddress.ip_network("::1/128"),
 42      ipaddress.ip_network("fc00::/7"),
 43  ]
 44  
 45  _URL_PATTERN = re.compile(
 46      r"https?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(),]|%[0-9a-fA-F][0-9a-fA-F])+"
 47  )
 48  
 49  
 50  def _is_private_ip(hostname: str) -> bool:
 51      """Resolve hostname and check if any resolved IP falls within blocked networks."""
 52      try:
 53          addrinfos = socket.getaddrinfo(hostname, None)
 54      except socket.gaierror:
 55          raise ValueError(f"Cannot resolve hostname: {hostname}")
 56  
 57      for addrinfo in addrinfos:
 58          ip = ipaddress.ip_address(addrinfo[4][0])
 59          for network in _BLOCKED_NETWORKS:
 60              if ip in network:
 61                  return True
 62      return False
 63  
 64  
 65  def resolve_image(image: str) -> str:
 66      """If image is a URL, fetch it and return base64-encoded data. Otherwise return as-is."""
 67      if re.match(_URL_PATTERN, image):
 68          parsed = urlparse(image)
 69          hostname = parsed.hostname
 70          if not hostname:
 71              raise ValueError("URL has no valid hostname.")
 72  
 73          if _is_private_ip(hostname):
 74              logger.warning("Blocked SSRF attempt to internal address: %s", hostname)
 75              raise ValueError(f"Access to internal/private addresses is not allowed: {hostname}")
 76  
 77          response = requests.get(image, timeout=10, stream=True)
 78          response.raise_for_status()
 79  
 80          chunks = []
 81          downloaded = 0
 82          for chunk in response.iter_content(chunk_size=8192):
 83              downloaded += len(chunk)
 84              if downloaded > MAX_IMAGE_SIZE:
 85                  response.close()
 86                  raise ValueError(f"Image exceeds maximum allowed size of {MAX_IMAGE_SIZE} bytes.")
 87              chunks.append(chunk)
 88  
 89          content = b"".join(chunks)
 90          if not content:
 91              raise ValueError("Content is null.")
 92          return base64.b64encode(content).decode("utf-8")
 93      return image
 94  
 95  
 96  _IMAGE_ATTACHMENT_EXTS = (".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp")
 97  
 98  
 99  def _is_image_attachment(att) -> bool:
100      mime = (getattr(att, "mime_type", None) or "").lower()
101      if mime.startswith("image/"):
102          return True
103      name = (getattr(att, "name", "") or "").lower()
104      return name.endswith(_IMAGE_ATTACHMENT_EXTS)
105  
106  
107  def _attachment_meta(files):
108      """Strip attachment bytes down to logging-safe metadata. `files` is the
109      optional `FileAttachment[]` on Chat/QuestionModel."""
110      if not files:
111          return []
112      meta = []
113      for f in files:
114          try:
115              size = len(f.content or "") if isinstance(f.content, str) else 0
116          except Exception:
117              size = 0
118          meta.append({
119              "name": getattr(f, "name", None) or "",
120              "mime_type": getattr(f, "mime_type", None),
121              "size": size,
122          })
123      return meta
124  
125  
126  def _normalize_image_inputs(model) -> None:
127      """Canonicalize image input on a Chat/QuestionModel in place.
128  
129      Historically a user could attach an image either via `.image` (the
130      single-image slot) or via `.files[]` (a FileAttachment whose mime or
131      filename marks it as an image). These two paths were drifting apart:
132  
133      - The log viewer persisted only `.image`.
134      - `FileAttachment.content` had no size cap while `.image` did.
135      - Block projects only read `.image`, so an image sent via `.files[]`
136        silently vanished.
137      - Inside the agent, `_route_attachments` promoted the first image file
138        into `image_url` locally, but never propagated that back to `.image`.
139  
140      This helper collapses `files[<image>]` into `.image` at the helper
141      boundary so every downstream path sees a single source of truth.
142  
143      Rules (preserving today's agent behavior):
144      - Explicit `.image` wins. If set, files-images are dropped.
145      - Only the first image in `files[]` is promoted. Subsequent images
146        are dropped (same as `_route_attachments` today).
147      - Non-image files pass through untouched — they still go to the
148        Docker sandbox when `terminal` is enabled.
149      """
150      files = getattr(model, "files", None) or []
151      if not files:
152          return
153  
154      first_image = None
155      remaining = []
156      for f in files:
157          if _is_image_attachment(f):
158              if first_image is None:
159                  first_image = f
160              # Drop image files from `files` regardless — either we
161              # promoted it to `.image` or we're discarding a second+ image.
162              continue
163          remaining.append(f)
164  
165      if first_image is not None and not getattr(model, "image", None):
166          mime = first_image.mime_type or "image/png"
167          model.image = f"data:{mime};base64,{first_image.content}"
168  
169      # Always rewrite `files` to the image-free remainder so downstream
170      # code (agent's `_route_attachments`, attachment-meta logging, sandbox
171      # upload) doesn't see the image twice.
172      model.files = remaining
173  
174  
175  def _pick_image_for_log(explicit_image, files):
176      """Return the image to persist in `output.image`.
177  
178      Post-`_normalize_image_inputs`, `.image` is the canonical source and
179      `files` no longer contains images. Kept as a thin helper for symmetry
180      with `_attachment_meta` and to stay resilient to direct-callers that
181      skip the normalization step.
182      """
183      if explicit_image:
184          return explicit_image
185      if not files:
186          return None
187      # Defensive fallback — only fires if a caller bypassed the normalizer.
188      for f in files:
189          if _is_image_attachment(f) and getattr(f, "content", None):
190              mime = getattr(f, "mime_type", None) or "image/png"
191              return f"data:{mime};base64,{f.content}"
192      return None
193  
194  
195  def _log_inference_error(
196      project: Project,
197      user: User,
198      db: DBWrapper,
199      *,
200      question: str,
201      image: str = None,
202      attachments: list = None,
203      status: str,
204      error: str,
205      system_prompt: str = None,
206      context: dict = None,
207      start_time: float = None,
208  ):
209      """Synchronously write an error row to the inference log before letting
210      an HTTPException propagate. Never raises — logging failures are
211      swallowed with a traceback so the original error still reaches the client."""
212      latency_ms = int((time.perf_counter() - start_time) * 1000) if start_time else None
213      output = {
214          "question": question or "",
215          "answer": "",
216          "tokens": {"input": 0, "output": 0},
217          "status": status,
218          "error": str(error) if error is not None else None,
219          "image": image,
220          "attachments": attachments or [],
221      }
222      try:
223          log_inference(
224              project, user, output, db,
225              latency_ms=latency_ms,
226              system_prompt=system_prompt,
227              context=context,
228          )
229      except Exception:
230          logging.exception("Failed to write error row to inference log")
231  
232  
233  async def create_streaming_response_with_logging(
234      generator,
235      project: Project,
236      user: User,
237      db: DBWrapper,
238      background_tasks: BackgroundTasks,
239      start_time: float = None,
240      system_prompt: str = None,
241      context: dict = None,
242      question: str = None,
243      image: str = None,
244      attachments: list = None,
245  ) -> StreamingResponse:
246  
247      async def stream_with_logging():
248          final_output = None
249          completed = False
250  
251          try:
252              try:
253                  async for chunk in generator:
254                      if isinstance(chunk, dict):
255                          # Safety: if the generator yields a dict (e.g. error fallback),
256                          # wrap it as an SSE data line instead of yielding raw.
257                          final_output = chunk
258                          yield "data: " + json.dumps(chunk) + "\n"
259                          continue
260                      if isinstance(chunk, str) and chunk.startswith("data: "):
261                          try:
262                              data = json.loads(chunk.replace("data: ", ""))
263                              if "answer" in data and "type" in data:
264                                  final_output = data
265                          except:
266                              pass
267                      yield chunk
268                  completed = True
269              except Exception as e:
270                  # Error mid-stream: record it, send a final SSE dict the
271                  # frontend can render, and stop iterating.
272                  logging.exception("Streaming inference failed: %s", e)
273                  err_output = {
274                      "question": question or "",
275                      "answer": f"Internal error: {e}",
276                      "tokens": {"input": 0, "output": 0},
277                      "type": project.props.type,
278                      "status": "error",
279                      "error": str(e),
280                      "image": image,
281                      "attachments": attachments or [],
282                  }
283                  final_output = err_output
284                  # The yields below can themselves fail with ClientDisconnect /
285                  # CancelledError if the client already left — swallow so the
286                  # finally block below still runs the log write.
287                  try:
288                      yield "data: " + json.dumps({"text": err_output["answer"]}) + "\n\n"
289                      yield "data: " + json.dumps(err_output) + "\n"
290                  except Exception:
291                      pass
292  
293              # Best-effort close marker. Wrapped because yielding from a
294              # generator the runtime is trying to cancel can re-raise — and
295              # we don't want that to skip the finally block that does the
296              # actual log write.
297              try:
298                  yield "event: close\n\n"
299              except Exception:
300                  pass
301          finally:
302              # Run log_inference SYNCHRONOUSLY here instead of via
303              # background_tasks.add_task. BackgroundTasks only fire after the
304              # response body finishes sending, so a client that disconnects
305              # mid-stream (CancelledError / GeneratorExit propagates into
306              # this generator) skips them entirely — losing the audit row,
307              # the cost attribution, and the per-API-key quota counter.
308              # The finally block runs regardless of how the generator exited
309              # (success, mid-stream error, client cancel, server cancel),
310              # so logging is guaranteed for any inference that actually
311              # consumed model tokens.
312              try:
313                  if final_output:
314                      if image and not final_output.get("image"):
315                          final_output["image"] = image
316                      if attachments and not final_output.get("attachments"):
317                          final_output["attachments"] = attachments
318                      latency_ms = int((time.perf_counter() - start_time) * 1000) if start_time else None
319                      log_inference(
320                          project, user, final_output, db,
321                          latency_ms=latency_ms, system_prompt=system_prompt, context=context,
322                      )
323                  elif not completed:
324                      # Client disconnected before the model finished — record
325                      # a stub so the call still appears in the inference log
326                      # (tokens are not known here; kept zero on purpose so we
327                      # don't double-count cost vs whatever the model actually
328                      # consumed before being cut off).
329                      _log_inference_error(
330                          project, user, db,
331                          question=question, image=image, attachments=attachments,
332                          status="disconnected",
333                          error="Client disconnected before stream completed",
334                          system_prompt=system_prompt, context=context,
335                          start_time=start_time,
336                      )
337              except Exception:
338                  logging.exception("log_inference failed in stream_with_logging finally")
339  
340      return StreamingResponse(
341          stream_with_logging(),
342          media_type="text/event-stream",
343      )
344  
345  
346  def _apply_context(project: Project, interaction: InteractionModel) -> Project:
347      """If the request includes context, apply it to the project's system prompt."""
348      if not interaction.context:
349          return project
350      return project.with_context(interaction.context)
351  
352  
353  async def chat_main(
354      _: Request,
355      brain: Brain,
356      project: Project,
357      chat_input: ChatModel,
358      user: User,
359      db: DBWrapper,
360      background_tasks: BackgroundTasks,
361      start_time: float = None,
362  ):
363      # Canonicalize image input: folds `.files[<image>]` into `.image` so
364      # every downstream path (agent vision flow, block interpreter, log
365      # viewer) sees a single source of truth. Does nothing if `.image` was
366      # already set or if `.files[]` has no image.
367      _normalize_image_inputs(chat_input)
368  
369      # Capture request metadata up front so every error path below can
370      # write a log row tagged with what the user actually sent.
371      _files = getattr(chat_input, "files", None)
372      _image = _pick_image_for_log(chat_input.image, _files)
373      _attachments = _attachment_meta(_files)
374      _question = chat_input.question
375      _sys = project.props.system
376      _ctx = chat_input.context
377  
378      try:
379          check_budget(project, db)
380      except HTTPException as e:
381          _log_inference_error(
382              project, user, db,
383              question=_question, image=_image, attachments=_attachments,
384              status="budget", error=getattr(e, "detail", str(e)),
385              system_prompt=_sys, context=_ctx, start_time=start_time,
386          )
387          raise
388  
389      try:
390          check_rate_limit(project, db)
391      except HTTPException as e:
392          _log_inference_error(
393              project, user, db,
394              question=_question, image=_image, attachments=_attachments,
395              status="rate_limit", error=getattr(e, "detail", str(e)),
396              system_prompt=_sys, context=_ctx, start_time=start_time,
397          )
398          raise
399  
400      try:
401          check_api_key_quota(user, db)
402      except HTTPException as e:
403          _log_inference_error(
404              project, user, db,
405              question=_question, image=_image, attachments=_attachments,
406              status="quota", error=getattr(e, "detail", str(e)),
407              system_prompt=_sys, context=_ctx, start_time=start_time,
408          )
409          raise
410  
411      try:
412          project = _apply_context(project, chat_input)
413  
414          proj_logic: ProjectBase
415          match project.props.type:
416              case "rag":
417                  proj_logic = RAG(brain)
418              case "agent":
419                  proj_logic = Agent(brain)
420                  if chat_input.image:
421                      chat_input.image = resolve_image(chat_input.image)
422                      _image = chat_input.image  # keep log-image in sync with resolved form
423              case "block":
424                  proj_logic = Block(brain)
425              case _:
426                  raise HTTPException(status_code=400, detail="Invalid project type")
427  
428          if chat_input.stream:
429              return await create_streaming_response_with_logging(
430                  proj_logic.chat(project, chat_input, user, db),
431                  project,
432                  user,
433                  db,
434                  background_tasks,
435                  start_time=start_time,
436                  system_prompt=_sys,
437                  context=_ctx,
438                  question=_question,
439                  image=_image,
440                  attachments=_attachments,
441              )
442          else:
443              output_generator = proj_logic.chat(project, chat_input, user, db)
444              async for line in output_generator:
445                  latency_ms = int((time.perf_counter() - start_time) * 1000) if start_time else None
446                  # Splice request metadata into the output dict so the log
447                  # row carries the image + attachments without each project
448                  # type having to remember to copy them in.
449                  if _image and not line.get("image"):
450                      line["image"] = _image
451                  if _attachments and not line.get("attachments"):
452                      line["attachments"] = _attachments
453                  # Log synchronously, NOT via background_tasks.add_task —
454                  # those only fire after the response body has been
455                  # successfully written, so a client disconnect between
456                  # `return line` and serialization would skip them and
457                  # silently lose audit / cost / quota counting.
458                  log_inference(
459                      project, user, line, db,
460                      latency_ms=latency_ms, system_prompt=_sys, context=_ctx,
461                  )
462                  return line
463              return None
464      except HTTPException as e:
465          _log_inference_error(
466              project, user, db,
467              question=_question, image=_image, attachments=_attachments,
468              status="error", error=getattr(e, "detail", str(e)),
469              system_prompt=_sys, context=_ctx, start_time=start_time,
470          )
471          raise
472      except Exception as e:
473          _log_inference_error(
474              project, user, db,
475              question=_question, image=_image, attachments=_attachments,
476              status="error", error=str(e),
477              system_prompt=_sys, context=_ctx, start_time=start_time,
478          )
479          raise
480  
481  
482  async def question_main(
483      request: Request,
484      brain: Brain,
485      project: Project,
486      q_input: QuestionModel,
487      user: User,
488      db: DBWrapper,
489      background_tasks: BackgroundTasks,
490      start_time: float = None,
491  ):
492      # Canonicalize image input: see `_normalize_image_inputs` doc. Ensures
493      # `.image` is the single source of truth for every project type.
494      _normalize_image_inputs(q_input)
495  
496      # Request metadata captured so every error path can log a row that
497      # reflects what the user sent (including images + attachments that
498      # the LLM never saw because we rejected the request up-front).
499      _files = getattr(q_input, "files", None)
500      _image = _pick_image_for_log(q_input.image, _files)
501      _attachments = _attachment_meta(_files)
502      _question = q_input.question
503      _sys = project.props.system
504      _ctx = q_input.context
505  
506      try:
507          check_budget(project, db)
508      except HTTPException as e:
509          _log_inference_error(
510              project, user, db,
511              question=_question, image=_image, attachments=_attachments,
512              status="budget", error=getattr(e, "detail", str(e)),
513              system_prompt=_sys, context=_ctx, start_time=start_time,
514          )
515          raise
516  
517      try:
518          check_rate_limit(project, db)
519      except HTTPException as e:
520          _log_inference_error(
521              project, user, db,
522              question=_question, image=_image, attachments=_attachments,
523              status="rate_limit", error=getattr(e, "detail", str(e)),
524              system_prompt=_sys, context=_ctx, start_time=start_time,
525          )
526          raise
527  
528      try:
529          check_api_key_quota(user, db)
530      except HTTPException as e:
531          _log_inference_error(
532              project, user, db,
533              question=_question, image=_image, attachments=_attachments,
534              status="quota", error=getattr(e, "detail", str(e)),
535              system_prompt=_sys, context=_ctx, start_time=start_time,
536          )
537          raise
538  
539      project = _apply_context(project, q_input)
540  
541      # Check cache for all project types
542      cached = await process_cache(project, q_input)
543      if cached:
544          latency_ms = int((time.perf_counter() - start_time) * 1000) if start_time else None
545          if _image and not cached.get("image"):
546              cached["image"] = _image
547          if _attachments and not cached.get("attachments"):
548              cached["attachments"] = _attachments
549          # Sync log — see note in chat_main: BackgroundTasks fire only
550          # after the body is sent, so client disconnect would lose this.
551          log_inference(
552              project, user, cached, db,
553              latency_ms=latency_ms, system_prompt=_sys, context=_ctx,
554          )
555          return cached
556  
557      result = None
558      match project.props.type:
559          case "rag":
560              result = await question_rag(
561                  request, brain, project, q_input, user, db, background_tasks, start_time, _sys, _ctx
562              )
563          case "agent":
564              result = await question_agent(
565                  brain, project, q_input, user, db, background_tasks, start_time, _sys, _ctx
566              )
567          case "block":
568              result = await question_block(
569                  brain, project, q_input, user, db, background_tasks, start_time, _sys, _ctx
570              )
571          case _:
572              _log_inference_error(
573                  project, user, db,
574                  question=_question, image=_image, attachments=_attachments,
575                  status="error", error="Invalid project type",
576                  system_prompt=_sys, context=_ctx, start_time=start_time,
577              )
578              raise HTTPException(status_code=400, detail="Invalid project type")
579  
580      # Populate cache with the result
581      if result and project.cache and isinstance(result, dict) and "answer" in result:
582          try:
583              project.cache.add(q_input.question, result["answer"])
584          except Exception:
585              pass
586  
587      # Log retrieval events for RAG source analytics
588      if result and isinstance(result, dict) and result.get("sources") and project.props.type == "rag":
589          from restai.tools import log_retrieval_events
590          background_tasks.add_task(log_retrieval_events, project, result["sources"], db)
591  
592      return result
593  
594  
595  async def question_rag(
596      _: Request,
597      brain: Brain,
598      project: Project,
599      q_input: QuestionModel,
600      user: User,
601      db: DBWrapper,
602      background_tasks: BackgroundTasks,
603      start_time: float = None,
604      system_prompt: str = None,
605      context: dict = None,
606  ):
607      _files = getattr(q_input, "files", None)
608      _image = _pick_image_for_log(q_input.image, _files)
609      _attachments = _attachment_meta(_files)
610      _question = q_input.question
611  
612      try:
613          proj_logic = RAG(brain)
614  
615          if project.props.type != "rag":
616              raise HTTPException(
617                  status_code=400, detail="Only available for RAG projects."
618              )
619  
620          if q_input.stream:
621              return await create_streaming_response_with_logging(
622                  proj_logic.question(project, q_input, user, db),
623                  project,
624                  user,
625                  db,
626                  background_tasks,
627                  start_time=start_time,
628                  system_prompt=system_prompt,
629                  context=context,
630                  question=_question,
631                  image=_image,
632                  attachments=_attachments,
633              )
634          else:
635              output_generator = proj_logic.question(project, q_input, user, db)
636              async for line in output_generator:
637                  latency_ms = int((time.perf_counter() - start_time) * 1000) if start_time else None
638                  if _image and not line.get("image"):
639                      line["image"] = _image
640                  if _attachments and not line.get("attachments"):
641                      line["attachments"] = _attachments
642                  # Sync log — see chat_main note. BackgroundTasks would lose
643                  # this row on client disconnect.
644                  log_inference(
645                      project, user, line, db,
646                      latency_ms=latency_ms, system_prompt=system_prompt, context=context,
647                  )
648                  return line
649      except HTTPException as e:
650          _log_inference_error(
651              project, user, db,
652              question=_question, image=_image, attachments=_attachments,
653              status="error", error=getattr(e, "detail", str(e)),
654              system_prompt=system_prompt, context=context, start_time=start_time,
655          )
656          raise
657      except Exception as e:
658          logging.exception(e)
659          _log_inference_error(
660              project, user, db,
661              question=_question, image=_image, attachments=_attachments,
662              status="error", error=str(e),
663              system_prompt=system_prompt, context=context, start_time=start_time,
664          )
665          raise HTTPException(status_code=500, detail="Internal server error")
666  
667  
668  async def process_cache(project: Project, q_input: QuestionModel):
669      output = {
670          "question": q_input.question,
671          "type": "question",
672          "sources": [],
673          "tokens": {"input": 0, "output": 0},
674      }
675  
676      if project.cache:
677          answer = project.cache.verify(q_input.question)
678          if answer is not None:
679              output.update(
680                  {
681                      "answer": answer,
682                      "cached": True,
683                  }
684              )
685  
686              return output
687  
688      return None
689  
690  
691  async def question_agent(
692      brain: Brain,
693      project: Project,
694      q_input: QuestionModel,
695      user: User,
696      db: DBWrapper,
697      background_tasks: BackgroundTasks,
698      start_time: float = None,
699      system_prompt: str = None,
700      context: dict = None,
701  ):
702      _files = getattr(q_input, "files", None)
703      _image = _pick_image_for_log(q_input.image, _files)
704      _attachments = _attachment_meta(_files)
705      _question = q_input.question
706  
707      try:
708          proj_logic: Agent = Agent(brain)
709  
710          if project.props.type != "agent":
711              raise HTTPException(
712                  status_code=400, detail="Only available for AGENT projects."
713              )
714  
715          if q_input.image:
716              q_input.image = resolve_image(q_input.image)
717              _image = q_input.image
718  
719          if q_input.stream:
720              return await create_streaming_response_with_logging(
721                  proj_logic.question(project, q_input, user, db),
722                  project,
723                  user,
724                  db,
725                  background_tasks,
726                  start_time=start_time,
727                  system_prompt=system_prompt,
728                  context=context,
729                  question=_question,
730                  image=_image,
731                  attachments=_attachments,
732              )
733          else:
734              output_generator = proj_logic.question(project, q_input, user, db)
735              async for line in output_generator:
736                  latency_ms = int((time.perf_counter() - start_time) * 1000) if start_time else None
737                  if _image and not line.get("image"):
738                      line["image"] = _image
739                  if _attachments and not line.get("attachments"):
740                      line["attachments"] = _attachments
741                  # Sync log — see chat_main note. BackgroundTasks would lose
742                  # this row on client disconnect.
743                  log_inference(
744                      project, user, line, db,
745                      latency_ms=latency_ms, system_prompt=system_prompt, context=context,
746                  )
747                  return line
748  
749      except HTTPException as e:
750          _log_inference_error(
751              project, user, db,
752              question=_question, image=_image, attachments=_attachments,
753              status="error", error=getattr(e, "detail", str(e)),
754              system_prompt=system_prompt, context=context, start_time=start_time,
755          )
756          raise
757      except Exception as e:
758          logging.exception(e)
759          _log_inference_error(
760              project, user, db,
761              question=_question, image=_image, attachments=_attachments,
762              status="error", error=str(e),
763              system_prompt=system_prompt, context=context, start_time=start_time,
764          )
765          raise HTTPException(status_code=500, detail="Internal server error")
766  
767  
768  async def question_block(
769      brain: Brain,
770      project: Project,
771      q_input: QuestionModel,
772      user: User,
773      db: DBWrapper,
774      background_tasks: BackgroundTasks,
775      start_time: float = None,
776      system_prompt: str = None,
777      context: dict = None,
778  ):
779      _files = getattr(q_input, "files", None)
780      _image = _pick_image_for_log(q_input.image, _files)
781      _attachments = _attachment_meta(_files)
782      _question = q_input.question
783  
784      try:
785          proj_logic = Block(brain)
786  
787          if project.props.type != "block":
788              raise HTTPException(
789                  status_code=400, detail="Only available for BLOCK projects."
790              )
791  
792          if q_input.image:
793              q_input.image = resolve_image(q_input.image)
794              _image = q_input.image
795  
796          output_generator = proj_logic.question(project, q_input, user, db)
797          async for line in output_generator:
798              latency_ms = int((time.perf_counter() - start_time) * 1000) if start_time else None
799              if _image and not line.get("image"):
800                  line["image"] = _image
801              if _attachments and not line.get("attachments"):
802                  line["attachments"] = _attachments
803              # Sync log — see chat_main note. BackgroundTasks would lose
804              # this row on client disconnect.
805              log_inference(
806                  project, user, line, db,
807                  latency_ms=latency_ms, system_prompt=system_prompt, context=context,
808              )
809              return line
810  
811      except HTTPException as e:
812          _log_inference_error(
813              project, user, db,
814              question=_question, image=_image, attachments=_attachments,
815              status="error", error=getattr(e, "detail", str(e)),
816              system_prompt=system_prompt, context=context, start_time=start_time,
817          )
818          raise
819      except Exception as e:
820          logging.exception(e)
821          _log_inference_error(
822              project, user, db,
823              question=_question, image=_image, attachments=_attachments,
824              status="error", error=str(e),
825              system_prompt=system_prompt, context=context, start_time=start_time,
826          )
827          raise HTTPException(status_code=500, detail="Internal server error")
828  
829