"""Async background worker for processing ingestion jobs.""" import asyncio import hashlib import json import logging import shutil from pathlib import Path from kb import config, database, embeddings, staging from kb.database import build_enriched_text from kb.ingest import detector logger = logging.getLogger("kb.worker") async def ingestion_worker() -> None: """Main background loop that processes queued ingestion jobs. Runs indefinitely until cancelled. Every 2 seconds it checks for the oldest queued job, marks it as *processing*, and delegates to :func:`_process_job` in a thread pool. """ logger.info("Ingestion worker started") while True: try: cfg = config.cfg conn = database.get_connection(cfg.db_path) try: row = conn.execute( "SELECT * FROM jobs WHERE status = 'queued' " "ORDER BY created_at ASC LIMIT 1" ).fetchone() if row is None: await asyncio.sleep(2) continue job_id = row["id"] database.update_job_status(conn, job_id, "processing") logger.info("Processing job %d (%s)", job_id, row["filename"]) finally: conn.close() try: status, doc_id, chunk_count = await asyncio.to_thread( _process_job, row ) except Exception as exc: logger.exception("Job %d failed", job_id) conn = database.get_connection(cfg.db_path) try: database.update_job_status( conn, job_id, "failed", error=str(exc) ) finally: conn.close() continue conn = database.get_connection(cfg.db_path) try: database.update_job_status( conn, job_id, status, document_id=doc_id, chunk_count=chunk_count, ) finally: conn.close() logger.info( "Job %d finished: status=%s doc_id=%s chunks=%s", job_id, status, doc_id, chunk_count, ) except asyncio.CancelledError: logger.info("Ingestion worker cancelled") raise except Exception: logger.exception("Unexpected error in ingestion worker loop") await asyncio.sleep(2) def _process_job(job_row) -> tuple[str, int | None, int]: """Synchronously process a single ingestion job. Returns: A tuple of ``(status, document_id, chunk_count)`` where *status* is one of ``"done"``, ``"skipped"``. """ cfg = config.cfg conn = database.get_connection(cfg.db_path) staged_path = Path(job_row["staging_path"]) try: # --- Determine document type and language ------------------------- filename = job_row["filename"] forced_type = job_row["doc_type"] if staged_path.suffix == ".note": doc_type = "note" language = None elif forced_type: doc_type = forced_type language = None else: doc_type, language = detector.detect_type(Path(filename)) # --- Chunk the content -------------------------------------------- if doc_type == "note": text = staged_path.read_text(encoding="utf-8") from kb.ingest.note import chunk_note chunks = chunk_note(text) elif doc_type == "pdf": from kb.ingest.docling_pipeline import chunk_document chunks = chunk_document(staged_path, cfg.ingest_device) elif doc_type == "markdown": text = staged_path.read_text(encoding="utf-8") from kb.ingest.markdown import chunk_markdown chunks = chunk_markdown(text) elif doc_type == "code": text = staged_path.read_text(encoding="utf-8") if not language: _, language = detector.detect_type(Path(filename)) from kb.ingest.code import chunk_code chunks = chunk_code(text, language) else: raise ValueError(f"Unsupported doc_type: {doc_type}") # --- Duplicate detection via content hash ------------------------- raw_bytes = staged_path.read_bytes() content_hash = hashlib.sha256(raw_bytes).hexdigest() if database.hash_exists(conn, content_hash): logger.info("Duplicate detected for job %d, skipping", job_row["id"]) return ("skipped", None, 0) # --- Persist document, chunks, and embeddings --------------------- title = job_row["title"] or filename doc_id = database.insert_document( conn, title=title, source_path=str(staged_path), content_hash=content_hash, doc_type=doc_type, language=language, ) chunk_texts = [c if isinstance(c, str) else c["text"] for c in chunks] chunk_metas = [] for idx, c in enumerate(chunks): if isinstance(c, str): chunk_metas.append(None) else: meta = {k: v for k, v in c.items() if k != "text"} or None chunk_metas.append(meta) enriched_texts = [ build_enriched_text(title, ct, cm) for ct, cm in zip(chunk_texts, chunk_metas) ] vectors = embeddings.embed_texts(enriched_texts) for idx, (chunk_text, enriched, vector) in enumerate( zip(chunk_texts, enriched_texts, vectors) ): chunk_id = database.insert_chunk( conn, document_id=doc_id, chunk_index=idx, text=chunk_text, enriched_text=enriched, metadata=chunk_metas[idx], ) database.insert_embedding(conn, chunk_id, vector) # --- Tags --------------------------------------------------------- tags = json.loads(job_row["tags_json"] or "[]") if tags: database.tag_document(conn, doc_id, tags) conn.commit() # --- Move original file to persistent storage --------------------- ext = Path(filename).suffix or staged_path.suffix dest = cfg.documents_dir / f"{content_hash}{ext}" try: cfg.documents_dir.mkdir(parents=True, exist_ok=True) shutil.move(str(staged_path), str(dest)) conn_update = database.get_connection(cfg.db_path) try: conn_update.execute( "UPDATE documents SET stored_path = ?, original_filename = ? WHERE id = ?", (str(dest), filename, doc_id), ) conn_update.commit() finally: conn_update.close() logger.info("Stored original file: %s", dest) except Exception as exc: logger.warning("Failed to store original file: %s", exc) staging.cleanup(staged_path) return ("done", doc_id, len(chunk_texts)) finally: conn.close() # Only clean up staging if the file is still there (not moved) if staged_path.exists(): staging.cleanup(staged_path)