/ api-gateway / app / routes / docs.py
docs.py
  1  """Document management routes (Module A)."""
  2  import logging
  3  from typing import Optional
  4  from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, Body
  5  from sqlalchemy.orm import Session
  6  
  7  from app.database import get_db
  8  from app.schemas import (
  9      DocQueryRequest,
 10      DocQueryResponse,
 11      DocUploadResponse,
 12      DocStatusResponse,
 13      DocIndexRequest,
 14      DocIndexResponse,
 15      ErrorResponse,
 16      Citation
 17  )
 18  from app.services import (
 19      generate_request_id,
 20      hash_prompt,
 21      save_uploaded_file,
 22      create_doc_asset,
 23      create_audit_event,
 24      get_all_docs,
 25      check_duplicate_filename,
 26      get_pending_gcs_docs,
 27      update_doc_indexed_status,
 28      trigger_vertex_import,
 29      STORAGE_BACKEND,
 30  )
 31  from app.models import AuditModule, AuditStatus, IndexedStatus
 32  
 33  logger = logging.getLogger(__name__)
 34  
 35  router = APIRouter(prefix="/docs", tags=["docs"])
 36  
 37  
 38  @router.get("/config")
 39  async def get_storage_config():
 40      """
 41      Get storage backend configuration.
 42      
 43      Returns the active storage backend (local or gcs) for frontend display.
 44      """
 45      from app.services import STORAGE_BACKEND
 46      return {
 47          "storage_backend": STORAGE_BACKEND
 48      }
 49  
 50  
 51  @router.post("/upload", response_model=DocUploadResponse)
 52  async def upload_document(
 53      file: UploadFile = File(...),
 54      db: Session = Depends(get_db)
 55  ):
 56      """
 57      Upload a document file.
 58      
 59      Writes file to uploads volume, inserts doc_asset row, and logs audit event.
 60      """
 61      request_id = generate_request_id()
 62      
 63      try:
 64          # Get original filename
 65          original_filename = file.filename or "unnamed"
 66          
 67          # Check for duplicate filename (informational - we allow duplicates for MVP)
 68          existing_doc = check_duplicate_filename(db, original_filename)
 69          if existing_doc:
 70              logger.warning(f"Duplicate filename detected: {original_filename} (existing doc_id: {existing_doc.id}, request_id: {request_id})")
 71              # For MVP: allow duplicates but log it. Later we could reject or replace.
 72          
 73          # Save file to uploads volume
 74          storage_uri, filename = await save_uploaded_file(file, request_id)
 75          
 76          # Create doc_asset record
 77          doc_asset = create_doc_asset(db, filename, storage_uri, request_id)
 78          
 79          # Trigger Vertex AI Search import when using GCS (docs saved to gs://bucket/docs/)
 80          if STORAGE_BACKEND == "gcs" and storage_uri.startswith("gs://"):
 81              update_doc_indexed_status(db, doc_asset, IndexedStatus.INDEXING)
 82              success, err = trigger_vertex_import(storage_uri, request_id)
 83              if success:
 84                  update_doc_indexed_status(db, doc_asset, IndexedStatus.READY, datastore_ref=storage_uri)
 85              else:
 86                  update_doc_indexed_status(db, doc_asset, IndexedStatus.FAILED)
 87                  logger.warning(f"Vertex import failed for doc_id={doc_asset.id}: {err}")
 88          
 89          # Log audit event
 90          create_audit_event(
 91              db=db,
 92              module=AuditModule.MODULE_A,
 93              request_id=request_id,
 94              sources_json={"filename": filename, "doc_id": doc_asset.id},
 95              status=AuditStatus.SUCCESS
 96          )
 97          
 98          # Prepare response with optional duplicate warning
 99          duplicate_warning = None
100          if existing_doc:
101              duplicate_warning = f"A document with filename '{filename}' already exists (ID: {existing_doc.id}). This upload creates a new record."
102          
103          return DocUploadResponse(
104              request_id=request_id,
105              doc_id=doc_asset.id,
106              filename=filename,
107              message="Document uploaded successfully",
108              duplicate_warning=duplicate_warning
109          )
110          
111      except Exception as e:
112          logger.error(f"Upload failed (request_id: {request_id}): {e}", exc_info=True)
113          
114          # Log failed audit event
115          create_audit_event(
116              db=db,
117              module=AuditModule.MODULE_A,
118              request_id=request_id,
119              status=AuditStatus.FAILURE,
120              error=str(e)
121          )
122          
123          raise HTTPException(
124              status_code=500,
125              detail=ErrorResponse(
126                  request_id=request_id,
127                  error="Upload failed",
128                  detail=str(e)
129              ).dict()
130          )
131  
132  
133  @router.post("/index", response_model=DocIndexResponse)
134  async def trigger_indexing(
135      request: Optional[DocIndexRequest] = Body(default=None),
136      db: Session = Depends(get_db)
137  ):
138      """
139      Trigger document indexing to Vertex AI Search.
140      Indexes PENDING docs with gs:// storage URIs. Optionally restrict to doc_id.
141      Only applies when STORAGE_BACKEND=gcs; local uploads cannot be indexed.
142      """
143      request_id = generate_request_id()
144      doc_id = request.doc_id if request else None
145      docs = get_pending_gcs_docs(db, doc_id)
146      triggered = len(docs)
147      succeeded = 0
148      failed = 0
149      details = []
150  
151      for doc in docs:
152          update_doc_indexed_status(db, doc, IndexedStatus.INDEXING)
153          success, err = trigger_vertex_import(doc.storage_uri, request_id)
154          if success:
155              update_doc_indexed_status(db, doc, IndexedStatus.READY, datastore_ref=doc.storage_uri)
156              succeeded += 1
157              details.append({"doc_id": doc.id, "filename": doc.filename, "status": "ready", "error": None})
158          else:
159              update_doc_indexed_status(db, doc, IndexedStatus.FAILED)
160              failed += 1
161              details.append({"doc_id": doc.id, "filename": doc.filename, "status": "failed", "error": err})
162  
163      create_audit_event(
164          db=db,
165          module=AuditModule.MODULE_A,
166          request_id=request_id,
167          sources_json={"triggered": triggered, "succeeded": succeeded, "failed": failed},
168          status=AuditStatus.SUCCESS if failed == 0 else AuditStatus.FAILURE,
169          error=None if failed == 0 else f"{failed} of {triggered} imports failed"
170      )
171  
172      return DocIndexResponse(
173          request_id=request_id,
174          triggered=triggered,
175          succeeded=succeeded,
176          failed=failed,
177          details=details
178      )
179  
180  
181  @router.get("/status", response_model=DocStatusResponse)
182  async def get_docs_status(
183      db: Session = Depends(get_db)
184  ):
185      """
186      Get status of all documents.
187      
188      Returns list of documents with their indexing status.
189      """
190      request_id = generate_request_id()
191      
192      try:
193          docs = get_all_docs(db)
194          
195          documents = [
196              {
197                  "id": doc.id,
198                  "filename": doc.filename,
199                  "uploaded_at": doc.uploaded_at.isoformat() if doc.uploaded_at else None,
200                  "indexed_status": doc.indexed_status.value,
201                  "storage_uri": doc.storage_uri,
202                  "datastore_ref": doc.datastore_ref,
203                  "deleted_at": doc.deleted_at.isoformat() if doc.deleted_at else None
204              }
205              for doc in docs
206          ]
207          
208          # Log audit event
209          create_audit_event(
210              db=db,
211              module=AuditModule.MODULE_A,
212              request_id=request_id,
213              sources_json={"doc_count": len(documents)},
214              status=AuditStatus.SUCCESS
215          )
216          
217          return DocStatusResponse(
218              request_id=request_id,
219              documents=documents
220          )
221          
222      except Exception as e:
223          logger.error(f"Status query failed (request_id: {request_id}): {e}", exc_info=True)
224          
225          # Log failed audit event
226          create_audit_event(
227              db=db,
228              module=AuditModule.MODULE_A,
229              request_id=request_id,
230              status=AuditStatus.FAILURE,
231              error=str(e)
232          )
233          
234          raise HTTPException(
235              status_code=500,
236              detail=ErrorResponse(
237                  request_id=request_id,
238                  error="Failed to retrieve document status",
239                  detail=str(e)
240              ).dict()
241          )
242  
243  
244  @router.post("/query", response_model=DocQueryResponse)
245  async def query_documents(
246      request: DocQueryRequest,
247      db: Session = Depends(get_db)
248  ):
249      """
250      Query documents (stub implementation).
251      
252      Returns refusal message with empty citations until Vertex AI Search is integrated.
253      Logs audit event.
254      """
255      request_id = generate_request_id()
256      prompt_hash = hash_prompt(request.query)
257      
258      # Stub response: refuse if no citations (hard trust rule)
259      answer = "Information not found in internal records."
260      citations = []
261      
262      try:
263          # Log audit event for the query
264          create_audit_event(
265              db=db,
266              module=AuditModule.MODULE_A,
267              request_id=request_id,
268              prompt_hash=prompt_hash,
269              sources_json={"query": request.query, "citations_count": len(citations)},
270              status=AuditStatus.SUCCESS,
271              error=None
272          )
273          
274          return DocQueryResponse(
275              request_id=request_id,
276              answer=answer,
277              citations=citations
278          )
279          
280      except Exception as e:
281          logger.error(f"Query failed (request_id: {request_id}): {e}", exc_info=True)
282          
283          # Log failed audit event
284          create_audit_event(
285              db=db,
286              module=AuditModule.MODULE_A,
287              request_id=request_id,
288              prompt_hash=prompt_hash,
289              status=AuditStatus.FAILURE,
290              error=str(e)
291          )
292          
293          raise HTTPException(
294              status_code=500,
295              detail=ErrorResponse(
296                  request_id=request_id,
297                  error="Query failed",
298                  detail=str(e)
299              ).dict()
300          )