docs.py
1 """Document management routes (Module A).""" 2 import logging 3 from typing import Optional 4 from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, Body 5 from sqlalchemy.orm import Session 6 7 from app.database import get_db 8 from app.schemas import ( 9 DocQueryRequest, 10 DocQueryResponse, 11 DocUploadResponse, 12 DocStatusResponse, 13 DocIndexRequest, 14 DocIndexResponse, 15 ErrorResponse, 16 Citation 17 ) 18 from app.services import ( 19 generate_request_id, 20 hash_prompt, 21 save_uploaded_file, 22 create_doc_asset, 23 create_audit_event, 24 get_all_docs, 25 check_duplicate_filename, 26 get_pending_gcs_docs, 27 update_doc_indexed_status, 28 trigger_vertex_import, 29 STORAGE_BACKEND, 30 ) 31 from app.models import AuditModule, AuditStatus, IndexedStatus 32 33 logger = logging.getLogger(__name__) 34 35 router = APIRouter(prefix="/docs", tags=["docs"]) 36 37 38 @router.get("/config") 39 async def get_storage_config(): 40 """ 41 Get storage backend configuration. 42 43 Returns the active storage backend (local or gcs) for frontend display. 44 """ 45 from app.services import STORAGE_BACKEND 46 return { 47 "storage_backend": STORAGE_BACKEND 48 } 49 50 51 @router.post("/upload", response_model=DocUploadResponse) 52 async def upload_document( 53 file: UploadFile = File(...), 54 db: Session = Depends(get_db) 55 ): 56 """ 57 Upload a document file. 58 59 Writes file to uploads volume, inserts doc_asset row, and logs audit event. 60 """ 61 request_id = generate_request_id() 62 63 try: 64 # Get original filename 65 original_filename = file.filename or "unnamed" 66 67 # Check for duplicate filename (informational - we allow duplicates for MVP) 68 existing_doc = check_duplicate_filename(db, original_filename) 69 if existing_doc: 70 logger.warning(f"Duplicate filename detected: {original_filename} (existing doc_id: {existing_doc.id}, request_id: {request_id})") 71 # For MVP: allow duplicates but log it. Later we could reject or replace. 72 73 # Save file to uploads volume 74 storage_uri, filename = await save_uploaded_file(file, request_id) 75 76 # Create doc_asset record 77 doc_asset = create_doc_asset(db, filename, storage_uri, request_id) 78 79 # Trigger Vertex AI Search import when using GCS (docs saved to gs://bucket/docs/) 80 if STORAGE_BACKEND == "gcs" and storage_uri.startswith("gs://"): 81 update_doc_indexed_status(db, doc_asset, IndexedStatus.INDEXING) 82 success, err = trigger_vertex_import(storage_uri, request_id) 83 if success: 84 update_doc_indexed_status(db, doc_asset, IndexedStatus.READY, datastore_ref=storage_uri) 85 else: 86 update_doc_indexed_status(db, doc_asset, IndexedStatus.FAILED) 87 logger.warning(f"Vertex import failed for doc_id={doc_asset.id}: {err}") 88 89 # Log audit event 90 create_audit_event( 91 db=db, 92 module=AuditModule.MODULE_A, 93 request_id=request_id, 94 sources_json={"filename": filename, "doc_id": doc_asset.id}, 95 status=AuditStatus.SUCCESS 96 ) 97 98 # Prepare response with optional duplicate warning 99 duplicate_warning = None 100 if existing_doc: 101 duplicate_warning = f"A document with filename '{filename}' already exists (ID: {existing_doc.id}). This upload creates a new record." 102 103 return DocUploadResponse( 104 request_id=request_id, 105 doc_id=doc_asset.id, 106 filename=filename, 107 message="Document uploaded successfully", 108 duplicate_warning=duplicate_warning 109 ) 110 111 except Exception as e: 112 logger.error(f"Upload failed (request_id: {request_id}): {e}", exc_info=True) 113 114 # Log failed audit event 115 create_audit_event( 116 db=db, 117 module=AuditModule.MODULE_A, 118 request_id=request_id, 119 status=AuditStatus.FAILURE, 120 error=str(e) 121 ) 122 123 raise HTTPException( 124 status_code=500, 125 detail=ErrorResponse( 126 request_id=request_id, 127 error="Upload failed", 128 detail=str(e) 129 ).dict() 130 ) 131 132 133 @router.post("/index", response_model=DocIndexResponse) 134 async def trigger_indexing( 135 request: Optional[DocIndexRequest] = Body(default=None), 136 db: Session = Depends(get_db) 137 ): 138 """ 139 Trigger document indexing to Vertex AI Search. 140 Indexes PENDING docs with gs:// storage URIs. Optionally restrict to doc_id. 141 Only applies when STORAGE_BACKEND=gcs; local uploads cannot be indexed. 142 """ 143 request_id = generate_request_id() 144 doc_id = request.doc_id if request else None 145 docs = get_pending_gcs_docs(db, doc_id) 146 triggered = len(docs) 147 succeeded = 0 148 failed = 0 149 details = [] 150 151 for doc in docs: 152 update_doc_indexed_status(db, doc, IndexedStatus.INDEXING) 153 success, err = trigger_vertex_import(doc.storage_uri, request_id) 154 if success: 155 update_doc_indexed_status(db, doc, IndexedStatus.READY, datastore_ref=doc.storage_uri) 156 succeeded += 1 157 details.append({"doc_id": doc.id, "filename": doc.filename, "status": "ready", "error": None}) 158 else: 159 update_doc_indexed_status(db, doc, IndexedStatus.FAILED) 160 failed += 1 161 details.append({"doc_id": doc.id, "filename": doc.filename, "status": "failed", "error": err}) 162 163 create_audit_event( 164 db=db, 165 module=AuditModule.MODULE_A, 166 request_id=request_id, 167 sources_json={"triggered": triggered, "succeeded": succeeded, "failed": failed}, 168 status=AuditStatus.SUCCESS if failed == 0 else AuditStatus.FAILURE, 169 error=None if failed == 0 else f"{failed} of {triggered} imports failed" 170 ) 171 172 return DocIndexResponse( 173 request_id=request_id, 174 triggered=triggered, 175 succeeded=succeeded, 176 failed=failed, 177 details=details 178 ) 179 180 181 @router.get("/status", response_model=DocStatusResponse) 182 async def get_docs_status( 183 db: Session = Depends(get_db) 184 ): 185 """ 186 Get status of all documents. 187 188 Returns list of documents with their indexing status. 189 """ 190 request_id = generate_request_id() 191 192 try: 193 docs = get_all_docs(db) 194 195 documents = [ 196 { 197 "id": doc.id, 198 "filename": doc.filename, 199 "uploaded_at": doc.uploaded_at.isoformat() if doc.uploaded_at else None, 200 "indexed_status": doc.indexed_status.value, 201 "storage_uri": doc.storage_uri, 202 "datastore_ref": doc.datastore_ref, 203 "deleted_at": doc.deleted_at.isoformat() if doc.deleted_at else None 204 } 205 for doc in docs 206 ] 207 208 # Log audit event 209 create_audit_event( 210 db=db, 211 module=AuditModule.MODULE_A, 212 request_id=request_id, 213 sources_json={"doc_count": len(documents)}, 214 status=AuditStatus.SUCCESS 215 ) 216 217 return DocStatusResponse( 218 request_id=request_id, 219 documents=documents 220 ) 221 222 except Exception as e: 223 logger.error(f"Status query failed (request_id: {request_id}): {e}", exc_info=True) 224 225 # Log failed audit event 226 create_audit_event( 227 db=db, 228 module=AuditModule.MODULE_A, 229 request_id=request_id, 230 status=AuditStatus.FAILURE, 231 error=str(e) 232 ) 233 234 raise HTTPException( 235 status_code=500, 236 detail=ErrorResponse( 237 request_id=request_id, 238 error="Failed to retrieve document status", 239 detail=str(e) 240 ).dict() 241 ) 242 243 244 @router.post("/query", response_model=DocQueryResponse) 245 async def query_documents( 246 request: DocQueryRequest, 247 db: Session = Depends(get_db) 248 ): 249 """ 250 Query documents (stub implementation). 251 252 Returns refusal message with empty citations until Vertex AI Search is integrated. 253 Logs audit event. 254 """ 255 request_id = generate_request_id() 256 prompt_hash = hash_prompt(request.query) 257 258 # Stub response: refuse if no citations (hard trust rule) 259 answer = "Information not found in internal records." 260 citations = [] 261 262 try: 263 # Log audit event for the query 264 create_audit_event( 265 db=db, 266 module=AuditModule.MODULE_A, 267 request_id=request_id, 268 prompt_hash=prompt_hash, 269 sources_json={"query": request.query, "citations_count": len(citations)}, 270 status=AuditStatus.SUCCESS, 271 error=None 272 ) 273 274 return DocQueryResponse( 275 request_id=request_id, 276 answer=answer, 277 citations=citations 278 ) 279 280 except Exception as e: 281 logger.error(f"Query failed (request_id: {request_id}): {e}", exc_info=True) 282 283 # Log failed audit event 284 create_audit_event( 285 db=db, 286 module=AuditModule.MODULE_A, 287 request_id=request_id, 288 prompt_hash=prompt_hash, 289 status=AuditStatus.FAILURE, 290 error=str(e) 291 ) 292 293 raise HTTPException( 294 status_code=500, 295 detail=ErrorResponse( 296 request_id=request_id, 297 error="Query failed", 298 detail=str(e) 299 ).dict() 300 )