044_bulk_ingest_jobs.py
1 """Bulk file ingest queue for RAG projects. 2 3 Admins upload CSV/XLSX/JSON/PDF/docx via the KB tab; each file lands 4 as a ``queued`` row here and ``crons/bulk_ingest.py`` picks it up. 5 Decouples large-file ingestion from the request/response cycle so an 6 admin uploading a 500-page PDF doesn't block on a 2-minute response. 7 """ 8 import sqlalchemy as sa 9 from alembic import op 10 11 12 revision = '044' 13 down_revision = '043' 14 branch_labels = None 15 depends_on = None 16 17 18 def upgrade(): 19 try: 20 op.create_table( 21 'bulk_ingest_jobs', 22 sa.Column('id', sa.Integer(), primary_key=True, index=True), 23 sa.Column('project_id', sa.Integer(), sa.ForeignKey('projects.id', ondelete='CASCADE'), nullable=False, index=True), 24 sa.Column('filename', sa.String(512), nullable=False), 25 sa.Column('mime_type', sa.String(255), nullable=True), 26 sa.Column('size_bytes', sa.Integer(), nullable=False, server_default='0'), 27 sa.Column('file_path', sa.String(1024), nullable=False), 28 sa.Column('method', sa.String(32), nullable=True), 29 sa.Column('splitter', sa.String(32), nullable=True, server_default='sentence'), 30 sa.Column('chunks', sa.Integer(), nullable=True, server_default='256'), 31 sa.Column('status', sa.String(16), nullable=False, server_default='queued', index=True), 32 sa.Column('error_message', sa.Text(), nullable=True), 33 sa.Column('documents_count', sa.Integer(), nullable=True), 34 sa.Column('chunks_count', sa.Integer(), nullable=True), 35 sa.Column('created_at', sa.DateTime(), nullable=False), 36 sa.Column('started_at', sa.DateTime(), nullable=True), 37 sa.Column('completed_at', sa.DateTime(), nullable=True), 38 ) 39 except Exception: 40 pass 41 42 43 def downgrade(): 44 try: 45 op.drop_table('bulk_ingest_jobs') 46 except Exception: 47 pass