/ migrations / versions / 044_bulk_ingest_jobs.py
044_bulk_ingest_jobs.py
 1  """Bulk file ingest queue for RAG projects.
 2  
 3  Admins upload CSV/XLSX/JSON/PDF/docx via the KB tab; each file lands
 4  as a ``queued`` row here and ``crons/bulk_ingest.py`` picks it up.
 5  Decouples large-file ingestion from the request/response cycle so an
 6  admin uploading a 500-page PDF doesn't block on a 2-minute response.
 7  """
 8  import sqlalchemy as sa
 9  from alembic import op
10  
11  
12  revision = '044'
13  down_revision = '043'
14  branch_labels = None
15  depends_on = None
16  
17  
18  def upgrade():
19      try:
20          op.create_table(
21              'bulk_ingest_jobs',
22              sa.Column('id', sa.Integer(), primary_key=True, index=True),
23              sa.Column('project_id', sa.Integer(), sa.ForeignKey('projects.id', ondelete='CASCADE'), nullable=False, index=True),
24              sa.Column('filename', sa.String(512), nullable=False),
25              sa.Column('mime_type', sa.String(255), nullable=True),
26              sa.Column('size_bytes', sa.Integer(), nullable=False, server_default='0'),
27              sa.Column('file_path', sa.String(1024), nullable=False),
28              sa.Column('method', sa.String(32), nullable=True),
29              sa.Column('splitter', sa.String(32), nullable=True, server_default='sentence'),
30              sa.Column('chunks', sa.Integer(), nullable=True, server_default='256'),
31              sa.Column('status', sa.String(16), nullable=False, server_default='queued', index=True),
32              sa.Column('error_message', sa.Text(), nullable=True),
33              sa.Column('documents_count', sa.Integer(), nullable=True),
34              sa.Column('chunks_count', sa.Integer(), nullable=True),
35              sa.Column('created_at', sa.DateTime(), nullable=False),
36              sa.Column('started_at', sa.DateTime(), nullable=True),
37              sa.Column('completed_at', sa.DateTime(), nullable=True),
38          )
39      except Exception:
40          pass
41  
42  
43  def downgrade():
44      try:
45          op.drop_table('bulk_ingest_jobs')
46      except Exception:
47          pass