-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathembed_documents.py
More file actions
executable file
·168 lines (128 loc) · 5.16 KB
/
embed_documents.py
File metadata and controls
executable file
·168 lines (128 loc) · 5.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python
"""
embed_documents.py
Main entry point for embedding documents into a vector database.
This script performs the following operations:
1. Loads configuration and initializes DB provider and loaders.
2. Fetches and embeds Git-sourced documents (Markdown, PDFs, etc.).
3. Fetches and embeds web documents (HTML and PDFs).
4. Chunks all documents and indexes them into the configured vector store.
This tool is designed for use in pipelines or manual indexing workflows.
Usage:
$ python embed_documents.py
Environment:
Requires a valid .env file or environment variables defined for:
- DB_TYPE, EMBEDDING_MODEL, TEMP_DIR
- CHUNK_SIZE, CHUNK_OVERLAP, LOG_LEVEL
- WEB_SOURCES, REPO_SOURCES
- Plus additional DB-specific variables based on DB_TYPE
Example:
$ DB_TYPE=QDRANT EMBEDDING_MODEL=BAAI/bge-large-en-v1.5 \
CHUNK_SIZE=20480 CHUNK_OVERLAP=2048 \
TEMP_DIR=/tmp EMBEDDING_MODEL=... \
python embed_documents.py
"""
import logging
import sys
from pathlib import Path
import requests
from config import Config
from loaders.git import GitLoader
from loaders.pdf import PDFLoader
from loaders.web import WebLoader
config = Config.load()
logger = logging.getLogger(__name__)
def _fail_and_exit(message: str, exc: Exception) -> None:
"""
Log an error with full traceback and raise the exception.
Args:
message (str): Contextual message to log with the error.
exc (Exception): The exception to raise.
This utility is used to ensure proper logging and failure behavior
across all critical stages of the embedding job.
"""
logger.error("%s: %s", message, exc, exc_info=True)
raise exc
def _process_git_documents() -> None:
"""Process Git-based document sources and add to vector DB."""
if not config.repo_sources:
return
logger.info("Starting Git-based document embedding...")
try:
git_loader = GitLoader(config)
git_chunks = git_loader.load()
if git_chunks:
logger.info("Adding %d Git document chunks to vector DB", len(git_chunks))
config.db_provider.add_documents(git_chunks)
else:
logger.info("No documents found in Git sources.")
except Exception as e:
_fail_and_exit("Failed during Git document processing", e)
def _process_html_documents(html_urls: list) -> None:
"""Process HTML web documents and add to vector DB."""
if not html_urls:
return
logger.info("Starting HTML-based web document embedding...")
try:
web_loader = WebLoader(config)
web_chunks = web_loader.load(html_urls)
if web_chunks:
logger.info("Adding %d HTML web chunks to vector DB", len(web_chunks))
config.db_provider.add_documents(web_chunks)
else:
logger.info("No chunks produced from HTML URLs.")
except Exception as e:
_fail_and_exit("Failed during HTML web document processing", e)
def _process_pdf_documents(pdf_urls: list) -> None:
"""Download and process PDF documents from web URLs and add to vector DB."""
if not pdf_urls:
return
logger.info("Downloading PDF documents from web URLs...")
pdf_dir = Path(config.temp_dir) / "web_pdfs"
pdf_dir.mkdir(parents=True, exist_ok=True)
downloaded_files = []
for url in pdf_urls:
try:
response = requests.get(url)
response.raise_for_status()
filename = Path(url.split("/")[-1])
file_path = pdf_dir / filename
with open(file_path, "wb") as f:
f.write(response.content)
logger.info("Downloaded: %s", file_path)
downloaded_files.append(file_path)
except Exception as e:
_fail_and_exit(f"Failed to download {url}", e)
if downloaded_files:
try:
pdf_loader = PDFLoader(config)
pdf_chunks = pdf_loader.load(downloaded_files)
if pdf_chunks:
logger.info("Adding %d PDF web chunks to vector DB", len(pdf_chunks))
config.db_provider.add_documents(pdf_chunks)
else:
logger.info("No chunks produced from downloaded PDFs.")
except Exception as e:
_fail_and_exit("Failed during PDF web document processing", e)
def main() -> None:
"""
Main embedding workflow for Git, HTML, and PDF sources.
Steps:
1. Load and chunk files from configured Git repos, if any.
2. Load and chunk HTML documents from web sources.
3. Download, load, and chunk remote PDF files.
4. Store all chunks into the configured vector DB provider.
All errors are logged with traceback and will stop execution via `_fail_and_exit`.
"""
_process_git_documents()
pdf_urls = [url for url in config.web_sources if url.lower().endswith(".pdf")]
html_urls = [url for url in config.web_sources if not url.lower().endswith(".pdf")]
_process_html_documents(html_urls)
_process_pdf_documents(pdf_urls)
logger.info("Embedding job complete.")
if __name__ == "__main__":
try:
main()
except Exception as e:
logger.critical("Fatal error: %s", e, exc_info=True)
sys.exit(1)