# -*- coding: utf-8 -*- # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import collections import datetime import logging import operator import os from concurrent import futures as cf import croniter from kombu import Exchange from kombu import Queue import pause import six from stevedore import driver from entropy import exceptions from entropy import utils import imp LOG = logging.getLogger(__name__) class Engine(object): def __init__(self, name, **cfg_data): utils.reset_logger(logging.getLogger()) Engine.set_logger(**cfg_data) # constants # TODO(praneshp): Hardcode for now, could/should be cmdline input self._engine_cfg_data = cfg_data self.max_workers = 8 self.audit_type = 'audit' self.repair_type = 'repair' self.entropy_exchange = Exchange('entropy_exchange', type='fanout') self.known_queues = [] # engine variables self.name = name self.audit_cfg = cfg_data['audit_cfg'] self.repair_cfg = cfg_data['repair_cfg'] self.serializer_schedule = cfg_data['serializer_schedule'] self.engine_timeout = cfg_data['engine_timeout'] # TODO(praneshp): Assuming cfg files are in 1 dir. Change later self._backend = cfg_data['backend'] self._backend_driver = self.get_backend(self._backend, self._engine_cfg_data) self.cfg_dir = os.path.dirname(self.audit_cfg) self.log_file = cfg_data['log_file'] self.executor = cf.ThreadPoolExecutor(max_workers=self.max_workers) self.running_audits = [] self.running_repairs = [] self.futures = [] self.run_queue = collections.deque() # Private variables self._watchdog_event_fn = {self.repair_cfg: self.repair_modified} # Private variables to keep track of repair scripts. self._repairs = [] self._known_routing_keys = set() LOG.info('Created engine obj %s', self.name) # TODO(praneshp): Move to utils? @staticmethod def set_logger(**cfg_data): # Set the logger LOG.handlers = [] log_to_file = logging.FileHandler(cfg_data['log_file']) log_to_file.setLevel(logging.INFO) log_format = logging.Formatter(cfg_data['log_format']) log_to_file.setFormatter(log_format) LOG.addHandler(log_to_file) LOG.propagate = False @staticmethod def get_backend(backend, cfg_data): backend = driver.DriverManager( namespace='entropy.backend', name=backend, invoke_on_load=True, invoke_args=(cfg_data,), ) return backend.driver def run(self): LOG.info('Starting Scheduler for %s', self.name) self.start_scheduler() def start_scheduler(self): serializer = self.executor.submit(self.start_serializer) self.futures.append(serializer) # Start react scripts. self.futures.extend(self.start_react_scripts()) scheduler = self.executor.submit(self.schedule) self.futures.append(scheduler) # watchdog watchdog_thread = self.start_watchdog(self.cfg_dir) watchdog_thread.join() def schedule(self): while True: (next_time, next_jobs) = self.wait_next(self.engine_timeout) # NOTE(praneshp): here, call a function that will wait till next # time and call next_jobs, if next_jobs: self.setup_audit(next_time, next_jobs) def wait_next(self, timeout=None): watch = None next_jobs = [] if timeout is not None: watch = utils.StopWatch(duration=float(timeout)) watch.start() try: while True: if not self.run_queue: if watch and watch.expired(): raise exceptions.TimeoutException( "Died after waiting for audits to arrive for %s" % watch.elapsed()) else: # Grab all the jobs for the next time. next_jobs.append(self.run_queue.popleft()) next_time = next_jobs[0]['time'] l = len(self.run_queue) for i in xrange(l): if self.run_queue[0]['time'] == next_time: next_jobs.append(self.run_queue.popleft()) return next_time, next_jobs except exceptions.TimeoutException as te: LOG.info("%s", te.message) return None, [] except Exception: LOG.exception("Something went wrong") return None, [] def start_serializer(self): schedule = self.serializer_schedule now = datetime.datetime.now() cron = croniter.croniter(schedule, now) next_iteration = cron.get_next(datetime.datetime) while True: LOG.info('It is %s, next serializer at %s', now, next_iteration) pause.until(next_iteration) now = datetime.datetime.now() next_iteration = cron.get_next(datetime.datetime) self.run_serializer(next_iteration, now) def run_serializer(self, next_iteration, current_time): LOG.info("Running serializer for %s at %s", self.name, current_time) audits = self._backend_driver.get_audits() schedules = {} if not audits: return try: for audit_name in audits: audit_cfg = self._backend_driver.audit_cfg_from_name( audit_name) schedules[audit_name] = audit_cfg['schedule'] new_additions = [] for key in six.iterkeys(schedules): sched = schedules[key] now = datetime.datetime.now() cron = croniter.croniter(sched, now) while True: next_call = cron.get_next(datetime.datetime) if next_call > next_iteration: break new_additions.append({'time': next_call, 'name': key}) new_additions.sort(key=operator.itemgetter('time')) self.run_queue.extend(new_additions) LOG.info("Run queue till %s is %s", next_iteration, self.run_queue) LOG.info("Repair scripts at %s: %s", next_iteration, self._repairs) except Exception: LOG.exception("Could not run serializer for %s at %s", self.name, current_time) def repair_modified(self): LOG.info('Repair configuration changed') self.futures.extend(self.start_react_scripts()) def start_watchdog(self, dir_to_watch): LOG.debug('Watchdog mapping is: ', self._watchdog_event_fn) return utils.watch_dir_for_change(dir_to_watch, self._watchdog_event_fn) def setup_audit(self, execution_time, audit_list): try: pause.until(execution_time) LOG.info("Time: %s, Starting %s", execution_time, audit_list) audit_futures = [] for audit in audit_list: audit_name = audit['name'] audit_cfg = self._backend_driver.audit_cfg_from_name( audit_name) future = self.executor.submit(self.run_audit, audit_name=audit_name, **audit_cfg) audit_futures.append(future) if audit_futures: self.futures.extend(audit_futures) except Exception: LOG.exception("Could not run all audits in %s at %s", audit_list, execution_time) def start_react_scripts(self): repairs = self._backend_driver.get_repairs() futures = [] if repairs: for script in repairs: if script not in self.running_repairs: future = self.setup_react(script, **repairs[script]) if future is not None: futures.append(future) LOG.info('Running repair scripts %s', ', '.join(self.running_repairs)) return futures def setup_react(self, script, **script_args): LOG.info('Setting up reactor %s', script) # Pick out relevant info data = self._backend_driver.repair_cfg_from_name(script) react_script = data['script'] search_path, reactor = utils.get_filename_and_path(react_script) available_modules = imp.find_module(reactor, [search_path]) LOG.debug('Found these modules: %s', available_modules) try: # create any queues this react script wants, add it to a list # of known queues message_queue = Queue(self.name, self.entropy_exchange, data['routing_key']) if message_queue not in self.known_queues: self.known_queues.append(message_queue) self._known_routing_keys.add(data['routing_key']) kwargs = data kwargs['name'] = script kwargs['conf'] = script_args['cfg'] kwargs['exchange'] = self.entropy_exchange kwargs['message_queue'] = message_queue # add this job to list of running repairs self.running_repairs.append(script) imported_module = imp.load_module(react_script, *available_modules) future = self.executor.submit(imported_module.main, **kwargs) self._repairs.append(future) return future except Exception: LOG.exception("Could not setup %s", script) return None def run_audit(self, audit_name, **audit_cfg): # general stuff for the audit module # TODO(praneshp): later, fix to send only one copy of mq_args mq_args = {'mq_host': audit_cfg['mq_host'], 'mq_port': audit_cfg['mq_port'], 'mq_user': audit_cfg['mq_user'], 'mq_password': audit_cfg['mq_password']} audit_cfg['mq_args'] = mq_args audit_cfg['exchange'] = self.entropy_exchange audit_cfg['name'] = audit_name # Put a message on the mq # TODO(praneshp): this should be the path with register-audit try: audit_script = audit_cfg['module'] search_path, auditor = utils.get_filename_and_path(audit_script) available_modules = imp.find_module(auditor, [search_path]) LOG.debug('Found these modules: %s', available_modules) imported_module = imp.load_module(audit_script, *available_modules) audit_obj = imported_module.Audit(**audit_cfg) audit_obj.send_message(**audit_cfg) except Exception: LOG.exception('Could not run audit %s', audit_name)