6fec627503
- Reject duplicate uploads at the API boundary (HTTP 409) instead of silently skipping in the background worker. Checks both ingested documents and in-flight jobs via content_hash on the jobs table. - Go client handles 409 with distinct messages for already-imported documents vs already-queued jobs. - Sanitize FTS5 search queries by quoting each token to prevent syntax errors from special characters like ?, *, ", (), AND, OR, NOT. - Add try/except safety net around FTS5 execute for edge cases. - Add main branch guard to release.sh to prevent releasing from feature branches. - Update specs and README to reflect new behaviour. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
267 lines
6.0 KiB
Go
267 lines
6.0 KiB
Go
package cmd
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/kb-search/kb/internal/api"
|
|
"github.com/kb-search/kb/internal/output"
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
type uploadResult struct {
|
|
Raw interface{}
|
|
Duplicate bool
|
|
DocID float64
|
|
JobID float64
|
|
Title string
|
|
}
|
|
|
|
func (r *uploadResult) duplicateMsg() string {
|
|
if r.DocID > 0 {
|
|
return fmt.Sprintf("Already imported: %s (doc ID: %.0f)", r.Title, r.DocID)
|
|
}
|
|
return fmt.Sprintf("Already queued: %s (job ID: %.0f)", r.Title, r.JobID)
|
|
}
|
|
|
|
var supportedExts = map[string]bool{
|
|
".pdf": true,
|
|
".docx": true,
|
|
".html": true,
|
|
".md": true,
|
|
".txt": true,
|
|
".py": true,
|
|
".sh": true,
|
|
".go": true,
|
|
}
|
|
|
|
var addCmd = &cobra.Command{
|
|
Use: "add <path>",
|
|
Short: "Add a document or directory to the knowledge base",
|
|
Args: cobra.MaximumNArgs(1),
|
|
RunE: runAdd,
|
|
}
|
|
|
|
func init() {
|
|
addCmd.Flags().String("tags", "", "tags (comma-separated)")
|
|
addCmd.Flags().String("type", "", "document type")
|
|
addCmd.Flags().BoolP("recursive", "r", false, "recursively add directory contents")
|
|
addCmd.Flags().String("note", "", "add a text note instead of a file")
|
|
addCmd.Flags().String("title", "", "title for the note")
|
|
rootCmd.AddCommand(addCmd)
|
|
}
|
|
|
|
func runAdd(cmd *cobra.Command, args []string) error {
|
|
tags, _ := cmd.Flags().GetString("tags")
|
|
docType, _ := cmd.Flags().GetString("type")
|
|
recursive, _ := cmd.Flags().GetBool("recursive")
|
|
note, _ := cmd.Flags().GetString("note")
|
|
title, _ := cmd.Flags().GetString("title")
|
|
|
|
client := api.NewClient()
|
|
|
|
// Note mode
|
|
if note != "" {
|
|
fields := map[string]string{
|
|
"note": note,
|
|
}
|
|
if title != "" {
|
|
fields["title"] = title
|
|
}
|
|
if tags != "" {
|
|
fields["tags"] = tags
|
|
}
|
|
if docType != "" {
|
|
fields["type"] = docType
|
|
}
|
|
|
|
resp, err := client.PostMultipart("/api/v1/jobs", fields, nil)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusConflict {
|
|
var result interface{}
|
|
if err := api.DecodeJSON(resp, &result); err != nil {
|
|
return fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
if output.IsJSON() {
|
|
output.PrintJSON(result)
|
|
} else {
|
|
if m, ok := result.(map[string]interface{}); ok {
|
|
if docID, ok := m["document_id"].(float64); ok {
|
|
fmt.Printf("Already imported: %s (doc ID: %.0f)\n", m["title"], docID)
|
|
} else if jobID, ok := m["job_id"].(float64); ok {
|
|
fmt.Printf("Already queued: %s (job ID: %.0f)\n", m["title"], jobID)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := api.CheckError(resp); err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
var result interface{}
|
|
if err := api.DecodeJSON(resp, &result); err != nil {
|
|
return fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
if output.IsJSON() {
|
|
output.PrintJSON(result)
|
|
} else {
|
|
fmt.Println("Queued: note")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if len(args) == 0 {
|
|
return fmt.Errorf("path argument is required (or use --note)")
|
|
}
|
|
|
|
path := args[0]
|
|
info, err := os.Stat(path)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot access %s: %w", path, err)
|
|
}
|
|
|
|
if !info.IsDir() {
|
|
// Single file upload
|
|
result, err := uploadFile(client, path, tags, docType)
|
|
if err != nil {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if output.IsJSON() {
|
|
output.PrintJSON([]interface{}{result.Raw})
|
|
} else if result.Duplicate {
|
|
fmt.Println(result.duplicateMsg())
|
|
} else {
|
|
fmt.Printf("Queued: %s\n", filepath.Base(path))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Directory upload
|
|
var files []string
|
|
walkFn := func(p string, d os.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if d.IsDir() {
|
|
if !recursive && p != path {
|
|
return filepath.SkipDir
|
|
}
|
|
return nil
|
|
}
|
|
ext := strings.ToLower(filepath.Ext(p))
|
|
if supportedExts[ext] {
|
|
files = append(files, p)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := filepath.WalkDir(path, walkFn); err != nil {
|
|
return fmt.Errorf("failed to walk directory: %w", err)
|
|
}
|
|
|
|
var results []interface{}
|
|
queued := 0
|
|
duplicates := 0
|
|
for _, f := range files {
|
|
result, err := uploadFile(client, f, tags, docType)
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error uploading %s: %v\n", f, err)
|
|
continue
|
|
}
|
|
results = append(results, result.Raw)
|
|
if result.Duplicate {
|
|
duplicates++
|
|
if !output.IsJSON() {
|
|
fmt.Println(result.duplicateMsg())
|
|
}
|
|
} else {
|
|
queued++
|
|
if !output.IsJSON() {
|
|
fmt.Printf("Queued: %s\n", filepath.Base(f))
|
|
}
|
|
}
|
|
}
|
|
|
|
if output.IsJSON() {
|
|
output.PrintJSON(results)
|
|
} else if duplicates > 0 {
|
|
fmt.Printf("Queued: %d files, %d duplicates skipped\n", queued, duplicates)
|
|
} else {
|
|
fmt.Printf("Queued: %d files\n", queued)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func uploadFile(client *api.Client, path, tags, docType string) (*uploadResult, error) {
|
|
f, err := os.Open(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot open %s: %w", path, err)
|
|
}
|
|
defer f.Close()
|
|
|
|
fields := make(map[string]string)
|
|
if tags != "" {
|
|
fields["tags"] = tags
|
|
}
|
|
if docType != "" {
|
|
fields["type"] = docType
|
|
}
|
|
|
|
upload := &api.FileUpload{
|
|
FieldName: "file",
|
|
FileName: filepath.Base(path),
|
|
Reader: f,
|
|
}
|
|
|
|
resp, err := client.PostMultipart("/api/v1/jobs", fields, upload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.StatusCode == http.StatusConflict {
|
|
var raw json.RawMessage
|
|
if err := api.DecodeJSON(resp, &raw); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
var dupResp struct {
|
|
DocumentID float64 `json:"document_id"`
|
|
JobID float64 `json:"job_id"`
|
|
Title string `json:"title"`
|
|
}
|
|
json.Unmarshal(raw, &dupResp)
|
|
var rawIface interface{}
|
|
json.Unmarshal(raw, &rawIface)
|
|
return &uploadResult{
|
|
Raw: rawIface,
|
|
Duplicate: true,
|
|
DocID: dupResp.DocumentID,
|
|
JobID: dupResp.JobID,
|
|
Title: dupResp.Title,
|
|
}, nil
|
|
}
|
|
|
|
if err := api.CheckError(resp); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result interface{}
|
|
if err := api.DecodeJSON(resp, &result); err != nil {
|
|
return nil, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
return &uploadResult{Raw: result}, nil
|
|
}
|