-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathdocker_entrypoint.py
More file actions
393 lines (316 loc) · 15.5 KB
/
docker_entrypoint.py
File metadata and controls
393 lines (316 loc) · 15.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
#!/usr/bin/env python3
"""
Apify Actor entry point for web2json-agent
Reads input from Apify, runs the parser generator, and saves results to dataset
"""
import os
import sys
import json
import base64
import tempfile
import shutil
import traceback
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
from loguru import logger
from web2json.main import main as web2json_main
# Try to import Apify SDK (available in Apify platform)
try:
from apify import Actor
APIFY_SDK_AVAILABLE = True
except ImportError:
APIFY_SDK_AVAILABLE = False
logger.debug("Apify SDK not available, will use file-based input reading")
async def get_apify_input_async():
"""Read input using Apify SDK (async version for platform)"""
try:
actor_input = await Actor.get_input() or {}
logger.info(f"✓ Successfully read input via Apify SDK")
logger.info(f" Input keys: {list(actor_input.keys())}")
return actor_input
except Exception as e:
logger.error(f"Failed to read input via Apify SDK: {e}")
return {}
def get_apify_input():
"""Read input from Apify Actor - supports both SDK and file-based methods"""
# Method 1: Try Apify SDK first (works on Apify platform)
if APIFY_SDK_AVAILABLE and os.environ.get('APIFY_IS_AT_HOME') == '1':
logger.info("Running on Apify platform, using Apify SDK...")
try:
import asyncio
actor_input = asyncio.run(get_apify_input_async())
if actor_input:
return actor_input
except Exception as e:
logger.warning(f"Apify SDK method failed: {e}, falling back to file-based reading")
# Method 2: File-based reading (works locally and as fallback)
# Log environment for debugging
logger.info("=" * 60)
logger.info("Apify Environment Diagnostics")
logger.info("=" * 60)
logger.info(f"APIFY_INPUT_PATH: {os.environ.get('APIFY_INPUT_PATH', 'NOT SET')}")
logger.info(f"APIFY_DEFAULT_KEY_VALUE_STORE_ID: {os.environ.get('APIFY_DEFAULT_KEY_VALUE_STORE_ID', 'NOT SET')}")
logger.info(f"APIFY_IS_AT_HOME: {os.environ.get('APIFY_IS_AT_HOME', 'NOT SET')}")
logger.info(f"Current working directory: {os.getcwd()}")
# List files in current directory
logger.info(f"Files in current directory: {os.listdir('.')}")
# Check if apify_storage exists
if os.path.exists("apify_storage"):
logger.info(f"apify_storage exists: {os.listdir('apify_storage')}")
if os.path.exists("apify_storage/key_value_stores"):
logger.info(f"key_value_stores content: {os.listdir('apify_storage/key_value_stores')}")
# Apify standard input locations (in priority order)
possible_paths = [
os.environ.get("APIFY_INPUT_PATH"), # Custom path if set
"/apify_storage/key_value_stores/default/INPUT.json", # Standard Apify location
"apify_storage/key_value_stores/default/INPUT.json", # Relative path
"INPUT.json", # Fallback in current directory
]
# Try to find the default store ID dynamically
default_store_id = os.environ.get("APIFY_DEFAULT_KEY_VALUE_STORE_ID")
if default_store_id:
possible_paths.insert(1, f"/apify_storage/key_value_stores/{default_store_id}/INPUT.json")
possible_paths.insert(2, f"apify_storage/key_value_stores/{default_store_id}/INPUT.json")
logger.info(f"Searching for input in {len(possible_paths)} locations...")
for i, input_path in enumerate(possible_paths, 1):
if input_path:
logger.info(f" [{i}] Trying: {input_path}")
if os.path.exists(input_path):
logger.info(f" ✓ Found input file at: {input_path}")
try:
with open(input_path, "r", encoding="utf-8") as f:
content = f.read()
logger.info(f" Input file size: {len(content)} bytes")
actor_input = json.loads(content)
logger.info(f" Successfully parsed JSON with {len(actor_input)} keys")
return actor_input
except Exception as e:
logger.error(f" ✗ Failed to read/parse {input_path}: {e}")
else:
logger.debug(f" ✗ Not found: {input_path}")
logger.warning("=" * 60)
logger.warning("⚠ No input file found in any location")
logger.warning("This usually means:")
logger.warning("1. Input was not provided in Apify Console")
logger.warning("2. Or Actor is running in local/test mode")
logger.warning("=" * 60)
logger.warning("Using empty default input")
return {}
async def save_to_dataset_async(data):
"""Save data to Apify dataset using SDK"""
try:
await Actor.push_data(data)
logger.info(f"✓ Saved record via Apify SDK")
except Exception as e:
logger.error(f"Failed to save via Apify SDK: {e}")
raise
def save_to_dataset_file(data):
"""Fallback: Save data to dataset using file-based method"""
dataset_dir = os.environ.get("APIFY_DEFAULT_DATASET_ID", "default")
dataset_path = Path(f"apify_storage/datasets/{dataset_dir}")
dataset_path.mkdir(parents=True, exist_ok=True)
# Find next file number
existing_files = list(dataset_path.glob("*.json"))
next_num = len(existing_files) + 1
output_file = dataset_path / f"{next_num:09d}.json"
with open(output_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Saved data to {output_file}")
async def run_actor_logic(actor_input, is_on_apify):
logger.info(f"Received input: {json.dumps(actor_input, indent=2)}")
async def save_record(data):
if is_on_apify:
await save_to_dataset_async(data)
else:
save_to_dataset_file(data)
async def async_main():
"""Main entry point for Apify Actor (async version)"""
logger.info("Starting web2json-agent Apify Actor")
# Determine if running on Apify platform
is_on_apify = APIFY_SDK_AVAILABLE and os.environ.get('APIFY_IS_AT_HOME') == '1'
if is_on_apify:
async with Actor:
logger.info("✓ Apify Actor initialized")
actor_input = await Actor.get_input() or {}
logger.info("✓ Read input via Apify SDK")
await run_actor_logic(actor_input, True)
return
else:
actor_input = get_apify_input()
await run_actor_logic(actor_input, False)
return
# Parse input parameters
input_mode = actor_input.get("inputMode", "html")
domain = actor_input.get("domain", "apify_output")
iteration_rounds = actor_input.get("iterationRounds", 1)
cluster_mode = actor_input.get("clusterMode", False)
schema_mode = actor_input.get("schemaMode", "auto")
predefined_schema = actor_input.get("predefinedSchema")
# Create temporary directories
with tempfile.TemporaryDirectory() as temp_dir:
input_dir = Path(temp_dir) / "input_html"
output_dir = Path(temp_dir) / "output"
input_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
try:
# Initialize URL mapping (will be populated based on input mode)
url_mapping = {}
# Handle different input modes
if input_mode == "html":
# Priority 1: htmlContents (direct text input - easiest)
html_contents = actor_input.get("htmlContents", [])
# Priority 2: htmlFiles (base64 or plain text - advanced)
html_files = actor_input.get("htmlFiles", [])
if not html_contents and not html_files:
raise ValueError(
"No HTML content provided. Please provide either:\n"
"1. 'htmlContents' (recommended): array of {name, html}\n"
"2. 'htmlFiles' (advanced): array of {filename, content}\n"
"3. Or switch inputMode to 'url' and provide URLs"
)
# Process htmlContents (direct text input)
if html_contents:
logger.info(f"Processing {len(html_contents)} HTML content items (direct input)")
for idx, content_item in enumerate(html_contents):
page_name = content_item.get("name", f"page_{idx+1}")
html_content = content_item.get("html", "")
if not html_content:
logger.warning(f"Skipping empty HTML content for {page_name}")
continue
# Ensure .html extension
filename = page_name if page_name.endswith(".html") else f"{page_name}.html"
# Save to input directory
input_file = input_dir / filename
with open(input_file, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info(f"Saved HTML content: {filename} ({len(html_content)} chars)")
# Process htmlFiles (base64 or plain text)
if html_files:
logger.info(f"Processing {len(html_files)} HTML files (base64/text)")
for idx, file_data in enumerate(html_files):
filename = file_data.get("filename", f"page_{idx+1}.html")
content = file_data.get("content", "")
if not content:
logger.warning(f"Skipping empty content for {filename}")
continue
# Try to decode as base64 first, fall back to plain text
html_content = content
try:
# Check if it looks like base64 (no < or > characters)
if "<" not in content[:100] and ">" not in content[:100]:
decoded = base64.b64decode(content).decode("utf-8")
html_content = decoded
logger.info(f"Decoded base64 content for {filename}")
except Exception as e:
logger.debug(f"Using content as plain text for {filename}: {e}")
# Save to input directory
input_file = input_dir / filename
with open(input_file, "w", encoding="utf-8") as f:
f.write(html_content)
logger.info(f"Saved HTML file: {filename} ({len(html_content)} chars)")
elif input_mode == "url":
# Fetch URLs and save HTML
urls = actor_input.get("urls", [])
# Ensure urls is a list
if not isinstance(urls, list):
urls = [urls] if urls else []
# Use default URLs if none provided
if not urls:
urls = [
"https://quotes.toscrape.com/page/1/",
"https://quotes.toscrape.com/page/2/"
]
logger.info("No URLs provided, using default demo URLs")
logger.info(f"Received {len(urls)} URLs to fetch")
import requests
from bs4 import BeautifulSoup
for idx, url in enumerate(urls):
if not url or not url.strip():
logger.warning(f"Skipping empty URL at position {idx+1}")
continue
url = url.strip()
try:
logger.info(f"[{idx+1}/{len(urls)}] Fetching: {url}")
response = requests.get(url, timeout=30)
response.raise_for_status()
html_content = response.text
# Save to input directory
filename = f"page_{idx+1}.html"
input_file = input_dir / filename
with open(input_file, "w", encoding="utf-8") as f:
f.write(html_content)
# Store URL mapping: page_N.json -> URL
url_mapping[f"page_{idx+1}.json"] = url
logger.info(f"Fetched and saved: {url}")
except Exception as e:
logger.error(f"Failed to fetch {url}: {e}")
else:
raise ValueError(f"Invalid input mode: {input_mode}")
# Check if any files were saved
input_files = list(input_dir.glob("*.html"))
if not input_files:
raise ValueError("No HTML files were successfully processed")
logger.info(f"Total input files: {len(input_files)}")
# Build web2json command arguments
args = [
"-d", str(input_dir),
"-o", str(output_dir / domain),
"--iteration-rounds", str(iteration_rounds)
]
if domain:
args.extend(["--domain", domain])
if cluster_mode:
args.append("--cluster")
# Save predefined schema if provided
if schema_mode == "predefined" and predefined_schema:
schema_file = Path(temp_dir) / "predefined_schema.json"
with open(schema_file, "w", encoding="utf-8") as f:
json.dump(predefined_schema, f, indent=2)
args.extend(["--schema-template", str(schema_file)])
# Run web2json-agent
logger.info(f"Running web2json with args: {' '.join(args)}")
# Import and run the main function
original_argv = sys.argv
sys.argv = ["web2json"] + args
try:
web2json_main()
finally:
sys.argv = original_argv
# Collect results and save to Apify dataset
result_dir = output_dir / domain / "result"
if result_dir.exists():
total_records = 0
for result_file in result_dir.glob("*.json"):
with open(result_file, "r", encoding="utf-8") as f:
result_data = json.load(f)
# Add URL as the first field (if available)
source_url = url_mapping.get(result_file.name, "")
# Create record with URL as first field, keep original JSON structure
record = {"url": source_url} if source_url else {}
record.update(result_data)
# Save record without flattening or adding metadata
await save_record(record)
total_records += 1
logger.info(f"Saved {total_records} records to dataset")
else:
logger.warning("No results found in output directory")
logger.info("Web2JSON Agent completed successfully")
except Exception as e:
error_msg = str(e)
logger.error(f"Error during execution: {error_msg}")
logger.debug("Full traceback:", exc_info=True)
await save_record({
"_type": "error",
"error": error_msg,
"traceback": traceback.format_exc()
})
if not is_on_apify:
sys.exit(1)
raise
def main():
"""Synchronous entry point that calls async_main"""
import asyncio
asyncio.run(async_main())
if __name__ == "__main__":
main()