diff --git a/lib/core/components/impl/socketappserver.py b/lib/core/components/impl/socketappserver.py index 2d299551..789f9ad4 100644 --- a/lib/core/components/impl/socketappserver.py +++ b/lib/core/components/impl/socketappserver.py @@ -250,7 +250,7 @@ def _process_application(self, conn, addr): dn = '' for rdn in user_cert_data['subject']: - dn += '/' + '+'.join('%s=%s' % (DN_TRANSLATION[key], value) for key, value in rdn if key in DN_TRANSLATION) + dn += '/' + '+'.join('%s=%s' % (DN_TRANSLATION[key], value) for key, value in rdn) user_info = master.identify_user(dn = dn, check_trunc = True) diff --git a/lib/dataformat/site.py b/lib/dataformat/site.py index 22a194c6..dbd2f86a 100644 --- a/lib/dataformat/site.py +++ b/lib/dataformat/site.py @@ -8,7 +8,7 @@ class Site(object): """Represents a site. Owns lists of dataset and block replicas, which are organized into partitions.""" __slots__ = ['_name', 'id', 'host', 'storage_type', 'backend', 'status', 'filename_mapping', - '_dataset_replicas', 'partitions'] + '_dataset_replicas', 'partitions', 'x509proxy'] _storage_types = ['disk', 'mss', 'buffer', 'unknown'] TYPE_DISK, TYPE_MSS, TYPE_BUFFER, TYPE_UNKNOWN = range(1, len(_storage_types) + 1) @@ -90,7 +90,7 @@ def map(self, lfn): return None - def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', status = STAT_UNKNOWN, filename_mapping = {}, sid = 0): + def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', status = STAT_UNKNOWN, filename_mapping = {}, x509proxy = None, sid = 0): self._name = name self.host = host self.storage_type = Site.storage_type_val(storage_type) @@ -107,18 +107,21 @@ def __init__(self, name, host = '', storage_type = TYPE_DISK, backend = '', stat self.partitions = {} # {Partition: SitePartition} + self.x509proxy = x509proxy + def __str__(self): - return 'Site %s (host=%s, storage_type=%s, backend=%s, status=%s, id=%d)' % \ - (self._name, self.host, Site.storage_type_name(self.storage_type), self.backend, Site.status_name(self.status), self.id) + return 'Site %s (host=%s, storage_type=%s, backend=%s, status=%s, x509=%s, id=%d)' % \ + (self._name, self.host, Site.storage_type_name(self.storage_type), self.backend, Site.status_name(self.status), self.x509proxy, self.id) def __repr__(self): - return 'Site(%s,%s,\'%s\',%s,\'%s\',%s,%d)' % \ - (repr(self._name), repr(self.host), Site.storage_type_name(self.storage_type), repr(self.backend), Site.status_name(self.status), repr(self.filename_mapping), self.id) + return 'Site(%s,%s,\'%s\',%s,\'%s\',%s,%s,%d)' % \ + (repr(self._name), repr(self.host), Site.storage_type_name(self.storage_type), repr(self.backend), Site.status_name(self.status), repr(self.filename_mapping), repr(self.x509proxy), self.id) def __eq__(self, other): return self is other or \ (self._name == other._name and self.host == other.host and self.storage_type == other.storage_type and \ - self.backend == other.backend and self.status == other.status and self.filename_mapping == other.filename_mapping) + self.backend == other.backend and self.status == other.status and \ + self.filename_mapping == other.filename_mapping and self.x509proxy == other.x509proxy) def __ne__(self, other): return not self.__eq__(other) @@ -134,6 +137,8 @@ def copy(self, other): for protocol, mapping in other.filename_mapping.iteritems(): self.filename_mapping[protocol] = Site.FileNameMapping(mapping._chains) + self.x509proxy = other.x509proxy + def embed_into(self, inventory, check = False): updated = False diff --git a/lib/dealer/plugins/popularity.py b/lib/dealer/plugins/popularity.py index 689b902d..b2804d7b 100644 --- a/lib/dealer/plugins/popularity.py +++ b/lib/dealer/plugins/popularity.py @@ -2,6 +2,8 @@ import math from dynamo.dataformat import Dataset +from dynamo.policy.variables import replica_variables +from dynamo.policy.condition import Condition from base import BaseHandler, DealerRequest LOG = logging.getLogger(__name__) @@ -19,6 +21,10 @@ def __init__(self, config): self.max_dataset_size = config.max_dataset_size * 1.e+12 self.max_replication = config.max_replication self.request_to_replica_threshold = config.request_to_replica_threshold + try: + self.condition = Condition(config.condition, replica_variables) + except: + self.condition = None self._datasets = [] @@ -36,6 +42,7 @@ def get_requests(self, inventory, policy): # override LOG.debug('Dataset %s request weight %f', dataset.name, request_weight) dataset_in_source_groups = False + for dr in dataset.replicas: for br in dr.block_replicas: if br.group.name in self.source_groups: @@ -46,6 +53,9 @@ def get_requests(self, inventory, policy): # override if not dataset_in_source_groups: continue + if not self.condition.match(dataset): + continue + if request_weight <= 0.: continue diff --git a/lib/detox/conditions.py b/lib/detox/conditions.py index 14e55c71..51affa23 100644 --- a/lib/detox/conditions.py +++ b/lib/detox/conditions.py @@ -4,7 +4,7 @@ class ReplicaCondition(Condition): def __init__(self, text): Condition.__init__(self, text, replica_variables) - + def get_matching_blocks(self, replica): """If this is a block-level condition, return the list of matching block replicas.""" @@ -18,3 +18,5 @@ def get_matching_blocks(self, replica): class SiteCondition(Condition): def __init__(self, text): Condition.__init__(self, text, site_variables) + + diff --git a/lib/detox/detoxpolicy.py b/lib/detox/detoxpolicy.py index 01111b54..efdbfac6 100644 --- a/lib/detox/detoxpolicy.py +++ b/lib/detox/detoxpolicy.py @@ -83,9 +83,11 @@ def evaluate(self, replica): if self.condition.match(replica): self.has_match = True + if issubclass(self.decision.action_cls, BlockAction): # block-level matching_block_replicas = self.condition.get_matching_blocks(replica) + if len(matching_block_replicas) == len(replica.block_replicas): # but all blocks matched - return dataset level action = self.decision.action_cls.dataset_level(self) @@ -246,3 +248,4 @@ def evaluate(self, replica): replica.block_replicas.update(block_replicas_tmp) return actions + diff --git a/lib/fileop/history.py b/lib/fileop/history.py index 070e80e8..71ec7d82 100644 --- a/lib/fileop/history.py +++ b/lib/fileop/history.py @@ -30,41 +30,6 @@ def histogram_binning(tmin,tmax): return (nbins,dt) -#class OperationFilter: -# """ -# Allows to generate a sql filter string to be applied to the query of historic file -# operations tables. -# """ -# def __init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'): -# self.source_filter = source_filter -# self.mss_filter = mss_filter -# self.period = period -# self.upto = upto -# self.exit_code = exit_code -# -# def generate_filter_string(self): -# -# where_sql = "" -# -# return where_sql -# -#class DeletionFilter(OperationFilter): -# """ -# Allows to generate a sql filter string to be applied to the query of historic file -# deletions tables. -# """ -# def __init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'): -# OperationFilter.__init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0') -# -#class TransferFilter(OperationFilter): -# """ -# Allows to generate a sql filter string to be applied to the query of historic file -# transfer tables. -# """ -# def __init__(self,source_filter="",destination_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0'): -# OperationFilter.__init__(self,source_filter="",mss_filter=True,period='24h',upto='0h',exit_code='0') -# self.destintation_filter = destination_filter - class Sites: """ Defines the sites. diff --git a/lib/fileop/impl/fts.py b/lib/fileop/impl/fts.py index a1088f86..de3326fb 100644 --- a/lib/fileop/impl/fts.py +++ b/lib/fileop/impl/fts.py @@ -49,6 +49,7 @@ def __init__(self, config): # Proxy to be forwarded to FTS self.x509proxy = config.get('x509proxy', None) + self.x509proxy_orig = config.get('x509proxy', None) # Bookkeeping device self.db = MySQL(config.db_params) @@ -67,7 +68,15 @@ def num_pending_transfers(self): #override file_states = ['SUBMITTED', 'READY', 'ACTIVE', 'STAGING', 'STARTED'] jobs = self._ftscall('list_jobs', state_in = ['SUBMITTED', 'ACTIVE', 'STAGING']) + from random import shuffle + shuffle(jobs) + + total_count = 0 for job in jobs: + total_count = total_count + 1 +# if total_count > 50: +# LOG.info('-- in fts: total_count > ' + str(total_count)) +# break job_info = self._ftscall('get_job_status', job['job_id'], list_files = True) for file_info in job_info['files']: if file_info['file_state'] in file_states: @@ -87,7 +96,15 @@ def num_pending_deletions(self): #override file_states = ['SUBMITTED', 'READY', 'ACTIVE'] jobs = self._ftscall('list_jobs', state_in = ['SUBMITTED', 'ACTIVE']) + from random import shuffle + shuffle(jobs) + + total_count = 0 for job in jobs: + total_count = total_count + 1 +# if total_count > 1: +# LOG.info('-- in fts: total_count > ' + str(total_count)) +# break job_info = self._ftscall('get_job_status', job['job_id'], list_files = True) for file_info in job_info['dm']: if file_info['file_state'] in file_states: @@ -127,6 +144,20 @@ def start_transfers(self, batch_id, batch_tasks): #override dest_pfn = sub.destination.to_pfn(lfn, 'gfal2') source_pfn = task.source.to_pfn(lfn, 'gfal2') + self.x509proxy = self.x509proxy_orig + #if sub.destination.x509proxy is not None: + self.x509proxy = sub.destination.x509proxy + + if task.source.storage_type == Site.TYPE_MSS: + self.x509proxy = task.source.x509proxy + + #LOG.info("CCCCCCCCCCC") + #LOG.info("File is: %s" % sub.file.lfn) + #LOG.info("Destination is: %s" % sub.destination.name) + #LOG.info("Source is: %s" % task.source.name) + #LOG.info("x509 is: %s" % self.x509proxy) + #LOG.info("CCCCCCCCCCC") + if dest_pfn is None or source_pfn is None: # either gfal2 is not supported or lfn could not be mapped LOG.warning('Could not obtain PFN for %s at %s or %s', lfn, sub.destination.name, task.source.name) @@ -173,7 +204,8 @@ def start_transfers(self, batch_id, batch_tasks): #override if len(transfers) != 0: LOG.debug('Submit new transfer job for %d files', len(transfers)) - job = fts3.new_job(transfers, retry = self.fts_retry, overwrite = True, verify_checksum = verify_checksum, metadata = self.metadata_string) + job = fts3.new_job(transfers, retry = self.fts_retry, overwrite = True, + verify_checksum = verify_checksum, metadata = self.metadata_string) success = self._submit_job(job, 'transfer', batch_id, dict((pfn, task.id) for pfn, task in t_pfn_to_task.iteritems())) for transfer in transfers: @@ -317,6 +349,7 @@ def _ftscallurl(self, url): return self._do_ftscall(url = url) def _do_ftscall(self, binding = None, url = None): + if self._context is None: # request_class = Request -> use "requests"-based https call (instead of default PyCURL, # which may not be able to handle proxy certificates depending on the cURL installation) @@ -337,10 +370,12 @@ def _do_ftscall(self, binding = None, url = None): LOG.debug('FTS: %s', reqstring) wait_time = 1. + for attempt in xrange(10): try: if binding is not None: method, args, kwd = binding + return getattr(fts3, method)(context, *args, **kwd) else: return json.loads(context.get(url)) @@ -368,8 +403,6 @@ def _submit_job(self, job, optype, batch_id, pfn_to_tid): LOG.error('Failed to submit %s to FTS: Exception %s (%s)', optype, exc_type.__name__, str(exc)) return False - LOG.debug('FTS job id: %s', job_id) - # list of file-level operations (one-to-one with pfn) try: if optype == 'transfer' or optype == 'staging': @@ -456,6 +489,7 @@ def _get_status(self, batch_id, optype): result = self._ftscall('get_job_status', job_id = job_id, list_files = True) except: LOG.error('Failed to get job status for FTS job %s', job_id) + LOG.error(optype) continue if optype == 'transfer' or optype == 'staging': diff --git a/lib/fileop/rlfsm.py b/lib/fileop/rlfsm.py index 36d9e247..951d585e 100644 --- a/lib/fileop/rlfsm.py +++ b/lib/fileop/rlfsm.py @@ -5,6 +5,8 @@ import datetime import threading import logging +import datetime +import time from dynamo.fileop.base import FileQuery from dynamo.fileop.transfer import FileTransferOperation, FileTransferQuery @@ -45,6 +47,14 @@ def __init__(self, subscription, source): self.subscription = subscription self.source = source + def __str__(self): + s = '' + s += self.source.name + s += '->' + s += self.subscription.destination.name + s += ' ' + self.subscription.file.lfn + return s + class Desubscription(object): __slots__ = ['id', 'status', 'file', 'site'] @@ -88,7 +98,7 @@ def __init__(self, config = None): condition = Condition(condition_text, site_variables) self.transfer_operations.append((condition, FileTransferOperation.get_instance(module, conf))) - + if 'transfer_query' in config: self.transfer_queries = [] for condition_text, module, conf in config.transfer_query: @@ -189,7 +199,7 @@ def transfer_files(self, inventory): @param inventory The inventory. """ - + self._cleanup() LOG.debug('Clearing cancelled transfer tasks.') @@ -201,7 +211,7 @@ def transfer_files(self, inventory): return LOG.debug('Fetching subscription status from the file operation agent.') - self._update_status('transfer') + self._update_status('transfer', inventory) if self.cycle_stop.is_set(): return @@ -279,7 +289,7 @@ def issue_tasks(op, my_tasks): num_success = 0 num_failure = 0 num_batches = 0 - + for condition, op in self.transfer_operations: if condition is None: default_op = op @@ -310,6 +320,7 @@ def issue_tasks(op, my_tasks): num_success += ns num_failure += nf + if num_success + num_failure != 0: LOG.info('Issued transfer tasks: %d success, %d failure. %d batches.', num_success, num_failure, num_batches) else: @@ -339,7 +350,7 @@ def delete_files(self, inventory): return LOG.debug('Fetching deletion status from the file operation agent.') - completed = self._update_status('deletion') + completed = self._update_status('deletion', inventory) LOG.debug('Recording candidates for empty directories.') self._set_dirclean_candidates(completed, inventory) @@ -533,7 +544,8 @@ def get_subscriptions(self, inventory, op = None, status = None): subscriptions = [] - get_all = 'SELECT u.`id`, u.`status`, u.`delete`, f.`block_id`, f.`name`, s.`name`, u.`hold_reason` FROM `file_subscriptions` AS u' + get_all = 'SELECT u.`id`, u.`status`, u.`delete`, f.`block_id`, f.`name`, s.`name`, u.`hold_reason`, u.`created`' + get_all += ' FROM `file_subscriptions` AS u' get_all += ' INNER JOIN `files` AS f ON f.`id` = u.`file_id`' get_all += ' INNER JOIN `sites` AS s ON s.`id` = u.`site_id`' @@ -564,8 +576,10 @@ def get_subscriptions(self, inventory, op = None, status = None): COPY = 0 DELETE = 1 + now_time = int(time.time()) + for row in self.db.query(get_all): - sub_id, st, optype, block_id, file_name, site_name, hold_reason = row + sub_id, st, optype, block_id, file_name, site_name, hold_reason, created = row if site_name != _destination_name: _destination_name = site_name @@ -583,7 +597,8 @@ def get_subscriptions(self, inventory, op = None, status = None): if block_id != _block_id: lfile = inventory.find_file(file_name) if lfile is None: - # Dataset, block, or file was deleted from the inventory earlier in this process (deletion not reflected in the inventory store yet) + # Dataset, block, or file was deleted from the inventory earlier in this process + #(deletion not reflected in the inventory store yet) continue _block_id = block_id @@ -593,7 +608,8 @@ def get_subscriptions(self, inventory, op = None, status = None): else: lfile = block.find_file(file_name) if lfile is None: - # Dataset, block, or file was deleted from the inventory earlier in this process (deletion not reflected in the inventory store yet) + # Dataset, block, or file was deleted from the inventory earlier in this process + #(deletion not reflected in the inventory store yet) continue if dest_replica is None and st != 'cancelled': @@ -615,6 +631,13 @@ def get_subscriptions(self, inventory, op = None, status = None): tape_sources = None failed_sources = None + + #create_time = int( time.mktime(datetime.datetime(created).timetuple()) ) + #if (now_time-create_time) > 7*(60*60*24): + # LOG.info('---- very old request-----') + LOG.info(row) + + if st not in ('done', 'held', 'cancelled'): if dest_replica.has_file(lfile): LOG.debug('%s already exists at %s', file_name, site_name) @@ -625,6 +648,7 @@ def get_subscriptions(self, inventory, op = None, status = None): else: disk_sources = [] tape_sources = [] + skip_rest = False for replica in block.replicas: if replica.site == destination or replica.site.status != Site.STAT_READY: continue @@ -634,9 +658,11 @@ def get_subscriptions(self, inventory, op = None, status = None): disk_sources.append(replica.site) elif replica.site.storage_type == Site.TYPE_MSS: tape_sources.append(replica.site) + if skip_rest: + continue if len(disk_sources) + len(tape_sources) == 0: - LOG.warning('Transfer of %s to %s has no source.', file_name, site_name) + LOG.info('Transfer of %s to %s has no source.', file_name, site_name) no_source.append(sub_id) st = 'held' @@ -666,6 +692,8 @@ def get_subscriptions(self, inventory, op = None, status = None): # This site failed for a recoverable reason break else: + LOG.info('Number of disk sources: '+ str(len(disk_sources))) + LOG.info('Number of tape sources: '+ str(len(tape_sources))) # last failure from all sites due to irrecoverable errors LOG.warning('Transfer of %s to %s failed from all sites.', file_name, site_name) all_failed.append(sub_id) @@ -695,8 +723,6 @@ def get_subscriptions(self, inventory, op = None, status = None): if len(all_failed) != 0: msg += ', %d held with reason "all_failed"' % len(all_failed) - LOG.info(msg) - if not self._read_only: self.db.execute_many('UPDATE `file_subscriptions` SET `status` = \'done\', `last_update` = NOW()', 'id', to_done) self.db.execute_many('UPDATE `file_subscriptions` SET `status` = \'held\', `hold_reason` = \'no_source\', `last_update` = NOW()', 'id', no_source) @@ -736,7 +762,7 @@ def release_subscription(self, subscription): return self.db.query('DELETE FROM `failed_transfers` WHERE `subscription_id` = %s', subscription.id) - self.db.query('UPDATE `file_subscriptions` SET `status` = \'retry\' WHERE `id` = %s', subscription.id) + self.db.query('UPDATE `file_subscriptions` SET `status` = \'new\' WHERE `id` = %s', subscription.id) def _run_cycle(self, inventory): while True: @@ -852,7 +878,7 @@ def _get_cancelled_tasks(self, optype): sql += ' WHERE u.`status` = \'cancelled\' AND u.`delete` = %d' % delete return self.db.query(sql) - def _update_status(self, optype): + def _update_status(self, optype, inventory): if optype == 'transfer': site_columns = 'ss.`name`, sd.`name`' site_joins = ' INNER JOIN `sites` AS ss ON ss.`id` = q.`source_id`' @@ -900,21 +926,33 @@ def _update_status(self, optype): # Collect completed tasks + total_counter = 0 + for batch_id in self.db.query('SELECT `id` FROM `{op}_batches`'.format(op = optype)): results = [] + if optype == 'transfer': - for _, query in self.transfer_queries: + + total_counter = total_counter + 1 + +# if total_counter > 200: +# LOG.info('-- getiing out transfer_status as counter > ' + str(total_counter)) +# break + + + for condition, query in self.transfer_queries: results = query.get_transfer_status(batch_id) if len(results) != 0: break else: - for _, query in self.deletion_queries: + for condition, query in self.deletion_queries: results = query.get_deletion_status(batch_id) if len(results) != 0: break + batch_complete = True for task_id, status, exitcode, message, start_time, finish_time in results: @@ -980,6 +1018,18 @@ def _update_status(self, optype): if self._read_only: history_id = 0 else: + #LOG.error(values) + #values_tmp = set() + #for v in values: + # try: + # tmp = v.replace(u"\u2019", "'") + # values_tmp.add(tmp) + # except: + # values_tmp.add(v) + #LOG.error(history_fields) + #LOG.error(values_tmp) + #values = values_tmp + history_id = self.history_db.db.insert_get_id(history_table_name, history_fields, values) if optype == 'transfer': diff --git a/lib/policy/condition.py b/lib/policy/condition.py index 49390d34..b27d68c8 100644 --- a/lib/policy/condition.py +++ b/lib/policy/condition.py @@ -1,5 +1,9 @@ +import logging + from dynamo.policy.predicates import Predicate +LOG = logging.getLogger(__name__) + class Condition(object): """AND-chained Predicates.""" @@ -10,6 +14,20 @@ def __init__(self, text, variables): pred_strs = map(str.strip, text.split(' and ')) + self.time_condition = None + + tmp = '' + if ' until ' in pred_strs[-1]: + tmp = pred_strs[-1].split(' until ') + tmp[-1] = 'until ' + tmp[-1] + elif ' from ' in pred_strs[-1]: + tmp = pred_strs[-1].split(' from ') + tmp[-1] = 'from ' + tmp[-1] + if tmp != '': + pred_strs[-1] = tmp[0] + self.time_condition = tmp[-1] + + # parsing the individual components for pred_str in pred_strs: words = pred_str.split() @@ -42,6 +60,18 @@ def __repr__(self): return 'Condition(\'%s\')' % self.text def match(self, obj): + if self.time_condition is not None: + if 'until' in self.time_condition: + proc = subprocess.Popen(['date', '-d', self.time_condition.split('until ')[1], '+%s'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) + unixt, err = proc.communicate() + if time.time() > unixt: + return False + else: # from + proc = subprocess.Popen(['date', '-d', self.time_condition.split('from ')[1], '+%s'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) + unixt, err = proc.communicate() + if time.time() < unixt: + return False + for predicate in self.predicates: if not predicate(obj): return False diff --git a/lib/policy/predicates.py b/lib/policy/predicates.py index 47ff3b7a..ceb11d60 100644 --- a/lib/policy/predicates.py +++ b/lib/policy/predicates.py @@ -26,7 +26,6 @@ def get(variable, op = '', rhs_expr = ''): if rhs_expr == '': raise InvalidOperator(op) return SetElementExpr.get(variable, op, rhs_expr) - else: raise InvalidOperator(op) diff --git a/lib/policy/variables.py b/lib/policy/variables.py index 2dc7d91b..f296dae6 100644 --- a/lib/policy/variables.py +++ b/lib/policy/variables.py @@ -241,6 +241,10 @@ def __init__(self): BlockReplicaAttr.__init__(self, Attr.BOOL_TYPE) def _get(self, replica): + if not replica.is_complete(): + # We dont want to delete incomplete blocks anyway + return False + for rep in replica.block.replicas: if rep.site.storage_type == Site.TYPE_MSS and rep.is_complete(): return True diff --git a/lib/registry/registry.py b/lib/registry/registry.py index 444ff5ca..27475051 100644 --- a/lib/registry/registry.py +++ b/lib/registry/registry.py @@ -96,3 +96,28 @@ def unlock_app(self, app, user, service = None): self.db.query('ALTER TABLE `activity_lock` AUTO_INCREMENT = 1') self.db.unlock_tables() + + + +class CacheDatabase(RegistryDatabase): + """ + Similar to HistoryDatabase, this is just one abstraction layer that doesn't really hide the + backend technology for the registry. We still have the benefit of being able to use default + parameters to initialize the registry database handle. + """ + + # default configuration + _config = Configuration() + + @staticmethod + def set_default(config): + CacheDatabase._config = Configuration(config) + + def __init__(self, config = None): + if config is None: + config = CacheDatabase._config + + self.db = MySQL(config.db_params) + + self.set_read_only(config.get('read_only', False)) + diff --git a/lib/request/common.py b/lib/request/common.py index ed86b064..c0e74af7 100644 --- a/lib/request/common.py +++ b/lib/request/common.py @@ -1,8 +1,9 @@ import logging +import time from dynamo.utils.interface.mysql import MySQL from dynamo.history.history import HistoryDatabase -from dynamo.registry.registry import RegistryDatabase +from dynamo.registry.registry import RegistryDatabase, CacheDatabase import dynamo.dataformat as df from dynamo.dataformat.request import Request, RequestAction @@ -33,10 +34,12 @@ def __init__(self, optype, config = None): self.registry = RegistryDatabase(config.get('registry', None)) self.history = HistoryDatabase(config.get('history', None)) + #self.cache = CacheDatabase(config.get('cache', None)) # we'll be using temporary tables self.registry.db.reuse_connection = True self.history.db.reuse_connection = True + #self.cache.db.reuse_connection = True self.optype = optype @@ -119,6 +122,9 @@ def _make_temp_registry_tables(self, items, sites): if items is not None: self.registry.db.insert_many('items_tmp', ('item',), MySQL.make_tuple, items, db = self.registry.db.scratch_db) + + LOG.info(sites) + if sites is not None: self.registry.db.insert_many('sites_tmp', ('site',), MySQL.make_tuple, sites, db = self.registry.db.scratch_db) @@ -128,6 +134,7 @@ def _make_temp_registry_tables(self, items, sites): ] self.registry.db.create_tmp_table('ids_tmp', columns) + sql = 'INSERT INTO `{db}`.`ids_tmp`' sql += ' SELECT r.`id` FROM `{op}_requests` AS r WHERE' sql += ' 0 NOT IN (SELECT (`site` IN (SELECT `site` FROM `{op}_request_sites` AS s WHERE s.`request_id` = r.`id`)) FROM `{db}`.`sites_tmp`)' diff --git a/lib/request/copy.py b/lib/request/copy.py index 7937a079..b99cab01 100644 --- a/lib/request/copy.py +++ b/lib/request/copy.py @@ -11,13 +11,16 @@ class CopyRequestManager(RequestManager): def __init__(self, config = None): RequestManager.__init__(self, 'copy', config) + LOG.info("Initializing CopyRequestManager with config:") + LOG.info(config) def lock(self): #override # Caller of this function is responsible for unlocking # Non-aliased locks are for insert & update statements later tables = [ ('copy_requests', 'r'), 'copy_requests', ('active_copies', 'a'), 'active_copies', - ('copy_request_items', 'i'), 'copy_request_items', ('copy_request_sites', 's'), 'copy_request_sites' + ('copy_request_items', 'i'), 'copy_request_items', ('copy_request_sites', 's'), 'copy_request_sites', + ('cached_copy_requests','c'), 'cached_copy_requests' ] if not self._read_only: @@ -34,6 +37,7 @@ def get_requests(self, request_id = None, statuses = None, users = None, items = sql += ' ORDER BY r.`id`' _rid = 0 + for rid, group, n, status, first_request, last_request, count, user, dn, a_item, a_site, a_status, a_update in self.registry.db.xquery(sql): if rid != _rid: _rid = rid @@ -118,11 +122,13 @@ def create_request(self, caller, items, sites, sites_original, group, ncopies): # Make an entry in registry columns = ('group', 'num_copies', 'user', 'dn', 'first_request_time', 'last_request_time') values = (group, ncopies, caller.name, caller.dn, MySQL.bare('FROM_UNIXTIME(%d)' % now), MySQL.bare('FROM_UNIXTIME(%d)' % now)) + LOG.info(values) request_id = self.registry.db.insert_get_id('copy_requests', columns, values) mapping = lambda site: (request_id, site) self.registry.db.insert_many('copy_request_sites', ('request_id', 'site'), mapping, sites) mapping = lambda item: (request_id, item) + self.registry.db.insert_many('copy_request_items', ('request_id', 'item'), mapping, items) # Make an entry in history @@ -144,6 +150,22 @@ def create_request(self, caller, items, sites, sites_original, group, ncopies): return self.get_requests(request_id = request_id)[request_id] + def create_cached_request(self, caller, item, sites_original, group, ncopies): + now = int(time.time()) + + # Make an entry in registry + columns = ('item', 'sites', 'group', 'num_copies', 'user', 'dn', 'request_time', 'status') + values = (item, sites_original, group, ncopies, caller.name, caller.dn, MySQL.bare('FROM_UNIXTIME(%d)' % now), 'new') + LOG.info(values) + cached_request_id = self.registry.db.insert_get_id('cached_copy_requests', columns, values) + + return_dict = {} + return_dict['request_id'] = cached_request_id + return_dict['item'] = item + return_dict['sites'] = sites_original + + return return_dict + def update_request(self, request): if self._read_only: return diff --git a/lib/source/impl/staticsiteinfo.py b/lib/source/impl/staticsiteinfo.py index d1b99b92..14fbc315 100644 --- a/lib/source/impl/staticsiteinfo.py +++ b/lib/source/impl/staticsiteinfo.py @@ -1,6 +1,9 @@ from dynamo.source.siteinfo import SiteInfoSource from dynamo.dataformat import Configuration, Site +import logging +LOG = logging.getLogger(__name__) + class StaticSiteInfoSource(SiteInfoSource): """ Site information source fully specified by the static configuration. @@ -11,7 +14,21 @@ def __init__(self, config): self.config = Configuration(config.sites) - def get_site(self, name): #override + LOG.info('-----------in constructor of SiteInfoSource-------') + LOG.info(self.config) + +# config_path = os.getenv('DYNAMO_SERVER_CONFIG', '/etc/dynamo/fom_config.json') +# self.fom_config = Configuration(config_path) +# self.fom_conditions = [] + +# for condition_text, module, conf in self.fom_config.rlfsm.transfer: +# if condition_text is not None: # default +# condition = Condition(condition_text, site_variables) +# self.fom_conditions.append((condition, module, conf)) + + + + def get_site(self, name, inventory): #override try: site_config = self.config[name] except KeyError: @@ -20,13 +37,26 @@ def get_site(self, name): #override storage_type = Site.storage_type_val(site_config.storage_type) backend = site_config.backend - return Site(name, host = site_config.host, storage_type = storage_type, backend = backend) + LOG.info('--------in get_site-------------') + LOG.info(name) + + site_obj = Site(name, host = site_config.host, storage_type = storage_type, backend = backend) + if name in inventory.sites: + old_site_obj = inventory.sites[name] + LOG.info('found ' + name) + LOG.info.(old_site_obj.x509proxy) + site_obj.x509proxy = old_site_obj.x509proxy + + #for cond, module, conf in self.fom_conditions: + # if cond.match(site_obj): + # site_obj.x509proxy = conf.x509proxy + return site_obj - def get_site_list(self): #override + def get_site_list(self, inventory): #override site_list = [] for name in self.config.keys(): - site_list.append(self.get_site(name)) + site_list.append(self.get_site(name,inventory)) return site_list diff --git a/lib/source/siteinfo.py b/lib/source/siteinfo.py index 04c357c2..2c7e984d 100644 --- a/lib/source/siteinfo.py +++ b/lib/source/siteinfo.py @@ -46,14 +46,14 @@ def __init__(self, config): else: self.exclude = None - def get_site(self, name): + def get_site(self, name, inventory): """ @param name Name of the site @return A Site object with full info, or None if the site is not found. """ raise NotImplementedError('get_site') - def get_site_list(self): + def get_site_list(self, inventory): """ @return List of unlinked Site objects """ diff --git a/lib/utils/parallel.py b/lib/utils/parallel.py index a931b225..5a4ff121 100644 --- a/lib/utils/parallel.py +++ b/lib/utils/parallel.py @@ -1,10 +1,14 @@ +import logging import time import multiprocessing import threading import Queue +import sys from dynamo.dataformat import Configuration +LOG = logging.getLogger(__name__) + class FunctionWrapper(object): def __init__(self, function, start_sem, done_sem): self.function = function diff --git a/lib/web/modules/inventory/__init__.py b/lib/web/modules/inventory/__init__.py index a5b799a8..834ebc97 100644 --- a/lib/web/modules/inventory/__init__.py +++ b/lib/web/modules/inventory/__init__.py @@ -1,21 +1,13 @@ -from . import datasets -from . import groups -from . import sites -from . import stats -from . import inject -from . import delete -from . import blockreplicas -from . import subscriptions -from . import requestlist -from . import lfn2pfn -from . import nodes -from . import data -from . import transferrequests +import datasets +import groups +import stats +import inject +import delete +import blockreplicas, requestlist, subscriptions, lfn2pfn, nodes, data, transferrequests export_data = {} export_data.update(datasets.export_data) export_data.update(groups.export_data) -export_data.update(sites.export_data) export_data.update(stats.export_data) export_data.update(inject.export_data) export_data.update(delete.export_data) @@ -27,5 +19,6 @@ export_data.update(data.export_data) export_data.update(transferrequests.export_data) + export_web = {} export_web.update(stats.export_web) diff --git a/lib/web/modules/inventory/transferrequests.py b/lib/web/modules/inventory/transferrequests.py index 4dab3053..5cdd57c7 100644 --- a/lib/web/modules/inventory/transferrequests.py +++ b/lib/web/modules/inventory/transferrequests.py @@ -40,8 +40,6 @@ def run(self, caller, request, inventory): break req_hash = self.copy_manager.get_requests(request_id=req_id) - LOG.info(req_id) - LOG.info(req_hash) if req_id not in req_hash: return {'request':[]} diff --git a/lib/web/modules/request/copy.py b/lib/web/modules/request/copy.py index 861590b3..3933875f 100644 --- a/lib/web/modules/request/copy.py +++ b/lib/web/modules/request/copy.py @@ -32,9 +32,10 @@ def __init__(self, config): self.default_sites = config['request']['copy'].get('default_sites', []) def run(self, caller, request, inventory): - self.parse_input(request, inventory, ('request_id', 'item', 'site', 'group', 'n')) + self.parse_input(request, inventory, ('request_id', 'item', 'site', 'group', 'n', 'cache')) self.manager.lock() + LOG.info("aV1: %s" % str(time.time())) try: existing = None @@ -54,6 +55,8 @@ def run(self, caller, request, inventory): raise InvalidRequest('Request %d cannot be updated any more' % request_id) else: + LOG.info("aV2: %s" % str(time.time())) + # create a new request if 'item' not in self.params: raise MissingParameter('item') @@ -64,16 +67,19 @@ def run(self, caller, request, inventory): else: self.params['site'] = list(self.default_sites) - constraints = self.make_constraints(by_id = False) - constraints['statuses'] = [Request.ST_NEW, Request.ST_ACTIVATED] - existing_requests = self.manager.get_requests(**constraints) + if 'cache' not in self.params: + # This only has to be done if we do not want to stupidly dump things into the cache table + constraints = self.make_constraints(by_id = False) - for request_id in sorted(existing_requests.iterkeys()): - if existing_requests[request_id].status == Request.ST_NEW: - existing = existing_requests[request_id] - break - elif existing_requests[request_id].status == Request.ST_ACTIVATED: - existing = existing_requests[request_id] + constraints['statuses'] = [Request.ST_NEW, Request.ST_ACTIVATED] + existing_requests = self.manager.get_requests(**constraints) + + for request_id in sorted(existing_requests.iterkeys()): + if existing_requests[request_id].status == Request.ST_NEW: + existing = existing_requests[request_id] + break + elif existing_requests[request_id].status == Request.ST_ACTIVATED: + existing = existing_requests[request_id] if existing is None: if 'n' not in self.params: @@ -81,9 +87,16 @@ def run(self, caller, request, inventory): if 'group' not in self.params: self.params['group'] = self.default_group - - request = self.manager.create_request(caller, self.params['item'], self.params['site'], self.params['site_orig'], self.params['group'], self.params['n']) - + + if 'cache' not in self.params: + LOG.info("Create request") + LOG.info("aV3: %s" % str(time.time())) + request = self.manager.create_request(caller, self.params['item'], self.params['site'], self.params['site_orig'], self.params['group'], self.params['n']) + else: + # We want to allow the requester to just place the request info in a cache table that dynamo will act on by itself + LOG.info("Creating caching request") + LOG.info("aV4: %s" % str(time.time())) + request = self.manager.create_cached_request(caller, self.params['item'][0], " ".join(self.params['site_orig']), self.params['group'], self.params['n']) else: existing.request_count += 1 existing.last_request = int(time.time()) @@ -106,7 +119,10 @@ def run(self, caller, request, inventory): LOG.error('Error in manager.unlock()') # requests is a single-element dictionary - return [request.to_dict()] + if 'cache' in self.params: + return [request] + else: + return [request.to_dict()] class PollCopyRequest(CopyRequestBase): @@ -117,6 +133,10 @@ def run(self, caller, request, inventory): self.parse_input(request, inventory, ('request_id', 'item', 'site', 'status', 'user')) constraints = self.make_constraints(by_id = False) + + LOG.info("PollCopy constraints:") + LOG.info(constraints) + existing_requests = self.manager.get_requests(**constraints) if 'item' in self.params and 'site' in self.params and \ diff --git a/lib/web/modules/request/mixin.py b/lib/web/modules/request/mixin.py index 8860bc30..b0715f2d 100644 --- a/lib/web/modules/request/mixin.py +++ b/lib/web/modules/request/mixin.py @@ -1,20 +1,30 @@ import re import fnmatch +import logging from dynamo.web.exceptions import MissingParameter, ExtraParameter, IllFormedRequest, InvalidRequest from dynamo.web.modules._common import yesno from dynamo.utils.interface.mysql import MySQL import dynamo.dataformat as df +LOG = logging.getLogger(__name__) + class ParseInputMixin(object): def __init__(self, config): # Parsed and formatted HTTP queries self.params = {} def parse_input(self, request, inventory, allowed_fields, required_fields = tuple()): + if self.input_data is not None: + LOG.info("Input data:") + LOG.info(self.input_data) + else: + LOG.info("No input data:") # JSON could have been uploaded if self.input_data is not None: + LOG.info("Updating input:") request.update(self.input_data) + LOG.info("Completed updating input.") # Check we have the right request fields @@ -117,6 +127,12 @@ def parse_input(self, request, inventory, allowed_fields, required_fields = tupl if status not in ('new', 'activated', 'completed', 'rejected', 'cancelled'): raise InvalidRequest('Invalid status value %s' % status) + if 'cache' in request: + self.params['cache'] = True + + LOG.info("Printing all request parameterssss") + LOG.info(self.params) + def make_constraints(self, by_id = False): constraints = {} if 'request_id' in self.params: diff --git a/lib/web/modules/transfers/history.py b/lib/web/modules/transfers/history.py index d64e971c..5352a465 100644 --- a/lib/web/modules/transfers/history.py +++ b/lib/web/modules/transfers/history.py @@ -10,9 +10,9 @@ LOG = logging.getLogger(__name__) class FileTransferHistory(WebModule): - def __init__(self, config): WebModule.__init__(self, config) + self.history = HistoryDatabase() def run(self, caller, request, inventory): @@ -51,7 +51,7 @@ def run(self, caller, request, inventory): # calculate the time limits to consider past_min = self._get_date_before_end(datetime.datetime.now(),upto) - tmax = int(past_min.strftime('%s')) # epochseconds: careful max/min in t and past invert + tmax = int(past_min.strftime('%s')) # epochseconds: careful max/min in t and past inverts past_max = self._get_date_before_end(past_min,period) tmin = int(past_max.strftime('%s')) # epochseconds @@ -66,7 +66,8 @@ def run(self, caller, request, inventory): # parse and extract the plotting data (timeseries guarantees an empty dictionary as data) start = time.time() - (min_value,max_value,avg_value,cur_value,data) = transfers.timeseries(graph,entity,tmin,tmax) + (min_value,max_value,avg_value,cur_value,data) = \ + transfers.timeseries(graph,entity,tmin,tmax) elapsed_processing = time.time() - start LOG.info('Parsed data: %7.3f sec', elapsed_processing) @@ -77,6 +78,7 @@ def run(self, caller, request, inventory): yaxis_label = 'Transfered Volume [GB]' summary_string = "Min: %.3f GB, Max: %.3f GB, Avg: %.3f GB, Last: %.3f GB" \ %(min_value,max_value,avg_value,cur_value) + if graph[0] == 'r': # cumulative volume yaxis_label = 'Transfered Rate [GB/sec]' summary_string = "Min: %.3f GB/s, Max: %.3f GB/s, Avg: %.3f GB/s, Last: %.3f GB/s" \ @@ -88,15 +90,14 @@ def run(self, caller, request, inventory): yaxis_label = 'Number of Transfers' summary_string = "Min: %.0f, Max: %.0f, Avg: %.0f, Last: %.0f" \ %(min_value,max_value,avg_value,cur_value) - - # add the unit per bin yaxis_label += unit # add text graphics information to the plot data[0]['yaxis_label'] = yaxis_label data[0]['title'] = 'Dynamo Transfers (%s by %s)'%(graph,entity) data[0]['subtitle'] = 'Time period: %s -- %s'%(str(past_max).split('.')[0],str(past_min).split('.')[0]) - data[0]['timing_string'] = 'db:%.2fs, processing:%.2fs'%(elapsed_db,elapsed_processing) + data[0]['timing_string'] = \ + 'db:%.2fs, processing:%.2fs'%(elapsed_db,elapsed_processing) data[0]['summary_string'] = summary_string return data diff --git a/lib/web/server.py b/lib/web/server.py index c80c1b77..99ede4c7 100644 --- a/lib/web/server.py +++ b/lib/web/server.py @@ -280,8 +280,12 @@ def main(self, environ, start_response): # replace content with the json string start = time.time() if self.callback is not None: + LOG.info("Callback is not None") + LOG.info(json_data) content = '%s(%s)' % (self.callback, json.dumps(json_data)) else: + LOG.info("Callback is None") + LOG.info(json_data) content = json.dumps(json_data) root_logger.info('Make JSON: %s seconds', time.time() - start) @@ -461,6 +465,8 @@ def _main(self, environ): # Even though our default content type is URL form, we check if this is a JSON try: + LOG.info("Printing post_data") + LOG.info(post_data) json_data = json.loads(post_data) except: if content_type == 'application/json': @@ -474,6 +480,8 @@ def _main(self, environ): if content_type == 'application/x-www-form-urlencoded': try: + LOG.info("Printing post_data 2") + LOG.info(post_data) post_request = parse_qs(post_data) except: self.code = 400 @@ -551,9 +559,12 @@ def _internal_server_error(self): response += 'Traceback (most recent call last):\n' response += ''.join(traceback.format_tb(tb)) + '\n' response += '%s: %s\n' % (exc_type.__name__, str(exc)) - return response else: - return 'Internal server error! (' + exc_type.__name__ + ': ' + str(exc) + ')\n' + response = 'Internal server error! (' + exc_type.__name__ + ': ' + str(exc) + ')\n' + + LOG.error(response) + + return response class DummyInventory(object): """