Skip to content

Commit b4d8d5b

Browse files
Deduplicate paginated job aggregation across managers
Co-authored-by: Shri Sukhani <shrisukhani@users.noreply.github.com>
1 parent fc54d05 commit b4d8d5b

File tree

10 files changed

+311
-294
lines changed

10 files changed

+311
-294
lines changed

hyperbrowser/client/managers/async_manager/crawl.py

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
import asyncio
2-
import time
31
from typing import Optional
42

53
from hyperbrowser.models.consts import POLLING_ATTEMPTS
64
from ...polling import (
7-
has_exceeded_max_wait,
5+
collect_paginated_results_async,
86
poll_until_terminal_status_async,
97
retry_operation_async,
108
)
@@ -73,8 +71,6 @@ async def start_and_wait(
7371
retry_delay_seconds=0.5,
7472
)
7573

76-
failures = 0
77-
page_fetch_start_time = time.monotonic()
7874
job_response = CrawlJobResponse(
7975
jobId=job_id,
8076
status=job_status,
@@ -84,37 +80,28 @@ async def start_and_wait(
8480
totalCrawledPages=0,
8581
batchSize=100,
8682
)
87-
first_check = True
88-
while (
89-
first_check
90-
or job_response.current_page_batch < job_response.total_page_batches
91-
):
92-
if has_exceeded_max_wait(page_fetch_start_time, max_wait_seconds):
93-
raise HyperbrowserError(
94-
f"Timed out fetching all pages for crawl job {job_id} after {max_wait_seconds} seconds"
95-
)
96-
try:
97-
tmp_job_response = await self.get(
98-
job_start_resp.job_id,
99-
GetCrawlJobParams(
100-
page=job_response.current_page_batch + 1, batch_size=100
101-
),
102-
)
103-
if tmp_job_response.data:
104-
job_response.data.extend(tmp_job_response.data)
105-
job_response.current_page_batch = tmp_job_response.current_page_batch
106-
job_response.total_crawled_pages = tmp_job_response.total_crawled_pages
107-
job_response.total_page_batches = tmp_job_response.total_page_batches
108-
job_response.batch_size = tmp_job_response.batch_size
109-
job_response.error = tmp_job_response.error
110-
failures = 0
111-
first_check = False
112-
except Exception as e:
113-
failures += 1
114-
if failures >= POLLING_ATTEMPTS:
115-
raise HyperbrowserError(
116-
f"Failed to get crawl batch page {job_response.current_page_batch} for job {job_id} after {POLLING_ATTEMPTS} attempts: {e}"
117-
)
118-
await asyncio.sleep(0.5)
83+
84+
def merge_page_response(page_response: CrawlJobResponse) -> None:
85+
if page_response.data:
86+
job_response.data.extend(page_response.data)
87+
job_response.current_page_batch = page_response.current_page_batch
88+
job_response.total_crawled_pages = page_response.total_crawled_pages
89+
job_response.total_page_batches = page_response.total_page_batches
90+
job_response.batch_size = page_response.batch_size
91+
job_response.error = page_response.error
92+
93+
await collect_paginated_results_async(
94+
operation_name=f"crawl job {job_id}",
95+
get_next_page=lambda page: self.get(
96+
job_start_resp.job_id,
97+
GetCrawlJobParams(page=page, batch_size=100),
98+
),
99+
get_current_page_batch=lambda page_response: page_response.current_page_batch,
100+
get_total_page_batches=lambda page_response: page_response.total_page_batches,
101+
on_page_success=merge_page_response,
102+
max_wait_seconds=max_wait_seconds,
103+
max_attempts=POLLING_ATTEMPTS,
104+
retry_delay_seconds=0.5,
105+
)
119106

120107
return job_response

hyperbrowser/client/managers/async_manager/scrape.py

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
import asyncio
2-
import time
31
from typing import Optional
42

53
from hyperbrowser.models.consts import POLLING_ATTEMPTS
64
from ...polling import (
7-
has_exceeded_max_wait,
5+
collect_paginated_results_async,
86
poll_until_terminal_status_async,
97
retry_operation_async,
108
)
@@ -79,8 +77,6 @@ async def start_and_wait(
7977
retry_delay_seconds=0.5,
8078
)
8179

82-
failures = 0
83-
page_fetch_start_time = time.monotonic()
8480
job_response = BatchScrapeJobResponse(
8581
jobId=job_id,
8682
status=job_status,
@@ -90,39 +86,29 @@ async def start_and_wait(
9086
totalScrapedPages=0,
9187
batchSize=100,
9288
)
93-
first_check = True
94-
95-
while (
96-
first_check
97-
or job_response.current_page_batch < job_response.total_page_batches
98-
):
99-
if has_exceeded_max_wait(page_fetch_start_time, max_wait_seconds):
100-
raise HyperbrowserError(
101-
f"Timed out fetching all pages for batch scrape job {job_id} after {max_wait_seconds} seconds"
102-
)
103-
try:
104-
tmp_job_response = await self.get(
105-
job_id,
106-
params=GetBatchScrapeJobParams(
107-
page=job_response.current_page_batch + 1, batch_size=100
108-
),
109-
)
110-
if tmp_job_response.data:
111-
job_response.data.extend(tmp_job_response.data)
112-
job_response.current_page_batch = tmp_job_response.current_page_batch
113-
job_response.total_scraped_pages = tmp_job_response.total_scraped_pages
114-
job_response.total_page_batches = tmp_job_response.total_page_batches
115-
job_response.batch_size = tmp_job_response.batch_size
116-
job_response.error = tmp_job_response.error
117-
failures = 0
118-
first_check = False
119-
except Exception as e:
120-
failures += 1
121-
if failures >= POLLING_ATTEMPTS:
122-
raise HyperbrowserError(
123-
f"Failed to get batch page {job_response.current_page_batch} for job {job_id} after {POLLING_ATTEMPTS} attempts: {e}"
124-
)
125-
await asyncio.sleep(0.5)
89+
90+
def merge_page_response(page_response: BatchScrapeJobResponse) -> None:
91+
if page_response.data:
92+
job_response.data.extend(page_response.data)
93+
job_response.current_page_batch = page_response.current_page_batch
94+
job_response.total_scraped_pages = page_response.total_scraped_pages
95+
job_response.total_page_batches = page_response.total_page_batches
96+
job_response.batch_size = page_response.batch_size
97+
job_response.error = page_response.error
98+
99+
await collect_paginated_results_async(
100+
operation_name=f"batch scrape job {job_id}",
101+
get_next_page=lambda page: self.get(
102+
job_id,
103+
params=GetBatchScrapeJobParams(page=page, batch_size=100),
104+
),
105+
get_current_page_batch=lambda page_response: page_response.current_page_batch,
106+
get_total_page_batches=lambda page_response: page_response.total_page_batches,
107+
on_page_success=merge_page_response,
108+
max_wait_seconds=max_wait_seconds,
109+
max_attempts=POLLING_ATTEMPTS,
110+
retry_delay_seconds=0.5,
111+
)
126112

127113
return job_response
128114

hyperbrowser/client/managers/async_manager/web/batch_fetch.py

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@
1010
)
1111
from hyperbrowser.exceptions import HyperbrowserError
1212
from ....polling import (
13-
has_exceeded_max_wait,
13+
collect_paginated_results_async,
1414
poll_until_terminal_status_async,
1515
retry_operation_async,
1616
)
1717
from ....schema_utils import inject_web_output_schemas
18-
import asyncio
19-
import time
2018

2119

2220
class BatchFetchManager:
@@ -81,8 +79,6 @@ async def start_and_wait(
8179
retry_delay_seconds=0.5,
8280
)
8381

84-
failures = 0
85-
page_fetch_start_time = time.monotonic()
8682
job_response = BatchFetchJobResponse(
8783
jobId=job_id,
8884
status=job_status,
@@ -92,38 +88,28 @@ async def start_and_wait(
9288
totalPages=0,
9389
batchSize=100,
9490
)
95-
first_check = True
9691

97-
while (
98-
first_check
99-
or job_response.current_page_batch < job_response.total_page_batches
100-
):
101-
if has_exceeded_max_wait(page_fetch_start_time, max_wait_seconds):
102-
raise HyperbrowserError(
103-
f"Timed out fetching all pages for batch fetch job {job_id} after {max_wait_seconds} seconds"
104-
)
105-
try:
106-
tmp_job_response = await self.get(
107-
job_id,
108-
params=GetBatchFetchJobParams(
109-
page=job_response.current_page_batch + 1, batch_size=100
110-
),
111-
)
112-
if tmp_job_response.data:
113-
job_response.data.extend(tmp_job_response.data)
114-
job_response.current_page_batch = tmp_job_response.current_page_batch
115-
job_response.total_pages = tmp_job_response.total_pages
116-
job_response.total_page_batches = tmp_job_response.total_page_batches
117-
job_response.batch_size = tmp_job_response.batch_size
118-
job_response.error = tmp_job_response.error
119-
failures = 0
120-
first_check = False
121-
except Exception as e:
122-
failures += 1
123-
if failures >= POLLING_ATTEMPTS:
124-
raise HyperbrowserError(
125-
f"Failed to get batch page {job_response.current_page_batch} for job {job_id} after {POLLING_ATTEMPTS} attempts: {e}"
126-
)
127-
await asyncio.sleep(0.5)
92+
def merge_page_response(page_response: BatchFetchJobResponse) -> None:
93+
if page_response.data:
94+
job_response.data.extend(page_response.data)
95+
job_response.current_page_batch = page_response.current_page_batch
96+
job_response.total_pages = page_response.total_pages
97+
job_response.total_page_batches = page_response.total_page_batches
98+
job_response.batch_size = page_response.batch_size
99+
job_response.error = page_response.error
100+
101+
await collect_paginated_results_async(
102+
operation_name=f"batch fetch job {job_id}",
103+
get_next_page=lambda page: self.get(
104+
job_id,
105+
params=GetBatchFetchJobParams(page=page, batch_size=100),
106+
),
107+
get_current_page_batch=lambda page_response: page_response.current_page_batch,
108+
get_total_page_batches=lambda page_response: page_response.total_page_batches,
109+
on_page_success=merge_page_response,
110+
max_wait_seconds=max_wait_seconds,
111+
max_attempts=POLLING_ATTEMPTS,
112+
retry_delay_seconds=0.5,
113+
)
128114

129115
return job_response

hyperbrowser/client/managers/async_manager/web/crawl.py

Lines changed: 23 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@
1010
)
1111
from hyperbrowser.exceptions import HyperbrowserError
1212
from ....polling import (
13-
has_exceeded_max_wait,
13+
collect_paginated_results_async,
1414
poll_until_terminal_status_async,
1515
retry_operation_async,
1616
)
1717
from ....schema_utils import inject_web_output_schemas
18-
import asyncio
19-
import time
2018

2119

2220
class WebCrawlManager:
@@ -79,8 +77,6 @@ async def start_and_wait(
7977
retry_delay_seconds=0.5,
8078
)
8179

82-
failures = 0
83-
page_fetch_start_time = time.monotonic()
8480
job_response = WebCrawlJobResponse(
8581
jobId=job_id,
8682
status=job_status,
@@ -90,38 +86,28 @@ async def start_and_wait(
9086
totalPages=0,
9187
batchSize=100,
9288
)
93-
first_check = True
9489

95-
while (
96-
first_check
97-
or job_response.current_page_batch < job_response.total_page_batches
98-
):
99-
if has_exceeded_max_wait(page_fetch_start_time, max_wait_seconds):
100-
raise HyperbrowserError(
101-
f"Timed out fetching all pages for web crawl job {job_id} after {max_wait_seconds} seconds"
102-
)
103-
try:
104-
tmp_job_response = await self.get(
105-
job_id,
106-
params=GetWebCrawlJobParams(
107-
page=job_response.current_page_batch + 1, batch_size=100
108-
),
109-
)
110-
if tmp_job_response.data:
111-
job_response.data.extend(tmp_job_response.data)
112-
job_response.current_page_batch = tmp_job_response.current_page_batch
113-
job_response.total_pages = tmp_job_response.total_pages
114-
job_response.total_page_batches = tmp_job_response.total_page_batches
115-
job_response.batch_size = tmp_job_response.batch_size
116-
job_response.error = tmp_job_response.error
117-
failures = 0
118-
first_check = False
119-
except Exception as e:
120-
failures += 1
121-
if failures >= POLLING_ATTEMPTS:
122-
raise HyperbrowserError(
123-
f"Failed to get batch page {job_response.current_page_batch} for web crawl job {job_id} after {POLLING_ATTEMPTS} attempts: {e}"
124-
)
125-
await asyncio.sleep(0.5)
90+
def merge_page_response(page_response: WebCrawlJobResponse) -> None:
91+
if page_response.data:
92+
job_response.data.extend(page_response.data)
93+
job_response.current_page_batch = page_response.current_page_batch
94+
job_response.total_pages = page_response.total_pages
95+
job_response.total_page_batches = page_response.total_page_batches
96+
job_response.batch_size = page_response.batch_size
97+
job_response.error = page_response.error
98+
99+
await collect_paginated_results_async(
100+
operation_name=f"web crawl job {job_id}",
101+
get_next_page=lambda page: self.get(
102+
job_id,
103+
params=GetWebCrawlJobParams(page=page, batch_size=100),
104+
),
105+
get_current_page_batch=lambda page_response: page_response.current_page_batch,
106+
get_total_page_batches=lambda page_response: page_response.total_page_batches,
107+
on_page_success=merge_page_response,
108+
max_wait_seconds=max_wait_seconds,
109+
max_attempts=POLLING_ATTEMPTS,
110+
retry_delay_seconds=0.5,
111+
)
126112

127113
return job_response

0 commit comments

Comments
 (0)