diff --git a/shoebox/roll_manager.py b/shoebox/roll_manager.py index be038ef..9578a6b 100644 --- a/shoebox/roll_manager.py +++ b/shoebox/roll_manager.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os.path + import archive import utils @@ -26,7 +28,8 @@ class RollManager(object): def _make_filename(self): f = utils.now().strftime(self.filename_template) - return f.replace(" ", "_") + f = f.replace(" ", "_") + return os.path.join(self.directory, f) def get_active_archive(self): if not self.active_archive: diff --git a/test/integration/gen_events.py b/test/integration/gen_events.py new file mode 100644 index 0000000..20b6c04 --- /dev/null +++ b/test/integration/gen_events.py @@ -0,0 +1,309 @@ +# Copyright (c) 2014 Dark Secret Software Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utilities for Event generation for integration tests. + +Built from work done in https://github.com/SandyWalsh/twobillion + +TODO: Break this out into a standalone library. +""" + +import datetime +import heapq +import uuid as uuidlib +import random +import time + + +COMPUTE_EVENTS = [ + 'compute.instance.finish_resize.*', + 'compute.instance.power_off.*', + 'compute.instance.power_on.*', + 'compute.instance.reboot.*', + 'compute.instance.rebuild.*', + 'compute.instance.resize.confirm.*', + 'compute.instance.resize.prep.*', + 'compute.instance.resize.revert.*', + 'compute.instance.resize.*', + 'compute.instance.shutdown.*', + 'compute.instance.snapshot.*', + 'compute.instance.suspend', + 'compute.instance.resume', + 'compute.instance.exists', + 'compute.instance.update', + 'attach_volume', + 'change_instance_metadata', + 'detach_volume', + 'finish_resize', + 'finish_revert_resize', + 'get_vnc_console', + 'power_on_instance', + 'prep_resize', + 'reboot_instance', + 'rebuild_instance', + 'rescue_instance', + 'reserve_block_device_name', + 'resize_instance', + 'revert_resize', + 'run_instance', + 'set_admin_password', + 'snapshot_instance', + 'start_instance', + 'suspend_instance', + 'terminate_instance', + 'unrescue_instance'] + +SCHEDULER_EVENTS = ['scheduler.run_instance.start', + 'scheduler.run_instance.scheduled', + 'scheduler.run_instance.end'] + +SCHEDULERS = ['scheduler_%02d' % x for x in xrange(3)] + +COMPUTE_NODES = ['compute_%03d' % x for x in xrange(100)] + +API_NODES = ['api.server.%02d' % x for x in xrange(10)] + + +class EventGenerator(object): + def __init__(self, operations_per_minute=1000): + self.instances = {} # { uuid: compute_node } + + # Many actions can be performed concurrently. + # We might start working on instance #1 and then, while that + # effort is still underway, start doing something with instances + # #2, 3 and 4. They need to interleave each other. + # + # An "action", below, is a list of each of the steps necessary + # to perform that operation, but with a time component relative to + # the starting time passed in. + # It's our responsibility to fire off the events when sufficient "time" + # has passed. + # + # The thing we don't want to have to deal with is overlapping commands + # (like instance.delete starting while instance.create is still underway) + # That's too much headache. + + operations_per_second = float(operations_per_minute) / 60.0 + + # An operation will happen every so many milliseconds to + # get our operations/sec. We call this a Tick. + self.millisecond_per_tick = 1000.0 / float(operations_per_second) + print "Operation every %d ms (%.1f/sec)" % (self.millisecond_per_tick, + operations_per_second) + self.next_events = [] # priority queue + self.instances_in_use = set() + + now = datetime.datetime.utcnow() + self.tick = now + datetime.timedelta( + milliseconds=self.millisecond_per_tick) + + def generate(self, now): + self._add_new_sequence(now) + return self._get_ready_events(now) + + def move_to_next_tick(self, now): + return now + datetime.timedelta(milliseconds=self.millisecond_per_tick) + + def _add_new_sequence(self, now): + """Add a new operation to the queue. + This is the entire sequence of events going into + the future. They will be interwoven with other + future events and pumped out in proper (interleaving) + order.""" + if now >= self.tick: + action = self._get_action(now) + for idx, event in enumerate(action): + when = event['when'] + if idx == 0: + uuid = event['uuid'][-4:] + request = event['request_id'][-4:] + if event['is_create']: + print "CREATE:", + if event['is_delete']: + print "DELETE:", + if event['is_update']: + print "UPDATE:", + print "U:%s R:%s" % (uuid, request), + print "(%d of %d)" % (len(self.instances_in_use), \ + len(self.instances)) + # (when, event, is_first_event, is_last_event) + heapq.heappush(self.next_events, + (when, event, idx==0, idx==len(action)-1)) + self.tick = self.move_to_next_tick(now) + return now + + def _get_ready_events(self, now): + """Pump out all the ready events.""" + ready = [] + while True: + if not self.next_events: + return ready + when, event, start, end = self.next_events[0] # peek + if when > now: + return ready + when, event, start, end = heapq.heappop(self.next_events) + uuid = event['uuid'] + request = event['request_id'] + if end: + if event['is_create']: + self.instances_in_use.add(uuid) + elif event['is_delete']: + self.instances_in_use.remove(uuid) + print "%s %40s U:%4s" % (' ' * 20, event['event'], uuid[-4:]) + ready.append(event) + + def _get_action(self, now): + """Get an action sequence. A series of related events + that perform an operation. At this stage all it has + is a request_id.""" + request_id = "req_" + str(uuidlib.uuid4()) + base = {'request_id': request_id} + return self._make_action(now, base) + + def _make_action(self, now, base): + """Start creating records that look like OpenStack events. + + api [-> scheduler] -> compute node. + + instances_in_use is different than instances.keys(): + instances.keys() is the list of all instances, even instances that + don't exist yet, but will be created in the near future. + instance_in_use are the instances in the current timeline. + + While there are no in-use instances, create new ones. + + After that, 10% chance of new instance. Otherwise, + 20% chance it's a delete. The remaining 80% are + instance update operations. + """ + event_chain = [] + + is_create = random.randrange(100) < 10 + is_delete = False + is_update = False + + uuid = str(uuidlib.uuid4()) + compute_node = random.choice(COMPUTE_NODES) + + if not is_create and not self.instances_in_use: + is_create = True + + if not is_create: + temp_uuid = random.choice(list(self.instances_in_use)) + try: + compute_node = self.instances[temp_uuid] + uuid = temp_uuid + + # 20% of the time it's a Delete, otherwise an Update ... + is_delete = random.randrange(100) < 20 + if not is_delete: + is_update = True + except KeyError: + # The instance is in the process of being deleted. + is_create = True + + if not (is_create or is_delete or is_update): + raise Exception("Why?!") + is_create = True + + nbase = {'uuid': uuid, 'is_create': is_create, 'is_delete': is_delete, + 'is_update': is_update} + nbase.update(base) + + # All operations start with an API call ... + api = self._mk_event(now, nbase, API_NODES, + ['compute.instance.update']) + event_chain.extend(api) + + if is_create: + now = self._bump_time(now, 0.5, 3.0) # From api to scheduler + scheduler_node = random.choice(SCHEDULERS) + for e in SCHEDULER_EVENTS: + event_chain.extend(self._event(now, nbase, scheduler_node, e)) + now = self._bump_time(now, 0.1, 0.5) # inside scheduler + + now = self._bump_time(now, 0.5, 3.0) # In Compute node + event_chain.extend(self._event(now, nbase, compute_node, + 'compute.instance.create.*')) + self.instances[uuid] = compute_node + + if is_delete: + event_chain.extend(self._event(now, nbase, compute_node, + 'compute.instance.delete.*')) + del self.instances[uuid] + + if is_update: + event = random.choice(COMPUTE_EVENTS) + event_chain.extend(self._event(now, nbase, compute_node, event)) + + return event_chain + + def _bump_time(self, now, low, high): + """Create a random time in fractional seconds and move now ahead + that amount.""" + secs = low + ((high - low) * random.random()) + return now + datetime.timedelta(seconds=secs) + + def _mk_event(self, now, base, nodes, events): + """Make a single event with random node/events. + If the event name ends in .* we will generate + the corresponding .start and .end events + while we're at it.""" + return self._event(now, base, random.choice(nodes), + random.choice(events)) + + def _event(self, now, base, node, event): + """Make a single event or a pair of events (depending on the + event type)""" + results = [] + if event[-1] == '*': + event = event[0:-1] + extra = {'when': now, 'node': node} + results.append(self._pkg(base, extra, {'event': event + "start"})) + now = self._bump_time(now, 0.25, 60.0 * 15.0) # In compute node + extra = {'when': now, 'node': node} + results.append(self._pkg(base, extra, {'event': event + "end"})) + else: + extra = {'when': now, 'node': node} + results.append(self._pkg(base, extra, {'event': event})) + return results + + def _pkg(self, *args): + """Pack together a bunch of dict's into a single dict.""" + new = {} + for a in args: + new.update(a) + return new + + +if __name__ == '__main__': + + real_time = False + + g = EventGenerator(100) + now = datetime.datetime.utcnow() + start = now + nevents = 0 + while nevents < 100: + e = g.generate(now) + if e: + nevents += len(e) + + if real_time: + now = datetime.datetime.utcnow() + else: + now = g.move_to_next_tick(now) + + print "Elapsed: ", datetime.datetime.utcnow() - start diff --git a/test/integration/test_rolling.py b/test/integration/test_rolling.py new file mode 100644 index 0000000..d7be2c9 --- /dev/null +++ b/test/integration/test_rolling.py @@ -0,0 +1,38 @@ +import datetime +import mock +import os +import shutil +import unittest + + +from shoebox import roll_checker +from shoebox import roll_manager +from shoebox import utils + +import test.integration.gen_events as egen + + +TEMPDIR = "test_temp" + + +class TestSizeRolling(unittest.TestCase): + def setUp(self): + shutil.rmtree(TEMPDIR, ignore_errors=True) + os.mkdir(TEMPDIR) + + def tearDown(self): + shutil.rmtree(TEMPDIR) + + def test_size_rolling(self): + checker = roll_checker.SizeRollChecker(1) + manager = roll_manager.WritingRollManager("test_%c.events", + checker, + TEMPDIR) + g = egen.EventGenerator(6000) + nevents = 0 + now = datetime.datetime.utcnow() + while nevents < 1000: + e = g.generate(now) + if e: + nevents += len(e) + now = g.move_to_next_tick(now) diff --git a/test/test_roll_manager.py b/test/test_roll_manager.py index d2c9409..d1bd4e5 100644 --- a/test/test_roll_manager.py +++ b/test/test_roll_manager.py @@ -18,7 +18,8 @@ class TestRollManager(unittest.TestCase): with mock.patch.object(utils, "now") as dt: dt.return_value = now filename = x._make_filename() - self.assertEqual(filename, "filename_Sat_Feb__1_10:11:12_2014.dat") + self.assertEqual(filename, + "./filename_Sat_Feb__1_10:11:12_2014.dat") class FakeArchive(object):