Source code for dynamo_consistency.datatypes

"""
Module defines the datatypes that are used for storage and comparison.
There is also a powerful create_dirinfo function that takes a filler function
or object and uses the multiprocessing module to recursively list directories
in parallel.

:author: Daniel Abercrombie <dabercro@mit.edu>
"""


import os
import time
import hashlib
import cPickle
import logging

from . import config


LOG = logging.getLogger(__name__)


[docs]class NotEmpty(Exception): """ An exception for throwing when a non-empty directory is deleted from a :py:class:`DirectoryInfo` """ pass
[docs]class BadPath(Exception): """ An exception for throwing when the path doesn't make sense for various methods of a :py:class:`DirectoryInfo` """ pass
[docs]class DirectoryInfo(object): """ Stores all of the information of the contents of a directory :param str name: The name of the directory :param list directories: If this is set, the infos in the list are merged into a master :py:class:`DirectoryInfo`. :param list files: List of tuples containing information about files in the directory. """ ignore_age = None __slots__ = ('directories', 'timestamp', 'name', 'hash', 'files', 'mtime', 'can_compare') def __init__(self, name='', directories=None, files=None): if DirectoryInfo.ignore_age is None: DirectoryInfo.ignore_age = float(config.config_dict()['IgnoreAge']) self.directories = directories or [] self.timestamp = time.time() self.name = name self.hash = None # Is only None until filled for the first time. # If still None for some reason during comparison, errors will be thrown self.files = None self.mtime = None self.can_compare = False if directories is not None or files is not None: self.add_files(files)
[docs] def get_files(self, min_age=0, path=''): """ Get the list of files that are older than some age :param int min_age: The minimum age, in seconds, of files to list :param str path: The path to this file. Used for recursive calls :returns: List of full file paths :rtype: list """ output = [] for fil in self.files: # Only list old files if (self.timestamp - fil['mtime']) > min_age and fil['name'] != '_unlisted_': output.append(os.path.join(path, self.name, fil['name'])) for directory in self.directories: output.extend(directory.get_files(min_age, os.path.join(path, self.name))) return output
[docs] def add_files(self, files): """ Set the files for this :py:class:`DirectoryInfo` node :param list files: The tuples of file information. Each element consists of file name, size, and mod time. :returns: self for chaining calls :rtype: :py:class:`DirectoryInfo` """ # This is where we know that the directory has been properly filled if self.files is None: self.files = [] # Get the list of new files existing_names = [fi['name'] for fi in self.files] sorted_files = [fi for fi in sorted(files or []) \ if fi[0] not in existing_names] for file_info in sorted_files: name, size, mtime = file_info[:3] if len(file_info) > 3: block = file_info[3] else: block = '' self.files.append({ 'name': name, 'size': long(size), 'mtime': mtime, 'block': block, 'hash': hashlib.sha1( '%s %i' % (name, size) # We are not comparing mtime for now ).hexdigest(), 'can_compare': bool(mtime + DirectoryInfo.ignore_age * 24 * 3600 < self.timestamp and name != '_unlisted_') }) self.files.sort(key=lambda x: x['name']) return self
[docs] def add_file_list(self, file_infos): """ Add a list of tuples containing file_name, file_size to the node. This is most useful when you get a list of files from some other source and want to easily convert that list into a :py:func:`DirectoryInfo` :param list file_infos: The list of files (full path, size in bytes[, timestamp]) """ files = [] directory = '' for file_info in file_infos: name, size = file_info[:2] if len(file_info) > 2: timestamp = file_info[2] else: timestamp = 0 new_dir = os.path.dirname(name[len(self.name):].lstrip('/')) if directory == new_dir: # If in the old directory, append to the list of files files.append((os.path.basename(name), size, timestamp)) else: # When changing directories, append the files gathered in the last directory self.get_node(directory).add_files(files) # Get the new directory name directory = new_dir # Reset the files list files = [(os.path.basename(name), size, timestamp)] # Add data from the last directory self.get_node(directory).add_files(files)
[docs] def setup_hash(self): """ Set the hashes for this :py:class:`DirectoryInfo` """ if self.files is None: return hasher = hashlib.sha1() # Sort the sub-directories and files self.directories.sort(key=lambda x: x.name) self.files.sort(key=lambda x: x['name']) hasher.update(self.name) for directory in self.directories: # Recursively make the hash for each subdirectory first directory.setup_hash() # Can compare if a subdirectory asks for it self.can_compare = self.can_compare or directory.can_compare # Ignore newer directories or any others that don't want to be compared if directory.can_compare: hasher.update('%s %s' % (directory.name, directory.hash)) for file_info in self.files: if file_info['can_compare']: # Add files that can be compared, and set self to be compared self.can_compare = True hasher.update('%s %s' % (file_info['name'], file_info['hash'])) # Add empty directories that are not too new to comparison if not (self.directories or self.files) and self.mtime and \ self.mtime + DirectoryInfo.ignore_age * 24 * 3600 < self.timestamp: self.can_compare = True # Calculate hash self.hash = hasher.hexdigest()
[docs] def save(self, file_name): """ Save this :py:class:`DirectoryInfo` in a file. :param str file_name: is the location to save the file """ with open(file_name, 'w') as outfile: cPickle.dump(self, outfile, protocol=cPickle.HIGHEST_PROTOCOL)
[docs] def display(self, path=''): """ Print out the contents of this :py:class:`DirectoryInfo` :param str path: The full path to this :py:class:`DirectoryInfo` instance """ print self.displays(path)
[docs] def displays(self, path=''): """ Get the string to print out the contents of this :py:class:`DirectoryInfo`. :param str path: The full path to this :py:class:`DirectoryInfo` instance :returns: The display string :rtype: str """ # This is in a separate function for unit test assertion errors, which likes strings if not path: path = self.name output = 'compare: %i mtime: %s my hash: %s path: %s' % \ (int(self.can_compare), str(self.mtime), self.hash, path) for file_info in self.files: output += ('\nmtime: %i size: %i my hash:%s name: %s' % (file_info['mtime'], file_info['size'], file_info['hash'], file_info['name'])) for directory in self.directories: # Recursively get displays for sub-directories output += '\n' + directory.displays(os.path.join(path, directory.name)) return output
[docs] def get_node(self, path, make_new=True): """ Get the node that corresponds to the path given. If the node does not exist yet, and ``make_new`` is True, the node is created. :param str path: Path to the desired node from current node. If the path does not exist yet, empty nodes will be created. :param str make_new: Bool to create new node if none exists at path or not :returns: A node with the proper path, unless make_new is False and the node doesn't exist :rtype: DirectoryInfo or None """ # If any path left if path: split_path = path.split('/') return_name = '/'.join(split_path[1:]) # Search for if directory exists for directory in self.directories: if split_path[0] == directory.name: return directory.get_node(return_name, make_new) # If not, make a new directory, or None if make_new: # If we're making a new directory, then this should have non-None self.files if self.files is None: self.files = [] new_dir = DirectoryInfo(split_path[0]) self.directories.append(new_dir) return new_dir.get_node(return_name, make_new) return None # If no path, just return self return self
[docs] def get_directory_size(self): """ Report the total size used by this directory and its subdirectories. :returns: Size of files in directory, in bytes :rtype: int """ return sum([di.get_directory_size() for di in self.directories], sum([fi['size'] for fi in self.files]))
[docs] def get_unlisted(self, path=''): """ :param str path: Path to prepend to the name, used in recursive calls :returns: List of directories that were unlisted :rtype: list """ here = os.path.join(path, self.name) output = [name for d in self.directories for name in d.get_unlisted(here)] if '_unlisted_' in [f['name'] for f in self.files]: output.append(here) return output
[docs] def get_num_files(self, unlisted=False, place_new=False): """ Report the total number of files stored. :param bool unlisted: If true, return number of unlisted directories, Otherwise return only successfully listed files :param bool place_new: If true, pretend there's one more file inside any new directory or if files is None. This prevents listing of empty directories to include directories that should not actually be deleted. :returns: The number of files in the directory tree structure :rtype: int """ if self.files is None: return int(place_new) num_files = len([fi for fi in self.files \ if (fi['name'] == '_unlisted_') == unlisted]) for directory in self.directories: num_files += directory.get_num_files(unlisted, place_new) if place_new and (not self.can_compare or self.mtime is None or self.mtime + DirectoryInfo.ignore_age * 24 * 3600 > \ self.timestamp): num_files += 1 return num_files
def _grab_first(self, levels=100): """ Used for debugging. Grabs the subdirectories by the first in the list. :param int levels: is the number of levels of directories to bypass :returns: The proper :py:class:`DirectoryInfo` level :rtype: DirectoryInfo """ output = self for _ in xrange(levels): if output.directories: output = output.directories[0] else: break return output
[docs] def compare(self, other, path='', check=None): # pylint: disable=too-complex, too-many-branches """ Does one way comparison with a different tree :param DirectoryInfo other: The directory tree to compare this one to :param str path: Is the path to get to this location so far :param check: An optional function that double checks a file name. If the checking function returns ``True`` for a file name, the file will not be included in the output. :type check: function :returns: Tuple of list of files and directories that are present and not in the other tree and the size of the files that corresponds to :rtype: list, list, long """ extra_files = [] extra_dirs = [] extra_size = long(0) if '_unlisted_' in [fi['name'] for fi in self.files]: return extra_files, extra_dirs, extra_size here = os.path.join(path, self.name) if other: # If there is a match in the hash, then the nodes are effectively identical # Otherwise, do these recursive comparisons logging.debug('Hashes: %s -- %s, can compare: %i -- %i', self.hash, other.hash, self.can_compare, other.can_compare) if self.hash != other.hash and other.can_compare: for directory in self.directories: # Ignore not comparable directories (usually new ones) if not directory.can_compare: continue # Recursive check of extra files and directories here new_other = other.get_node(directory.name, False) more_files, more_dirs, more_size = directory.compare(new_other, here, check) extra_size += more_size extra_files.extend(more_files) if new_other: extra_dirs.extend(more_dirs) elif '_unlisted_' not in [fi['name'] for fi in other.files]: # If the subdirectory does not exist, and '_unlisted_' not thrown # mark that whole directory as being extra. # At the moment this is redundant with all the files, # but gives a good place to prune file system directories # after files have been deleted extra_dirs.append(os.path.join(here, directory.name)) for file_info in self.files: if not file_info['can_compare']: continue # See if each file exists and has the correct hash # Say all files are fine in a directory that is even partially '_unlisted_' found = False for to_match in other.files: if file_info['hash'] == to_match['hash'] or \ to_match['name'] == '_unlisted_': found = True break full_name = os.path.join(path, self.name, file_info['name']) if not found and (check is None or not check(full_name)): extra_size += file_info['size'] extra_files.append(full_name) else: # If no other node to compare, all files are extra (not in the other tree) LOG.debug('Nothing to compare, files: %s', self.files) LOG.debug('Nothing to compare, directories: %s', [(di.name, di.can_compare) for di in self.directories]) for file_info in [fi for fi in self.files if fi['can_compare']]: full_name = os.path.join(path, self.name, file_info['name']) if check is None or not check(full_name): extra_files.append(os.path.join(path, self.name, file_info['name'])) extra_size += file_info['size'] # All directories are extra too for directory in [di for di in self.directories if di.can_compare]: more_files, _, more_size = directory.compare(None, here, check) extra_size += more_size extra_files.extend(more_files) return extra_files, extra_dirs, extra_size
[docs] def count_nodes(self, empty=False): """ :param bool empty: If True, only return the number of empty nodes :returns: The total number of nodes in this Directory Info. This corresponds to approximately the number of listing requests required to build the data. :rtype: int """ count_this = 0 if self.files is None or (empty and self.get_num_files() != 0) else 1 return sum([directory.count_nodes(empty) for directory in self.directories], count_this)
[docs] def empty_nodes_set(self): """ This function recursively builds the entire list of empty directories that can be deleted :returns: The set of empty directories to delete :rtype: set """ output = set() # Count direct subdirectories that are removed count_sub = 0 for directory in self.directories: # Add all the elements from the other set for sub in directory.empty_nodes_set(): if '/' not in sub: count_sub += 1 output.add(os.path.join(self.name, sub)) if not (self.get_num_files(place_new=True) or self.mtime is None) and \ count_sub == len(self.directories): output.add(self.name) return output
[docs] def empty_nodes_list(self): """ This function should be used to get the nodes to delete in the proper order for non-recursive deletion :returns: The list of empty directories to delete in the order to delete :rtype: list """ # Don't want to recursively sort, so we send this to a helpful set function return sorted(self.empty_nodes_set(), reverse=True)
[docs] def listdir(self, *args, **kwargs): """ Get the list of directory names within a :py:class:`DirectoryInfo`. Adding an argument will display the contents of the next directory. For example, if ``dir.listdir()`` returns:: 0: data 1: mc ``dir.listdir(1)`` then lists the contents of ``mc`` and ``dir.listdir(1, 0)`` lists the contents of the first subdirectory in ``mc``. :param args: Is a list of indices to list the subdirectories :param kwargs: Supports 'printing' which is set to a bool. Defaults as True. :returns: The :py:class:`DirectoryInfo` that is being listed :rtype: DirectoryInfo """ printing = kwargs.get('printing', True) # Print the contents of a directory picked next, and return that DirectoryInfo if args: return self.directories[args[0]].listdir(*args[1:], printing=printing) # If we got to the last directory of the args, print the files contained elif printing: print '\nDirectories:' # Get the formatting width for printing the directory names if self.directories: width = max([len(di.name) for di in self.directories]) + 2 else: width = 0 # Print information for each directory for index, directory in enumerate(self.directories): print '%3i: %-{0}s Hash: %s Num Files: %7i Dirs Unlisted: %7i'.format(width) % \ (index, directory.name, directory.hash, directory.get_num_files(), directory.get_num_files(True)) if self.files: print 'Files:' for file_info in self.files: print file_info return self
[docs] def get_file(self, file_name): """ Get the file dictionary based off the name. :param str file_name: The LFN of the file :returns: Dictionary of file information :rtype: dict :raises BadPath: if the file_name does not start with ``self.name`` """ LOG.debug('Getting file info for: %s', file_name) if not file_name.startswith(self.name): raise BadPath('self.name is %s, file_name is %s' % (self.name, file_name)) exploded_name = file_name[len(self.name) + (1 if self.name[-1] != '/' else 0):].split('/') desired_name = exploded_name[-1] node = self.get_node('/'.join(exploded_name[:-1])) for file_info in node.files: if file_info['name'] == desired_name: return file_info return None
[docs] def remove_node(self, path_name): """ Remove an empty node from the DirectoryInfo :param str path_name: The path to the node, including the ``self.name`` at the beginning :returns: self for chaining :rtype: :py:class:`DirectoryInfo` :raises NotEmpty: if the directory is not empty or ``self.files`` is None :raises BadPath: if the path_name does not start with the ``self.name`` """ LOG.debug('Would like to remove %s', path_name) if not path_name.startswith(self.name): raise BadPath('self.name is %s, path_name is %s' % (self.name, path_name)) exploded_name = path_name[len(self.name) + 1:].split('/') parent = self.get_node('/'.join(exploded_name[:-1])) # If the directory doesn't exist, we'll get some TypeError things node = parent.get_node(exploded_name[-1], make_new=False) if node.files: raise NotEmpty('This directory has files %s' % node.files) if node.directories: raise NotEmpty('This directory contains subdirectories %s' % [d.name for d in node.directories]) if node.files is None: raise NotEmpty('The files list is still None') if node.mtime + DirectoryInfo.ignore_age * 24 * 3600 > node.timestamp: raise NotEmpty('This directory is not old enough?') parent.directories.remove(node) return self
[docs]def get_info(file_name): """ Get the :py:class:`DirectoryInfo` from a file. :param str file_name: is the location of the saved information :returns: Saved info :rtype: DirectoryInfo """ if DirectoryInfo.ignore_age is None: DirectoryInfo.ignore_age = float(config.config_dict()['IgnoreAge']) infile = open(file_name, 'r') output = cPickle.load(infile) infile.close() return output
[docs]def compare(inventory, listing, output_base=None, orphan_check=None, missing_check=None): """ Compare two different trees and output the differences into an ASCII file :param DirectoryInfo inventory: The tree of files that should be at a site :param DirectoryInfo listing: The tree of files that are listed remotely :param str output_base: The names of the ASCII files to place the reports are generated from this variable. :param function orphan_check: A function that double checks each expected orphan. The function takes as an input, an LFN. If the function returns true, the LFN will not be listed as an orphan. :param function missing_check: A function checks each expected missing file The function takes as an input, an LFN. If the function returns true, the LFN will not be listed as missing. :returns: The two lists, missing and orphan files :rtype: tuple """ LOG.info('About to perform comparison. Results will be in files starting with %s', output_base) LOG.debug('Double checking missing with %s', missing_check) missing, _, m_size = inventory.compare(listing, check=missing_check) LOG.info('There are %i missing files', len(missing)) LOG.info('Size: %i', m_size) LOG.debug('Double checking orphans with %s', orphan_check) orphan, _, o_size = listing.compare(inventory, check=orphan_check) LOG.info('There are %i orphan files', len(orphan)) LOG.info('Size: %i', o_size) if output_base: with open('%s_missing.txt' % output_base, 'w') as missing_file: for line in missing: missing_file.write(line + '\n') with open('%s_orphan.txt' % output_base, 'w') as orphan_file: for line in orphan: orphan_file.write(line + '\n') return missing, m_size, orphan, o_size