From 8bbc62305e3d018867913f01761cb9de7065cbf1 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Mon, 19 Apr 2021 14:18:17 -0700 Subject: [PATCH 01/22] adding TileSpecifier --- batch-setup/make_meta_tiles.py | 68 +++++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index f5e91eb..be89f0e 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -20,7 +20,6 @@ from ModestMaps.Core import Coordinate from multiprocessing import Pool - MissingTiles = namedtuple('MissingTiles', 'low_zoom_file high_zoom_file') @@ -325,6 +324,73 @@ def missing_file(self, missing): return missing.low_zoom_file +class TileSpecifier(object): + ORDER_KEY = "order" + MEM_GB_KEY = "mem_gb" + + """ + Provides the ability to sort tiles based on an ordering and specify memory reqs + """ + + def __init__(self, default_mem_gb=8, spec_dict={}): + """ + :param default_mem_gb: + :param spec_dict: keys are of form "//" value is a map with keys "mem_gb" and "order". e.g. + {"7/3/10": {"mem_gb": 2.5, "order": 12}, + "10/12/3": {"mem_gb": 0.3, "order": 10}} + """ + self.default_mem_gb = default_mem_gb + self.spec_dict = {} + + def from_ordering_file(self, filename, default_mem_gb=8): + """ + Expects a csv with at least these columns: + zoom, x, y, mem_gb + The lines in the file should be ordered in the desired queue ordering (e.g. first data line contains first tile to enqueue) + :return: + TileSpecifier with ordering and memory requirements in filename + """ + import csv + with open(filename) as fh: + order_idx = 0 + reader = csv.DictReader(fh) + for row in reader: + coord = Coordinate(row['zoom'], row['x'], row['y']) + self.spec_dict[serialize_coord(coord)] = \ + {self.MEM_GB_KEY: row[self.MEM_GB_KEY], self.ORDER_KEY: order_idx} + order_idx += 1 + + def reorder(self, coord_list): + """ + Using the sort ordering for this specifier, reorders the tiles in coord_list. + coords that are in the coord_list that aren't mentioned in the ordering will go first. + :return: + """ + return sorted(coord_list, lambda coord: self.get_ordering_val(coord)) + + def get_ordering_val(self, coord): + """ + returns ordering location for coord. The lower it is the earlier in the order. + If there is no ordering specified for this coordinate, returns 0 + """ + key = serialize_coord(coord) + if key in self.spec_dict: + return self.spec_dict[key][self.ORDER_KEY] + + return 0 + + def get_mem_reqs(self, coord): + """ + returns the specified memory requirements for the coordinate. If none are specified, returns default_gb + """ + key = serialize_coord(coord) + if key in self.spec_dict: + val = self.spec_dict[key] + return val[self.MEM_GB_KEY] + else: + return self.default_mem_gb + + # abstracts away the logic for a re-rendering loop, splitting between high and # low zoom tiles and stopping if all the tiles aren't rendered within a # certain number of retries. From 6335701d86f494cac057078b645d2c59ab72bfe9 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 20 Apr 2021 11:33:28 -0700 Subject: [PATCH 02/22] calling tilequeue_batch_enqueue once for each line of tile_list_file --- batch-setup/make_meta_tiles.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index be89f0e..dcee6b6 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -295,13 +295,17 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists): from tilequeue.command import tilequeue_batch_enqueue from make_rawr_tiles import BatchEnqueueArgs - args = BatchEnqueueArgs(config_file, None, tile_list_file, None) os.environ['TILEQUEUE__BATCH__CHECK-METATILE-EXISTS'] = ( str(check_metatile_exists).lower()) - with open(args.config) as fh: + with open(config_file) as fh: cfg = make_config_from_argparse(fh) - tilequeue_batch_enqueue(cfg, args) + with open(tile_list_file, 'r') as tile_list: + coord_lines = tile_list.readlines() + + for coord_line in coord_lines: + args = BatchEnqueueArgs(config_file, coord_line, None) + tilequeue_batch_enqueue(cfg, args) # adaptor class for MissingTiles to see just the high zoom parts, this is used # along with the LowZoomLense to loop over missing tiles generically but From 312571cb38f39e89b57d38471854f4cb3fff8d0b Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 20 Apr 2021 12:04:16 -0700 Subject: [PATCH 03/22] integrate TileSpecifier, reorder input lines --- batch-setup/make_meta_tiles.py | 166 +++++++++++++++++++-------------- 1 file changed, 95 insertions(+), 71 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index dcee6b6..807f716 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -19,6 +19,7 @@ from tilequeue.store import make_s3_tile_key_generator from ModestMaps.Core import Coordinate from multiprocessing import Pool +import sys MissingTiles = namedtuple('MissingTiles', 'low_zoom_file high_zoom_file') @@ -248,6 +249,78 @@ def __call__(self, parent): return parent, size +class TileSpecifier(object): + ORDER_KEY = "order" + MEM_GB_KEY = "mem_gb" + + """ + Provides the ability to sort tiles based on an ordering and specify memory reqs + """ + + def __init__(self, default_mem_gb=8, spec_dict={}): + """ + :param default_mem_gb: + :param spec_dict: keys are of form "//" value is a map with keys "mem_gb" and "order". e.g. + {"7/3/10": {"mem_gb": 2.5, "order": 12}, + "10/12/3": {"mem_gb": 0.3, "order": 10}} + """ + self.default_mem_gb = default_mem_gb + self.spec_dict = {} + + @staticmethod + def from_ordering_file(filename, default_mem_gb=8): + """ + Expects a csv with at least these columns: + zoom, x, y, mem_gb + The lines in the file should be ordered in the desired queue ordering (e.g. first data line contains first tile to enqueue) + :return: + TileSpecifier with ordering and memory requirements expressed in + """ + import csv + + spec_dict = {} + with open(filename) as fh: + order_idx = 0 + reader = csv.DictReader(fh) + for row in reader: + coord = Coordinate(row['zoom'], row['x'], row['y']) + spec_dict[serialize_coord(coord)] = \ + {TileSpecifier.MEM_GB_KEY: row[TileSpecifier.MEM_GB_KEY], TileSpecifier.ORDER_KEY: order_idx} + order_idx += 1 + + return TileSpecifier(default_mem_gb, spec_dict) + + def reorder(self, coord_list): + """ + Using the sort ordering for this specifier, reorders the tiles in coord_list. + coords that are in the coord_list that aren't mentioned in the ordering will go first. + :return: + """ + return sorted(coord_list, lambda coord: self.get_ordering_val(coord)) + + def get_ordering_val(self, coord): + """ + returns ordering location for coord. The lower it is the earlier in the order. + If there is no ordering specified for this coordinate, returns 0 + """ + key = serialize_coord(coord) + if key in self.spec_dict: + return self.spec_dict[key][self.ORDER_KEY] + + return 0 + + def get_mem_reqs(self, coord): + """ + returns the specified memory requirements for the coordinate. If none are specified, returns default_gb + """ + key = serialize_coord(coord) + if key in self.spec_dict: + val = self.spec_dict[key] + return val[self.MEM_GB_KEY] + else: + return self.default_mem_gb + + def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, size_threshold, pool_size=30): """ @@ -290,7 +363,7 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, return big_jobs -def enqueue_tiles(config_file, tile_list_file, check_metatile_exists): +def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_specifier=TileSpecifier()): from tilequeue.command import make_config_from_argparse from tilequeue.command import tilequeue_batch_enqueue from make_rawr_tiles import BatchEnqueueArgs @@ -303,6 +376,8 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists): with open(tile_list_file, 'r') as tile_list: coord_lines = tile_list.readlines() + reordered_lines = tile_specifier.reorder(coord_lines) + for coord_line in coord_lines: args = BatchEnqueueArgs(config_file, coord_line, None) tilequeue_batch_enqueue(cfg, args) @@ -328,79 +403,12 @@ def missing_file(self, missing): return missing.low_zoom_file -class TileSpecifier(object): - ORDER_KEY = "order" - MEM_GB_KEY = "mem_gb" - - """ - Provides the ability to sort tiles based on an ordering and specify memory reqs - """ - - def __init__(self, default_mem_gb=8, spec_dict={}): - """ - :param default_mem_gb: - :param spec_dict: keys are of form "//" value is a map with keys "mem_gb" and "order". e.g. - {"7/3/10": {"mem_gb": 2.5, "order": 12}, - "10/12/3": {"mem_gb": 0.3, "order": 10}} - """ - self.default_mem_gb = default_mem_gb - self.spec_dict = {} - - def from_ordering_file(self, filename, default_mem_gb=8): - """ - Expects a csv with at least these columns: - zoom, x, y, mem_gb - The lines in the file should be ordered in the desired queue ordering (e.g. first data line contains first tile to enqueue) - :return: - TileSpecifier with ordering and memory requirements in filename - """ - import csv - with open(filename) as fh: - order_idx = 0 - reader = csv.DictReader(fh) - for row in reader: - coord = Coordinate(row['zoom'], row['x'], row['y']) - self.spec_dict[serialize_coord(coord)] = \ - {self.MEM_GB_KEY: row[self.MEM_GB_KEY], self.ORDER_KEY: order_idx} - order_idx += 1 - - def reorder(self, coord_list): - """ - Using the sort ordering for this specifier, reorders the tiles in coord_list. - coords that are in the coord_list that aren't mentioned in the ordering will go first. - :return: - """ - return sorted(coord_list, lambda coord: self.get_ordering_val(coord)) - - def get_ordering_val(self, coord): - """ - returns ordering location for coord. The lower it is the earlier in the order. - If there is no ordering specified for this coordinate, returns 0 - """ - key = serialize_coord(coord) - if key in self.spec_dict: - return self.spec_dict[key][self.ORDER_KEY] - - return 0 - - def get_mem_reqs(self, coord): - """ - returns the specified memory requirements for the coordinate. If none are specified, returns default_gb - """ - key = serialize_coord(coord) - if key in self.spec_dict: - val = self.spec_dict[key] - return val[self.MEM_GB_KEY] - else: - return self.default_mem_gb - - # abstracts away the logic for a re-rendering loop, splitting between high and # low zoom tiles and stopping if all the tiles aren't rendered within a # certain number of retries. class TileRenderer(object): - def __init__(self, tile_finder, big_jobs, split_zoom, zoom_max, allowed_missing_tiles=0): + def __init__(self, tile_finder, big_jobs, split_zoom, zoom_max, tile_specifier=TileSpecifier(), allowed_missing_tiles=0): self.tile_finder = tile_finder self.big_jobs = big_jobs self.split_zoom = split_zoom @@ -432,7 +440,7 @@ def render(self, num_retries, lense): print("Enqueueing %d %s tiles (e.g. %s)" % (count, lense.description, ', '.join(sample))) enqueue_tiles(lense.config, missing_tile_file, - check_metatile_exists) + check_metatile_exists, tile_specifier) else: with self._missing() as missing: @@ -444,6 +452,17 @@ def render(self, num_retries, lense): % (count, lense.description, num_retries, ', '.join(sample))) +def create_tile_specifier(tile_specifier_file): + default_mem_gb=8 + tile_specifier = TileSpecifier() + if tile_specifier_file is not None: + try: + tile_specifier = TileSpecifier.from_ordering_file(tile_specifier_filename, default_mem_gb=8) + except: + print("Error creating TileSpecifier, will use the default. Error details: {}".format(sys.exc_info()[0])) + + return tile_specifier + if __name__ == '__main__': import argparse @@ -485,6 +504,9 @@ def render(self, num_retries, lense): parser.add_argument('--allowed-missing-tiles', default=2, type=int, help='The maximum number of missing metatiles allowed ' 'to continue the build process.') + parser.add_argument('--tile-specifier-file', + help="optional csv containing lines in desired queue order with columns " + "for zoom, x, y, and mem_gb to specify ordering and memory requirements") args = parser.parse_args() assert_run_id_format(args.run_id) @@ -512,11 +534,13 @@ def render(self, num_retries, lense): buckets.missing, buckets.meta, date_prefix, missing_bucket_date_prefix, region, args.key_format_type, args.config, metatile_max_zoom) + tile_specifier = create_tile_specifier(args.tile_specifier_file) + big_jobs = _big_jobs( buckets.rawr, missing_bucket_date_prefix, args.key_format_type, split_zoom, zoom_max, args.size_threshold) - tile_renderer = TileRenderer(tile_finder, big_jobs, split_zoom, zoom_max, args.allowed_missing_tiles) + tile_renderer = TileRenderer(tile_finder, big_jobs, split_zoom, zoom_max, args.allowed_missing_tiles, tile_specifier) tile_renderer.render(args.retries, LowZoomLense(args.low_zoom_config)) From d77d07adbe261796511615e946b233cc7846d465 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 20 Apr 2021 12:23:18 -0700 Subject: [PATCH 04/22] overriding mem requirements --- batch-setup/make_meta_tiles.py | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 807f716..f83853a 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -283,10 +283,14 @@ def from_ordering_file(filename, default_mem_gb=8): order_idx = 0 reader = csv.DictReader(fh) for row in reader: - coord = Coordinate(row['zoom'], row['x'], row['y']) - spec_dict[serialize_coord(coord)] = \ - {TileSpecifier.MEM_GB_KEY: row[TileSpecifier.MEM_GB_KEY], TileSpecifier.ORDER_KEY: order_idx} - order_idx += 1 + try: + coord = Coordinate(row['zoom'], row['x'], row['y']) + spec_dict[serialize_coord(coord)] = \ + {TileSpecifier.MEM_GB_KEY: row[TileSpecifier.MEM_GB_KEY], TileSpecifier.ORDER_KEY: order_idx} + order_idx += 1 + except: + print("Error for line parsed as %s in TileSpecifier.from_ordering_file" % row) + continue return TileSpecifier(default_mem_gb, spec_dict) @@ -309,16 +313,15 @@ def get_ordering_val(self, coord): return 0 - def get_mem_reqs(self, coord): + def get_mem_reqs_mb(self, coord_str, overprovision_multiplier=1.0): """ - returns the specified memory requirements for the coordinate. If none are specified, returns default_gb + returns the specified memory requirement in megabytes for the coordinate. If none are specified, + returns default_gb """ - key = serialize_coord(coord) - if key in self.spec_dict: - val = self.spec_dict[key] - return val[self.MEM_GB_KEY] + if coord_str in self.spec_dict: + return self.spec_dict[coord_str][self.MEM_GB_KEY] * 1024 * overprovision_multiplier else: - return self.default_mem_gb + return self.default_mem_gb * 1024 def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, @@ -378,10 +381,15 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_speci reordered_lines = tile_specifier.reorder(coord_lines) - for coord_line in coord_lines: + overprovision_multiplier = 1.2 # 20% more memory than we think we need + for coord_line in reordered_lines: + # override memory requirements for this job with what the tile_specifier tells us + cfg["batch"]["memory"] = tile_specifier.get_mem_reqs_mb(coord_line, overprovision_multiplier) + args = BatchEnqueueArgs(config_file, coord_line, None) tilequeue_batch_enqueue(cfg, args) + # adaptor class for MissingTiles to see just the high zoom parts, this is used # along with the LowZoomLense to loop over missing tiles generically but # separately. @@ -463,6 +471,7 @@ def create_tile_specifier(tile_specifier_file): return tile_specifier + if __name__ == '__main__': import argparse From a4901c01776d08bb5bbb86400d97b4ad48daaa3a Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 20 Apr 2021 12:26:25 -0700 Subject: [PATCH 05/22] doubling memory request each retry --- batch-setup/make_meta_tiles.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index f83853a..70fb0fc 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -366,7 +366,7 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, return big_jobs -def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_specifier=TileSpecifier()): +def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, retry_number, tile_specifier=TileSpecifier()): from tilequeue.command import make_config_from_argparse from tilequeue.command import tilequeue_batch_enqueue from make_rawr_tiles import BatchEnqueueArgs @@ -381,7 +381,11 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_speci reordered_lines = tile_specifier.reorder(coord_lines) - overprovision_multiplier = 1.2 # 20% more memory than we think we need + if retry_number == 0: + overprovision_multiplier = 1.2 # overprovision by 20% at the start + else: + overprovision_multiplier = (2.0 ** retry_number) # double memory each time we retry + for coord_line in reordered_lines: # override memory requirements for this job with what the tile_specifier tells us cfg["batch"]["memory"] = tile_specifier.get_mem_reqs_mb(coord_line, overprovision_multiplier) @@ -448,7 +452,7 @@ def render(self, num_retries, lense): print("Enqueueing %d %s tiles (e.g. %s)" % (count, lense.description, ', '.join(sample))) enqueue_tiles(lense.config, missing_tile_file, - check_metatile_exists, tile_specifier) + check_metatile_exists, retry_number, tile_specifier) else: with self._missing() as missing: From ac7639e811e7314817ac3acc83badedf7e7c98c0 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 20 Apr 2021 12:39:37 -0700 Subject: [PATCH 06/22] being paranoid --- batch-setup/make_meta_tiles.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 70fb0fc..c28f866 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -284,6 +284,7 @@ def from_ordering_file(filename, default_mem_gb=8): reader = csv.DictReader(fh) for row in reader: try: + # TODO: add checks for keys coord = Coordinate(row['zoom'], row['x'], row['y']) spec_dict[serialize_coord(coord)] = \ {TileSpecifier.MEM_GB_KEY: row[TileSpecifier.MEM_GB_KEY], TileSpecifier.ORDER_KEY: order_idx} @@ -386,9 +387,11 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, retry_numb else: overprovision_multiplier = (2.0 ** retry_number) # double memory each time we retry + max_mem_mb = 32 * 1024 for coord_line in reordered_lines: # override memory requirements for this job with what the tile_specifier tells us - cfg["batch"]["memory"] = tile_specifier.get_mem_reqs_mb(coord_line, overprovision_multiplier) + cfg["batch"]["memory"] = max(int(tile_specifier.get_mem_reqs_mb(coord_line, overprovision_multiplier)), + max_mem_mb) args = BatchEnqueueArgs(config_file, coord_line, None) tilequeue_batch_enqueue(cfg, args) From 87c4ba36d08bdfb90189577043b665e135429e73 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Thu, 22 Apr 2021 16:48:14 -0700 Subject: [PATCH 07/22] Updates based on testing live --- batch-setup/make_meta_tiles.py | 54 +++++++++++++++++----------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index c28f866..e8c9014 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -19,6 +19,7 @@ from tilequeue.store import make_s3_tile_key_generator from ModestMaps.Core import Coordinate from multiprocessing import Pool +import time import sys MissingTiles = namedtuple('MissingTiles', 'low_zoom_file high_zoom_file') @@ -149,7 +150,7 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): self.read_metas_to_file(missing_meta_file, compress=True) - print("Splitting into high and low zoom lists") + print("[%s] Splitting into high and low zoom lists" % (time.ctime())) # contains zooms 0 until group zoom. the jobs between the group # zoom and RAWR zoom are merged into the parent at group zoom. @@ -190,11 +191,13 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): for coord in missing_high: fh.write(serialize_coord(coord) + "\n") + print("[%s] Done splitting into high and low zoom lists" % (time.ctime())) yield MissingTiles(missing_low_file, missing_high_file) finally: shutil.rmtree(tmpdir) + @contextmanager def present_tiles(self): """ @@ -265,7 +268,7 @@ def __init__(self, default_mem_gb=8, spec_dict={}): "10/12/3": {"mem_gb": 0.3, "order": 10}} """ self.default_mem_gb = default_mem_gb - self.spec_dict = {} + self.spec_dict = spec_dict @staticmethod def from_ordering_file(filename, default_mem_gb=8): @@ -285,12 +288,12 @@ def from_ordering_file(filename, default_mem_gb=8): for row in reader: try: # TODO: add checks for keys - coord = Coordinate(row['zoom'], row['x'], row['y']) - spec_dict[serialize_coord(coord)] = \ - {TileSpecifier.MEM_GB_KEY: row[TileSpecifier.MEM_GB_KEY], TileSpecifier.ORDER_KEY: order_idx} + coord_str = serialize_coord(Coordinate(int(row['y']), int(row['x']), int(row['zoom']))) + spec_dict[coord_str] = \ + {TileSpecifier.MEM_GB_KEY: float(row[TileSpecifier.MEM_GB_KEY]), TileSpecifier.ORDER_KEY: order_idx} order_idx += 1 except: - print("Error for line parsed as %s in TileSpecifier.from_ordering_file" % row) + print("Error for line parsed as %s in TileSpecifier.from_ordering_file - details %s" % (row, sys.exc_info()[0])) continue return TileSpecifier(default_mem_gb, spec_dict) @@ -301,16 +304,16 @@ def reorder(self, coord_list): coords that are in the coord_list that aren't mentioned in the ordering will go first. :return: """ - return sorted(coord_list, lambda coord: self.get_ordering_val(coord)) + return sorted(coord_list, key=lambda coord: self.get_ordering_val(coord)) def get_ordering_val(self, coord): """ + coord is type string returns ordering location for coord. The lower it is the earlier in the order. If there is no ordering specified for this coordinate, returns 0 """ - key = serialize_coord(coord) - if key in self.spec_dict: - return self.spec_dict[key][self.ORDER_KEY] + if coord in self.spec_dict: + return self.spec_dict[coord][self.ORDER_KEY] return 0 @@ -344,7 +347,7 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, job_sizer = _JobSizer(rawr_bucket, prefix, key_format_type, rawr_zoom) big_jobs = CoordSet(group_zoom, min_zoom=group_zoom) - + print("[%s] Finding big jobs by counting the size of raw tiles" % time.ctime()) # loop over each column individually. this limits the number of concurrent # tasks, which means we don't waste memory maintaining a huge queue of # pending tasks. and when something goes wrong, the stacktrace isn't buried @@ -364,10 +367,11 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, if size >= size_threshold: big_jobs[coord] = True + print("[%s] Done finding big jobs" % time.ctime()) return big_jobs -def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, retry_number, tile_specifier=TileSpecifier()): +def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_specifier=TileSpecifier()): from tilequeue.command import make_config_from_argparse from tilequeue.command import tilequeue_batch_enqueue from make_rawr_tiles import BatchEnqueueArgs @@ -378,24 +382,20 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, retry_numb cfg = make_config_from_argparse(fh) with open(tile_list_file, 'r') as tile_list: - coord_lines = tile_list.readlines() + coord_lines = [line.strip() for line in tile_list.readlines()] reordered_lines = tile_specifier.reorder(coord_lines) - if retry_number == 0: - overprovision_multiplier = 1.2 # overprovision by 20% at the start - else: - overprovision_multiplier = (2.0 ** retry_number) # double memory each time we retry - - max_mem_mb = 32 * 1024 + overprovision_multiplier = 1.2 # overprovision by 20% + print("[%s] Starting to enqueue %d tile batches" % (time.ctime(), len(reordered_lines))) for coord_line in reordered_lines: # override memory requirements for this job with what the tile_specifier tells us - cfg["batch"]["memory"] = max(int(tile_specifier.get_mem_reqs_mb(coord_line, overprovision_multiplier)), - max_mem_mb) + memory_mb = int(tile_specifier.get_mem_reqs_mb(coord_line, overprovision_multiplier)) + cfg.yml["batch"]["memory"] = memory_mb - args = BatchEnqueueArgs(config_file, coord_line, None) + args = BatchEnqueueArgs(config_file, coord_line, None, None) tilequeue_batch_enqueue(cfg, args) - + print("[%s] Done enqueuing tile batches" % time.ctime()) # adaptor class for MissingTiles to see just the high zoom parts, this is used # along with the LowZoomLense to loop over missing tiles generically but @@ -429,6 +429,7 @@ def __init__(self, tile_finder, big_jobs, split_zoom, zoom_max, tile_specifier=T self.split_zoom = split_zoom self.zoom_max = zoom_max self.allowed_missing_tiles = allowed_missing_tiles + self.tile_specifier = tile_specifier def _missing(self): return self.tile_finder.missing_tiles_split( @@ -455,7 +456,7 @@ def render(self, num_retries, lense): print("Enqueueing %d %s tiles (e.g. %s)" % (count, lense.description, ', '.join(sample))) enqueue_tiles(lense.config, missing_tile_file, - check_metatile_exists, retry_number, tile_specifier) + check_metatile_exists, self.tile_specifier) else: with self._missing() as missing: @@ -468,11 +469,10 @@ def render(self, num_retries, lense): def create_tile_specifier(tile_specifier_file): - default_mem_gb=8 tile_specifier = TileSpecifier() if tile_specifier_file is not None: try: - tile_specifier = TileSpecifier.from_ordering_file(tile_specifier_filename, default_mem_gb=8) + tile_specifier = TileSpecifier.from_ordering_file(tile_specifier_file, default_mem_gb=8) except: print("Error creating TileSpecifier, will use the default. Error details: {}".format(sys.exc_info()[0])) @@ -556,7 +556,7 @@ def create_tile_specifier(tile_specifier_file): buckets.rawr, missing_bucket_date_prefix, args.key_format_type, split_zoom, zoom_max, args.size_threshold) - tile_renderer = TileRenderer(tile_finder, big_jobs, split_zoom, zoom_max, args.allowed_missing_tiles, tile_specifier) + tile_renderer = TileRenderer(tile_finder, big_jobs, split_zoom, zoom_max, tile_specifier, args.allowed_missing_tiles) tile_renderer.render(args.retries, LowZoomLense(args.low_zoom_config)) From ffd4b9e216c72bd2d0b2682ee65a6ccb20c3013e Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Mon, 26 Apr 2021 13:09:09 -0700 Subject: [PATCH 08/22] updating instance type to c5 compute-optmimized --- batch-setup/batch_setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch-setup/batch_setup.py b/batch-setup/batch_setup.py index daa9556..a5a7689 100644 --- a/batch-setup/batch_setup.py +++ b/batch-setup/batch_setup.py @@ -196,7 +196,7 @@ def batch_setup(region_name, run_id, vpc_id, securityGroupIds, computeEnvironmen minvCpus=0, maxvCpus=max_vcpus, desiredvCpus=0, - instanceTypes=["r5"], + instanceTypes=["c5"], # although this is called "instanceRole", it really wants an instance _profile_ ARN. instanceRole=instanceProfileArn, tags={ From 76ff7166581555507635f49f717adfe04f1fb86b Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Fri, 30 Apr 2021 14:09:58 -0700 Subject: [PATCH 09/22] woops forgot to add time to other dockerfiles --- docker/meta-low-zoom-batch/Dockerfile | 2 +- docker/rawr-batch/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/meta-low-zoom-batch/Dockerfile b/docker/meta-low-zoom-batch/Dockerfile index 2139dc3..1c8544e 100644 --- a/docker/meta-low-zoom-batch/Dockerfile +++ b/docker/meta-low-zoom-batch/Dockerfile @@ -1,7 +1,7 @@ FROM python:2 RUN apt-get -y update \ - && apt-get -y install libgeos-dev libboost-python-dev \ + && apt-get -y install libgeos-dev libboost-python-dev time \ && rm -rf /var/lib/apt/lists/* COPY raw_tiles /usr/src/raw_tiles diff --git a/docker/rawr-batch/Dockerfile b/docker/rawr-batch/Dockerfile index c3b0d05..d234ead 100644 --- a/docker/rawr-batch/Dockerfile +++ b/docker/rawr-batch/Dockerfile @@ -1,7 +1,7 @@ FROM python:2 RUN apt-get -y update \ - && apt-get -y install libgeos-dev \ + && apt-get -y install libgeos-dev time \ && rm -rf /var/lib/apt/lists/* COPY raw_tiles /usr/src/raw_tiles From 7e2ef955a14699f92be04654b357f77bd9c39d9f Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Fri, 7 May 2021 07:17:56 -0700 Subject: [PATCH 10/22] adding a better mem calculation --- batch-setup/make_meta_tiles.py | 49 ++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 92c5d57..3a22594 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -317,13 +317,13 @@ def get_ordering_val(self, coord): return 0 - def get_mem_reqs_mb(self, coord_str, overprovision_multiplier=1.0): + def get_mem_reqs_mb(self, coord_str): """ returns the specified memory requirement in megabytes for the coordinate. If none are specified, returns default_gb """ if coord_str in self.spec_dict: - return self.spec_dict[coord_str][self.MEM_GB_KEY] * 1024 * overprovision_multiplier + return self.spec_dict[coord_str][self.MEM_GB_KEY] * 1024 else: return self.default_mem_gb * 1024 @@ -371,6 +371,27 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, return big_jobs +def viable_container_overrides(mem_mb): + """ + Turns a number into the next highest even multiple that AWS will accept, and the min number of CPUs you need for that amount + :param mem_mb: (int) the megabytes of memory you'd request in an ideal world + :return: the amount of mem you need to request for AWS batch to honor it, the amount of vcpus you must request + """ + if mem_mb < 512: + return 512 + + if mem_mb % 1024 == 0: + return mem_mb + + # truncate number / 1024, then bump by 1 + desired_mem_mb = (int(mem_mb) / 1024 + 1) * 1024 + + max_mem_per_vcpu = 8 * 1024 + vcpus = (desired_mem_mb - 1) / max_mem_per_vcpu + 1 + + return desired_mem_mb, vcpus + + def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_specifier=TileSpecifier(), mem_multiplier=1.0, mem_max=32 * 1024): from tilequeue.command import make_config_from_argparse from tilequeue.command import tilequeue_batch_enqueue @@ -385,26 +406,26 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_speci coord_lines = [line.strip() for line in tile_list.readlines()] reordered_lines = tile_specifier.reorder(coord_lines) - - # if we aren't already increasing our memory usage somewhere else, we want to - # overprovision by 20% to allow for changes in tile complexity over time - overprovision_multiplier = 1.0 - if mem_multiplier <= 1.0: - overprovision_multiplier = 1.2 - + print("[%s] Starting to enqueue %d tile batches" % (time.ctime(), len(reordered_lines))) for coord_line in reordered_lines: # override memory requirements for this job with what the tile_specifier tells us - mem_mb = int(tile_specifier.get_mem_reqs_mb(coord_line, overprovision_multiplier)) - update_memory_request(cfg, mem_mb, mem_multiplier, mem_max) + mem_mb = int(tile_specifier.get_mem_reqs_mb(coord_line)) + adjusted_mem = mem_mb * mem_multiplier + + # now that we know what we think we want, pick something AWS actually supports + viable_mem_request, required_min_cpus = viable_container_overrides(adjusted_mem) + + update_container_overrides(cfg, viable_mem_request, mem_max, required_min_cpus) args = BatchEnqueueArgs(config_file, coord_line, None, None) tilequeue_batch_enqueue(cfg, args) print("[%s] Done enqueuing tile batches" % time.ctime()) -def update_memory_request(cfg, mem_mb, mem_multiplier, mem_max): - adjusted_mem = mem_mb * mem_multiplier - cfg.yml["batch"]["memory"] = int(min(adjusted_mem, mem_max)) + +def update_container_overrides(cfg, mem_mb, mem_max, cpus): + cfg.yml["batch"]["memory"] = int(min(mem_mb, mem_max)) + cfg.yml["batch"]["vcpus"] = cpus # adaptor class for MissingTiles to see just the high zoom parts, this is used From 5b7e09207f5ab62f9d107ee366c39abe6408ae55 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Fri, 7 May 2021 12:21:26 -0700 Subject: [PATCH 11/22] Switching to on-demand compute instances --- batch-setup/batch_setup.py | 31 +------------------------------ batch-setup/make_meta_tiles.py | 13 +++++++------ 2 files changed, 8 insertions(+), 36 deletions(-) diff --git a/batch-setup/batch_setup.py b/batch-setup/batch_setup.py index a5a7689..d3f977c 100644 --- a/batch-setup/batch_setup.py +++ b/batch-setup/batch_setup.py @@ -148,34 +148,6 @@ def batch_setup(region_name, run_id, vpc_id, securityGroupIds, computeEnvironmen boto_iam, boto_ec2, instanceRoleName, instanceRoleName) print("Using ECS instance profile %s" % instanceProfileArn) - # Create the spot fleet role - # https://docs.aws.amazon.com/batch/latest/userguide/spot_fleet_IAM_role.html - spotIamFleetRoleName = "AmazonEC2SpotFleetRole" - spotIamRoleDocument = { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": { - "Service": "spotfleet.amazonaws.com" - }, - "Action": "sts:AssumeRole" - } - ] - } - spotIamFleetRoleArn = get_or_create_role( - boto_iam, spotIamFleetRoleName, spotIamRoleDocument) - print("Using EC2 Spot Fleet role %s" % spotIamFleetRoleArn) - - spotFleetTaggingRolePolicy = find_policy( - boto_iam, 'AmazonEC2SpotFleetTaggingRole') - boto_iam.attach_role_policy( - PolicyArn=spotFleetTaggingRolePolicy['Arn'], - RoleName=spotIamFleetRoleName, - ) - print("Attached Spot Fleet Tagging Role to EC2 Spot Fleet role %s" - % spotIamFleetRoleArn) - # Create a compute environment for raw tile rendering try: response = boto_batch.describe_compute_environments( @@ -192,7 +164,7 @@ def batch_setup(region_name, run_id, vpc_id, securityGroupIds, computeEnvironmen state="ENABLED", serviceRole=serviceRoleArn, computeResources=dict( - type='SPOT', + type='EC2', minvCpus=0, maxvCpus=max_vcpus, desiredvCpus=0, @@ -205,7 +177,6 @@ def batch_setup(region_name, run_id, vpc_id, securityGroupIds, computeEnvironmen 'cost_resource_group': run_id, }, bidPercentage=60, - spotIamFleetRole=spotIamFleetRoleArn, subnets=subnet_ids, securityGroupIds=securityGroupIds, ) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 3a22594..450779a 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -378,16 +378,17 @@ def viable_container_overrides(mem_mb): :return: the amount of mem you need to request for AWS batch to honor it, the amount of vcpus you must request """ if mem_mb < 512: - return 512 + return 512, 1 if mem_mb % 1024 == 0: - return mem_mb + return mem_mb, 1 - # truncate number / 1024, then bump by 1 - desired_mem_mb = (int(mem_mb) / 1024 + 1) * 1024 + mem_gb_truncated = mem_mb / 1024 + next_gb = mem_gb_truncated + 1 + desired_mem_mb = next_gb * 1024 max_mem_per_vcpu = 8 * 1024 - vcpus = (desired_mem_mb - 1) / max_mem_per_vcpu + 1 + vcpus = 1 + (desired_mem_mb - 1)/max_mem_per_vcpu return desired_mem_mb, vcpus @@ -413,7 +414,7 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_speci mem_mb = int(tile_specifier.get_mem_reqs_mb(coord_line)) adjusted_mem = mem_mb * mem_multiplier - # now that we know what we think we want, pick something AWS actually supports + # now that we know what we want, pick something AWS actually supports viable_mem_request, required_min_cpus = viable_container_overrides(adjusted_mem) update_container_overrides(cfg, viable_mem_request, mem_max, required_min_cpus) From 010947874909a96041bafb4d88df9b155a17a152 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Fri, 7 May 2021 16:24:17 -0700 Subject: [PATCH 12/22] what is happening with python float coercion --- batch-setup/make_meta_tiles.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 450779a..88a74d0 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -377,18 +377,20 @@ def viable_container_overrides(mem_mb): :param mem_mb: (int) the megabytes of memory you'd request in an ideal world :return: the amount of mem you need to request for AWS batch to honor it, the amount of vcpus you must request """ + mem_mb = int(mem_mb) + if mem_mb < 512: return 512, 1 if mem_mb % 1024 == 0: return mem_mb, 1 - mem_gb_truncated = mem_mb / 1024 + mem_gb_truncated = int(mem_mb / 1024) next_gb = mem_gb_truncated + 1 desired_mem_mb = next_gb * 1024 max_mem_per_vcpu = 8 * 1024 - vcpus = 1 + (desired_mem_mb - 1)/max_mem_per_vcpu + vcpus = 1 + int((desired_mem_mb - 1)/max_mem_per_vcpu) return desired_mem_mb, vcpus From b60f89c67eb42bcf08a2d41c17d93eadbf664fac Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 18 May 2021 15:34:06 -0700 Subject: [PATCH 13/22] switching to rawr tiles size-based approach --- batch-setup/make_meta_tiles.py | 142 +++++++++++++++++++++++---------- 1 file changed, 99 insertions(+), 43 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 88a74d0..93a0bbb 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -1,3 +1,5 @@ +import queue + from batch import Buckets from batch import run_go import yaml @@ -130,15 +132,15 @@ def read_metas_to_file(self, filename, present=False, compress=False): stdout=filename) @contextmanager - def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): + def missing_tiles_split(self, split_zoom, zoom_max, job_set): """ To be used in a with-statement. Yields a MissingTiles object, giving information about the tiles which are missing. - High zoom jobs are output either at split_zoom (RAWR tile granularity) - or zoom_max (usually lower, e.g: 7) depending on whether big_jobs - contains a truthy value for the RAWR tile. The big_jobs are looked up - at zoom_max. + High zoom jobs are output between split_zoom (RAWR tile granularity) and zoom_max + (usually lower, e.g: 6) depending on whether job_list + contains a truthy value for the RAWR tile. The job_list jobs are are looked up + at starting at zoom_max, then at increasing zooms until split_zoom. """ self.run_batch_job() @@ -163,25 +165,31 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): with gzip.open(missing_meta_file, 'r') as fh: for line in fh: - c = deserialize_coord(line) - if c.zoom < split_zoom: + this_coord = deserialize_coord(line) + if this_coord.zoom < split_zoom: # in order to not have too many jobs in the queue, we # group the low zoom jobs to the zoom_max (usually 7) - if c.zoom > zoom_max: - c = c.zoomTo(zoom_max).container() + if this_coord.zoom > zoom_max: + this_coord = this_coord.zoomTo(zoom_max).container() - missing_low[c] = True + missing_low[this_coord] = True else: - # if the group of jobs at zoom_max would be too big - # (according to big_jobs[]) then enqueue the original - # coordinate. this is to prevent a "long tail" of huge - # job groups. - job_coord = c.zoomTo(zoom_max).container() - if not big_jobs[job_coord]: - c = job_coord - - missing_high[c] = True + # check the job set at every zoom starting from + # zoom max - if we don't find it at a lower zoom + # register it at split_zoom + job_registered = False + for this_zoom in range(zoom_max, split_zoom): + lower_zoom_job_coord = this_coord.zoomTo(this_zoom) + if job_set[lower_zoom_job_coord]: + print("REMOVEME: Registered %s at lower zoom coord %s" % (serialize_coord(this_coord), serialize_coord(lower_zoom_job_coord))) + missing_high[lower_zoom_job_coord] = True + job_registered = True + break + + if not job_registered: + print("REMOVEME: Fell back to registering %s at split zoom: %s" % (serialize_coord(this_coord), serialize_coord(lower_zoom_job_coord))) + missing_high[this_coord] = True with open(missing_low_file, 'w') as fh: for coord in missing_low: @@ -238,7 +246,7 @@ def __call__(self, parent): assert dz >= 0 width = 1 << dz - size = 0 + sizes = {} for dx in range(width): for dy in range(width): coord = Coordinate( @@ -247,9 +255,19 @@ def __call__(self, parent): row=((parent.row << dz) + dy)) key = gen(self.prefix, coord, 'zip') response = s3.head_object(Bucket=self.bucket, Key=key) - size += response['ContentLength'] + sizes[coord] = response['ContentLength'] + + # now sum the sizes from rawr zoom to parent zoom + for coord in sizes.keys(): + print("REMOVEME: started at %s - size: %s" % (serialize_coord(coord), sizes[coord])) + for zoom in range(self.rawr_zoom, parent.zoom, -1): + parent_coord = coord.zoomTo(zoom-1).container() + if not sizes.has_key(parent_coord): + sizes.parent_coord = 0 + sizes[parent_coord] += sizes[coord] + print("REMOVEME: ended at %s - total size: %s" % (serialize_coord(parent_coord), sizes[parent_coord])) - return parent, size + return parent, sizes class TileSpecifier(object): @@ -297,6 +315,14 @@ def from_ordering_file(filename, default_mem_gb=8): continue return TileSpecifier(default_mem_gb, spec_dict) + @staticmethod + def from_coord_list(coord_list, default_mem_gb): + spec_dict = {} + for i in range(len(coord_list)): + coord_str = serialize_coord(coord_list[i]) + spec_dict[coord_str] = {TileSpecifier.ORDER_KEY: i} + + return TileSpecifier(default_mem_gb, spec_dict) def reorder(self, coord_list): """ @@ -328,14 +354,13 @@ def get_mem_reqs_mb(self, coord_str): return self.default_mem_gb * 1024 -def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, - size_threshold, pool_size=30): +def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, + size_threshold, pool_size=30): """ Look up the RAWR tiles in the rawr_bucket under the prefix and with the given key format, group the RAWR tiles (usually at zoom 10) by the job - group zoom (usually 7) and sum their sizes. Return a map-like object - which has a truthy value for those Coordinates at group_zoom which sum - to size_threshold or more. + group zoom (usually 7) and sum their sizes. Return an ordered list of job coordinates + by descending raw size sum. A pool size of 30 seems to work well; the point of the pool is to hide the latency of S3 requests, so pretty quickly hits the law of diminishing @@ -346,13 +371,15 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, p = Pool(pool_size) job_sizer = _JobSizer(rawr_bucket, prefix, key_format_type, rawr_zoom) - big_jobs = CoordSet(group_zoom, min_zoom=group_zoom) - print("[%s] Finding big jobs by counting the size of raw tiles" % time.ctime()) + grouped_by_rawr_tile_size = [] + print("[%s] Bucketizing jobs by zoom using size of raw tiles" % time.ctime()) # loop over each column individually. this limits the number of concurrent # tasks, which means we don't waste memory maintaining a huge queue of # pending tasks. and when something goes wrong, the stacktrace isn't buried # in a million others. num_coords = 1 << group_zoom + all_sizes = {} + grouping_queue = queue.SimpleQueue() for x in range(num_coords): # kick off tasks async. each one knows its own coordinate, so we only # need to track the handle to know when its finished. @@ -360,15 +387,38 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, for y in range(num_coords): coord = Coordinate(zoom=group_zoom, column=x, row=y) tasks.append(p.apply_async(job_sizer, (coord,))) + grouping_queue.put_nowait(coord) # queue for future size counting # collect tasks and put them into the big jobs list. for task in tasks: - coord, size = task.get() - if size >= size_threshold: - big_jobs[coord] = True + coord, sizes = task.get() + all_sizes = all_sizes.update(sizes) + print("REMOVEME: Finished the %s column" % x) + + # now use all_sizes plus the size_threshold to find the lowest zoom we can group the tiles in + counts_at_zoom = {} + while not grouping_queue.empty(): + this_coord = grouping_queue.get_nowait() + this_size = all_sizes[this_coord] + + if this_size <= size_threshold or this_coord.zoom == rawr_zoom: + grouped_by_rawr_tile_size.append(this_coord) + grouping_queue.task_done() + + if not counts_at_zoom.has_key(this_coord.zoom): + counts_at_zoom[this_coord.zoom] = 0 + counts_at_zoom += 1 + else: + top_left_child = this_coord.zoomBy(1) + + grouping_queue.put_nowait(top_left_child) + grouping_queue.put_nowait(top_left_child.down(1)) + grouping_queue.put_nowait(top_left_child.right(1)) + grouping_queue.put_nowait(top_left_child.down(1).right(1)) - print("[%s] Done finding big jobs" % time.ctime()) - return big_jobs + print("[%s] Done bucketizing jobs - count by zoom %s" % (time.ctime(), counts_at_zoom)) + ordered_job_list = sorted(grouped_by_rawr_tile_size, key=lambda coord: all_sizes[coord], reverse=True) + return ordered_job_list def viable_container_overrides(mem_mb): @@ -457,9 +507,9 @@ def missing_file(self, missing): # certain number of retries. class TileRenderer(object): - def __init__(self, tile_finder, big_jobs, split_zoom, zoom_max, tile_specifier=TileSpecifier(), allowed_missing_tiles=0): + def __init__(self, tile_finder, high_zoom_job_set, split_zoom, zoom_max, tile_specifier=TileSpecifier(), allowed_missing_tiles=0): self.tile_finder = tile_finder - self.big_jobs = big_jobs + self.high_zoom_job_set = high_zoom_job_set self.split_zoom = split_zoom self.zoom_max = zoom_max self.allowed_missing_tiles = allowed_missing_tiles @@ -467,7 +517,7 @@ def __init__(self, tile_finder, big_jobs, split_zoom, zoom_max, tile_specifier=T def _missing(self): return self.tile_finder.missing_tiles_split( - self.split_zoom, self.zoom_max, self.big_jobs) + self.split_zoom, self.zoom_max, self.high_zoom_job_set) def render(self, num_retries, lense): mem_max = 32 * 1024 # 32 GiB @@ -517,6 +567,12 @@ def create_tile_specifier(tile_specifier_file): return tile_specifier +def make_coord_set(coords, max_zoom, min_zoom): + coord_set = CoordSet(max_zoom, min_zoom) + for coord in coords: + coord_set[coord] = True + return coord_set + if __name__ == '__main__': import argparse @@ -551,7 +607,7 @@ def create_tile_specifier(tile_specifier_file): "prefixed with the date or hash first.") parser.add_argument('--metatile-size', default=8, type=int, help='Metatile size (in 256px tiles).') - parser.add_argument('--size-threshold', default=350000000, type=int, + parser.add_argument('--size-threshold', default=80_000_000, type=int, help='If all the RAWR tiles grouped together are ' 'bigger than this, split the job up into individual ' 'RAWR tiles.') @@ -572,7 +628,7 @@ def create_tile_specifier(tile_specifier_file): # TODO: split zoom and zoom max should come from config. split_zoom = 10 - zoom_max = 7 + zoom_max = 6 region = args.region or os.environ.get('AWS_DEFAULT_REGION') if region is None: @@ -588,13 +644,13 @@ def create_tile_specifier(tile_specifier_file): buckets.missing, buckets.meta, date_prefix, missing_bucket_date_prefix, region, args.key_format_type, args.config, metatile_max_zoom) - tile_specifier = create_tile_specifier(args.tile_specifier_file) - - big_jobs = _big_jobs( + jobs_list = _distribute_jobs_by_raw_tile_size( buckets.rawr, missing_bucket_date_prefix, args.key_format_type, split_zoom, zoom_max, args.size_threshold) - tile_renderer = TileRenderer(tile_finder, big_jobs, split_zoom, zoom_max, tile_specifier, args.allowed_missing_tiles) + tile_specifier = TileSpecifier.from_coord_list(jobs_list, 4) + + tile_renderer = TileRenderer(tile_finder, make_coord_set(jobs_list, split_zoom, zoom_max), split_zoom, zoom_max, tile_specifier, args.allowed_missing_tiles) tile_renderer.render(args.retries, LowZoomLense(args.low_zoom_config)) From 1a6469dd2bc716f8d7e7a5c22f5da03e17178b83 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 18 May 2021 15:36:10 -0700 Subject: [PATCH 14/22] update --- batch-setup/make_meta_tiles.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 93a0bbb..e741f32 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -259,13 +259,12 @@ def __call__(self, parent): # now sum the sizes from rawr zoom to parent zoom for coord in sizes.keys(): - print("REMOVEME: started at %s - size: %s" % (serialize_coord(coord), sizes[coord])) for zoom in range(self.rawr_zoom, parent.zoom, -1): parent_coord = coord.zoomTo(zoom-1).container() - if not sizes.has_key(parent_coord): - sizes.parent_coord = 0 + if parent_coord not in sizes: + sizes[parent_coord] = 0 sizes[parent_coord] += sizes[coord] - print("REMOVEME: ended at %s - total size: %s" % (serialize_coord(parent_coord), sizes[parent_coord])) + print("Completed parent %s" % serialize_coord(parent_coord)) return parent, sizes @@ -379,7 +378,7 @@ def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr # in a million others. num_coords = 1 << group_zoom all_sizes = {} - grouping_queue = queue.SimpleQueue() + grouping_queue = queue.Queue() for x in range(num_coords): # kick off tasks async. each one knows its own coordinate, so we only # need to track the handle to know when its finished. @@ -405,7 +404,7 @@ def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr grouped_by_rawr_tile_size.append(this_coord) grouping_queue.task_done() - if not counts_at_zoom.has_key(this_coord.zoom): + if not this_coord.zoom in counts_at_zoom: counts_at_zoom[this_coord.zoom] = 0 counts_at_zoom += 1 else: @@ -607,7 +606,7 @@ def make_coord_set(coords, max_zoom, min_zoom): "prefixed with the date or hash first.") parser.add_argument('--metatile-size', default=8, type=int, help='Metatile size (in 256px tiles).') - parser.add_argument('--size-threshold', default=80_000_000, type=int, + parser.add_argument('--size-threshold', default=80000000, type=int, help='If all the RAWR tiles grouped together are ' 'bigger than this, split the job up into individual ' 'RAWR tiles.') @@ -648,6 +647,8 @@ def make_coord_set(coords, max_zoom, min_zoom): buckets.rawr, missing_bucket_date_prefix, args.key_format_type, split_zoom, zoom_max, args.size_threshold) + sys.exit() + tile_specifier = TileSpecifier.from_coord_list(jobs_list, 4) tile_renderer = TileRenderer(tile_finder, make_coord_set(jobs_list, split_zoom, zoom_max), split_zoom, zoom_max, tile_specifier, args.allowed_missing_tiles) From 5ccfb9e91560f3dd3f751a6daf4278d4ee436db6 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 18 May 2021 16:11:59 -0700 Subject: [PATCH 15/22] update doesn't work like that --- batch-setup/make_meta_tiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index e741f32..a7d5949 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -391,7 +391,7 @@ def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr # collect tasks and put them into the big jobs list. for task in tasks: coord, sizes = task.get() - all_sizes = all_sizes.update(sizes) + all_sizes.update(sizes) print("REMOVEME: Finished the %s column" % x) # now use all_sizes plus the size_threshold to find the lowest zoom we can group the tiles in From 5c6089a46524190bcddaf6001cea8cff554366ad Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 18 May 2021 16:47:17 -0700 Subject: [PATCH 16/22] bugs --- batch-setup/make_meta_tiles.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index a7d5949..c4170d5 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -404,7 +404,7 @@ def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr grouped_by_rawr_tile_size.append(this_coord) grouping_queue.task_done() - if not this_coord.zoom in counts_at_zoom: + if this_coord.zoom not in counts_at_zoom: counts_at_zoom[this_coord.zoom] = 0 counts_at_zoom += 1 else: @@ -444,7 +444,8 @@ def viable_container_overrides(mem_mb): return desired_mem_mb, vcpus -def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_specifier=TileSpecifier(), mem_multiplier=1.0, mem_max=32 * 1024): +def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_specifier=TileSpecifier(), + mem_multiplier=1.0, mem_max=32 * 1024): from tilequeue.command import make_config_from_argparse from tilequeue.command import tilequeue_batch_enqueue from make_rawr_tiles import BatchEnqueueArgs From 5b76da7faa6dddc739a982efacc11a8adbbb0b8d Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 18 May 2021 17:29:43 -0700 Subject: [PATCH 17/22] more dumb errors --- batch-setup/make_meta_tiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index c4170d5..d100815 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -406,7 +406,7 @@ def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr if this_coord.zoom not in counts_at_zoom: counts_at_zoom[this_coord.zoom] = 0 - counts_at_zoom += 1 + counts_at_zoom[this_coord.zoom] += 1 else: top_left_child = this_coord.zoomBy(1) From 0557b6494834a09e505927df61e139d40fd6e108 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Tue, 18 May 2021 17:47:09 -0700 Subject: [PATCH 18/22] switching to m5s --- batch-setup/batch_setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch-setup/batch_setup.py b/batch-setup/batch_setup.py index d3f977c..12fbdfb 100644 --- a/batch-setup/batch_setup.py +++ b/batch-setup/batch_setup.py @@ -168,7 +168,7 @@ def batch_setup(region_name, run_id, vpc_id, securityGroupIds, computeEnvironmen minvCpus=0, maxvCpus=max_vcpus, desiredvCpus=0, - instanceTypes=["c5"], + instanceTypes=["m5"], # although this is called "instanceRole", it really wants an instance _profile_ ARN. instanceRole=instanceProfileArn, tags={ From dbe4f64e4c3f71f775036e9241cde49436f7d9cb Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Wed, 19 May 2021 13:56:53 -0700 Subject: [PATCH 19/22] adding count verification --- batch-setup/make_meta_tiles.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index d100815..4da13e8 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -416,6 +416,17 @@ def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr grouping_queue.put_nowait(top_left_child.down(1).right(1)) print("[%s] Done bucketizing jobs - count by zoom %s" % (time.ctime(), counts_at_zoom)) + # validate counts by zoom - expecting the equivalent of 4^10 zoom 10 jobs. + counts_at_zoom_sum = 0 + for z in counts_at_zoom.keys(): + count_at_this_zoom = counts_at_zoom[z] + zoom_10_equiv_count = count_at_this_zoom * (4 ** (10 - z)) + counts_at_zoom_sum += zoom_10_equiv_count + if counts_at_zoom_sum == 4**10: + print("Count of jobs by zoom is correct") + else: + print("Count of jobs by zoom is off by %s" % counts_at_zoom_sum - 4**10) + ordered_job_list = sorted(grouped_by_rawr_tile_size, key=lambda coord: all_sizes[coord], reverse=True) return ordered_job_list From de9072feb4959151fa1b940f4a68034eded932e5 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Wed, 19 May 2021 13:58:35 -0700 Subject: [PATCH 20/22] adding missing tiles split info --- batch-setup/make_meta_tiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 4da13e8..c3aaadb 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -197,6 +197,7 @@ def missing_tiles_split(self, split_zoom, zoom_max, job_set): with open(missing_high_file, 'w') as fh: for coord in missing_high: + print("REMOVEME: %s" % serialize_coord(coord)) fh.write(serialize_coord(coord) + "\n") print("[%s] Done splitting into high and low zoom lists" % (time.ctime())) @@ -659,7 +660,6 @@ def make_coord_set(coords, max_zoom, min_zoom): buckets.rawr, missing_bucket_date_prefix, args.key_format_type, split_zoom, zoom_max, args.size_threshold) - sys.exit() tile_specifier = TileSpecifier.from_coord_list(jobs_list, 4) From 06739a6c311e6fa85da3508fd32aff4dd25b7139 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Wed, 19 May 2021 17:01:10 -0700 Subject: [PATCH 21/22] too much logging --- batch-setup/make_meta_tiles.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index c3aaadb..0319694 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -182,13 +182,11 @@ def missing_tiles_split(self, split_zoom, zoom_max, job_set): for this_zoom in range(zoom_max, split_zoom): lower_zoom_job_coord = this_coord.zoomTo(this_zoom) if job_set[lower_zoom_job_coord]: - print("REMOVEME: Registered %s at lower zoom coord %s" % (serialize_coord(this_coord), serialize_coord(lower_zoom_job_coord))) missing_high[lower_zoom_job_coord] = True job_registered = True break if not job_registered: - print("REMOVEME: Fell back to registering %s at split zoom: %s" % (serialize_coord(this_coord), serialize_coord(lower_zoom_job_coord))) missing_high[this_coord] = True with open(missing_low_file, 'w') as fh: From b2a0bfe702e817541e5769c1634262320556b935 Mon Sep 17 00:00:00 2001 From: Travis Grigsby Date: Wed, 19 May 2021 19:46:49 -0700 Subject: [PATCH 22/22] updating logging --- batch-setup/make_meta_tiles.py | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 0319694..c162191 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -318,7 +318,7 @@ def from_coord_list(coord_list, default_mem_gb): spec_dict = {} for i in range(len(coord_list)): coord_str = serialize_coord(coord_list[i]) - spec_dict[coord_str] = {TileSpecifier.ORDER_KEY: i} + spec_dict[coord_str] = {TileSpecifier.ORDER_KEY: i, TileSpecifier.MEM_GB_KEY: default_mem_gb} return TileSpecifier(default_mem_gb, spec_dict) @@ -347,7 +347,7 @@ def get_mem_reqs_mb(self, coord_str): returns default_gb """ if coord_str in self.spec_dict: - return self.spec_dict[coord_str][self.MEM_GB_KEY] * 1024 + return self.spec_dict[coord_str][self.MEM_GB_KEY] * 1024 or self.default_mem_gb else: return self.default_mem_gb * 1024 @@ -478,7 +478,7 @@ def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_speci # now that we know what we want, pick something AWS actually supports viable_mem_request, required_min_cpus = viable_container_overrides(adjusted_mem) - + print("REMOVEME: [%s] enqueueing %s at %s mem mb and %s cpus" % (time.ctime(), coord_line, viable_mem_request, required_min_cpus)) update_container_overrides(cfg, viable_mem_request, mem_max, required_min_cpus) args = BatchEnqueueArgs(config_file, coord_line, None, None) @@ -566,17 +566,6 @@ def render(self, num_retries, lense): % (count, lense.description, num_retries, ', '.join(sample))) -def create_tile_specifier(tile_specifier_file): - tile_specifier = TileSpecifier() - if tile_specifier_file is not None: - try: - tile_specifier = TileSpecifier.from_ordering_file(tile_specifier_file, default_mem_gb=8) - except: - print("Error creating TileSpecifier, will use the default. Error details: {}".format(sys.exc_info()[0])) - - return tile_specifier - - def make_coord_set(coords, max_zoom, min_zoom): coord_set = CoordSet(max_zoom, min_zoom) for coord in coords: @@ -658,7 +647,6 @@ def make_coord_set(coords, max_zoom, min_zoom): buckets.rawr, missing_bucket_date_prefix, args.key_format_type, split_zoom, zoom_max, args.size_threshold) - tile_specifier = TileSpecifier.from_coord_list(jobs_list, 4) tile_renderer = TileRenderer(tile_finder, make_coord_set(jobs_list, split_zoom, zoom_max), split_zoom, zoom_max, tile_specifier, args.allowed_missing_tiles)