Source code for dynamo_consistency.history

"""
Handles the invalidation of files through a separate read-write process
"""

import os
import sqlite3
import logging
from datetime import datetime

from . import config
from . import lock


LOG = logging.getLogger(__name__)
RUN = 0


[docs]class LockedConn(object): """ Similar to :py:class:`dynamo_consistency.summary.LockedConn` We want to handle the history database here though """ def __init__(self): self.lock = lock.acquire('history') dbname = os.path.join(config.vardir('db'), 'consistency.db') new = not os.path.exists(dbname) self.conn = sqlite3.connect(dbname) self.curs = self.conn.cursor() if new: with open(os.path.join(os.path.dirname(__file__), 'report_schema.sql'), 'r') as script_file: script_text = ''.join(script_file) self.curs.executescript(script_text)
[docs] def close(self): """Commit and close the connection""" self.conn.commit() self.conn.close() lock.release(self.lock)
def _connect(): """ :returns: A connection to the consistency database, along with a cursor. It creates the invalid table, if needed. .. note:: This connection needs to be closed by the caller :rtype: sqlite3.Connection, sqlite3.Cursor """ conn = LockedConn() return conn, conn.curs
[docs]def start_run(): """ Called in :py:func:`dynamo_consistency.main.main` to register the start of a consistency run """ global RUN # pylint: disable=global-statement conn, curs = _connect() curs.execute('INSERT OR IGNORE INTO sites (name) VALUES (?)', (config.SITE, )) curs.execute(""" INSERT INTO runs (site) SELECT rowid FROM sites WHERE name = ? """, (config.SITE, )) curs.execute(""" SELECT runs.rowid FROM runs LEFT JOIN sites ON sites.rowid = runs.site WHERE sites.name = ? ORDER BY runs.rowid DESC LIMIT 1 """, (config.SITE, )) RUN = curs.fetchone()[0] conn.close()
[docs]def finish_run(): """ Called in :py:func:`dynamo_consistency.main.main` to register the end of a consistency run """ global RUN # pylint: disable=global-statement conn, curs = _connect() curs.execute(""" UPDATE runs SET finished = DATETIME('NOW', 'LOCALTIME') WHERE rowid = ? """, (RUN, )) RUN = None conn.close()
def _insert_directories(curs, files): """ Inserts the directories into the proper table, given a cursor. :param sqlite3.Cursor curs: a cursor object to make the query :param list files: List of file, info dict tuples """ directories = set([os.path.dirname(name) for name, _ in files]) curs.executemany('INSERT OR IGNORE INTO directories (name) VALUES (?)', [(dirname, ) for dirname in directories]) def _current_siteid(curs): """ Get the site ID from the database :param sqlite3.Cursor curs: A cursor from the database to read :returns: The ID of the site :rtype: int """ if not RUN: return None curs.execute('SELECT rowid FROM sites WHERE name = ?', (config.SITE, )) return curs.fetchone()[0] def _report_files(table, files): """ Reports to either invalid table or orphan table. Moves old files to the history table if they haven't been acted on yet. :param str table: Which table to use :param list files: Tuples of name, info dict of files to report """ conn, curs = _connect() _insert_directories(curs, files) siteid = _current_siteid(curs) # Copy into history curs.execute( """ INSERT INTO {table}_history (site, run, directory, name, size, mtime, entered, acted) SELECT site, run, directory, {table}.name, size, mtime, entered, 0 FROM {table} WHERE {table}.site = ? AND {table}.run != ? """.format(table=table), (siteid, RUN)) # Remove old entries curs.execute( """ DELETE FROM {table} WHERE {table}.site = ? AND {table}.run != ? """.format(table=table), (siteid, RUN)) curs.executemany( """ INSERT INTO {table} (site, run, directory, name, size, mtime) VALUES (?, ?, (SELECT rowid FROM directories WHERE name = ?), ?, ?, ?) """.format(table=table), [(siteid, RUN, os.path.dirname(name), os.path.basename(name), info['size'], datetime.fromtimestamp(info['mtime'])) for name, info in files]) conn.close() def _get_files(table, site, acting): """ Get list of files for a site from a given table. :param str table: Table to read from :param str site: Name of a site to get files for :param bool acting: Whether or not the caller is acting on the files :returns: The LFNs from the table :rtype: list """ conn, curs = _connect() curs.execute( """ SELECT directories.name, {table}.name FROM {table} LEFT JOIN sites ON sites.rowid = {table}.site LEFT JOIN directories ON directories.rowid = {table}.directory WHERE sites.name = ? ORDER BY directories.name, {table}.name """.format(table=table), (site, )) output = list([os.path.join(directory, out) for directory, out in curs.fetchall()]) if acting: curs.execute( """ INSERT INTO {table}_history (site, run, directory, name, size, mtime, entered, acted) SELECT site, run, directory, {table}.name, size, mtime, entered, DATETIME('NOW', 'LOCALTIME') FROM {table} LEFT JOIN sites ON sites.rowid = {table}.site WHERE sites.name = ? """.format(table=table), (site, ) ) curs.execute( """ DELETE FROM {table} WHERE site IN ( SELECT rowid FROM sites WHERE sites.name = ? ) """.format(table=table), (site, )) conn.close() return output
[docs]def report_missing(missing): """ Stores a list of missing files in the invalidation table :param list missing: A list of tuples, where each tuple is a name, info dict pair """ _report_files('invalid', missing)
[docs]def missing_files(site, acting=False): """ Get the missing files from the consistency database. If the caller identifies itself as acting on the list, the list is moved into the history with the acted flag `True`. :param str site: Name of a site to get missing files for :param bool acting: Whether or not the caller is acting on the files :returns: The LFNs that were missing :rtype: list """ return _get_files('invalid', site, acting)
[docs]def report_orphan(orphan): """ Stores a list of orphan files in the orphan table :param list orphan: A list of tuples, where each tuple is a name, info dict pair """ missing = set(missing_files(config.SITE)) _report_files('orphans', [info for info in orphan if info[0] not in missing] )
[docs]def orphan_files(site, acting=False): """ Get the orphan files from the consistency database. If the caller identifies itself as acting on the list, the list is moved into the history with the acted flag `True`. :param str site: Name of a site to get orphan files for :param bool acting: Whether or not the caller is acting on the files :returns: The LFNs that were orphan :rtype: list """ return _get_files('orphans', site, acting)
[docs]def report_unmerged(unmerged): """ Stores a list of deletable unmerged files in the orphan table :param list unmerged: A list of tuples, where each tuple is a name, info dict pair """ _report_files('unmerged', unmerged)
[docs]def unmerged_files(site, acting=False): """ Get the deletable unmerged files from the consistency database. If the caller identifies itself as acting on the list, the list is moved into the history with the acted flag `True`. :param str site: Name of a site to get unmerged files for :param bool acting: Whether or not the caller is acting on the files :returns: The LFNs in unmerged that are deletable :rtype: list """ return _get_files('unmerged', site, acting)
[docs]def report_empty(directories): """ Adds emtpy directories to history database :param list directories: A list of directory names and mtime (in seconds) """ conn, curs = _connect() siteid = _current_siteid(curs) table = 'empty_directories' # Copy into history curs.execute( """ INSERT INTO {table}_history (site, run, name, mtime, entered, acted) SELECT site, run, {table}.name, {table}.mtime, entered, 0 FROM {table} WHERE {table}.site = ? AND {table}.run != ? """.format(table=table), (siteid, RUN)) # Remove old entries curs.execute( """ DELETE FROM {table} WHERE {table}.site = ? AND {table}.run != ? """.format(table=table), (siteid, RUN)) for name, mtime in directories: try: LOG.debug('Trying to insert empty directory %s', name) curs.execute( """ INSERT INTO {table} (site, run, name, mtime) VALUES (?, ?, ?, ?) """.format(table=table), (siteid, RUN, name, datetime.fromtimestamp(mtime)) ) except sqlite3.IntegrityError: LOG.error('Could not insert %s into %s (id %i, run %i)', name, config.SITE, siteid, RUN) conn.close()
[docs]def empty_directories(site, acting=False): """ Get the list of empty directories. If acting on them, the directories are moved into the history database. :param str site: Name of a site to get empty directories for :param bool acting: Whether or not the caller is acting on the list :returns: The directory list :rtype: list """ conn, curs = _connect() table = 'empty_directories' curs.execute( """ SELECT {table}.name FROM {table} LEFT JOIN sites ON sites.rowid = {table}.site WHERE sites.name = ? ORDER BY {table}.name DESC """.format(table=table), (site, )) output = [out[0] for out in curs.fetchall()] if acting: curs.execute( """ INSERT INTO {table}_history (site, run, name, mtime, entered, acted) SELECT site, run, {table}.name, {table}.mtime, entered, DATETIME('NOW', 'LOCALTIME') FROM {table} LEFT JOIN sites ON sites.rowid = {table}.site WHERE sites.name = ? """.format(table=table), (site, ) ) curs.execute( """ DELETE FROM {table} WHERE site IN ( SELECT rowid FROM sites WHERE sites.name = ? ) """.format(table=table), (site, )) conn.close() return output