Files
kb/engine/kb/ingest/code.py
T
steve 9aab79d49b v2 restructure: Go client, Docker engine, release tooling
- Remove v1 Python CLI (src/kb_search/, tests/, root pyproject.toml, uv.lock, .venv)
- Add Go client with cross-platform build (client/)
- Add FastAPI engine with NVIDIA and multi-stage ROCm Dockerfiles (engine/)
- Add VERSION files for client and engine, wired into builds
- Add release.sh for automated build, tag, release, and Docker push
- Update README with build/release docs and ROCm migration note
- Clean up .gitignore for v2 project structure

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 21:52:25 +00:00

207 lines
6.0 KiB
Python

"""Chunking pipeline for source code files."""
from __future__ import annotations
import ast
import re
def _approx_tokens(text: str) -> int:
return len(text) // 4
def _fixed_token_chunks(text: str, max_tokens: int) -> list[str]:
"""Split text into fixed-size token chunks by lines."""
lines = text.split("\n")
pieces: list[str] = []
current: list[str] = []
current_len = 0
for line in lines:
line_tokens = _approx_tokens(line)
if current and current_len + line_tokens > max_tokens:
pieces.append("\n".join(current))
current = [line]
current_len = line_tokens
else:
current.append(line)
current_len += line_tokens
if current:
pieces.append("\n".join(current))
return pieces
def _chunk_python(text: str, max_tokens: int) -> list[dict]:
"""Use the ast module to extract top-level classes and functions."""
lines = text.split("\n")
try:
tree = ast.parse(text)
except SyntaxError:
return []
# Collect top-level class and function definitions
regions: list[tuple[int, int, str]] = [] # (start_line, end_line, name)
for node in ast.iter_child_nodes(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
continue
start = node.lineno - 1 # 0-indexed
# Include preceding comments and decorators
first_line = node.lineno - 1
if node.decorator_list:
first_line = node.decorator_list[0].lineno - 1
# Walk backwards for comment lines
scan = first_line - 1
while scan >= 0 and (lines[scan].strip().startswith("#") or not lines[scan].strip()):
if lines[scan].strip().startswith("#"):
first_line = scan
scan -= 1
start = first_line
end = node.end_lineno # 1-indexed, inclusive
prefix = "class " if isinstance(node, ast.ClassDef) else "def "
regions.append((start, end, f"{prefix}{node.name}"))
if not regions:
return []
# Sort by start line
regions.sort(key=lambda r: r[0])
chunks: list[dict] = []
chunk_index = 0
prev_end = 0
for start, end, name in regions:
# Capture any module-level code between definitions
if start > prev_end:
preamble = "\n".join(lines[prev_end:start]).strip()
if preamble and _approx_tokens(preamble) > 10:
chunks.append({
"text": preamble,
"chunk_index": chunk_index,
"metadata": {},
})
chunk_index += 1
block = "\n".join(lines[start:end]).rstrip()
if _approx_tokens(block) > max_tokens:
for piece in _fixed_token_chunks(block, max_tokens):
chunks.append({
"text": piece,
"chunk_index": chunk_index,
"metadata": {"name": name},
})
chunk_index += 1
else:
chunks.append({
"text": block,
"chunk_index": chunk_index,
"metadata": {"name": name},
})
chunk_index += 1
prev_end = end
# Trailing module-level code
if prev_end < len(lines):
tail = "\n".join(lines[prev_end:]).strip()
if tail and _approx_tokens(tail) > 10:
chunks.append({
"text": tail,
"chunk_index": chunk_index,
"metadata": {},
})
return chunks
def _chunk_by_regex(text: str, pattern: str, max_tokens: int) -> list[dict]:
"""Split source code at regex-matched function boundaries."""
lines = text.split("\n")
boundaries: list[tuple[int, str]] = []
for i, line in enumerate(lines):
m = re.match(pattern, line)
if m:
boundaries.append((i, m.group(0).strip()))
if not boundaries:
return []
chunks: list[dict] = []
chunk_index = 0
# Content before first match
if boundaries[0][0] > 0:
preamble = "\n".join(lines[: boundaries[0][0]]).strip()
if preamble:
chunks.append({
"text": preamble,
"chunk_index": chunk_index,
"metadata": {},
})
chunk_index += 1
for idx, (start, name) in enumerate(boundaries):
end = boundaries[idx + 1][0] if idx + 1 < len(boundaries) else len(lines)
block = "\n".join(lines[start:end]).rstrip()
if _approx_tokens(block) > max_tokens:
for piece in _fixed_token_chunks(block, max_tokens):
chunks.append({
"text": piece,
"chunk_index": chunk_index,
"metadata": {"name": name},
})
chunk_index += 1
else:
chunks.append({
"text": block,
"chunk_index": chunk_index,
"metadata": {"name": name},
})
chunk_index += 1
return chunks
def chunk_code(
text: str,
language: str | None,
max_tokens: int = 1024,
) -> list[dict]:
"""Split source code into chunks using language-aware strategies.
Returns a list of chunk dicts, each containing:
text, chunk_index, metadata
"""
chunks: list[dict] = []
if language == "python":
chunks = _chunk_python(text, max_tokens)
elif language == "bash":
chunks = _chunk_by_regex(
text, r"^(?:\w+\s*\(\)|function\s+\w+)", max_tokens
)
elif language == "go":
chunks = _chunk_by_regex(text, r"^func\s+", max_tokens)
# Fallback: fixed-size token chunking
if not chunks:
for idx, piece in enumerate(_fixed_token_chunks(text, max_tokens)):
piece = piece.strip()
if piece:
chunks.append({
"text": piece,
"chunk_index": idx,
"metadata": {},
})
return chunks