Source code for dynamo_consistency.getsitecontents

# pylint: disable=import-error

"""
Tool to get the files located at a site.

.. Warning::

   Must be used on a machine with XRootD python module installed.

:author: Daniel Abercrombie <dabercro@mit.edu> \n
         Max Goncharov <maxi@mit.edu>
"""

import re
import logging
import time
import subprocess
from datetime import datetime

import timeout_decorator

import XRootD.client
from common.interface.mysql import MySQL

from . import config
from . import datatypes
from . import cache_tree


LOG = logging.getLogger(__name__)


[docs]class Lister(object): """ The protoype of the listing facility :param int thread_num: This optional parameter is only used to Create a separate logger for this object :param str site: Used for reading the correct configuration """ def __init__(self, thread_num, site): config_dict = config.config_dict() self.log = logging.getLogger(__name__ if thread_num is None else \ '%s--thread%i' % (__name__, thread_num)) self.ignore_list = config_dict.get('IgnoreDirectories', []) self.store_prefix = config_dict.get('PathPrefix', {}).get(site, '') self.tries = config_dict.get('Retries', 0) + 1 self.fallback_tries = 5
[docs] def ls_directory(self, path): """ Prototype function that lists the directories :param str path: The full path, starting with ``/store/``, of the directory to list. """ pass
[docs] def reconnect(self): """ A prototype for reconnecting to the remote servers """ pass
[docs] def list(self, path, retries=0): """ Return the directory contents at the given path. The ``list`` member is expected of every object passed to :py:mod:`datatypes`. :param str path: The full path, starting with ``/store/``, of the directory to list. :param int retries: Number of attempts so far :returns: A bool indicating the success, a list of directories, and a list of files. The list of directories consists of tuples of (directory name, mod time). The list of files consistents of tuples of (file name, size, mod time). The modification times are in seconds from epoch and the file size is in bytes. :rtype: bool, list, list """ # Skip over paths that include part of the list of ignored directories for pattern in self.ignore_list: if pattern in path: self.log.warning('Ignoring %s because of ignored pattern %s', path, pattern) return False, [], [] if retries >= self.tries: self.log.error('Giving up on %s due to too many retries', path) return False, [], [] if retries: self.reconnect() # FileSystem only works with ending slashes for some sites (not all, but let's be safe) path = self.store_prefix + path + ('/' if path[-1] != '/' else '') try: okay, directories, files = self.ls_directory(path) if not okay and self.fallback_tries and \ not self.store_prefix and len(path.strip('/').split('/')) < 4: self.fallback_tries -= 1 # Try to fall back on /cms self.store_prefix = '/cms' self.log.warning('Trying to fall back to using suffix %s', self.store_prefix) okay, directories, files = self.list(path, self.tries - 1) if not okay: self.log.warning('Fallback did not work, reverting') self.store_prefix = '' except timeout_decorator.TimeoutError: self.log.warning('Directory %s timed out.', path) okay = False if not okay: okay, directories, files = self.list(path, retries + 1) return okay, directories, files
[docs]def ct_timestamp(line): """ Takes a time string from gfal and extracts the time since epoch :param str line: The line from the gfal-ls call including month, day, and year in some format with lots of hypens :returns: Timestamp's time since epoch :rtype: int """ fields = line.split('-') if ':' in fields[-1]: fields[-1] = datetime.now().year month = time.strptime(fields[0], '%b').tm_mon datestr = str(month) + '-' + str(fields[1]) + '-' + str(fields[2]) epoch = int(time.mktime(time.strptime(datestr, '%m-%d-%Y'))) return epoch
[docs]class GFalLister(Lister): """ An object to list a site through ``gfal-ls`` calls """ def __init__(self, site, thread_num=None): super(GFalLister, self).__init__(thread_num, site) mysql_reg = MySQL(config_file='/etc/my.cnf', db='dynamo', config_group='mysql-dynamo') self.backend = mysql_reg.query('SELECT backend FROM sites WHERE name=%s', site)[0] mysql_reg.close()
[docs] def ls_directory(self, path): """ Gets the contents of a path :param str path: The full path, starting with ``/store/``, of the directory to list. :returns: A bool indicating the success, a list of directories, and a list of files. :rtype: bool, list, list """ directories = [] files = [] full_path = self.backend + '/' + str(path) cmd = 'gfal-ls -l ' + full_path process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=4096, shell=True) strout, error = process.communicate() if process.returncode != 0: self.log.error(error) return False, directories, files for line in strout.split('\n'): fields = line.strip().split() if len(fields) < 1: continue item_name = fields[-1] item_size = int(fields[4]) tstamp = ct_timestamp(fields[5] + '-' + fields[6] + '-' + fields[7]) if fields[0].startswith('d'): directories.append((item_name, tstamp)) else: files.append((item_name, item_size, tstamp)) return True, directories, files
[docs]class XRootDSubShell(Lister): """ Very similar to the :py:class:`XRootDLister`, but uses a subshell through `pexpect`. """ def __init__(self, site, door, thread_num=None): super(XRootDSubShell, self).__init__(thread_num, site) self.shell = subprocess.Popen(['xrdfs', door], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) self.uri = door def __del__(self): self.shell.communicate('quit')
[docs] @timeout_decorator.timeout(config.config_dict()['Timeout']) def ls_directory(self, path): self.log.debug('Directly listing %s with %s', path, self.uri) self.shell.stdin.write('ls -l %s\n' % path) self.shell.stdin.write('\n') # send one extra [ENTER] to make an empty prompt appear self.shell.stdin.flush() self.shell.stdout.readline().strip() # Read out the original prompt line directories = [] files = [] okay = True while True: line = self.shell.stdout.readline().strip() self.log.debug(line) if line.endswith('>'): # stop reading at the empty prompt (otherwise readline hangs) break # Parse the line # First check that it matches the expected format match = re.match( # (d)rwx (YYYY-MM-DD HH:MM:SS) (size) (name) r'(d|\-).{3}\s(\d{4}\-\d{2}\-\d{2}\s\d{2}:\d{2}:\d{2})\s*(\d*)\s([^\s]*)', line) if match: # Get the timestamp mtime = time.mktime(time.strptime(match.group(2), '%Y-%m-%d %H:%M:%S')) # Get the relative name name = match.group(4).split('/')[-1] if match.group(1) == 'd': # Directory directories.append((name, mtime)) else: # File files.append((name, int(match.group(3)), mtime)) elif re.match(r'.*\[\d+\].*', line): # Otherwise, we probably have an error on our hands okay = False self.log.error(line) return okay, directories, files
[docs]class XRootDLister(Lister): """ A class that holds two XRootD connections. If the primary connection fails to list a directory, then a fallback connection is used. This keeps the load of listing from hitting more than half of a site's doors at a time. :param str site: The site that this connection is to. :param str door: The URL of the door that will get the most load :param int thread_num: This optional parameter is only used to Create a separate logger for this object """ def __init__(self, site, door, thread_num=None): super(XRootDLister, self).__init__(thread_num, site) self.conn = XRootD.client.FileSystem(door) self.log.info('Connection created at %s', door)
[docs] @timeout_decorator.timeout(config.config_dict()['Timeout']) def ls_directory(self, path): """ Gets the contents of the previously defined redirector at a given path :param str path: The full path, starting with ``/store/``, of the directory to list. :returns: A bool indicating the success, a list of directories, and a list of files. :rtype: bool, list, list """ self.log.debug('Using door at %s to list directory %s', self.conn.url, path) # http://xrootd.org/doc/python/xrootd-python-0.1.0/modules/client/filesystem.html#XRootD.client.FileSystem.dirlist status, dir_list = self.conn.dirlist(path, flags=XRootD.client.flags.DirListFlags.STAT) directories = [] files = [] self.log.debug('For %s, directory listing good: %s', path, bool(dir_list)) # If there's a directory listing, parse it if dir_list: for entry in dir_list.dirlist: if entry.statinfo.flags & XRootD.client.flags.StatInfoFlags.IS_DIR: directories.append((entry.name.lstrip('/'), entry.statinfo.modtime)) else: files.append((entry.name.lstrip('/'), entry.statinfo.size, entry.statinfo.modtime)) okay = bool(status.ok) # If status isn't perfect, analyze the error if not okay: self.log.warning('While listing %s: %s', path, status.message) self.log.debug('Directory List: %s', dir_list) self.log.debug('Okay: %i', okay) self.log.debug('From %s returning status %i with %i directories and %i files.', path, okay, len(directories), len(files)) return okay, directories, files
[docs]@cache_tree('ListAge', 'remotelisting') def get_site_tree(site, callback=None): """ Get the information for a site, from XRootD or a cache. :param str site: The site name :param function callback: The callback function to pass to :py:func:`datatypes.create_dirinfo` :returns: The site directory listing information :rtype: dynamo_consistency.datatypes.DirectoryInfo """ config_dict = config.config_dict() access = config_dict.get('AccessMethod', {}) if access.get(site) == 'SRM': num_threads = int(config_dict.get('GFALThreads')) LOG.info('threads = %i', num_threads) directories = [ datatypes.create_dirinfo('/store', directory, GFalLister, [(site, x) for x in xrange(num_threads)], callback) \ for directory in config.config_dict().get('DirectoryList', []) ] # Return the DirectoryInfo return datatypes.DirectoryInfo(name='/store', directories=directories) # Get the redirector for a site # The redirector can be used for a double check (not implemented yet...) # The redir_list is used for the original listing num_threads = int(config_dict.get('NumThreads')) balancer, door_list = config.get_redirector(site) LOG.debug('Full redirector list: %s', door_list) if site in config_dict.get('UseLoadBalancer', []) or \ (balancer and not door_list): num_threads = 1 door_list = [balancer] if not door_list: LOG.error('No doors found. Returning emtpy tree') return datatypes.DirectoryInfo(name='/store') while num_threads > len(door_list): if len(door_list) % 2: door_list.extend(door_list) else: # If even number of redirectors and not using both, stagger them door_list.extend(door_list[1:]) door_list.append(door_list[0]) # Strip off the extra threads door_list = door_list[:num_threads] # Create DirectoryInfo for each directory to search (set in configuration file) # The search is done with XRootDLister objects that have two doors and the thread # number as initialization arguments. directories = [ datatypes.create_dirinfo( '/store/', directory, XRootDSubShell if access.get(site) == 'directx' else XRootDLister, [(site, door, thread_num) for thread_num, door in enumerate(door_list)], callback) for directory in config_dict.get('DirectoryList', []) ] # Return the DirectoryInfo return datatypes.DirectoryInfo(name='/store', directories=directories)