Source code for dynamo_consistency.create

"""
Module that contains all of the threading functions needed to create a
:py:class:`dynamo_consistency.datatypes.DirectoryInfo` object.

:author: Daniel Abercrombie <dabercro@mit.edu>
"""


import os
import time
import logging
import threading
import multiprocessing
import collections

from Queue import Empty

from . import config
from . import messaging
from .datatypes import DirectoryInfo


LOG = logging.getLogger(__name__)


[docs]class ListingThread(threading.Thread): """ A thread that does the listing :param int number: A unique number of the thread created. This is used to generate a thread name and to fetch a logger. :param multiprocessing.Pipe recv: One end of a pipe for communicating with the master thread. Receives message here. :param multiprocessing.Queue send: A queue to send message to the master thread. :param function filler: The function that takes a path as an argument and returns: - A bool saying whether the listing was successful or not - A list of tuples of sub-directories and their mod times - A list of tuples files inside, their size, and their mode times """ out_queue = multiprocessing.Queue() """ A queue where all of the listing outputs are placed. A master thread is expected to read from and clear this. """ # Internal lock for using _in_deque _in_lock = threading.Lock() # Queue for all thread objects _in_deque = collections.deque() def __init__(self, number, recv, send, filler): name = 'ListingThread--%i' % number super(ListingThread, self).__init__(name=name) self.log = logging.getLogger(name) self.number = number self.recv = recv self.send = send self.filler = filler self.checker = messaging.Checker()
[docs] @staticmethod def put_first_dir(location, directory): """ Place the first set of parameters for the :py:class:`ListingThread` objects to start from. Should not be called once the threads are started. :param str location: This is the beginning of the path where we will find ``first_dir``. For example, to find the first directory ``mc``, we also have to say where it is. For using CMS LFNs, location would be ``/store`` (where ``mc`` is inside). :param str directory: Name of the first directory to run over inside of location :returns: The name that the first DirectoryInfo object should be. This is just the first directory in the directory parameter. :rtype: str """ # Put in the first element for the queue # They go like, (full path of the next listing to do, # name of sub-node to place the listing (blank for first level), # list of previous directories, list of previous files (for retries), # list of queue numbers that have failed so far) split_list = directory.split('/') dirinfo_name = split_list[0] first_relative = '/'.join(split_list[1:]) LOG.debug('Taking %s, %s -> Starting from: %s, relative path: %s', location, directory, dirinfo_name, first_relative) ListingThread._in_deque.append((os.path.join(location, dirinfo_name), first_relative, [], [], [])) return dirinfo_name
[docs] def run(self): """Runs the listing thread""" self.log.debug('Running queue: %i', self.number) running = True in_lock = ListingThread._in_lock in_deque = ListingThread._in_deque while running and self.checker.isrunning(): try: in_lock.acquire() location, name, prev_dirs, prev_files, failed_list = in_deque.pop() if self.number in failed_list: self.log.warning('Got previously failed call, putting back and sleeping') in_deque.append((location, name, prev_dirs, prev_files, failed_list)) in_lock.release() time.sleep(10 * (self.number in failed_list)) self.log.debug('Getting directory with (%s, %s, %s)', location, name, failed_list) # Call filler full_path = os.path.join(location, name) self.log.debug('Full path is %s', full_path) try: okay, directories, files = self.filler(full_path) except Exception: # pylint: disable=bare-except okay, directories, files = False, [], [] self.log.debug('Got from filler: Good? %s, %i directories, %i files', okay, len(directories), len(files)) # If not okay, add _unlisted_ flag if not okay: directories = list(set(directories + prev_dirs)) files = list(set(files + prev_files)) self.log.debug('Full dirs, and files: %s, %s', directories, files) self.log.error('Giving up directory %s', full_path) # _unlisted_ is used as a flag to tell our comparer something went wrong files.append(('_unlisted_', 0, 0)) # Send results to master queue ListingThread.out_queue.put((name, directories, files, len(failed_list))) # Add each directory into some input queue for directory, _ in directories: joined_name = os.path.join(name, directory) in_lock.acquire() in_deque.append((location, joined_name, [], [], [])) in_lock.release() # Tell master that a job finished, # so it can build the final object self.send.put(('O', time.time())) self.log.debug('Finished one job with (%s, %s)', location, name) except IndexError: in_lock.release() # Report empty self.log.debug('Worker finished input queue') self.send.put(('A', 0)) # Check for main process message = self.recv.recv() self.log.debug('Message from master: %s', message) # If permission, close if message == 'Close': running = False else: self.log.debug('Worker going back to check queue') self.recv.close()
# This function is a mess and can likely be simplified
[docs]def create_dirinfo( # pylint: disable=too-complex, too-many-locals, too-many-branches, too-many-statements location, first_dir, filler, object_params=None, callback=None): """ Create the directory information :param str location: This is the beginning of the path where we will find ``first_dir``. For example, to find the first directory ``mc``, we also have to say where it is. For using CMS LFNs, location would be ``/store`` (where ``mc`` is inside). This is a path. :param str first_dir: The name of the first directory that is inside the path of ``location``. This should not be a path, but the name of the directory to list recursively. :param filler: This is either a function that lists the directory contents given just a path of ``os.path.join(location, first_dir)``, or it is a constructor that does the same thing with a member function called ``list``. If ``filler`` is an object constructor, the parameters for the object creation must be passed through the parameter ``object_params``. Both listings must return the following tuple: - A bool saying whether the listing was successful or not - A list of tuples of sub-directories and their mod times - A list of tuples files inside, their size, and their mode times :type filler: function or constructor :param list object_params: This only needs to be set when filler is an object constructor. Each element in the list is a tuple of arguments to pass to the constructor. :param function callback: A function that is called every time master thread has finished checking the child threads. This can happen very many times at large sites. The function is called with the main DirectoryTree as its argument :returns: A :py:class:`DirectoryInfo` object containing everything the directory listings from ``os.path.join(location, first_dir)`` with name ``first_dir``. :rtype: DirectoryInfo :raises messaging.Killed: When a site has been stopped """ LOG.debug('Called create_dirinfo(%s, %s, %s, %s)', location, first_dir, filler, object_params) # Determine the number of threads if object_params is not None: n_threads = len(object_params) else: n_threads = config.config_dict()['NumThreads'] or multiprocessing.cpu_count() LOG.info('Listing directory %s with %i threads', os.path.join(location, first_dir), n_threads) master_conns = [] slave_conns = [] send_to_master = [] dirinfo_name = ListingThread.put_first_dir(location, first_dir) # Spawn threads to run on this run_queue function threads = [] for i_thread in xrange(n_threads): con1, con2 = multiprocessing.Pipe() queue = multiprocessing.Queue() master_conns.append(con1) slave_conns.append(con2) send_to_master.append(queue) thread = ListingThread(number=i_thread, recv=con2, send=queue, filler=filler(*(object_params[i_thread])).list if \ object_params is not None else filler) thread.start() threads.append(thread) # Build the DirectoryInfo building = True # If started in a nested directory, just build the first one dir_info = DirectoryInfo(dirinfo_name) checker = messaging.Checker() while building and checker.isrunning(): try: # Get the info from the queue name, directories, files, _ = ListingThread.out_queue.get(True, 0.25) # Create the nodes and files built = dir_info.get_node(name) built.add_files(files) # Set correct node mtime for directories for directory, mtime in directories: built.get_node(directory).mtime = mtime except Empty: # When empty, check on the status of the workers LOG.debug('Empty queue for building.') LOG.info('Number of files so far built: %8i nodes: %8i', dir_info.get_num_files(), dir_info.count_nodes()) # Process the dir_info with some callback if callback: LOG.debug('Checking callback') callback(dir_info) # Ends only if all threads are done at the beginning of this check threads_done = 0 for conn in master_conns: LOG.debug('Waiting for thread %i', master_conns.index(conn)) message, timestamp = send_to_master[master_conns.index(conn)].get(True) LOG.debug('Recieved message %s', message) # Count the number of threads saying their finished at the beginning if message == 'A': threads_done += 1 LOG.info('Threads saying done: %i', threads_done) # Send back to work, just in case not all threads are done conn.send('Work') elif message == 'O': # This thread wasn't finished at the beginning, so threads_done # will not reach n_threads if the master reaches this point in the code LOG.debug('Found one job, about to cycle') now = time.time() cycle = True while cycle: # Cycle through timestamps so that we do not have a backlog try: message, timestamp = \ send_to_master[master_conns.index(conn)].get(True, 1) if message == 'A': LOG.debug('Found end to pipe.') conn.send('Work') cycle = False elif timestamp > now: cycle = False except Empty: cycle = False else: LOG.error('Weird message from pipe') # Check if all the threads were finished if threads_done == n_threads: LOG.debug('Done building') # Break out of loop of checking building = False LOG.debug('Closing all connections') killed = not checker.isrunning() # Tell connections to close for conn in master_conns: if not killed: conn.send('Close') conn.close() LOG.debug('Waiting for threads') # Wait for threads to join for thread in threads: thread.join() if killed: raise messaging.Killed('Site %s was stopped' % checker.site) return dir_info