Introduce a serializer
When the engine is started, start a serializer thread. The crontab for this is picked up from the engine cfg. The serializer maintains a queue of upcoming audits, and the engine should pick off audits as their scheduled time comes up. This will save us from running one thread per registered audit script, having just as many threads as there are currently scheculed audits. Repair scripts stay the way they are. implements blueprint backend-abstraction Change-Id: If17477f9939c69b454ba389f2f0f8cce47b94cca
This commit is contained in:
parent
38520d41d8
commit
b2ea4abbdb
|
@ -15,11 +15,12 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import datetime
|
||||
import logging
|
||||
import operator
|
||||
import os
|
||||
|
||||
from concurrent import futures as cf
|
||||
import croniter
|
||||
from kombu import Exchange
|
||||
from kombu import Queue
|
||||
|
@ -39,19 +40,21 @@ class Engine(object):
|
|||
self.max_workers = 8
|
||||
self.audit_type = 'audit'
|
||||
self.repair_type = 'repair'
|
||||
self.entropy_exchange = Exchange('entropy_exchage', type='fanout')
|
||||
self.entropy_exchange = Exchange('entropy_exchange', type='fanout')
|
||||
self.known_queues = []
|
||||
# engine variables
|
||||
self.name = name
|
||||
self.audit_cfg = cfg_data['audit_cfg']
|
||||
self.repair_cfg = cfg_data['repair_cfg']
|
||||
self.serializer_schedule = cfg_data['serializer_schedule']
|
||||
# TODO(praneshp): Assuming cfg files are in 1 dir. Change later
|
||||
self.cfg_dir = os.path.dirname(self.audit_cfg)
|
||||
self.log_file = cfg_data['log_file']
|
||||
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
|
||||
self.executor = cf.ThreadPoolExecutor(max_workers=self.max_workers)
|
||||
self.running_audits = []
|
||||
self.running_repairs = []
|
||||
self.futures = []
|
||||
self.run_queue = []
|
||||
LOG.info('Created engine obj %s', self.name)
|
||||
|
||||
# TODO(praneshp): Move to utils?
|
||||
|
@ -71,17 +74,60 @@ class Engine(object):
|
|||
self.start_scheduler()
|
||||
|
||||
def start_scheduler(self):
|
||||
# Start watchdog thread, which will detect any new audit/react scripts
|
||||
# TODO(praneshp): Look into how to do this with threadpoolexecutor?
|
||||
watchdog_thread = self.start_watchdog(self.cfg_dir) # noqa
|
||||
serializer = self.executor.submit(self.start_serializer)
|
||||
self.futures.append(serializer)
|
||||
|
||||
# Start react and audit scripts.
|
||||
# Start react scripts.
|
||||
self.futures.append(self.start_scripts('repair'))
|
||||
self.futures.append(self.start_scripts('audit'))
|
||||
watchdog_thread.join()
|
||||
|
||||
# TODO(praneshp): For now, only addition of scripts. Take care of
|
||||
# deletion later
|
||||
scheduler = self.executor.submit(self.schedule)
|
||||
self.futures.append(scheduler)
|
||||
|
||||
def schedule(self):
|
||||
pass
|
||||
|
||||
def start_serializer(self):
|
||||
schedule = self.serializer_schedule
|
||||
now = datetime.datetime.now()
|
||||
cron = croniter.croniter(schedule, now)
|
||||
next_iteration = cron.get_next(datetime.datetime)
|
||||
while True:
|
||||
LOG.info('It is %s, next serializer at %s', now, next_iteration)
|
||||
pause.until(next_iteration)
|
||||
now = datetime.datetime.now()
|
||||
next_iteration = cron.get_next(datetime.datetime)
|
||||
self.run_serializer(next_iteration, now)
|
||||
|
||||
def run_serializer(self, next_iteration, current_time):
|
||||
LOG.info("Running serializer for %s at %s", self.name, current_time)
|
||||
audits = utils.load_yaml(self.audit_cfg)
|
||||
schedules = {}
|
||||
try:
|
||||
for audit_name in audits:
|
||||
conf = audits[audit_name]['cfg']
|
||||
audit_config = dict(utils.load_yaml(conf))
|
||||
schedules[audit_name] = audit_config['schedule']
|
||||
|
||||
new_additions = []
|
||||
|
||||
for key in schedules.keys():
|
||||
sched = schedules[key]
|
||||
now = datetime.datetime.now()
|
||||
cron = croniter.croniter(sched, now)
|
||||
while True:
|
||||
next_call = cron.get_next(datetime.datetime)
|
||||
if next_call > next_iteration:
|
||||
break
|
||||
new_additions.append({'time': next_call, 'name': key})
|
||||
|
||||
new_additions.sort(key=operator.itemgetter('time'))
|
||||
self.run_queue += new_additions
|
||||
LOG.info("Run queue till %s is %s", next_iteration, self.run_queue)
|
||||
except Exception:
|
||||
LOG.exception("Could not run serializer for %s at %s",
|
||||
self.name, current_time)
|
||||
|
||||
# TODO(praneshp): For now, only addition of scripts. Handle deletion later
|
||||
def audit_modified(self):
|
||||
LOG.info('Audit configuration changed')
|
||||
self.futures.append(self.start_scripts('audit'))
|
||||
|
|
Loading…
Reference in New Issue