From 89745c300a70ed0cb0d72062d515d1e4646d51d0 Mon Sep 17 00:00:00 2001 From: Daniel Abercrombie Date: Tue, 22 Jan 2019 11:49:49 -0500 Subject: [PATCH 1/4] Updates that didn't make it to GitHub. --- lib/core/components/impl/mysqlstore.py | 26 +++- lib/core/components/impl/socketappserver.py | 2 +- lib/dataformat/_namespace.py | 49 ++++++-- lib/dataformat/site.py | 19 +-- lib/dealer/plugins/popularity.py | 10 ++ lib/detox/conditions.py | 11 +- lib/detox/detoxpolicy.py | 3 + lib/fileop/history.py | 35 ------ lib/fileop/impl/fts.py | 40 ++++++- lib/fileop/rlfsm.py | 112 +++++++++++++++--- lib/policy/condition.py | 30 +++++ lib/policy/predicates.py | 2 +- lib/policy/variables.py | 4 + lib/request/common.py | 27 +++++ lib/request/copy.py | 30 +++++ lib/source/impl/staticsiteinfo.py | 38 +++++- lib/source/siteinfo.py | 4 +- lib/utils/parallel.py | 4 + lib/web/modules/inventory/__init__.py | 21 ++-- lib/web/modules/inventory/_customize.py | 18 ++- lib/web/modules/inventory/transferrequests.py | 1 + lib/web/modules/request/copy.py | 16 ++- lib/web/modules/request/mixin.py | 24 ++++ lib/web/modules/transfers/history.py | 15 +-- lib/web/server.py | 7 +- 25 files changed, 439 insertions(+), 109 deletions(-) diff --git a/lib/core/components/impl/mysqlstore.py b/lib/core/components/impl/mysqlstore.py index 7f671a41..6ecb0ccd 100644 --- a/lib/core/components/impl/mysqlstore.py +++ b/lib/core/components/impl/mysqlstore.py @@ -2,10 +2,13 @@ import logging import fnmatch import hashlib +import os from dynamo.core.components.persistency import InventoryStore from dynamo.utils.interface.mysql import MySQL from dynamo.dataformat import Configuration, Partition, Dataset, Block, File, Site, SitePartition, Group, DatasetReplica, BlockReplica +from dynamo.policy.condition import Condition +from dynamo.policy.variables import site_variables LOG = logging.getLogger(__name__) @@ -17,6 +20,16 @@ def __init__(self, config): self._mysql = MySQL(config.db_params) + config_path = os.getenv('DYNAMO_SERVER_CONFIG', '/etc/dynamo/fom_config.json') + + self.fom_config = Configuration(config_path) + self.fom_conditions = [] + + for condition_text, module, conf in self.fom_config.rlfsm.transfer: + if condition_text is not None: # default + condition = Condition(condition_text, site_variables) + self.fom_conditions.append((condition, module, conf)) + def close(self): self._mysql.close() @@ -484,9 +497,9 @@ def _save_sites(self, sites): #override self._mysql.query('CREATE TABLE `sites_tmp` LIKE `sites`') - fields = ('id', 'name', 'host', 'storage_type', 'backend', 'status') + fields = ('id', 'name', 'host', 'storage_type', 'status') mapping = lambda site: (site.id, site.name, site.host, Site.storage_type_name(site.storage_type), \ - site.backend, Site.status_name(site.status)) + Site.status_name(site.status)) num = self._mysql.insert_many('sites_tmp', fields, mapping, sites, do_update = False) @@ -746,6 +759,10 @@ def _yield_sites(self, sites_tmp = None): #override sid = site_id ) + for cond, module, conf in self.fom_conditions: + if cond.match(site): + site.x509proxy = conf.x509proxy + all_chains = {} for protocol, chain_id, idx, lfn, pfn in self._mysql.xquery(mapping_sql, site_id): try: @@ -1119,6 +1136,7 @@ def delete_blockreplica(self, block_replica): #override return sql = 'DELETE FROM `block_replicas` WHERE `block_id` = %s AND `site_id` = %s' + self._mysql.query(sql, block_id, site_id) sql = 'DELETE FROM `block_replica_files` WHERE `block_id` = %s AND `site_id` = %s' @@ -1232,8 +1250,8 @@ def delete_partition(self, partition): #override self._mysql.query(sql, partition.name) def save_site(self, site): #override - fields = ('name', 'host', 'storage_type', 'backend', 'status') - self._mysql.insert_update('sites', fields, site.name, site.host, site.storage_type, site.backend, site.status) + fields = ('name', 'host', 'storage_type', 'status') + self._mysql.insert_update('sites', fields, site.name, site.host, site.storage_type, site.status) site_id = self._mysql.last_insert_id if site_id != 0: diff --git a/lib/core/components/impl/socketappserver.py b/lib/core/components/impl/socketappserver.py index 2d299551..789f9ad4 100644 --- a/lib/core/components/impl/socketappserver.py +++ b/lib/core/components/impl/socketappserver.py @@ -250,7 +250,7 @@ def _process_application(self, conn, addr): dn = '' for rdn in user_cert_data['subject']: - dn += '/' + '+'.join('%s=%s' % (DN_TRANSLATION[key], value) for key, value in rdn if key in DN_TRANSLATION) + dn += '/' + '+'.join('%s=%s' % (DN_TRANSLATION[key], value) for key, value in rdn) user_info = master.identify_user(dn = dn, check_trunc = True) diff --git a/lib/dataformat/_namespace.py b/lib/dataformat/_namespace.py index 5a7387a1..b84f2583 100644 --- a/lib/dataformat/_namespace.py +++ b/lib/dataformat/_namespace.py @@ -1,24 +1,43 @@ # Namespace-specific rules for e.g. object name conversions +import re + from exceptions import ObjectError def Dataset_format_software_version(value): - return value + if type(value) is str: + formatted = eval(value) + elif type(value) is not tuple: + # some iterable + formatted = tuple(value) + else: + formatted = value + + if type(formatted) is not tuple or len(formatted) != 4: + raise ObjectError('Invalid software version %s' % repr(value)) + + return formatted def Block_to_internal_name(name_str): - return name_str + # block name format: [8]-[4]-[4]-[4]-[12] where [n] is an n-digit hex. + try: + return long(name_str.replace('-', ''), 16) + except ValueError: + raise ObjectError('Invalid block name %s' % name_str) def Block_to_real_name(name): - return name + full_string = hex(name).replace('0x', '')[:-1] # last character is 'L' + if len(full_string) < 32: + full_string = '0' * (32 - len(full_string)) + full_string + + return full_string[:8] + '-' + full_string[8:12] + '-' + full_string[12:16] + '-' + full_string[16:20] + '-' + full_string[20:] def Block_to_full_name(dataset_name, block_real_name): return dataset_name + '#' + block_real_name def Block_from_full_name(full_name): - """ - @param full_name Full name of the block - @return (dataset name, block internal name) - """ + # return dataset name, block internal name + delim = full_name.find('#') if delim == -1: raise ObjectError('Invalid block name %s' % full_name) @@ -27,24 +46,30 @@ def Block_from_full_name(full_name): def customize_dataset(Dataset): # Enumerator for dataset type. - # Starting from 1 to play better with MySQL enums - Dataset._data_types = ['unknown', 'production', 'test'] + # Starting from 1 to play better with MySQL + Dataset._data_types = ['unknown', 'align', 'calib', 'cosmic', 'data', 'lumi', 'mc', 'raw', 'test'] for name, val in zip(Dataset._data_types, range(1, len(Dataset._data_types) + 1)): # e.g. Dataset.TYPE_UNKNOWN = 1 setattr(Dataset, 'TYPE_' + name.upper(), val) - Dataset.SoftwareVersion.field_names = ('version',) + Dataset.SoftwareVersion.field_names = ('cycle', 'major', 'minor', 'suffix') Dataset.format_software_version = staticmethod(Dataset_format_software_version) + Dataset.name_pattern = re.compile('/[^/]+/[^/]+/[^/]+') + def customize_block(Block): Block.to_internal_name = staticmethod(Block_to_internal_name) Block.to_real_name = staticmethod(Block_to_real_name) Block.to_full_name = staticmethod(Block_to_full_name) Block.from_full_name = staticmethod(Block_from_full_name) + hex_chars = '[0-9a-fA-F]' + Block.name_pattern = re.compile('{h}{{8}}-{h}{{4}}-{h}{{4}}-{h}{{4}}-{h}{{12}}'.format(h = hex_chars)) + def customize_file(File): - pass + File.checksum_algorithms = ('crc32', 'adler32') def customize_blockreplica(BlockReplica): - pass + BlockReplica._use_file_ids = True + diff --git a/lib/dataformat/site.py b/lib/dataformat/site.py index 22a194c6..dbd2f86a 100644 --- a/lib/dataformat/site.py +++ b/lib/dataformat/site.py @@ -8,7 +8,7 @@ class Site(object): """Represents a site. Owns lists of dataset and block replicas, which are organized into partitions.""" __slots__ = ['_name', 'id', 'host', 'storage_type', 'backend', 'status', 'filename_mapping', - '_dataset_replicas', 'partitions'] + '_dataset_replicas', 'partitions', 'x509proxy'] _storage_types = ['disk', 'mss', 'buffer', 'unknown'] TYPE_DISK, TYPE_MSS, TYPE_BUFFER, TYPE_UNKNOWN = range(1, len(_storage_types) + 1) @@ -90,7 +90,7 @@ def map(self, lfn): return None - def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', status = STAT_UNKNOWN, filename_mapping = {}, sid = 0): + def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', status = STAT_UNKNOWN, filename_mapping = {}, x509proxy = None, sid = 0): self._name = name self.host = host self.storage_type = Site.storage_type_val(storage_type) @@ -107,18 +107,21 @@ def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', stat self.partitions = {} # {Partition: SitePartition} + self.x509proxy = x509proxy + def __str__(self): - return 'Site %s (host=%s, storage_type=%s, backend=%s, status=%s, id=%d)' % \ - (self._name, self.host, Site.storage_type_name(self.storage_type), self.backend, Site.status_name(self.status), self.id) + return 'Site %s (host=%s, storage_type=%s, backend=%s, status=%s, x509=%s, id=%d)' % \ + (self._name, self.host, Site.storage_type_name(self.storage_type), self.backend, Site.status_name(self.status), self.x509proxy, self.id) def __repr__(self): - return 'Site(%s,%s,\'%s\',%s,\'%s\',%s,%d)' % \ - (repr(self._name), repr(self.host), Site.storage_type_name(self.storage_type), repr(self.backend), Site.status_name(self.status), repr(self.filename_mapping), self.id) + return 'Site(%s,%s,\'%s\',%s,\'%s\',%s,%s,%d)' % \ + (repr(self._name), repr(self.host), Site.storage_type_name(self.storage_type), repr(self.backend), Site.status_name(self.status), repr(self.filename_mapping), repr(self.x509proxy), self.id) def __eq__(self, other): return self is other or \ (self._name == other._name and self.host == other.host and self.storage_type == other.storage_type and \ - self.backend == other.backend and self.status == other.status and self.filename_mapping == other.filename_mapping) + self.backend == other.backend and self.status == other.status and \ + self.filename_mapping == other.filename_mapping and self.x509proxy == other.x509proxy) def __ne__(self, other): return not self.__eq__(other) @@ -134,6 +137,8 @@ def copy(self, other): for protocol, mapping in other.filename_mapping.iteritems(): self.filename_mapping[protocol] = Site.FileNameMapping(mapping._chains) + self.x509proxy = other.x509proxy + def embed_into(self, inventory, check = False): updated = False diff --git a/lib/dealer/plugins/popularity.py b/lib/dealer/plugins/popularity.py index 689b902d..b2804d7b 100644 --- a/lib/dealer/plugins/popularity.py +++ b/lib/dealer/plugins/popularity.py @@ -2,6 +2,8 @@ import math from dynamo.dataformat import Dataset +from dynamo.policy.variables import replica_variables +from dynamo.policy.condition import Condition from base import BaseHandler, DealerRequest LOG = logging.getLogger(__name__) @@ -19,6 +21,10 @@ def __init__(self, config): self.max_dataset_size = config.max_dataset_size * 1.e+12 self.max_replication = config.max_replication self.request_to_replica_threshold = config.request_to_replica_threshold + try: + self.condition = Condition(config.condition, replica_variables) + except: + self.condition = None self._datasets = [] @@ -36,6 +42,7 @@ def get_requests(self, inventory, policy): # override LOG.debug('Dataset %s request weight %f', dataset.name, request_weight) dataset_in_source_groups = False + for dr in dataset.replicas: for br in dr.block_replicas: if br.group.name in self.source_groups: @@ -46,6 +53,9 @@ def get_requests(self, inventory, policy): # override if not dataset_in_source_groups: continue + if not self.condition.match(dataset): + continue + if request_weight <= 0.: continue diff --git a/lib/detox/conditions.py b/lib/detox/conditions.py index 14e55c71..ca558e73 100644 --- a/lib/detox/conditions.py +++ b/lib/detox/conditions.py @@ -1,10 +1,17 @@ +import logging + from dynamo.policy.condition import Condition from dynamo.policy.variables import site_variables, replica_variables +from dynamo.policy.predicates import Predicate + +LOG = logging.getLogger(__name__) + class ReplicaCondition(Condition): def __init__(self, text): Condition.__init__(self, text, replica_variables) - + self.text = text + def get_matching_blocks(self, replica): """If this is a block-level condition, return the list of matching block replicas.""" @@ -18,3 +25,5 @@ def get_matching_blocks(self, replica): class SiteCondition(Condition): def __init__(self, text): Condition.__init__(self, text, site_variables) + + diff --git a/lib/detox/detoxpolicy.py b/lib/detox/detoxpolicy.py index 01111b54..efdbfac6 100644 --- a/lib/detox/detoxpolicy.py +++ b/lib/detox/detoxpolicy.py @@ -83,9 +83,11 @@ def evaluate(self, replica): if self.condition.match(replica): self.has_match = True + if issubclass(self.decision.action_cls, BlockAction): # block-level matching_block_replicas = self.condition.get_matching_blocks(replica) + if len(matching_block_replicas) == len(replica.block_replicas): # but all blocks matched - return dataset level action = self.decision.action_cls.dataset_level(self) @@ -246,3 +248,4 @@ def evaluate(self, replica): replica.block_replicas.update(block_replicas_tmp) return actions + diff --git a/lib/fileop/history.py b/lib/fileop/history.py index 070e80e8..71ec7d82 100644 --- a/lib/fileop/history.py +++ b/lib/fileop/history.py @@ -30,41 +30,6 @@ def histogram_binning(tmin,tmax): return (nbins,dt) -#class OperationFilter: -# """ -# Allows to generate a sql filter string to be applied to the query of historic file -# operations tables. -# """ -# def __init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'): -# self.source_filter = source_filter -# self.mss_filter = mss_filter -# self.period = period -# self.upto = upto -# self.exit_code = exit_code -# -# def generate_filter_string(self): -# -# where_sql = "" -# -# return where_sql -# -#class DeletionFilter(OperationFilter): -# """ -# Allows to generate a sql filter string to be applied to the query of historic file -# deletions tables. -# """ -# def __init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'): -# OperationFilter.__init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0') -# -#class TransferFilter(OperationFilter): -# """ -# Allows to generate a sql filter string to be applied to the query of historic file -# transfer tables. -# """ -# def __init__(self,source_filter="",destination_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'): -# OperationFilter.__init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0') -# self.destintation_filter = destination_filter - class Sites: """ Defines the sites. diff --git a/lib/fileop/impl/fts.py b/lib/fileop/impl/fts.py index a1088f86..e194954b 100644 --- a/lib/fileop/impl/fts.py +++ b/lib/fileop/impl/fts.py @@ -49,6 +49,7 @@ def __init__(self, config): # Proxy to be forwarded to FTS self.x509proxy = config.get('x509proxy', None) + self.x509proxy_orig = config.get('x509proxy', None) # Bookkeeping device self.db = MySQL(config.db_params) @@ -67,7 +68,15 @@ def num_pending_transfers(self): #override file_states = ['SUBMITTED', 'READY', 'ACTIVE', 'STAGING', 'STARTED'] jobs = self._ftscall('list_jobs', state_in = ['SUBMITTED', 'ACTIVE', 'STAGING']) + from random import shuffle + shuffle(jobs) + + total_count = 0 for job in jobs: + total_count = total_count + 1 +# if total_count > 50: +# LOG.info('-- in fts: total_count > ' + str(total_count)) +# break job_info = self._ftscall('get_job_status', job['job_id'], list_files = True) for file_info in job_info['files']: if file_info['file_state'] in file_states: @@ -87,7 +96,15 @@ def num_pending_deletions(self): #override file_states = ['SUBMITTED', 'READY', 'ACTIVE'] jobs = self._ftscall('list_jobs', state_in = ['SUBMITTED', 'ACTIVE']) + from random import shuffle + shuffle(jobs) + + total_count = 0 for job in jobs: + total_count = total_count + 1 +# if total_count > 1: +# LOG.info('-- in fts: total_count > ' + str(total_count)) +# break job_info = self._ftscall('get_job_status', job['job_id'], list_files = True) for file_info in job_info['dm']: if file_info['file_state'] in file_states: @@ -127,6 +144,20 @@ def start_transfers(self, batch_id, batch_tasks): #override dest_pfn = sub.destination.to_pfn(lfn, 'gfal2') source_pfn = task.source.to_pfn(lfn, 'gfal2') + self.x509proxy = self.x509proxy_orig + #if sub.destination.x509proxy is not None: + self.x509proxy = sub.destination.x509proxy + + if task.source.storage_type == Site.TYPE_MSS: + self.x509proxy = task.source.x509proxy + + #LOG.info("CCCCCCCCCCC") + #LOG.info("File is: %s" % sub.file.lfn) + #LOG.info("Destination is: %s" % sub.destination.name) + #LOG.info("Source is: %s" % task.source.name) + #LOG.info("x509 is: %s" % self.x509proxy) + #LOG.info("CCCCCCCCCCC") + if dest_pfn is None or source_pfn is None: # either gfal2 is not supported or lfn could not be mapped LOG.warning('Could not obtain PFN for %s at %s or %s', lfn, sub.destination.name, task.source.name) @@ -173,7 +204,8 @@ def start_transfers(self, batch_id, batch_tasks): #override if len(transfers) != 0: LOG.debug('Submit new transfer job for %d files', len(transfers)) - job = fts3.new_job(transfers, retry = self.fts_retry, overwrite = True, verify_checksum = verify_checksum, metadata = self.metadata_string) + job = fts3.new_job(transfers, retry = self.fts_retry, overwrite = True, + verify_checksum = verify_checksum, metadata = self.metadata_string) success = self._submit_job(job, 'transfer', batch_id, dict((pfn, task.id) for pfn, task in t_pfn_to_task.iteritems())) for transfer in transfers: @@ -317,6 +349,7 @@ def _ftscallurl(self, url): return self._do_ftscall(url = url) def _do_ftscall(self, binding = None, url = None): + if self._context is None: # request_class = Request -> use "requests"-based https call (instead of default PyCURL, # which may not be able to handle proxy certificates depending on the cURL installation) @@ -337,10 +370,12 @@ def _do_ftscall(self, binding = None, url = None): LOG.debug('FTS: %s', reqstring) wait_time = 1. + for attempt in xrange(10): try: if binding is not None: method, args, kwd = binding + return getattr(fts3, method)(context, *args, **kwd) else: return json.loads(context.get(url)) @@ -368,7 +403,7 @@ def _submit_job(self, job, optype, batch_id, pfn_to_tid): LOG.error('Failed to submit %s to FTS: Exception %s (%s)', optype, exc_type.__name__, str(exc)) return False - LOG.debug('FTS job id: %s', job_id) + LOG.info('FTS job id: %s', job_id) # list of file-level operations (one-to-one with pfn) try: @@ -456,6 +491,7 @@ def _get_status(self, batch_id, optype): result = self._ftscall('get_job_status', job_id = job_id, list_files = True) except: LOG.error('Failed to get job status for FTS job %s', job_id) + LOG.error(optype) continue if optype == 'transfer' or optype == 'staging': diff --git a/lib/fileop/rlfsm.py b/lib/fileop/rlfsm.py index 36d9e247..9c34d24f 100644 --- a/lib/fileop/rlfsm.py +++ b/lib/fileop/rlfsm.py @@ -5,6 +5,8 @@ import datetime import threading import logging +import datetime +import time from dynamo.fileop.base import FileQuery from dynamo.fileop.transfer import FileTransferOperation, FileTransferQuery @@ -45,6 +47,14 @@ def __init__(self, subscription, source): self.subscription = subscription self.source = source + def __str__(self): + s = '' + s += self.source.name + s += '->' + s += self.subscription.destination.name + s += ' ' + self.subscription.file.lfn + return s + class Desubscription(object): __slots__ = ['id', 'status', 'file', 'site'] @@ -88,7 +98,7 @@ def __init__(self, config = None): condition = Condition(condition_text, site_variables) self.transfer_operations.append((condition, FileTransferOperation.get_instance(module, conf))) - + if 'transfer_query' in config: self.transfer_queries = [] for condition_text, module, conf in config.transfer_query: @@ -189,7 +199,7 @@ def transfer_files(self, inventory): @param inventory The inventory. """ - + self._cleanup() LOG.debug('Clearing cancelled transfer tasks.') @@ -201,7 +211,7 @@ def transfer_files(self, inventory): return LOG.debug('Fetching subscription status from the file operation agent.') - self._update_status('transfer') + self._update_status('transfer', inventory) if self.cycle_stop.is_set(): return @@ -279,7 +289,7 @@ def issue_tasks(op, my_tasks): num_success = 0 num_failure = 0 num_batches = 0 - + for condition, op in self.transfer_operations: if condition is None: default_op = op @@ -310,6 +320,7 @@ def issue_tasks(op, my_tasks): num_success += ns num_failure += nf + if num_success + num_failure != 0: LOG.info('Issued transfer tasks: %d success, %d failure. %d batches.', num_success, num_failure, num_batches) else: @@ -339,7 +350,7 @@ def delete_files(self, inventory): return LOG.debug('Fetching deletion status from the file operation agent.') - completed = self._update_status('deletion') + completed = self._update_status('deletion', inventory) LOG.debug('Recording candidates for empty directories.') self._set_dirclean_candidates(completed, inventory) @@ -533,7 +544,8 @@ def get_subscriptions(self, inventory, op = None, status = None): subscriptions = [] - get_all = 'SELECT u.`id`, u.`status`, u.`delete`, f.`block_id`, f.`name`, s.`name`, u.`hold_reason` FROM `file_subscriptions` AS u' + get_all = 'SELECT u.`id`, u.`status`, u.`delete`, f.`block_id`, f.`name`, s.`name`, u.`hold_reason`, u.`created`' + get_all += ' FROM `file_subscriptions` AS u' get_all += ' INNER JOIN `files` AS f ON f.`id` = u.`file_id`' get_all += ' INNER JOIN `sites` AS s ON s.`id` = u.`site_id`' @@ -564,8 +576,10 @@ def get_subscriptions(self, inventory, op = None, status = None): COPY = 0 DELETE = 1 + now_time = int(time.time()) + for row in self.db.query(get_all): - sub_id, st, optype, block_id, file_name, site_name, hold_reason = row + sub_id, st, optype, block_id, file_name, site_name, hold_reason, created = row if site_name != _destination_name: _destination_name = site_name @@ -583,7 +597,8 @@ def get_subscriptions(self, inventory, op = None, status = None): if block_id != _block_id: lfile = inventory.find_file(file_name) if lfile is None: - # Dataset, block, or file was deleted from the inventory earlier in this process (deletion not reflected in the inventory store yet) + # Dataset, block, or file was deleted from the inventory earlier in this process + #(deletion not reflected in the inventory store yet) continue _block_id = block_id @@ -593,7 +608,8 @@ def get_subscriptions(self, inventory, op = None, status = None): else: lfile = block.find_file(file_name) if lfile is None: - # Dataset, block, or file was deleted from the inventory earlier in this process (deletion not reflected in the inventory store yet) + # Dataset, block, or file was deleted from the inventory earlier in this process + #(deletion not reflected in the inventory store yet) continue if dest_replica is None and st != 'cancelled': @@ -615,6 +631,13 @@ def get_subscriptions(self, inventory, op = None, status = None): tape_sources = None failed_sources = None + + #create_time = int( time.mktime(datetime.datetime(created).timetuple()) ) + #if (now_time-create_time) > 7*(60*60*24): + # LOG.info('---- very old request-----') + LOG.info(row) + + if st not in ('done', 'held', 'cancelled'): if dest_replica.has_file(lfile): LOG.debug('%s already exists at %s', file_name, site_name) @@ -625,6 +648,7 @@ def get_subscriptions(self, inventory, op = None, status = None): else: disk_sources = [] tape_sources = [] + skip_rest = False for replica in block.replicas: if replica.site == destination or replica.site.status != Site.STAT_READY: continue @@ -633,10 +657,21 @@ def get_subscriptions(self, inventory, op = None, status = None): if replica.site.storage_type == Site.TYPE_DISK: disk_sources.append(replica.site) elif replica.site.storage_type == Site.TYPE_MSS: + if 'DESY' in dest_replica.site.name: + LOG.info('Here comes the bride') + LOG.info(replica.site) + LOG.info(destination) + LOG.info(row) + #self.cancel_subscription(destination, lfile) + #self.subscribe_file(destination, lfile) + #skip_rest = True + #break tape_sources.append(replica.site) + if skip_rest: + continue if len(disk_sources) + len(tape_sources) == 0: - LOG.warning('Transfer of %s to %s has no source.', file_name, site_name) + LOG.info('Transfer of %s to %s has no source.', file_name, site_name) no_source.append(sub_id) st = 'held' @@ -666,6 +701,8 @@ def get_subscriptions(self, inventory, op = None, status = None): # This site failed for a recoverable reason break else: + LOG.info('Number of disk sources: '+ str(len(disk_sources))) + LOG.info('Number of tape sources: '+ str(len(tape_sources))) # last failure from all sites due to irrecoverable errors LOG.warning('Transfer of %s to %s failed from all sites.', file_name, site_name) all_failed.append(sub_id) @@ -736,7 +773,7 @@ def release_subscription(self, subscription): return self.db.query('DELETE FROM `failed_transfers` WHERE `subscription_id` = %s', subscription.id) - self.db.query('UPDATE `file_subscriptions` SET `status` = \'retry\' WHERE `id` = %s', subscription.id) + self.db.query('UPDATE `file_subscriptions` SET `status` = \'new\' WHERE `id` = %s', subscription.id) def _run_cycle(self, inventory): while True: @@ -852,7 +889,7 @@ def _get_cancelled_tasks(self, optype): sql += ' WHERE u.`status` = \'cancelled\' AND u.`delete` = %d' % delete return self.db.query(sql) - def _update_status(self, optype): + def _update_status(self, optype, inventory): if optype == 'transfer': site_columns = 'ss.`name`, sd.`name`' site_joins = ' INNER JOIN `sites` AS ss ON ss.`id` = q.`source_id`' @@ -873,6 +910,7 @@ def _update_status(self, optype): history_table_name = 'file_transfers' history_site_fields = ('source_id', 'destination_id') else: + LOG.info("WTFFFFFFFF UUU") history_table_name = 'file_deletions' history_site_fields = ('site_id',) @@ -900,28 +938,54 @@ def _update_status(self, optype): # Collect completed tasks + total_counter = 0 + for batch_id in self.db.query('SELECT `id` FROM `{op}_batches`'.format(op = optype)): results = [] + if optype == 'transfer': - for _, query in self.transfer_queries: + + total_counter = total_counter + 1 + +# if total_counter > 200: +# LOG.info('-- getiing out transfer_status as counter > ' + str(total_counter)) +# break + + + for condition, query in self.transfer_queries: + + LOG.info("xxxxxxxxxxxxxxxx") + LOG.info(condition) + LOG.info(query) + LOG.info("xxxxxxxxxxxxxxxx") + results = query.get_transfer_status(batch_id) if len(results) != 0: break else: - for _, query in self.deletion_queries: + for condition, query in self.deletion_queries: results = query.get_deletion_status(batch_id) if len(results) != 0: break + batch_complete = True + if len(results) == 0: + LOG.info('did I just fail? ' + str(batch_id)) + + + LOG.info("yyyyyyyyyy") + LOG.info(len(results)) + LOG.info("yyyyyyyyyy") for task_id, status, exitcode, message, start_time, finish_time in results: # start_time and finish_time can be None LOG.debug('%s result: %d %s %d %s %s', optype, task_id, FileQuery.status_name(status), exitcode, start_time, finish_time) if status == FileQuery.STAT_DONE: + LOG.info('%s RESSSSULT: %d %s %d %s %s', optype, task_id, FileQuery.status_name(status), exitcode, start_time, finish_time) num_success += 1 elif status == FileQuery.STAT_FAILED: num_failure += 1 @@ -980,6 +1044,26 @@ def _update_status(self, optype): if self._read_only: history_id = 0 else: + #LOG.info("A:") + #LOG.info(history_table_name) + #LOG.info("B:") + #LOG.info(history_fields) + #LOG.info("C:") + #LOG.info(values) + + #values_tmp = set() + #for v in values: + # try: + # tmp = v.replace(u"\u2018", "'").replace(u"\u2019", "'") + # values_tmp.add(tmp) + # except: + # values_tmp.add(v) + #values = values_tmp + + + LOG.info(values) + + history_id = self.history_db.db.insert_get_id(history_table_name, history_fields, values) if optype == 'transfer': diff --git a/lib/policy/condition.py b/lib/policy/condition.py index 49390d34..b27d68c8 100644 --- a/lib/policy/condition.py +++ b/lib/policy/condition.py @@ -1,5 +1,9 @@ +import logging + from dynamo.policy.predicates import Predicate +LOG = logging.getLogger(__name__) + class Condition(object): """AND-chained Predicates.""" @@ -10,6 +14,20 @@ def __init__(self, text, variables): pred_strs = map(str.strip, text.split(' and ')) + self.time_condition = None + + tmp = '' + if ' until ' in pred_strs[-1]: + tmp = pred_strs[-1].split(' until ') + tmp[-1] = 'until ' + tmp[-1] + elif ' from ' in pred_strs[-1]: + tmp = pred_strs[-1].split(' from ') + tmp[-1] = 'from ' + tmp[-1] + if tmp != '': + pred_strs[-1] = tmp[0] + self.time_condition = tmp[-1] + + # parsing the individual components for pred_str in pred_strs: words = pred_str.split() @@ -42,6 +60,18 @@ def __repr__(self): return 'Condition(\'%s\')' % self.text def match(self, obj): + if self.time_condition is not None: + if 'until' in self.time_condition: + proc = subprocess.Popen(['date', '-d', self.time_condition.split('until ')[1], '+%s'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) + unixt, err = proc.communicate() + if time.time() > unixt: + return False + else: # from + proc = subprocess.Popen(['date', '-d', self.time_condition.split('from ')[1], '+%s'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) + unixt, err = proc.communicate() + if time.time() < unixt: + return False + for predicate in self.predicates: if not predicate(obj): return False diff --git a/lib/policy/predicates.py b/lib/policy/predicates.py index 47ff3b7a..36d06f50 100644 --- a/lib/policy/predicates.py +++ b/lib/policy/predicates.py @@ -1,4 +1,5 @@ import re +import time import dynamo.policy.attrs as attrs @@ -26,7 +27,6 @@ def get(variable, op = '', rhs_expr = ''): if rhs_expr == '': raise InvalidOperator(op) return SetElementExpr.get(variable, op, rhs_expr) - else: raise InvalidOperator(op) diff --git a/lib/policy/variables.py b/lib/policy/variables.py index 2dc7d91b..f296dae6 100644 --- a/lib/policy/variables.py +++ b/lib/policy/variables.py @@ -241,6 +241,10 @@ def __init__(self): BlockReplicaAttr.__init__(self, Attr.BOOL_TYPE) def _get(self, replica): + if not replica.is_complete(): + # We dont want to delete incomplete blocks anyway + return False + for rep in replica.block.replicas: if rep.site.storage_type == Site.TYPE_MSS and rep.is_complete(): return True diff --git a/lib/request/common.py b/lib/request/common.py index ed86b064..d47f16d6 100644 --- a/lib/request/common.py +++ b/lib/request/common.py @@ -1,4 +1,5 @@ import logging +import time from dynamo.utils.interface.mysql import MySQL from dynamo.history.history import HistoryDatabase @@ -110,6 +111,7 @@ def _make_temp_registry_tables(self, items, sites): @param items List of dataset and block names. @param sites List of site names. """ + LOG.info("aV4_0d_A %s" % str(time.time())) # Make temporary tables and fill copy_ids_tmp with ids of requests whose item and site lists fully cover the provided list of items and sites. columns = ['`item` varchar(512) CHARACTER SET latin1 COLLATE latin1_general_cs NOT NULL'] @@ -117,17 +119,28 @@ def _make_temp_registry_tables(self, items, sites): columns = ['`site` varchar(32) CHARACTER SET latin1 COLLATE latin1_general_cs NOT NULL'] self.registry.db.create_tmp_table('sites_tmp', columns) + LOG.info("aV4_0d_B %s" % str(time.time())) + if items is not None: self.registry.db.insert_many('items_tmp', ('item',), MySQL.make_tuple, items, db = self.registry.db.scratch_db) + + LOG.info(sites) + if sites is not None: self.registry.db.insert_many('sites_tmp', ('site',), MySQL.make_tuple, sites, db = self.registry.db.scratch_db) + + LOG.info("aV4_0d_C %s" % str(time.time())) + columns = [ '`id` int(10) unsigned NOT NULL AUTO_INCREMENT', 'PRIMARY KEY (`id`)' ] self.registry.db.create_tmp_table('ids_tmp', columns) + LOG.info("aV4_0d_D %s" % str(time.time())) + + sql = 'INSERT INTO `{db}`.`ids_tmp`' sql += ' SELECT r.`id` FROM `{op}_requests` AS r WHERE' sql += ' 0 NOT IN (SELECT (`site` IN (SELECT `site` FROM `{op}_request_sites` AS s WHERE s.`request_id` = r.`id`)) FROM `{db}`.`sites_tmp`)' @@ -135,9 +148,13 @@ def _make_temp_registry_tables(self, items, sites): sql += ' 0 NOT IN (SELECT (`item` IN (SELECT `item` FROM `{op}_request_items` AS i WHERE i.`request_id` = r.`id`)) FROM `{db}`.`items_tmp`)' self.registry.db.query(sql.format(db = self.registry.db.scratch_db, op = self.optype)) + LOG.info("aV4_0d_E %s" % str(time.time())) + self.registry.db.drop_tmp_table('items_tmp') self.registry.db.drop_tmp_table('sites_tmp') + LOG.info("aV4_0d_F %s" % str(time.time())) + return '`{db}`.`ids_tmp`'.format(db = self.registry.db.scratch_db) def _make_temp_history_tables(self, dataset_ids, block_ids, site_ids): @@ -201,19 +218,29 @@ def _make_temp_history_tables(self, dataset_ids, block_ids, site_ids): def _make_registry_constraints(self, request_id, statuses, users, items, sites): constraints = [] + LOG.info("aV4_0a %s" % str(time.time())) + if request_id is not None: constraints.append('r.`id` = %d' % request_id) + LOG.info("aV4_0b %s" % str(time.time())) + if statuses is not None: constraints.append('r.`status` IN ' + MySQL.stringify_sequence(statuses)) + LOG.info("aV4_0c %s" % str(time.time())) + if users is not None: constraints.append('r.`user` IN ' + MySQL.stringify_sequence(users)) + LOG.info("aV4_0d %s" % str(time.time())) + if items is not None or sites is not None: temp_table = self._make_temp_registry_tables(items, sites) constraints.append('r.`id` IN (SELECT `id` FROM {0})'.format(temp_table)) + LOG.info("aV4_0e %s" % str(time.time())) + if len(constraints) != 0: return ' WHERE ' + ' AND '.join(constraints) else: diff --git a/lib/request/copy.py b/lib/request/copy.py index 7937a079..6fb2fb83 100644 --- a/lib/request/copy.py +++ b/lib/request/copy.py @@ -24,6 +24,8 @@ def lock(self): #override self.registry.db.lock_tables(write = tables) def get_requests(self, request_id = None, statuses = None, users = None, items = None, sites = None): + LOG.info("aV4_0: %s" % str(time.time())) + all_requests = {} sql = 'SELECT r.`id`, r.`group`, r.`num_copies`, 0+r.`status`, UNIX_TIMESTAMP(r.`first_request_time`), UNIX_TIMESTAMP(r.`last_request_time`),' @@ -34,6 +36,9 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = sql += ' ORDER BY r.`id`' _rid = 0 + + LOG.info("aV4_1: %s" % str(time.time())) + for rid, group, n, status, first_request, last_request, count, user, dn, a_item, a_site, a_status, a_update in self.registry.db.xquery(sql): if rid != _rid: _rid = rid @@ -45,6 +50,8 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = if a_item is not None: request.actions.append(RequestAction(a_item, a_site, int(a_status), a_update)) + LOG.info("aV4_2: %s" % str(time.time())) + if len(all_requests) != 0: # get the sites sql = 'SELECT s.`request_id`, s.`site` FROM `copy_request_sites` AS s WHERE s.`request_id` IN (%s)' % ','.join('%d' % d for d in all_requests.iterkeys()) @@ -56,14 +63,20 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = for rid, item in self.registry.db.xquery(sql): all_requests[rid].items.append(item) + LOG.info("aV4_3: %s" % str(time.time())) + if items is not None or sites is not None: self.registry.db.drop_tmp_table('ids_tmp') + LOG.info("aV4_4: %s" % str(time.time())) + if (request_id is not None and len(all_requests) != 0) or \ (statuses is not None and (set(statuses) <= set(['new', 'activated']) or set(statuses) <= set([Request.ST_NEW, Request.ST_ACTIVATED]))): # there's nothing in the archive return all_requests + LOG.info("aV4_5: %s" % str(time.time())) + # Pick up archived requests from the history DB archived_requests = {} @@ -75,10 +88,14 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = sql += self._make_history_constraints(request_id, statuses, users, items, sites) sql += ' ORDER BY r.`id`' + LOG.info("aV4_6: %s" % str(time.time())) + for rid, group, n, status, request_time, reason, user, dn in self.history.db.xquery(sql): if rid not in all_requests: archived_requests[rid] = CopyRequest(rid, user, dn, group, n, int(status), request_time, request_time, 1, reason) + LOG.info("aV4_7: %s" % str(time.time())) + if len(archived_requests) != 0: # get the sites sql = 'SELECT s.`request_id`, h.`name` FROM `copy_request_sites` AS s' @@ -102,16 +119,24 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = for rid, dataset, block in self.history.db.xquery(sql): archived_requests[rid].items.append(df.Block.to_full_name(dataset, block)) + LOG.info("aV4_8: %s" % str(time.time())) + all_requests.update(archived_requests) + LOG.info("aV4_9: %s" % str(time.time())) + if items is not None or sites is not None: self.history.db.drop_tmp_table('ids_tmp') + LOG.info("aV4_10: %s" % str(time.time())) + return all_requests def create_request(self, caller, items, sites, sites_original, group, ncopies): now = int(time.time()) + LOG.info("aX: %s" % str(time.time())) + if self._read_only: return CopyRequest(0, caller.name, caller.dn, group, ncopies, 'new', now, now, 1) @@ -123,8 +148,13 @@ def create_request(self, caller, items, sites, sites_original, group, ncopies): mapping = lambda site: (request_id, site) self.registry.db.insert_many('copy_request_sites', ('request_id', 'site'), mapping, sites) mapping = lambda item: (request_id, item) + + LOG.info("aY: %s" % str(time.time())) + self.registry.db.insert_many('copy_request_items', ('request_id', 'item'), mapping, items) + LOG.info("aZ: %s" % str(time.time())) + # Make an entry in history history_user_ids = self.history.save_users([(caller.name, caller.dn)], get_ids = True) history_site_ids = self.history.save_sites(sites_original, get_ids = True) diff --git a/lib/source/impl/staticsiteinfo.py b/lib/source/impl/staticsiteinfo.py index d1b99b92..14fbc315 100644 --- a/lib/source/impl/staticsiteinfo.py +++ b/lib/source/impl/staticsiteinfo.py @@ -1,6 +1,9 @@ from dynamo.source.siteinfo import SiteInfoSource from dynamo.dataformat import Configuration, Site +import logging +LOG = logging.getLogger(__name__) + class StaticSiteInfoSource(SiteInfoSource): """ Site information source fully specified by the static configuration. @@ -11,7 +14,21 @@ def __init__(self, config): self.config = Configuration(config.sites) - def get_site(self, name): #override + LOG.info('-----------in constructor of SiteInfoSource-------') + LOG.info(self.config) + +# config_path = os.getenv('DYNAMO_SERVER_CONFIG', '/etc/dynamo/fom_config.json') +# self.fom_config = Configuration(config_path) +# self.fom_conditions = [] + +# for condition_text, module, conf in self.fom_config.rlfsm.transfer: +# if condition_text is not None: # default +# condition = Condition(condition_text, site_variables) +# self.fom_conditions.append((condition, module, conf)) + + + + def get_site(self, name, inventory): #override try: site_config = self.config[name] except KeyError: @@ -20,13 +37,26 @@ def get_site(self, name): #override storage_type = Site.storage_type_val(site_config.storage_type) backend = site_config.backend - return Site(name, host = site_config.host, storage_type = storage_type, backend = backend) + LOG.info('--------in get_site-------------') + LOG.info(name) + + site_obj = Site(name, host = site_config.host, storage_type = storage_type, backend = backend) + if name in inventory.sites: + old_site_obj = inventory.sites[name] + LOG.info('found ' + name) + LOG.info.(old_site_obj.x509proxy) + site_obj.x509proxy = old_site_obj.x509proxy + + #for cond, module, conf in self.fom_conditions: + # if cond.match(site_obj): + # site_obj.x509proxy = conf.x509proxy + return site_obj - def get_site_list(self): #override + def get_site_list(self, inventory): #override site_list = [] for name in self.config.keys(): - site_list.append(self.get_site(name)) + site_list.append(self.get_site(name,inventory)) return site_list diff --git a/lib/source/siteinfo.py b/lib/source/siteinfo.py index 04c357c2..2c7e984d 100644 --- a/lib/source/siteinfo.py +++ b/lib/source/siteinfo.py @@ -46,14 +46,14 @@ def __init__(self, config): else: self.exclude = None - def get_site(self, name): + def get_site(self, name, inventory): """ @param name Name of the site @return A Site object with full info, or None if the site is not found. """ raise NotImplementedError('get_site') - def get_site_list(self): + def get_site_list(self, inventory): """ @return List of unlinked Site objects """ diff --git a/lib/utils/parallel.py b/lib/utils/parallel.py index a931b225..5a4ff121 100644 --- a/lib/utils/parallel.py +++ b/lib/utils/parallel.py @@ -1,10 +1,14 @@ +import logging import time import multiprocessing import threading import Queue +import sys from dynamo.dataformat import Configuration +LOG = logging.getLogger(__name__) + class FunctionWrapper(object): def __init__(self, function, start_sem, done_sem): self.function = function diff --git a/lib/web/modules/inventory/__init__.py b/lib/web/modules/inventory/__init__.py index a5b799a8..834ebc97 100644 --- a/lib/web/modules/inventory/__init__.py +++ b/lib/web/modules/inventory/__init__.py @@ -1,21 +1,13 @@ -from . import datasets -from . import groups -from . import sites -from . import stats -from . import inject -from . import delete -from . import blockreplicas -from . import subscriptions -from . import requestlist -from . import lfn2pfn -from . import nodes -from . import data -from . import transferrequests +import datasets +import groups +import stats +import inject +import delete +import blockreplicas, requestlist, subscriptions, lfn2pfn, nodes, data, transferrequests export_data = {} export_data.update(datasets.export_data) export_data.update(groups.export_data) -export_data.update(sites.export_data) export_data.update(stats.export_data) export_data.update(inject.export_data) export_data.update(delete.export_data) @@ -27,5 +19,6 @@ export_data.update(data.export_data) export_data.update(transferrequests.export_data) + export_web = {} export_web.update(stats.export_web) diff --git a/lib/web/modules/inventory/_customize.py b/lib/web/modules/inventory/_customize.py index 35e75995..80542b09 100644 --- a/lib/web/modules/inventory/_customize.py +++ b/lib/web/modules/inventory/_customize.py @@ -1,2 +1,16 @@ -def customize_stats(InventoryStatCategories): - pass +import collections +from dynamo.dataformat import Dataset, Site, Group + +def campaign_name(dataset): + sd = dataset.name[dataset.name.find('/', 1) + 1:dataset.name.rfind('/')] + return sd[:sd.find('-')] + +def customize_stats(categories): + categories.categories = collections.OrderedDict([ + ('campaign', ('Production campaign', Dataset, campaign_name)), + ('data_tier', ('Data tier', Dataset, lambda d: d.name[d.name.rfind('/') + 1:])), + ('dataset_status', ('Dataset status', Dataset, lambda d: Dataset.status_name(d.status))), + ('dataset', ('Dataset name', Dataset, lambda d: d.name)), + ('site', ('Site name', Site, lambda s: s.name)), + ('group', ('Group name', Group, lambda g: g.name)) + ]) diff --git a/lib/web/modules/inventory/transferrequests.py b/lib/web/modules/inventory/transferrequests.py index 4dab3053..e292ebce 100644 --- a/lib/web/modules/inventory/transferrequests.py +++ b/lib/web/modules/inventory/transferrequests.py @@ -39,6 +39,7 @@ def run(self, caller, request, inventory): req_id = int(line) break + LOG.info(req_id) req_hash = self.copy_manager.get_requests(request_id=req_id) LOG.info(req_id) LOG.info(req_hash) diff --git a/lib/web/modules/request/copy.py b/lib/web/modules/request/copy.py index 861590b3..e9e7ffe3 100644 --- a/lib/web/modules/request/copy.py +++ b/lib/web/modules/request/copy.py @@ -39,6 +39,8 @@ def run(self, caller, request, inventory): try: existing = None + LOG.info("aV: %s" % str(time.time())) + if 'request_id' in self.params: request_id = self.params['request_id'] @@ -55,6 +57,7 @@ def run(self, caller, request, inventory): else: # create a new request + LOG.info("aV1: %s" % str(time.time())) if 'item' not in self.params: raise MissingParameter('item') @@ -64,9 +67,14 @@ def run(self, caller, request, inventory): else: self.params['site'] = list(self.default_sites) + LOG.info("aV2: %s" % str(time.time())) constraints = self.make_constraints(by_id = False) + + LOG.info("aV3: %s" % str(time.time())) constraints['statuses'] = [Request.ST_NEW, Request.ST_ACTIVATED] + LOG.info("aV4: %s" % str(time.time())) existing_requests = self.manager.get_requests(**constraints) + LOG.info("aV5: %s" % str(time.time())) for request_id in sorted(existing_requests.iterkeys()): if existing_requests[request_id].status == Request.ST_NEW: @@ -74,6 +82,7 @@ def run(self, caller, request, inventory): break elif existing_requests[request_id].status == Request.ST_ACTIVATED: existing = existing_requests[request_id] + LOG.info("aV6: %s" % str(time.time())) if existing is None: if 'n' not in self.params: @@ -81,7 +90,8 @@ def run(self, caller, request, inventory): if 'group' not in self.params: self.params['group'] = self.default_group - + + LOG.info("aW: %s" % str(time.time())) request = self.manager.create_request(caller, self.params['item'], self.params['site'], self.params['site_orig'], self.params['group'], self.params['n']) else: @@ -117,6 +127,10 @@ def run(self, caller, request, inventory): self.parse_input(request, inventory, ('request_id', 'item', 'site', 'status', 'user')) constraints = self.make_constraints(by_id = False) + + LOG.info("PollCopy constraints:") + LOG.info(constraints) + existing_requests = self.manager.get_requests(**constraints) if 'item' in self.params and 'site' in self.params and \ diff --git a/lib/web/modules/request/mixin.py b/lib/web/modules/request/mixin.py index 8860bc30..86262405 100644 --- a/lib/web/modules/request/mixin.py +++ b/lib/web/modules/request/mixin.py @@ -1,22 +1,35 @@ import re import fnmatch +import logging +import time from dynamo.web.exceptions import MissingParameter, ExtraParameter, IllFormedRequest, InvalidRequest from dynamo.web.modules._common import yesno from dynamo.utils.interface.mysql import MySQL import dynamo.dataformat as df +LOG = logging.getLogger(__name__) + class ParseInputMixin(object): def __init__(self, config): # Parsed and formatted HTTP queries self.params = {} def parse_input(self, request, inventory, allowed_fields, required_fields = tuple()): + if self.input_data is not None: + LOG.info("Input data:") + LOG.info(self.input_data) + else: + LOG.info("No input data:") # JSON could have been uploaded if self.input_data is not None: + LOG.info("Updating input:") request.update(self.input_data) + LOG.info("Completed updating input.") # Check we have the right request fields + + LOG.info("A: %s" % str(time.time())) input_fields = set(request.keys()) allowed_fields = set(allowed_fields) @@ -30,6 +43,8 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl # Pick up the values and cast them to correct types + LOG.info("B: %s" % str(time.time())) + for key in ['request_id', 'n']: if key not in request: continue @@ -68,6 +83,7 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl # The only reason for this would be to make the registry not dependent on specific inventory store technology. if 'item' in self.params: + print "Found item." for item in self.params['item']: if item in inventory.datasets: # OK this is a known dataset @@ -81,6 +97,7 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl try: inventory.datasets[dataset_name].find_block(block_name, must_find = True) except: + print 'Invalid block name %s' % item raise InvalidRequest('Invalid block name %s' % item) if 'site' in self.params: @@ -91,6 +108,8 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl # Wildcard allowed if '*' in site or '?' in site or '[' in site: + LOG.info("C: %s" % str(time.time())) + self.params['site'].remove(site) pattern = re.compile(fnmatch.translate(site)) @@ -106,6 +125,8 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl if len(self.params['site']) == 0: self.params.pop('site') + LOG.info("D: %s" % str(time.time())) + if 'group' in self.params: try: inventory.groups[self.params['group']] @@ -117,6 +138,9 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl if status not in ('new', 'activated', 'completed', 'rejected', 'cancelled'): raise InvalidRequest('Invalid status value %s' % status) + LOG.info("Printing all request parameterss") + LOG.info(self.params) + def make_constraints(self, by_id = False): constraints = {} if 'request_id' in self.params: diff --git a/lib/web/modules/transfers/history.py b/lib/web/modules/transfers/history.py index d64e971c..80bee08e 100644 --- a/lib/web/modules/transfers/history.py +++ b/lib/web/modules/transfers/history.py @@ -10,9 +10,9 @@ LOG = logging.getLogger(__name__) class FileTransferHistory(WebModule): - def __init__(self, config): WebModule.__init__(self, config) + self.history = HistoryDatabase() def run(self, caller, request, inventory): @@ -51,7 +51,7 @@ def run(self, caller, request, inventory): # calculate the time limits to consider past_min = self._get_date_before_end(datetime.datetime.now(),upto) - tmax = int(past_min.strftime('%s')) # epochseconds: careful max/min in t and past invert + tmax = int(past_min.strftime('%s')) # epochseconds: careful max/min in t and past inverts past_max = self._get_date_before_end(past_min,period) tmin = int(past_max.strftime('%s')) # epochseconds @@ -66,7 +66,8 @@ def run(self, caller, request, inventory): # parse and extract the plotting data (timeseries guarantees an empty dictionary as data) start = time.time() - (min_value,max_value,avg_value,cur_value,data) = transfers.timeseries(graph,entity,tmin,tmax) + (min_value,max_value,avg_value,cur_value,data) = \ + transfers.timeseries(graph,entity,tmin,tmax) elapsed_processing = time.time() - start LOG.info('Parsed data: %7.3f sec', elapsed_processing) @@ -77,6 +78,7 @@ def run(self, caller, request, inventory): yaxis_label = 'Transfered Volume [GB]' summary_string = "Min: %.3f GB, Max: %.3f GB, Avg: %.3f GB, Last: %.3f GB" \ %(min_value,max_value,avg_value,cur_value) + if graph[0] == 'r': # cumulative volume yaxis_label = 'Transfered Rate [GB/sec]' summary_string = "Min: %.3f GB/s, Max: %.3f GB/s, Avg: %.3f GB/s, Last: %.3f GB/s" \ @@ -88,15 +90,14 @@ def run(self, caller, request, inventory): yaxis_label = 'Number of Transfers' summary_string = "Min: %.0f, Max: %.0f, Avg: %.0f, Last: %.0f" \ %(min_value,max_value,avg_value,cur_value) - - # add the unit per bin yaxis_label += unit # add text graphics information to the plot data[0]['yaxis_label'] = yaxis_label data[0]['title'] = 'Dynamo Transfers (%s by %s)'%(graph,entity) data[0]['subtitle'] = 'Time period: %s -- %s'%(str(past_max).split('.')[0],str(past_min).split('.')[0]) - data[0]['timing_string'] = 'db:%.2fs, processing:%.2fs'%(elapsed_db,elapsed_processing) + data[0]['timing_string'] = \ + 'db:%.2fs, processing:%.2fs'%(elapsed_db,elapsed_processing) data[0]['summary_string'] = summary_string return data @@ -155,7 +156,7 @@ def _time_constants(self,tmin,tmax,nbins): if nbins>0: dt = delta_t/nbins - if abs(dt-604800) < 1: + if abs(dt-604800) < 1: unit = ' / week'; elif abs(dt-86400.) < 1: unit = ' / day'; diff --git a/lib/web/server.py b/lib/web/server.py index c80c1b77..6b6069a5 100644 --- a/lib/web/server.py +++ b/lib/web/server.py @@ -551,9 +551,12 @@ def _internal_server_error(self): response += 'Traceback (most recent call last):\n' response += ''.join(traceback.format_tb(tb)) + '\n' response += '%s: %s\n' % (exc_type.__name__, str(exc)) - return response else: - return 'Internal server error! (' + exc_type.__name__ + ': ' + str(exc) + ')\n' + response = 'Internal server error! (' + exc_type.__name__ + ': ' + str(exc) + ')\n' + + LOG.error(response) + + return response class DummyInventory(object): """ From 6b0df3a31009e1c764ab3d937b164d0b81e7dcdc Mon Sep 17 00:00:00 2001 From: Daniel Abercrombie Date: Mon, 28 Jan 2019 10:40:00 -0500 Subject: [PATCH 2/4] New changes --- lib/policy/predicates.py | 1 - lib/registry/registry.py | 25 ++++++++++ lib/request/common.py | 26 ++-------- lib/request/copy.py | 50 ++++++++----------- lib/web/modules/inventory/transferrequests.py | 3 -- lib/web/modules/request/copy.py | 25 +++++----- lib/web/modules/request/mixin.py | 16 ++---- lib/web/modules/transfers/history.py | 2 +- lib/web/server.py | 4 ++ 9 files changed, 70 insertions(+), 82 deletions(-) diff --git a/lib/policy/predicates.py b/lib/policy/predicates.py index 36d06f50..ceb11d60 100644 --- a/lib/policy/predicates.py +++ b/lib/policy/predicates.py @@ -1,5 +1,4 @@ import re -import time import dynamo.policy.attrs as attrs diff --git a/lib/registry/registry.py b/lib/registry/registry.py index 444ff5ca..27475051 100644 --- a/lib/registry/registry.py +++ b/lib/registry/registry.py @@ -96,3 +96,28 @@ def unlock_app(self, app, user, service = None): self.db.query('ALTER TABLE `activity_lock` AUTO_INCREMENT = 1') self.db.unlock_tables() + + + +class CacheDatabase(RegistryDatabase): + """ + Similar to HistoryDatabase, this is just one abstraction layer that doesn't really hide the + backend technology for the registry. We still have the benefit of being able to use default + parameters to initialize the registry database handle. + """ + + # default configuration + _config = Configuration() + + @staticmethod + def set_default(config): + CacheDatabase._config = Configuration(config) + + def __init__(self, config = None): + if config is None: + config = CacheDatabase._config + + self.db = MySQL(config.db_params) + + self.set_read_only(config.get('read_only', False)) + diff --git a/lib/request/common.py b/lib/request/common.py index d47f16d6..b419f971 100644 --- a/lib/request/common.py +++ b/lib/request/common.py @@ -3,7 +3,7 @@ from dynamo.utils.interface.mysql import MySQL from dynamo.history.history import HistoryDatabase -from dynamo.registry.registry import RegistryDatabase +from dynamo.registry.registry import RegistryDatabase#, CacheDatabase import dynamo.dataformat as df from dynamo.dataformat.request import Request, RequestAction @@ -34,10 +34,12 @@ def __init__(self, optype, config = None): self.registry = RegistryDatabase(config.get('registry', None)) self.history = HistoryDatabase(config.get('history', None)) + #self.cache = CacheDatabase(config.get('cache', None)) # we'll be using temporary tables self.registry.db.reuse_connection = True self.history.db.reuse_connection = True + #self.cache.db.reuse_connection = True self.optype = optype @@ -111,7 +113,6 @@ def _make_temp_registry_tables(self, items, sites): @param items List of dataset and block names. @param sites List of site names. """ - LOG.info("aV4_0d_A %s" % str(time.time())) # Make temporary tables and fill copy_ids_tmp with ids of requests whose item and site lists fully cover the provided list of items and sites. columns = ['`item` varchar(512) CHARACTER SET latin1 COLLATE latin1_general_cs NOT NULL'] @@ -119,8 +120,6 @@ def _make_temp_registry_tables(self, items, sites): columns = ['`site` varchar(32) CHARACTER SET latin1 COLLATE latin1_general_cs NOT NULL'] self.registry.db.create_tmp_table('sites_tmp', columns) - LOG.info("aV4_0d_B %s" % str(time.time())) - if items is not None: self.registry.db.insert_many('items_tmp', ('item',), MySQL.make_tuple, items, db = self.registry.db.scratch_db) @@ -129,17 +128,12 @@ def _make_temp_registry_tables(self, items, sites): if sites is not None: self.registry.db.insert_many('sites_tmp', ('site',), MySQL.make_tuple, sites, db = self.registry.db.scratch_db) - - LOG.info("aV4_0d_C %s" % str(time.time())) - columns = [ '`id` int(10) unsigned NOT NULL AUTO_INCREMENT', 'PRIMARY KEY (`id`)' ] self.registry.db.create_tmp_table('ids_tmp', columns) - LOG.info("aV4_0d_D %s" % str(time.time())) - sql = 'INSERT INTO `{db}`.`ids_tmp`' sql += ' SELECT r.`id` FROM `{op}_requests` AS r WHERE' @@ -148,13 +142,9 @@ def _make_temp_registry_tables(self, items, sites): sql += ' 0 NOT IN (SELECT (`item` IN (SELECT `item` FROM `{op}_request_items` AS i WHERE i.`request_id` = r.`id`)) FROM `{db}`.`items_tmp`)' self.registry.db.query(sql.format(db = self.registry.db.scratch_db, op = self.optype)) - LOG.info("aV4_0d_E %s" % str(time.time())) - self.registry.db.drop_tmp_table('items_tmp') self.registry.db.drop_tmp_table('sites_tmp') - LOG.info("aV4_0d_F %s" % str(time.time())) - return '`{db}`.`ids_tmp`'.format(db = self.registry.db.scratch_db) def _make_temp_history_tables(self, dataset_ids, block_ids, site_ids): @@ -218,29 +208,19 @@ def _make_temp_history_tables(self, dataset_ids, block_ids, site_ids): def _make_registry_constraints(self, request_id, statuses, users, items, sites): constraints = [] - LOG.info("aV4_0a %s" % str(time.time())) - if request_id is not None: constraints.append('r.`id` = %d' % request_id) - LOG.info("aV4_0b %s" % str(time.time())) - if statuses is not None: constraints.append('r.`status` IN ' + MySQL.stringify_sequence(statuses)) - LOG.info("aV4_0c %s" % str(time.time())) - if users is not None: constraints.append('r.`user` IN ' + MySQL.stringify_sequence(users)) - LOG.info("aV4_0d %s" % str(time.time())) - if items is not None or sites is not None: temp_table = self._make_temp_registry_tables(items, sites) constraints.append('r.`id` IN (SELECT `id` FROM {0})'.format(temp_table)) - LOG.info("aV4_0e %s" % str(time.time())) - if len(constraints) != 0: return ' WHERE ' + ' AND '.join(constraints) else: diff --git a/lib/request/copy.py b/lib/request/copy.py index 6fb2fb83..b99cab01 100644 --- a/lib/request/copy.py +++ b/lib/request/copy.py @@ -11,21 +11,22 @@ class CopyRequestManager(RequestManager): def __init__(self, config = None): RequestManager.__init__(self, 'copy', config) + LOG.info("Initializing CopyRequestManager with config:") + LOG.info(config) def lock(self): #override # Caller of this function is responsible for unlocking # Non-aliased locks are for insert & update statements later tables = [ ('copy_requests', 'r'), 'copy_requests', ('active_copies', 'a'), 'active_copies', - ('copy_request_items', 'i'), 'copy_request_items', ('copy_request_sites', 's'), 'copy_request_sites' + ('copy_request_items', 'i'), 'copy_request_items', ('copy_request_sites', 's'), 'copy_request_sites', + ('cached_copy_requests','c'), 'cached_copy_requests' ] if not self._read_only: self.registry.db.lock_tables(write = tables) def get_requests(self, request_id = None, statuses = None, users = None, items = None, sites = None): - LOG.info("aV4_0: %s" % str(time.time())) - all_requests = {} sql = 'SELECT r.`id`, r.`group`, r.`num_copies`, 0+r.`status`, UNIX_TIMESTAMP(r.`first_request_time`), UNIX_TIMESTAMP(r.`last_request_time`),' @@ -37,8 +38,6 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = _rid = 0 - LOG.info("aV4_1: %s" % str(time.time())) - for rid, group, n, status, first_request, last_request, count, user, dn, a_item, a_site, a_status, a_update in self.registry.db.xquery(sql): if rid != _rid: _rid = rid @@ -50,8 +49,6 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = if a_item is not None: request.actions.append(RequestAction(a_item, a_site, int(a_status), a_update)) - LOG.info("aV4_2: %s" % str(time.time())) - if len(all_requests) != 0: # get the sites sql = 'SELECT s.`request_id`, s.`site` FROM `copy_request_sites` AS s WHERE s.`request_id` IN (%s)' % ','.join('%d' % d for d in all_requests.iterkeys()) @@ -63,20 +60,14 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = for rid, item in self.registry.db.xquery(sql): all_requests[rid].items.append(item) - LOG.info("aV4_3: %s" % str(time.time())) - if items is not None or sites is not None: self.registry.db.drop_tmp_table('ids_tmp') - LOG.info("aV4_4: %s" % str(time.time())) - if (request_id is not None and len(all_requests) != 0) or \ (statuses is not None and (set(statuses) <= set(['new', 'activated']) or set(statuses) <= set([Request.ST_NEW, Request.ST_ACTIVATED]))): # there's nothing in the archive return all_requests - LOG.info("aV4_5: %s" % str(time.time())) - # Pick up archived requests from the history DB archived_requests = {} @@ -88,14 +79,10 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = sql += self._make_history_constraints(request_id, statuses, users, items, sites) sql += ' ORDER BY r.`id`' - LOG.info("aV4_6: %s" % str(time.time())) - for rid, group, n, status, request_time, reason, user, dn in self.history.db.xquery(sql): if rid not in all_requests: archived_requests[rid] = CopyRequest(rid, user, dn, group, n, int(status), request_time, request_time, 1, reason) - LOG.info("aV4_7: %s" % str(time.time())) - if len(archived_requests) != 0: # get the sites sql = 'SELECT s.`request_id`, h.`name` FROM `copy_request_sites` AS s' @@ -119,42 +106,31 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = for rid, dataset, block in self.history.db.xquery(sql): archived_requests[rid].items.append(df.Block.to_full_name(dataset, block)) - LOG.info("aV4_8: %s" % str(time.time())) - all_requests.update(archived_requests) - LOG.info("aV4_9: %s" % str(time.time())) - if items is not None or sites is not None: self.history.db.drop_tmp_table('ids_tmp') - LOG.info("aV4_10: %s" % str(time.time())) - return all_requests def create_request(self, caller, items, sites, sites_original, group, ncopies): now = int(time.time()) - LOG.info("aX: %s" % str(time.time())) - if self._read_only: return CopyRequest(0, caller.name, caller.dn, group, ncopies, 'new', now, now, 1) # Make an entry in registry columns = ('group', 'num_copies', 'user', 'dn', 'first_request_time', 'last_request_time') values = (group, ncopies, caller.name, caller.dn, MySQL.bare('FROM_UNIXTIME(%d)' % now), MySQL.bare('FROM_UNIXTIME(%d)' % now)) + LOG.info(values) request_id = self.registry.db.insert_get_id('copy_requests', columns, values) mapping = lambda site: (request_id, site) self.registry.db.insert_many('copy_request_sites', ('request_id', 'site'), mapping, sites) mapping = lambda item: (request_id, item) - LOG.info("aY: %s" % str(time.time())) - self.registry.db.insert_many('copy_request_items', ('request_id', 'item'), mapping, items) - LOG.info("aZ: %s" % str(time.time())) - # Make an entry in history history_user_ids = self.history.save_users([(caller.name, caller.dn)], get_ids = True) history_site_ids = self.history.save_sites(sites_original, get_ids = True) @@ -174,6 +150,22 @@ def create_request(self, caller, items, sites, sites_original, group, ncopies): return self.get_requests(request_id = request_id)[request_id] + def create_cached_request(self, caller, item, sites_original, group, ncopies): + now = int(time.time()) + + # Make an entry in registry + columns = ('item', 'sites', 'group', 'num_copies', 'user', 'dn', 'request_time', 'status') + values = (item, sites_original, group, ncopies, caller.name, caller.dn, MySQL.bare('FROM_UNIXTIME(%d)' % now), 'new') + LOG.info(values) + cached_request_id = self.registry.db.insert_get_id('cached_copy_requests', columns, values) + + return_dict = {} + return_dict['request_id'] = cached_request_id + return_dict['item'] = item + return_dict['sites'] = sites_original + + return return_dict + def update_request(self, request): if self._read_only: return diff --git a/lib/web/modules/inventory/transferrequests.py b/lib/web/modules/inventory/transferrequests.py index e292ebce..5cdd57c7 100644 --- a/lib/web/modules/inventory/transferrequests.py +++ b/lib/web/modules/inventory/transferrequests.py @@ -39,10 +39,7 @@ def run(self, caller, request, inventory): req_id = int(line) break - LOG.info(req_id) req_hash = self.copy_manager.get_requests(request_id=req_id) - LOG.info(req_id) - LOG.info(req_hash) if req_id not in req_hash: return {'request':[]} diff --git a/lib/web/modules/request/copy.py b/lib/web/modules/request/copy.py index e9e7ffe3..a1501c97 100644 --- a/lib/web/modules/request/copy.py +++ b/lib/web/modules/request/copy.py @@ -32,15 +32,13 @@ def __init__(self, config): self.default_sites = config['request']['copy'].get('default_sites', []) def run(self, caller, request, inventory): - self.parse_input(request, inventory, ('request_id', 'item', 'site', 'group', 'n')) + self.parse_input(request, inventory, ('request_id', 'item', 'site', 'group', 'n', 'cache')) self.manager.lock() try: existing = None - LOG.info("aV: %s" % str(time.time())) - if 'request_id' in self.params: request_id = self.params['request_id'] @@ -57,7 +55,6 @@ def run(self, caller, request, inventory): else: # create a new request - LOG.info("aV1: %s" % str(time.time())) if 'item' not in self.params: raise MissingParameter('item') @@ -67,14 +64,10 @@ def run(self, caller, request, inventory): else: self.params['site'] = list(self.default_sites) - LOG.info("aV2: %s" % str(time.time())) constraints = self.make_constraints(by_id = False) - LOG.info("aV3: %s" % str(time.time())) constraints['statuses'] = [Request.ST_NEW, Request.ST_ACTIVATED] - LOG.info("aV4: %s" % str(time.time())) existing_requests = self.manager.get_requests(**constraints) - LOG.info("aV5: %s" % str(time.time())) for request_id in sorted(existing_requests.iterkeys()): if existing_requests[request_id].status == Request.ST_NEW: @@ -82,7 +75,6 @@ def run(self, caller, request, inventory): break elif existing_requests[request_id].status == Request.ST_ACTIVATED: existing = existing_requests[request_id] - LOG.info("aV6: %s" % str(time.time())) if existing is None: if 'n' not in self.params: @@ -91,9 +83,13 @@ def run(self, caller, request, inventory): if 'group' not in self.params: self.params['group'] = self.default_group - LOG.info("aW: %s" % str(time.time())) - request = self.manager.create_request(caller, self.params['item'], self.params['site'], self.params['site_orig'], self.params['group'], self.params['n']) - + if 'cache' not in self.params: + LOG.info("Create request") + request = self.manager.create_request(caller, self.params['item'], self.params['site'], self.params['site_orig'], self.params['group'], self.params['n']) + else: + # We want to allow the requester to just place the request info in a cache table that dynamo will act on by itself + LOG.info("Creating caching request") + request = self.manager.create_cached_request(caller, self.params['item'][0], " ".join(self.params['site_orig']), self.params['group'], self.params['n']) else: existing.request_count += 1 existing.last_request = int(time.time()) @@ -116,7 +112,10 @@ def run(self, caller, request, inventory): LOG.error('Error in manager.unlock()') # requests is a single-element dictionary - return [request.to_dict()] + if 'cache' in self.params: + return [request] + else: + return [request.to_dict()] class PollCopyRequest(CopyRequestBase): diff --git a/lib/web/modules/request/mixin.py b/lib/web/modules/request/mixin.py index 86262405..b0715f2d 100644 --- a/lib/web/modules/request/mixin.py +++ b/lib/web/modules/request/mixin.py @@ -1,7 +1,6 @@ import re import fnmatch import logging -import time from dynamo.web.exceptions import MissingParameter, ExtraParameter, IllFormedRequest, InvalidRequest from dynamo.web.modules._common import yesno @@ -28,8 +27,6 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl LOG.info("Completed updating input.") # Check we have the right request fields - - LOG.info("A: %s" % str(time.time())) input_fields = set(request.keys()) allowed_fields = set(allowed_fields) @@ -43,8 +40,6 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl # Pick up the values and cast them to correct types - LOG.info("B: %s" % str(time.time())) - for key in ['request_id', 'n']: if key not in request: continue @@ -83,7 +78,6 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl # The only reason for this would be to make the registry not dependent on specific inventory store technology. if 'item' in self.params: - print "Found item." for item in self.params['item']: if item in inventory.datasets: # OK this is a known dataset @@ -97,7 +91,6 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl try: inventory.datasets[dataset_name].find_block(block_name, must_find = True) except: - print 'Invalid block name %s' % item raise InvalidRequest('Invalid block name %s' % item) if 'site' in self.params: @@ -108,8 +101,6 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl # Wildcard allowed if '*' in site or '?' in site or '[' in site: - LOG.info("C: %s" % str(time.time())) - self.params['site'].remove(site) pattern = re.compile(fnmatch.translate(site)) @@ -125,8 +116,6 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl if len(self.params['site']) == 0: self.params.pop('site') - LOG.info("D: %s" % str(time.time())) - if 'group' in self.params: try: inventory.groups[self.params['group']] @@ -138,7 +127,10 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl if status not in ('new', 'activated', 'completed', 'rejected', 'cancelled'): raise InvalidRequest('Invalid status value %s' % status) - LOG.info("Printing all request parameterss") + if 'cache' in request: + self.params['cache'] = True + + LOG.info("Printing all request parameterssss") LOG.info(self.params) def make_constraints(self, by_id = False): diff --git a/lib/web/modules/transfers/history.py b/lib/web/modules/transfers/history.py index 80bee08e..5352a465 100644 --- a/lib/web/modules/transfers/history.py +++ b/lib/web/modules/transfers/history.py @@ -156,7 +156,7 @@ def _time_constants(self,tmin,tmax,nbins): if nbins>0: dt = delta_t/nbins - if abs(dt-604800) < 1: + if abs(dt-604800) < 1: unit = ' / week'; elif abs(dt-86400.) < 1: unit = ' / day'; diff --git a/lib/web/server.py b/lib/web/server.py index 6b6069a5..3f9a0067 100644 --- a/lib/web/server.py +++ b/lib/web/server.py @@ -280,8 +280,12 @@ def main(self, environ, start_response): # replace content with the json string start = time.time() if self.callback is not None: + LOG.info("Callback is not None") + LOG.info(json_data) content = '%s(%s)' % (self.callback, json.dumps(json_data)) else: + LOG.info("Callback is None") + LOG.info(json_data) content = json.dumps(json_data) root_logger.info('Make JSON: %s seconds', time.time() - start) From 1382ca1a303cd86954617bfa4826c84f94633d75 Mon Sep 17 00:00:00 2001 From: Daniel Abercrombie Date: Tue, 29 Jan 2019 15:32:44 -0500 Subject: [PATCH 3/4] Take out files that are in dynamo-cms --- lib/core/components/impl/mysqlstore.py | 26 ++----------- lib/dataformat/_namespace.py | 49 ++++++------------------- lib/web/modules/inventory/_customize.py | 18 +-------- 3 files changed, 18 insertions(+), 75 deletions(-) diff --git a/lib/core/components/impl/mysqlstore.py b/lib/core/components/impl/mysqlstore.py index 6ecb0ccd..7f671a41 100644 --- a/lib/core/components/impl/mysqlstore.py +++ b/lib/core/components/impl/mysqlstore.py @@ -2,13 +2,10 @@ import logging import fnmatch import hashlib -import os from dynamo.core.components.persistency import InventoryStore from dynamo.utils.interface.mysql import MySQL from dynamo.dataformat import Configuration, Partition, Dataset, Block, File, Site, SitePartition, Group, DatasetReplica, BlockReplica -from dynamo.policy.condition import Condition -from dynamo.policy.variables import site_variables LOG = logging.getLogger(__name__) @@ -20,16 +17,6 @@ def __init__(self, config): self._mysql = MySQL(config.db_params) - config_path = os.getenv('DYNAMO_SERVER_CONFIG', '/etc/dynamo/fom_config.json') - - self.fom_config = Configuration(config_path) - self.fom_conditions = [] - - for condition_text, module, conf in self.fom_config.rlfsm.transfer: - if condition_text is not None: # default - condition = Condition(condition_text, site_variables) - self.fom_conditions.append((condition, module, conf)) - def close(self): self._mysql.close() @@ -497,9 +484,9 @@ def _save_sites(self, sites): #override self._mysql.query('CREATE TABLE `sites_tmp` LIKE `sites`') - fields = ('id', 'name', 'host', 'storage_type', 'status') + fields = ('id', 'name', 'host', 'storage_type', 'backend', 'status') mapping = lambda site: (site.id, site.name, site.host, Site.storage_type_name(site.storage_type), \ - Site.status_name(site.status)) + site.backend, Site.status_name(site.status)) num = self._mysql.insert_many('sites_tmp', fields, mapping, sites, do_update = False) @@ -759,10 +746,6 @@ def _yield_sites(self, sites_tmp = None): #override sid = site_id ) - for cond, module, conf in self.fom_conditions: - if cond.match(site): - site.x509proxy = conf.x509proxy - all_chains = {} for protocol, chain_id, idx, lfn, pfn in self._mysql.xquery(mapping_sql, site_id): try: @@ -1136,7 +1119,6 @@ def delete_blockreplica(self, block_replica): #override return sql = 'DELETE FROM `block_replicas` WHERE `block_id` = %s AND `site_id` = %s' - self._mysql.query(sql, block_id, site_id) sql = 'DELETE FROM `block_replica_files` WHERE `block_id` = %s AND `site_id` = %s' @@ -1250,8 +1232,8 @@ def delete_partition(self, partition): #override self._mysql.query(sql, partition.name) def save_site(self, site): #override - fields = ('name', 'host', 'storage_type', 'status') - self._mysql.insert_update('sites', fields, site.name, site.host, site.storage_type, site.status) + fields = ('name', 'host', 'storage_type', 'backend', 'status') + self._mysql.insert_update('sites', fields, site.name, site.host, site.storage_type, site.backend, site.status) site_id = self._mysql.last_insert_id if site_id != 0: diff --git a/lib/dataformat/_namespace.py b/lib/dataformat/_namespace.py index b84f2583..5a7387a1 100644 --- a/lib/dataformat/_namespace.py +++ b/lib/dataformat/_namespace.py @@ -1,43 +1,24 @@ # Namespace-specific rules for e.g. object name conversions -import re - from exceptions import ObjectError def Dataset_format_software_version(value): - if type(value) is str: - formatted = eval(value) - elif type(value) is not tuple: - # some iterable - formatted = tuple(value) - else: - formatted = value - - if type(formatted) is not tuple or len(formatted) != 4: - raise ObjectError('Invalid software version %s' % repr(value)) - - return formatted + return value def Block_to_internal_name(name_str): - # block name format: [8]-[4]-[4]-[4]-[12] where [n] is an n-digit hex. - try: - return long(name_str.replace('-', ''), 16) - except ValueError: - raise ObjectError('Invalid block name %s' % name_str) + return name_str def Block_to_real_name(name): - full_string = hex(name).replace('0x', '')[:-1] # last character is 'L' - if len(full_string) < 32: - full_string = '0' * (32 - len(full_string)) + full_string - - return full_string[:8] + '-' + full_string[8:12] + '-' + full_string[12:16] + '-' + full_string[16:20] + '-' + full_string[20:] + return name def Block_to_full_name(dataset_name, block_real_name): return dataset_name + '#' + block_real_name def Block_from_full_name(full_name): - # return dataset name, block internal name - + """ + @param full_name Full name of the block + @return (dataset name, block internal name) + """ delim = full_name.find('#') if delim == -1: raise ObjectError('Invalid block name %s' % full_name) @@ -46,30 +27,24 @@ def Block_from_full_name(full_name): def customize_dataset(Dataset): # Enumerator for dataset type. - # Starting from 1 to play better with MySQL - Dataset._data_types = ['unknown', 'align', 'calib', 'cosmic', 'data', 'lumi', 'mc', 'raw', 'test'] + # Starting from 1 to play better with MySQL enums + Dataset._data_types = ['unknown', 'production', 'test'] for name, val in zip(Dataset._data_types, range(1, len(Dataset._data_types) + 1)): # e.g. Dataset.TYPE_UNKNOWN = 1 setattr(Dataset, 'TYPE_' + name.upper(), val) - Dataset.SoftwareVersion.field_names = ('cycle', 'major', 'minor', 'suffix') + Dataset.SoftwareVersion.field_names = ('version',) Dataset.format_software_version = staticmethod(Dataset_format_software_version) - Dataset.name_pattern = re.compile('/[^/]+/[^/]+/[^/]+') - def customize_block(Block): Block.to_internal_name = staticmethod(Block_to_internal_name) Block.to_real_name = staticmethod(Block_to_real_name) Block.to_full_name = staticmethod(Block_to_full_name) Block.from_full_name = staticmethod(Block_from_full_name) - hex_chars = '[0-9a-fA-F]' - Block.name_pattern = re.compile('{h}{{8}}-{h}{{4}}-{h}{{4}}-{h}{{4}}-{h}{{12}}'.format(h = hex_chars)) - def customize_file(File): - File.checksum_algorithms = ('crc32', 'adler32') + pass def customize_blockreplica(BlockReplica): - BlockReplica._use_file_ids = True - + pass diff --git a/lib/web/modules/inventory/_customize.py b/lib/web/modules/inventory/_customize.py index 80542b09..35e75995 100644 --- a/lib/web/modules/inventory/_customize.py +++ b/lib/web/modules/inventory/_customize.py @@ -1,16 +1,2 @@ -import collections -from dynamo.dataformat import Dataset, Site, Group - -def campaign_name(dataset): - sd = dataset.name[dataset.name.find('/', 1) + 1:dataset.name.rfind('/')] - return sd[:sd.find('-')] - -def customize_stats(categories): - categories.categories = collections.OrderedDict([ - ('campaign', ('Production campaign', Dataset, campaign_name)), - ('data_tier', ('Data tier', Dataset, lambda d: d.name[d.name.rfind('/') + 1:])), - ('dataset_status', ('Dataset status', Dataset, lambda d: Dataset.status_name(d.status))), - ('dataset', ('Dataset name', Dataset, lambda d: d.name)), - ('site', ('Site name', Site, lambda s: s.name)), - ('group', ('Group name', Group, lambda g: g.name)) - ]) +def customize_stats(InventoryStatCategories): + pass From d73ff868998320f05f0713d9151ed51090eb9817 Mon Sep 17 00:00:00 2001 From: Daniel Abercrombie Date: Thu, 21 Feb 2019 15:32:47 -0500 Subject: [PATCH 4/4] Another update --- lib/detox/conditions.py | 7 ------ lib/fileop/impl/fts.py | 2 -- lib/fileop/rlfsm.py | 44 ++++----------------------------- lib/request/common.py | 2 +- lib/web/modules/request/copy.py | 25 ++++++++++++------- lib/web/server.py | 4 +++ 6 files changed, 26 insertions(+), 58 deletions(-) diff --git a/lib/detox/conditions.py b/lib/detox/conditions.py index ca558e73..51affa23 100644 --- a/lib/detox/conditions.py +++ b/lib/detox/conditions.py @@ -1,16 +1,9 @@ -import logging - from dynamo.policy.condition import Condition from dynamo.policy.variables import site_variables, replica_variables -from dynamo.policy.predicates import Predicate - -LOG = logging.getLogger(__name__) - class ReplicaCondition(Condition): def __init__(self, text): Condition.__init__(self, text, replica_variables) - self.text = text def get_matching_blocks(self, replica): """If this is a block-level condition, return the list of matching block replicas.""" diff --git a/lib/fileop/impl/fts.py b/lib/fileop/impl/fts.py index e194954b..de3326fb 100644 --- a/lib/fileop/impl/fts.py +++ b/lib/fileop/impl/fts.py @@ -403,8 +403,6 @@ def _submit_job(self, job, optype, batch_id, pfn_to_tid): LOG.error('Failed to submit %s to FTS: Exception %s (%s)', optype, exc_type.__name__, str(exc)) return False - LOG.info('FTS job id: %s', job_id) - # list of file-level operations (one-to-one with pfn) try: if optype == 'transfer' or optype == 'staging': diff --git a/lib/fileop/rlfsm.py b/lib/fileop/rlfsm.py index 9c34d24f..951d585e 100644 --- a/lib/fileop/rlfsm.py +++ b/lib/fileop/rlfsm.py @@ -657,15 +657,6 @@ def get_subscriptions(self, inventory, op = None, status = None): if replica.site.storage_type == Site.TYPE_DISK: disk_sources.append(replica.site) elif replica.site.storage_type == Site.TYPE_MSS: - if 'DESY' in dest_replica.site.name: - LOG.info('Here comes the bride') - LOG.info(replica.site) - LOG.info(destination) - LOG.info(row) - #self.cancel_subscription(destination, lfile) - #self.subscribe_file(destination, lfile) - #skip_rest = True - #break tape_sources.append(replica.site) if skip_rest: continue @@ -732,8 +723,6 @@ def get_subscriptions(self, inventory, op = None, status = None): if len(all_failed) != 0: msg += ', %d held with reason "all_failed"' % len(all_failed) - LOG.info(msg) - if not self._read_only: self.db.execute_many('UPDATE `file_subscriptions` SET `status` = \'done\', `last_update` = NOW()', 'id', to_done) self.db.execute_many('UPDATE `file_subscriptions` SET `status` = \'held\', `hold_reason` = \'no_source\', `last_update` = NOW()', 'id', no_source) @@ -910,7 +899,6 @@ def _update_status(self, optype, inventory): history_table_name = 'file_transfers' history_site_fields = ('source_id', 'destination_id') else: - LOG.info("WTFFFFFFFF UUU") history_table_name = 'file_deletions' history_site_fields = ('site_id',) @@ -954,12 +942,6 @@ def _update_status(self, optype, inventory): for condition, query in self.transfer_queries: - - LOG.info("xxxxxxxxxxxxxxxx") - LOG.info(condition) - LOG.info(query) - LOG.info("xxxxxxxxxxxxxxxx") - results = query.get_transfer_status(batch_id) if len(results) != 0: break @@ -972,20 +954,12 @@ def _update_status(self, optype, inventory): batch_complete = True - if len(results) == 0: - LOG.info('did I just fail? ' + str(batch_id)) - - - LOG.info("yyyyyyyyyy") - LOG.info(len(results)) - LOG.info("yyyyyyyyyy") for task_id, status, exitcode, message, start_time, finish_time in results: # start_time and finish_time can be None LOG.debug('%s result: %d %s %d %s %s', optype, task_id, FileQuery.status_name(status), exitcode, start_time, finish_time) if status == FileQuery.STAT_DONE: - LOG.info('%s RESSSSULT: %d %s %d %s %s', optype, task_id, FileQuery.status_name(status), exitcode, start_time, finish_time) num_success += 1 elif status == FileQuery.STAT_FAILED: num_failure += 1 @@ -1044,26 +1018,18 @@ def _update_status(self, optype, inventory): if self._read_only: history_id = 0 else: - #LOG.info("A:") - #LOG.info(history_table_name) - #LOG.info("B:") - #LOG.info(history_fields) - #LOG.info("C:") - #LOG.info(values) - + #LOG.error(values) #values_tmp = set() #for v in values: # try: - # tmp = v.replace(u"\u2018", "'").replace(u"\u2019", "'") + # tmp = v.replace(u"\u2019", "'") # values_tmp.add(tmp) # except: - # values_tmp.add(v) + # values_tmp.add(v) + #LOG.error(history_fields) + #LOG.error(values_tmp) #values = values_tmp - - LOG.info(values) - - history_id = self.history_db.db.insert_get_id(history_table_name, history_fields, values) if optype == 'transfer': diff --git a/lib/request/common.py b/lib/request/common.py index b419f971..c0e74af7 100644 --- a/lib/request/common.py +++ b/lib/request/common.py @@ -3,7 +3,7 @@ from dynamo.utils.interface.mysql import MySQL from dynamo.history.history import HistoryDatabase -from dynamo.registry.registry import RegistryDatabase#, CacheDatabase +from dynamo.registry.registry import RegistryDatabase, CacheDatabase import dynamo.dataformat as df from dynamo.dataformat.request import Request, RequestAction diff --git a/lib/web/modules/request/copy.py b/lib/web/modules/request/copy.py index a1501c97..3933875f 100644 --- a/lib/web/modules/request/copy.py +++ b/lib/web/modules/request/copy.py @@ -35,6 +35,7 @@ def run(self, caller, request, inventory): self.parse_input(request, inventory, ('request_id', 'item', 'site', 'group', 'n', 'cache')) self.manager.lock() + LOG.info("aV1: %s" % str(time.time())) try: existing = None @@ -54,6 +55,8 @@ def run(self, caller, request, inventory): raise InvalidRequest('Request %d cannot be updated any more' % request_id) else: + LOG.info("aV2: %s" % str(time.time())) + # create a new request if 'item' not in self.params: raise MissingParameter('item') @@ -64,17 +67,19 @@ def run(self, caller, request, inventory): else: self.params['site'] = list(self.default_sites) - constraints = self.make_constraints(by_id = False) + if 'cache' not in self.params: + # This only has to be done if we do not want to stupidly dump things into the cache table + constraints = self.make_constraints(by_id = False) - constraints['statuses'] = [Request.ST_NEW, Request.ST_ACTIVATED] - existing_requests = self.manager.get_requests(**constraints) + constraints['statuses'] = [Request.ST_NEW, Request.ST_ACTIVATED] + existing_requests = self.manager.get_requests(**constraints) - for request_id in sorted(existing_requests.iterkeys()): - if existing_requests[request_id].status == Request.ST_NEW: - existing = existing_requests[request_id] - break - elif existing_requests[request_id].status == Request.ST_ACTIVATED: - existing = existing_requests[request_id] + for request_id in sorted(existing_requests.iterkeys()): + if existing_requests[request_id].status == Request.ST_NEW: + existing = existing_requests[request_id] + break + elif existing_requests[request_id].status == Request.ST_ACTIVATED: + existing = existing_requests[request_id] if existing is None: if 'n' not in self.params: @@ -85,10 +90,12 @@ def run(self, caller, request, inventory): if 'cache' not in self.params: LOG.info("Create request") + LOG.info("aV3: %s" % str(time.time())) request = self.manager.create_request(caller, self.params['item'], self.params['site'], self.params['site_orig'], self.params['group'], self.params['n']) else: # We want to allow the requester to just place the request info in a cache table that dynamo will act on by itself LOG.info("Creating caching request") + LOG.info("aV4: %s" % str(time.time())) request = self.manager.create_cached_request(caller, self.params['item'][0], " ".join(self.params['site_orig']), self.params['group'], self.params['n']) else: existing.request_count += 1 diff --git a/lib/web/server.py b/lib/web/server.py index 3f9a0067..99ede4c7 100644 --- a/lib/web/server.py +++ b/lib/web/server.py @@ -465,6 +465,8 @@ def _main(self, environ): # Even though our default content type is URL form, we check if this is a JSON try: + LOG.info("Printing post_data") + LOG.info(post_data) json_data = json.loads(post_data) except: if content_type == 'application/json': @@ -478,6 +480,8 @@ def _main(self, environ): if content_type == 'application/x-www-form-urlencoded': try: + LOG.info("Printing post_data 2") + LOG.info(post_data) post_request = parse_qs(post_data) except: self.code = 400