Source code for filetracker.cachecleaner

from __future__ import absolute_import
from argparse import ArgumentParser
import collections
import datetime
import glob
import logging
import time

from filetracker.client import Client


logger = logging.getLogger(__name__)


[docs]class CacheCleaner(object): """Tool for periodically cleaning cache of the file tracker. Designed to work as a daemon. Cache cleaner is run by calling method :func:`run`. It supports multiple instances of :class:`Client`. Configuration is passed as constructors parameters: :param int cache_size_limit: soft limit for sum of logical files size :param iterable glob_cache_dirs: list of paths to :class:`filetracker.Client` cache directories as glob expressions :param datetime.timedelta scan_interval: interval specifying how often scan the disk and optionally clean cache :param float percent_cleaning_level: how many percent of ``cache_size_limit`` of newest cache files do not delete during \ cleaning cache Cache cleaner runs the following algorithm: 1. Ask each client (specified in constructor by cache directory) to list all stored files. This is *file index*. 2. Analyze file index - check whether cache cleaner should clean the cache and what files exactly. 3. Clean cache if necessary. 4. Wait time specified in constructor. 5. Go to step 1. Files are being deleting from the oldest to newer ones taking into account modification time. If files have the same modification time, then file with greater size is being deleted before the second one. """ FileIndexEntry = collections.namedtuple('FileIndexEntry', ['file_info', 'client']) """Entry for file index. Associates :class:`DataStore.FileInfoEntry` with :class:`filetracker.Client` which owns given file. Fields: * ``file_info`` instance of :class:`DataStore.FileInfoEntry` * ``client`` instance of :class:`filetracker.Client` which owns given \ file """ def __init__(self, cache_size_limit, glob_cache_dirs, scan_interval=datetime.timedelta(minutes=10), percent_cleaning_level=50.0): assert glob_cache_dirs self.glob_cache_dirs = glob_cache_dirs self.clients = [] self.file_index = [] self.scan_interval = scan_interval self.cache_size_limit = cache_size_limit self.cleaning_level = cache_size_limit * percent_cleaning_level / 100.
[docs] def run(self): """Starts cleaning cache in infinite loop. """ logger.info("Starting daemon.") while True: try: self._scan_disk() do_cleaning, delete_from_index = self._analyze_file_index() if do_cleaning: self._clean_cache(delete_from_index) except Exception: logger.exception("Following exception occurred:") sleeping_until_time = datetime.datetime.now() + self.scan_interval logger.info("Sleeping until %s.", sleeping_until_time) time.sleep(self.scan_interval.total_seconds())
def _scan_disk(self): logger.info("Scanning disk...") dirs = [] for glob_cache_dir in self.glob_cache_dirs: dirs.extend(glob.glob(glob_cache_dir)) logger.debug("Glob expressions have expanded to %s", dirs) self.clients = [Client(cache_dir=dir, remote_store=None) for dir in dirs] self.file_index = [] for client in self.clients: for file_info in client.list_local_files(): self.file_index.append( CacheCleaner.FileIndexEntry(file_info=file_info, client=client)) def _analyze_file_index(self): logger.info("Analyzing cache...") self.file_index.sort(key=lambda x: (-x.file_info.mtime, x.file_info.size)) cache_size = 0 delete_from_index = None do_cleaning = False for i, entry in enumerate(self.file_index): cache_size += entry.file_info.size if (delete_from_index is None and cache_size >= self.cleaning_level): delete_from_index = i if cache_size >= self.cache_size_limit: do_cleaning = True logger.info("Analysis done. Cache size: %s.", format_size_with_unit(cache_size)) if not do_cleaning: logger.info("Decided not to perform cache cleaning.") return do_cleaning, delete_from_index def _clean_cache(self, delete_from_index): # assumption: run after cache analyze logger.info("Performing cache cleaning...") deleted_files_cnt = 0 deleted_bytes = 0 assert (self.file_index[0].file_info.mtime >= self.file_index[-1].file_info.mtime) for entry in self.file_index[delete_from_index:]: logger.debug("Deleting file: %s from store located at: %s", entry.file_info.name, entry.client.local_store.dir) try: entry.client.delete_file(entry.file_info.name) deleted_files_cnt += 1 deleted_bytes += entry.file_info.size except Exception as e: logger.warning('During deleting file %s occurred following ' 'exception: %s', entry.file_info.name, e, exc_info=True) del self.file_index[delete_from_index:] logger.info("Cleaning done. Deleted %d files, total %s.", deleted_files_cnt, format_size_with_unit(deleted_bytes))
def main(): usage = "usage: %(prog)s [options]" parser = ArgumentParser(usage=usage) parser.add_argument('-c', '--cache-dirs', dest='glob_cache_dirs', nargs='+', default=[Client.DEFAULT_CACHE_DIR], help="Paths to the local cache directories specified " "as glob expressions. If not specified, uses default " "File Tracker directory: %(default)s") parser.add_argument('-s', '--cache-size-limit', dest='cache_size_limit', help="Soft limit for cache. Must be used with the " "following units: B, K, M, G, T and can be chained. " "Example: 1G512M. Note: K=2**10.") parser.add_argument('-i', '--scan-interval', dest='scan_interval', default='1h', help="How often performs cache scanning. Must be " "used with the following units: s, m, h, d and can " "be chained. Example: 1h30m [Default: %(default)s]") parser.add_argument('-p', '--percent-cleaning-level', dest='percent_cleaning_level', type=float, default=50.0, help="Percent of cache size limit that should be " "*NOT* deleted during cleaning the cache. Newest " "files will remain. [Default: %(default).1f]") logging_group = parser.add_mutually_exclusive_group() logging_group.add_argument('-d', '--debug', dest='debug', default=False, action='store_true', help="Enables debug logging.") logging_group.add_argument('-q', '--quiet', dest='quiet', default=False, action='store_true', help="Disables logging.") args = parser.parse_args() if not args.cache_size_limit: parser.error("Missing --cache-size-limit option. " "Try --help for more details.") level = logging.INFO if args.debug: level = logging.DEBUG if args.quiet: level = None logging.basicConfig( format="%(asctime)-23s %(name)s %(levelname)s: %(message)s", level=level) try: scan_interval = parse_time_delta(args.scan_interval) cache_size_limit = parse_size(args.cache_size_limit) except ValueError as exception: parser.error(exception) daemon = CacheCleaner( cache_size_limit=cache_size_limit, glob_cache_dirs=args.glob_cache_dirs, scan_interval=scan_interval, percent_cleaning_level=args.percent_cleaning_level) daemon.run() _time_units = dict(s=1, m=60, h=60 * 60, d=24 * 60 * 60) _size_units = dict(B=1, K=2**10, M=2**20, G=2**30, T=2**40) def parse_time_delta(text): seconds = parse_units(text, _time_units) return datetime.timedelta(seconds=seconds) def parse_size(text): return parse_units(text, _size_units) def format_size_with_unit(number): return format_with_unit(number, _size_units) def parse_units(text, units): result = 0 value = '' for ch in list(str(text).strip()): if ch.isdigit(): value += ch elif ch not in units: raise ValueError("Unknown unit \"{}\" in: {}".format(ch, text)) else: unit = units[ch] if not value: raise ValueError("Unit without numeric value: {}".format( text)) result += unit * int(value) value = '' if value: raise ValueError("Numeric value without unit: {}".format(text)) return result def format_with_unit(number, size_units): units = sorted(list(size_units.items()), key=lambda x: -x[1]) for unit, size in units: if number >= size: return "{amount:.3f}{unit}".format(amount=float(number) / size, unit=unit) return "{amount:.3f}{unit}".format(amount=number, unit=units[-1][0]) if __name__ == '__main__': main()