diff --git a/batch-setup/batch_setup.py b/batch-setup/batch_setup.py index daa9556..12fbdfb 100644 --- a/batch-setup/batch_setup.py +++ b/batch-setup/batch_setup.py @@ -148,34 +148,6 @@ def batch_setup(region_name, run_id, vpc_id, securityGroupIds, computeEnvironmen boto_iam, boto_ec2, instanceRoleName, instanceRoleName) print("Using ECS instance profile %s" % instanceProfileArn) - # Create the spot fleet role - # https://docs.aws.amazon.com/batch/latest/userguide/spot_fleet_IAM_role.html - spotIamFleetRoleName = "AmazonEC2SpotFleetRole" - spotIamRoleDocument = { - "Version": "2012-10-17", - "Statement": [ - { - "Effect": "Allow", - "Principal": { - "Service": "spotfleet.amazonaws.com" - }, - "Action": "sts:AssumeRole" - } - ] - } - spotIamFleetRoleArn = get_or_create_role( - boto_iam, spotIamFleetRoleName, spotIamRoleDocument) - print("Using EC2 Spot Fleet role %s" % spotIamFleetRoleArn) - - spotFleetTaggingRolePolicy = find_policy( - boto_iam, 'AmazonEC2SpotFleetTaggingRole') - boto_iam.attach_role_policy( - PolicyArn=spotFleetTaggingRolePolicy['Arn'], - RoleName=spotIamFleetRoleName, - ) - print("Attached Spot Fleet Tagging Role to EC2 Spot Fleet role %s" - % spotIamFleetRoleArn) - # Create a compute environment for raw tile rendering try: response = boto_batch.describe_compute_environments( @@ -192,11 +164,11 @@ def batch_setup(region_name, run_id, vpc_id, securityGroupIds, computeEnvironmen state="ENABLED", serviceRole=serviceRoleArn, computeResources=dict( - type='SPOT', + type='EC2', minvCpus=0, maxvCpus=max_vcpus, desiredvCpus=0, - instanceTypes=["r5"], + instanceTypes=["m5"], # although this is called "instanceRole", it really wants an instance _profile_ ARN. instanceRole=instanceProfileArn, tags={ @@ -205,7 +177,6 @@ def batch_setup(region_name, run_id, vpc_id, securityGroupIds, computeEnvironmen 'cost_resource_group': run_id, }, bidPercentage=60, - spotIamFleetRole=spotIamFleetRoleArn, subnets=subnet_ids, securityGroupIds=securityGroupIds, ) diff --git a/batch-setup/make_meta_tiles.py b/batch-setup/make_meta_tiles.py index 323156a..c162191 100644 --- a/batch-setup/make_meta_tiles.py +++ b/batch-setup/make_meta_tiles.py @@ -1,3 +1,5 @@ +import queue + from batch import Buckets from batch import run_go import yaml @@ -19,7 +21,8 @@ from tilequeue.store import make_s3_tile_key_generator from ModestMaps.Core import Coordinate from multiprocessing import Pool - +import time +import sys MissingTiles = namedtuple('MissingTiles', 'low_zoom_file high_zoom_file') @@ -129,15 +132,15 @@ def read_metas_to_file(self, filename, present=False, compress=False): stdout=filename) @contextmanager - def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): + def missing_tiles_split(self, split_zoom, zoom_max, job_set): """ To be used in a with-statement. Yields a MissingTiles object, giving information about the tiles which are missing. - High zoom jobs are output either at split_zoom (RAWR tile granularity) - or zoom_max (usually lower, e.g: 7) depending on whether big_jobs - contains a truthy value for the RAWR tile. The big_jobs are looked up - at zoom_max. + High zoom jobs are output between split_zoom (RAWR tile granularity) and zoom_max + (usually lower, e.g: 6) depending on whether job_list + contains a truthy value for the RAWR tile. The job_list jobs are are looked up + at starting at zoom_max, then at increasing zooms until split_zoom. """ self.run_batch_job() @@ -149,7 +152,7 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): self.read_metas_to_file(missing_meta_file, compress=True) - print("Splitting into high and low zoom lists") + print("[%s] Splitting into high and low zoom lists" % (time.ctime())) # contains zooms 0 until group zoom. the jobs between the group # zoom and RAWR zoom are merged into the parent at group zoom. @@ -162,25 +165,29 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): with gzip.open(missing_meta_file, 'r') as fh: for line in fh: - c = deserialize_coord(line) - if c.zoom < split_zoom: + this_coord = deserialize_coord(line) + if this_coord.zoom < split_zoom: # in order to not have too many jobs in the queue, we # group the low zoom jobs to the zoom_max (usually 7) - if c.zoom > zoom_max: - c = c.zoomTo(zoom_max).container() + if this_coord.zoom > zoom_max: + this_coord = this_coord.zoomTo(zoom_max).container() - missing_low[c] = True + missing_low[this_coord] = True else: - # if the group of jobs at zoom_max would be too big - # (according to big_jobs[]) then enqueue the original - # coordinate. this is to prevent a "long tail" of huge - # job groups. - job_coord = c.zoomTo(zoom_max).container() - if not big_jobs[job_coord]: - c = job_coord - - missing_high[c] = True + # check the job set at every zoom starting from + # zoom max - if we don't find it at a lower zoom + # register it at split_zoom + job_registered = False + for this_zoom in range(zoom_max, split_zoom): + lower_zoom_job_coord = this_coord.zoomTo(this_zoom) + if job_set[lower_zoom_job_coord]: + missing_high[lower_zoom_job_coord] = True + job_registered = True + break + + if not job_registered: + missing_high[this_coord] = True with open(missing_low_file, 'w') as fh: for coord in missing_low: @@ -188,13 +195,16 @@ def missing_tiles_split(self, split_zoom, zoom_max, big_jobs): with open(missing_high_file, 'w') as fh: for coord in missing_high: + print("REMOVEME: %s" % serialize_coord(coord)) fh.write(serialize_coord(coord) + "\n") + print("[%s] Done splitting into high and low zoom lists" % (time.ctime())) yield MissingTiles(missing_low_file, missing_high_file) finally: shutil.rmtree(tmpdir) + @contextmanager def present_tiles(self): """ @@ -235,7 +245,7 @@ def __call__(self, parent): assert dz >= 0 width = 1 << dz - size = 0 + sizes = {} for dx in range(width): for dy in range(width): coord = Coordinate( @@ -244,19 +254,111 @@ def __call__(self, parent): row=((parent.row << dz) + dy)) key = gen(self.prefix, coord, 'zip') response = s3.head_object(Bucket=self.bucket, Key=key) - size += response['ContentLength'] + sizes[coord] = response['ContentLength'] + + # now sum the sizes from rawr zoom to parent zoom + for coord in sizes.keys(): + for zoom in range(self.rawr_zoom, parent.zoom, -1): + parent_coord = coord.zoomTo(zoom-1).container() + if parent_coord not in sizes: + sizes[parent_coord] = 0 + sizes[parent_coord] += sizes[coord] + print("Completed parent %s" % serialize_coord(parent_coord)) + + return parent, sizes + + +class TileSpecifier(object): + ORDER_KEY = "order" + MEM_GB_KEY = "mem_gb" + + """ + Provides the ability to sort tiles based on an ordering and specify memory reqs + """ + + def __init__(self, default_mem_gb=8, spec_dict={}): + """ + :param default_mem_gb: + :param spec_dict: keys are of form "//" value is a map with keys "mem_gb" and "order". e.g. + {"7/3/10": {"mem_gb": 2.5, "order": 12}, + "10/12/3": {"mem_gb": 0.3, "order": 10}} + """ + self.default_mem_gb = default_mem_gb + self.spec_dict = spec_dict + + @staticmethod + def from_ordering_file(filename, default_mem_gb=8): + """ + Expects a csv with at least these columns: + zoom, x, y, mem_gb + The lines in the file should be ordered in the desired queue ordering (e.g. first data line contains first tile to enqueue) + :return: + TileSpecifier with ordering and memory requirements expressed in + """ + import csv + + spec_dict = {} + with open(filename) as fh: + order_idx = 0 + reader = csv.DictReader(fh) + for row in reader: + try: + # TODO: add checks for keys + coord_str = serialize_coord(Coordinate(int(row['y']), int(row['x']), int(row['zoom']))) + spec_dict[coord_str] = \ + {TileSpecifier.MEM_GB_KEY: float(row[TileSpecifier.MEM_GB_KEY]), TileSpecifier.ORDER_KEY: order_idx} + order_idx += 1 + except: + print("Error for line parsed as %s in TileSpecifier.from_ordering_file - details %s" % (row, sys.exc_info()[0])) + continue + + return TileSpecifier(default_mem_gb, spec_dict) + @staticmethod + def from_coord_list(coord_list, default_mem_gb): + spec_dict = {} + for i in range(len(coord_list)): + coord_str = serialize_coord(coord_list[i]) + spec_dict[coord_str] = {TileSpecifier.ORDER_KEY: i, TileSpecifier.MEM_GB_KEY: default_mem_gb} + + return TileSpecifier(default_mem_gb, spec_dict) + + def reorder(self, coord_list): + """ + Using the sort ordering for this specifier, reorders the tiles in coord_list. + coords that are in the coord_list that aren't mentioned in the ordering will go first. + :return: + """ + return sorted(coord_list, key=lambda coord: self.get_ordering_val(coord)) + + def get_ordering_val(self, coord): + """ + coord is type string + returns ordering location for coord. The lower it is the earlier in the order. + If there is no ordering specified for this coordinate, returns 0 + """ + if coord in self.spec_dict: + return self.spec_dict[coord][self.ORDER_KEY] + + return 0 - return parent, size + def get_mem_reqs_mb(self, coord_str): + """ + returns the specified memory requirement in megabytes for the coordinate. If none are specified, + returns default_gb + """ + if coord_str in self.spec_dict: + return self.spec_dict[coord_str][self.MEM_GB_KEY] * 1024 or self.default_mem_gb + else: + return self.default_mem_gb * 1024 -def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, - size_threshold, pool_size=30): +def _distribute_jobs_by_raw_tile_size(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, + size_threshold, pool_size=30): """ Look up the RAWR tiles in the rawr_bucket under the prefix and with the given key format, group the RAWR tiles (usually at zoom 10) by the job - group zoom (usually 7) and sum their sizes. Return a map-like object - which has a truthy value for those Coordinates at group_zoom which sum - to size_threshold or more. + group zoom (usually 7) and sum their sizes. Return an ordered list of job coordinates + by descending raw size sum. A pool size of 30 seems to work well; the point of the pool is to hide the latency of S3 requests, so pretty quickly hits the law of diminishing @@ -267,13 +369,15 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, p = Pool(pool_size) job_sizer = _JobSizer(rawr_bucket, prefix, key_format_type, rawr_zoom) - big_jobs = CoordSet(group_zoom, min_zoom=group_zoom) - + grouped_by_rawr_tile_size = [] + print("[%s] Bucketizing jobs by zoom using size of raw tiles" % time.ctime()) # loop over each column individually. this limits the number of concurrent # tasks, which means we don't waste memory maintaining a huge queue of # pending tasks. and when something goes wrong, the stacktrace isn't buried # in a million others. num_coords = 1 << group_zoom + all_sizes = {} + grouping_queue = queue.Queue() for x in range(num_coords): # kick off tasks async. each one knows its own coordinate, so we only # need to track the handle to know when its finished. @@ -281,33 +385,110 @@ def _big_jobs(rawr_bucket, prefix, key_format_type, rawr_zoom, group_zoom, for y in range(num_coords): coord = Coordinate(zoom=group_zoom, column=x, row=y) tasks.append(p.apply_async(job_sizer, (coord,))) + grouping_queue.put_nowait(coord) # queue for future size counting # collect tasks and put them into the big jobs list. for task in tasks: - coord, size = task.get() - if size >= size_threshold: - big_jobs[coord] = True + coord, sizes = task.get() + all_sizes.update(sizes) + print("REMOVEME: Finished the %s column" % x) + + # now use all_sizes plus the size_threshold to find the lowest zoom we can group the tiles in + counts_at_zoom = {} + while not grouping_queue.empty(): + this_coord = grouping_queue.get_nowait() + this_size = all_sizes[this_coord] + + if this_size <= size_threshold or this_coord.zoom == rawr_zoom: + grouped_by_rawr_tile_size.append(this_coord) + grouping_queue.task_done() + + if this_coord.zoom not in counts_at_zoom: + counts_at_zoom[this_coord.zoom] = 0 + counts_at_zoom[this_coord.zoom] += 1 + else: + top_left_child = this_coord.zoomBy(1) + + grouping_queue.put_nowait(top_left_child) + grouping_queue.put_nowait(top_left_child.down(1)) + grouping_queue.put_nowait(top_left_child.right(1)) + grouping_queue.put_nowait(top_left_child.down(1).right(1)) + + print("[%s] Done bucketizing jobs - count by zoom %s" % (time.ctime(), counts_at_zoom)) + # validate counts by zoom - expecting the equivalent of 4^10 zoom 10 jobs. + counts_at_zoom_sum = 0 + for z in counts_at_zoom.keys(): + count_at_this_zoom = counts_at_zoom[z] + zoom_10_equiv_count = count_at_this_zoom * (4 ** (10 - z)) + counts_at_zoom_sum += zoom_10_equiv_count + if counts_at_zoom_sum == 4**10: + print("Count of jobs by zoom is correct") + else: + print("Count of jobs by zoom is off by %s" % counts_at_zoom_sum - 4**10) + + ordered_job_list = sorted(grouped_by_rawr_tile_size, key=lambda coord: all_sizes[coord], reverse=True) + return ordered_job_list + + +def viable_container_overrides(mem_mb): + """ + Turns a number into the next highest even multiple that AWS will accept, and the min number of CPUs you need for that amount + :param mem_mb: (int) the megabytes of memory you'd request in an ideal world + :return: the amount of mem you need to request for AWS batch to honor it, the amount of vcpus you must request + """ + mem_mb = int(mem_mb) - return big_jobs + if mem_mb < 512: + return 512, 1 + if mem_mb % 1024 == 0: + return mem_mb, 1 -def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, mem_multiplier=1.0, mem_max=32 * 1024): + mem_gb_truncated = int(mem_mb / 1024) + next_gb = mem_gb_truncated + 1 + desired_mem_mb = next_gb * 1024 + + max_mem_per_vcpu = 8 * 1024 + vcpus = 1 + int((desired_mem_mb - 1)/max_mem_per_vcpu) + + return desired_mem_mb, vcpus + + +def enqueue_tiles(config_file, tile_list_file, check_metatile_exists, tile_specifier=TileSpecifier(), + mem_multiplier=1.0, mem_max=32 * 1024): from tilequeue.command import make_config_from_argparse from tilequeue.command import tilequeue_batch_enqueue from make_rawr_tiles import BatchEnqueueArgs - args = BatchEnqueueArgs(config_file, None, tile_list_file, None) os.environ['TILEQUEUE__BATCH__CHECK-METATILE-EXISTS'] = ( str(check_metatile_exists).lower()) - with open(args.config) as fh: + with open(config_file) as fh: cfg = make_config_from_argparse(fh) - update_memory_request(cfg, mem_multiplier, mem_max) - tilequeue_batch_enqueue(cfg, args) + with open(tile_list_file, 'r') as tile_list: + coord_lines = [line.strip() for line in tile_list.readlines()] + + reordered_lines = tile_specifier.reorder(coord_lines) + print("[%s] Starting to enqueue %d tile batches" % (time.ctime(), len(reordered_lines))) + for coord_line in reordered_lines: + # override memory requirements for this job with what the tile_specifier tells us + mem_mb = int(tile_specifier.get_mem_reqs_mb(coord_line)) + adjusted_mem = mem_mb * mem_multiplier -def update_memory_request(cfg, mem_multiplier, mem_max): - cfg.yml["batch"]["memory"] = int(min(cfg.yml["batch"]["memory"] * mem_multiplier, mem_max)) + # now that we know what we want, pick something AWS actually supports + viable_mem_request, required_min_cpus = viable_container_overrides(adjusted_mem) + print("REMOVEME: [%s] enqueueing %s at %s mem mb and %s cpus" % (time.ctime(), coord_line, viable_mem_request, required_min_cpus)) + update_container_overrides(cfg, viable_mem_request, mem_max, required_min_cpus) + + args = BatchEnqueueArgs(config_file, coord_line, None, None) + tilequeue_batch_enqueue(cfg, args) + print("[%s] Done enqueuing tile batches" % time.ctime()) + + +def update_container_overrides(cfg, mem_mb, mem_max, cpus): + cfg.yml["batch"]["memory"] = int(min(mem_mb, mem_max)) + cfg.yml["batch"]["vcpus"] = cpus # adaptor class for MissingTiles to see just the high zoom parts, this is used @@ -336,16 +517,17 @@ def missing_file(self, missing): # certain number of retries. class TileRenderer(object): - def __init__(self, tile_finder, big_jobs, split_zoom, zoom_max, allowed_missing_tiles=0): + def __init__(self, tile_finder, high_zoom_job_set, split_zoom, zoom_max, tile_specifier=TileSpecifier(), allowed_missing_tiles=0): self.tile_finder = tile_finder - self.big_jobs = big_jobs + self.high_zoom_job_set = high_zoom_job_set self.split_zoom = split_zoom self.zoom_max = zoom_max self.allowed_missing_tiles = allowed_missing_tiles + self.tile_specifier = tile_specifier def _missing(self): return self.tile_finder.missing_tiles_split( - self.split_zoom, self.zoom_max, self.big_jobs) + self.split_zoom, self.zoom_max, self.high_zoom_job_set) def render(self, num_retries, lense): mem_max = 32 * 1024 # 32 GiB @@ -372,7 +554,7 @@ def render(self, num_retries, lense): (count, lense.description, ', '.join(sample))) enqueue_tiles(lense.config, missing_tile_file, - check_metatile_exists, mem_multiplier, mem_max) + check_metatile_exists, self.tile_specifier, mem_multiplier, mem_max) else: with self._missing() as missing: @@ -384,6 +566,12 @@ def render(self, num_retries, lense): % (count, lense.description, num_retries, ', '.join(sample))) +def make_coord_set(coords, max_zoom, min_zoom): + coord_set = CoordSet(max_zoom, min_zoom) + for coord in coords: + coord_set[coord] = True + return coord_set + if __name__ == '__main__': import argparse @@ -418,13 +606,16 @@ def render(self, num_retries, lense): "prefixed with the date or hash first.") parser.add_argument('--metatile-size', default=8, type=int, help='Metatile size (in 256px tiles).') - parser.add_argument('--size-threshold', default=350000000, type=int, + parser.add_argument('--size-threshold', default=80000000, type=int, help='If all the RAWR tiles grouped together are ' 'bigger than this, split the job up into individual ' 'RAWR tiles.') parser.add_argument('--allowed-missing-tiles', default=2, type=int, help='The maximum number of missing metatiles allowed ' 'to continue the build process.') + parser.add_argument('--tile-specifier-file', + help="optional csv containing lines in desired queue order with columns " + "for zoom, x, y, and mem_gb to specify ordering and memory requirements") args = parser.parse_args() assert_run_id_format(args.run_id) @@ -436,7 +627,7 @@ def render(self, num_retries, lense): # TODO: split zoom and zoom max should come from config. split_zoom = 10 - zoom_max = 7 + zoom_max = 6 region = args.region or os.environ.get('AWS_DEFAULT_REGION') if region is None: @@ -452,11 +643,13 @@ def render(self, num_retries, lense): buckets.missing, buckets.meta, date_prefix, missing_bucket_date_prefix, region, args.key_format_type, args.config, metatile_max_zoom) - big_jobs = _big_jobs( + jobs_list = _distribute_jobs_by_raw_tile_size( buckets.rawr, missing_bucket_date_prefix, args.key_format_type, split_zoom, zoom_max, args.size_threshold) - tile_renderer = TileRenderer(tile_finder, big_jobs, split_zoom, zoom_max, args.allowed_missing_tiles) + tile_specifier = TileSpecifier.from_coord_list(jobs_list, 4) + + tile_renderer = TileRenderer(tile_finder, make_coord_set(jobs_list, split_zoom, zoom_max), split_zoom, zoom_max, tile_specifier, args.allowed_missing_tiles) tile_renderer.render(args.retries, LowZoomLense(args.low_zoom_config)) diff --git a/docker/meta-low-zoom-batch/Dockerfile b/docker/meta-low-zoom-batch/Dockerfile index 2139dc3..1c8544e 100644 --- a/docker/meta-low-zoom-batch/Dockerfile +++ b/docker/meta-low-zoom-batch/Dockerfile @@ -1,7 +1,7 @@ FROM python:2 RUN apt-get -y update \ - && apt-get -y install libgeos-dev libboost-python-dev \ + && apt-get -y install libgeos-dev libboost-python-dev time \ && rm -rf /var/lib/apt/lists/* COPY raw_tiles /usr/src/raw_tiles diff --git a/docker/rawr-batch/Dockerfile b/docker/rawr-batch/Dockerfile index c3b0d05..d234ead 100644 --- a/docker/rawr-batch/Dockerfile +++ b/docker/rawr-batch/Dockerfile @@ -1,7 +1,7 @@ FROM python:2 RUN apt-get -y update \ - && apt-get -y install libgeos-dev \ + && apt-get -y install libgeos-dev time \ && rm -rf /var/lib/apt/lists/* COPY raw_tiles /usr/src/raw_tiles