Task manager refactoring done

Manager API class removed and rpc fake impl used now for integration
testing

Change-Id: If20004f1b69e41e8243a3f58eec184f1f457affe
Implements: blueprit trove-task-manager-client-refactoring
This commit is contained in:
Dmitriy Ukhlov 2013-09-20 21:50:08 +03:00
parent b81f8c0682
commit 9f55a06e09
28 changed files with 176 additions and 302 deletions

View File

@ -37,10 +37,10 @@ if os.path.exists(os.path.join(possible_topdir, 'trove', '__init__.py')):
sys.path.insert(0, possible_topdir)
from trove.common import cfg
from trove.common import rpc
from trove.common.rpc import service as rpc_service
from oslo.config import cfg as openstack_cfg
from trove.openstack.common import log as logging
from trove.openstack.common import service
from trove.openstack.common import service as openstack_service
from trove.db import get_db_api
CONF = cfg.CONF
@ -58,8 +58,8 @@ if __name__ == '__main__':
if not manager:
msg = "Manager not found for service type " + CONF.service_type
raise RuntimeError(msg)
server = rpc.RpcService(manager=manager, host=CONF.guest_id)
launcher = service.launch(server)
server = rpc_service.RpcService(manager=manager, host=CONF.guest_id)
launcher = openstack_service.launch(server)
launcher.wait()
except RuntimeError as error:
import traceback

View File

@ -37,10 +37,10 @@ if os.path.exists(os.path.join(possible_topdir, 'trove', '__init__.py')):
sys.path.insert(0, possible_topdir)
from trove.common import cfg
from trove.common import rpc
from trove.common.rpc import service as rpc_service
from oslo.config import cfg as openstack_cfg
from trove.openstack.common import log as logging
from trove.openstack.common import service
from trove.openstack.common import service as openstack_service
from trove.db import get_db_api
CONF = cfg.CONF
@ -52,9 +52,9 @@ if __name__ == '__main__':
try:
get_db_api().configure_db(CONF)
server = rpc.RpcService(manager=CONF.taskmanager_manager,
topic="mgmt-taskmanager")
launcher = service.launch(server)
server = rpc_service.RpcService(manager=CONF.taskmanager_manager,
topic="mgmt-taskmanager")
launcher = openstack_service.launch(server)
launcher.wait()
except RuntimeError as error:
import traceback

View File

@ -55,14 +55,14 @@ if __name__ == '__main__':
if not debug_utils.enabled():
eventlet.monkey_patch(thread=True)
from trove.common import rpc
from trove.openstack.common import service
from trove.common.rpc import service as rpc_service
from trove.openstack.common import service as openstack_service
from trove.db import get_db_api
try:
get_db_api().configure_db(CONF)
server = rpc.RpcService(manager=CONF.taskmanager_manager)
launcher = service.launch(server)
server = rpc_service.RpcService(manager=CONF.taskmanager_manager)
launcher = openstack_service.launch(server)
launcher.wait()
except RuntimeError as error:
import traceback

View File

@ -2,7 +2,6 @@
"report_directory":"rdli-test-report",
"start_services": false,
"simulate_events":true,
"test_mgmt":false,
"use_local_ovz":false,
"use_venv":false,

View File

@ -1,19 +1,21 @@
[DEFAULT]
# Fake out the remote implementations
remote_implementation = fake
remote_nova_client = trove.tests.fakes.nova.fake_create_nova_client
remote_guest_client = trove.tests.fakes.guestagent.fake_create_guest_client
remote_swift_client = trove.tests.fakes.swift.fake_create_swift_client
remote_cinder_client = trove.tests.fakes.nova.fake_create_cinder_client
# Fake out the RPC implementation
rpc_backend = trove.common.rpc.impl_fake
# This will remove some of the verbose logging when trying to diagnose tox issues
#default_log_levels=routes.middleware=ERROR,trove.common.auth=WARN
fake_mode_events = eventlet
log_file = rdtest.log
use_stderr = False
# Show more verbose log output (sets INFO log level output)
verbose = True

View File

@ -9,8 +9,11 @@ from trove.openstack.common import log as logging
from trove.tests.config import CONFIG
from wsgi_intercept.httplib2_intercept import install as wsgi_install
import proboscis
from eventlet import greenthread
import wsgi_intercept
from trove.openstack.common.rpc import service as rpc_service
import eventlet
eventlet.monkey_patch(thread=False)
CONF = cfg.CONF
@ -37,11 +40,15 @@ def initialize_trove(config_file):
cfg.CONF(args=[],
project='trove',
default_config_files=[config_file])
CONF.use_stderr = False
CONF.log_file = 'rdtest.log'
logging.setup(None)
CONF.bind_port = 8779
CONF.fake_mode_events = 'simulated'
topic = CONF.taskmanager_queue
from trove.taskmanager import manager
manager_impl = manager.Manager()
taskman_service = rpc_service.Service(None, topic=topic,
manager=manager_impl)
taskman_service.start()
return pastedeploy.paste_deploy_app(config_file, 'trove', {})
@ -76,19 +83,6 @@ def initialize_fakes(app):
CONF.bind_port,
wsgi_interceptor)
# Finally, engage in some truly evil monkey business. We want
# to change anything which spawns threads with eventlet to instead simply
# put those functions on a queue in memory. Then, we swap out any functions
# which might try to take a nap to instead call functions that go through
# this queue and call the functions that would normally run in seperate
# threads.
import eventlet
from trove.tests.fakes.common import event_simulator_sleep
eventlet.sleep = event_simulator_sleep
greenthread.sleep = event_simulator_sleep
import time
time.sleep = event_simulator_sleep
def parse_args_for_test_config():
for index in range(len(sys.argv)):
@ -99,17 +93,10 @@ def parse_args_for_test_config():
return arg[14:]
return 'etc/tests/localhost.test.conf'
def replace_poll_until():
from trove.common import utils as rd_utils
from trove.tests import util as test_utils
rd_utils.poll_until = test_utils.poll_until
if __name__ == "__main__":
try:
wsgi_install()
add_support_for_localization()
replace_poll_until()
# Load Trove app
# Paste file needs absolute path
config_file = os.path.realpath('etc/trove/trove.conf.test')
@ -120,6 +107,7 @@ if __name__ == "__main__":
# Swap out WSGI, httplib, and several sleep functions
# with test doubles.
initialize_fakes(app)
# Initialize the test configuration.
test_config_file = parse_args_for_test_config()
CONFIG.load_from_file(test_config_file)

View File

@ -48,9 +48,6 @@ common_opts = [
default=True,
help='File name for the paste.deploy config for trove-api'),
cfg.ListOpt('admin_roles', default=['admin']),
cfg.StrOpt('remote_implementation',
default="real",
help='Remote implementation for using fake integration code'),
cfg.StrOpt('nova_compute_url', default='http://localhost:8774/v2'),
cfg.StrOpt('cinder_url', default='http://localhost:8776/v2'),
cfg.StrOpt('heat_url', default='http://localhost:8004/v1'),
@ -107,7 +104,6 @@ common_opts = [
cfg.StrOpt('taskmanager_queue', default='taskmanager'),
cfg.BoolOpt('use_nova_server_volume', default=False),
cfg.BoolOpt('use_heat', default=False),
cfg.StrOpt('fake_mode_events', default='simulated'),
cfg.StrOpt('device_path', default='/dev/vdb'),
cfg.StrOpt('mount_point', default='/var/lib/mysql'),
cfg.StrOpt('service_type', default='mysql'),

View File

@ -1,48 +0,0 @@
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from trove.openstack.common import log as logging
from trove.openstack.common import rpc
from trove.common import cfg
from trove.common import exception
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
# TODO(hub_cap): upgrade this to use rpc.proxy.RpcProxy
class ManagerAPI(object):
"""Extend this API for interacting with the common methods of managers"""
def __init__(self, context):
self.context = context
def _cast(self, method_name, **kwargs):
if CONF.remote_implementation == "fake":
self._fake_cast(method_name, **kwargs)
else:
self._real_cast(method_name, **kwargs)
def _real_cast(self, method_name, **kwargs):
try:
rpc.cast(self.context, self._get_routing_key(),
{"method": method_name, "args": kwargs})
except Exception as e:
LOG.error(e)
raise exception.TaskManagerError(original_message=str(e))
def _fake_cast(self, method_name, **kwargs):
pass

View File

@ -15,16 +15,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""RPC helper for launching a rpc service."""
import inspect
import os
import kombu
from trove.openstack.common import importutils
from trove.openstack.common import loopingcall
from trove.openstack.common import rpc as openstack_rpc
from trove.openstack.common.rpc import service as rpc_service
from trove.common import cfg
CONF = cfg.CONF
@ -39,24 +35,3 @@ def delete_queue(context, topic):
auto_delete=False, exclusive=False,
durable=durable)
queue.delete()
class RpcService(rpc_service.Service):
def __init__(self, host=None, binary=None, topic=None, manager=None):
host = host or CONF.host
binary = binary or os.path.basename(inspect.stack()[-1][1])
topic = topic or binary.rpartition('trove-')[2]
self.manager_impl = importutils.import_object(manager)
self.report_interval = CONF.report_interval
super(RpcService, self).__init__(host, topic,
manager=self.manager_impl)
def start(self):
super(RpcService, self).start()
# TODO(hub-cap): Currently the context is none... do we _need_ it here?
pulse = loopingcall.LoopingCall(self.manager_impl.run_periodic_tasks,
context=None)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
pulse.wait()

View File

@ -0,0 +1,32 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Standard openstack.common.rpc.impl_fake with nonblocking cast
"""
from trove.openstack.common.rpc.impl_fake import *
original_cast = cast
def non_blocking_cast(*args, **kwargs):
eventlet.spawn_n(original_cast, *args, **kwargs)
cast = non_blocking_cast

View File

@ -0,0 +1,30 @@
import inspect
import os
from trove.openstack.common import importutils
from trove.openstack.common import loopingcall
from trove.openstack.common.rpc import service as rpc_service
from trove.common import cfg
CONF = cfg.CONF
class RpcService(rpc_service.Service):
def __init__(self, host=None, binary=None, topic=None, manager=None):
host = host or CONF.host
binary = binary or os.path.basename(inspect.stack()[-1][1])
topic = topic or binary.rpartition('trove-')[2]
self.manager_impl = importutils.import_object(manager)
self.report_interval = CONF.report_interval
super(RpcService, self).__init__(host, topic,
manager=self.manager_impl)
def start(self):
super(RpcService, self).start()
# TODO(hub-cap): Currently the context is none... do we _need_ it here?
pulse = loopingcall.LoopingCall(self.manager_impl.run_periodic_tasks,
context=None)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
pulse.wait()

View File

@ -19,25 +19,24 @@ Routes all the requests to the task manager.
"""
import traceback
import sys
from trove.common import cfg
from trove.common.manager import ManagerAPI
from trove.openstack.common.rpc import proxy
from trove.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
RPC_API_VERSION = "1.0"
#todo(hub-cap): find a better way to deal w/ the fakes. Im sure we can
# use a fake impl to deal w/ this and switch it out in the configs.
# The ManagerAPI is only used here and should eventually be removed when
# we have a better way to handle fake casts (see rpc fake_impl)
class API(ManagerAPI):
class API(proxy.RpcProxy):
"""API for interacting with the task manager."""
def __init__(self, context):
self.context = context
super(API, self).__init__(self._get_routing_key(),
RPC_API_VERSION)
def _transform_obj(self, obj_ref):
# Turn the object into a dictionary and remove the mgr
if "__dict__" in dir(obj_ref):
@ -48,20 +47,6 @@ class API(ManagerAPI):
return obj_dict
raise ValueError("Could not transform %s" % obj_ref)
def _fake_cast(self, method_name, **kwargs):
from trove.tests.fakes.common import get_event_spawer
from trove.taskmanager.manager import Manager
method = getattr(Manager(), method_name)
def func():
try:
method(self.context, **kwargs)
except Exception:
LOG.exception("Error running async task")
raise
get_event_spawer()(0, func)
def _get_routing_key(self):
"""Create the routing key for the taskmanager"""
return CONF.taskmanager_queue
@ -69,51 +54,66 @@ class API(ManagerAPI):
def resize_volume(self, new_size, instance_id):
LOG.debug("Making async call to resize volume for instance: %s"
% instance_id)
self._cast("resize_volume", new_size=new_size, instance_id=instance_id)
self.cast(self.context, self.make_msg("resize_volume",
new_size=new_size,
instance_id=instance_id))
def resize_flavor(self, instance_id, old_flavor, new_flavor):
LOG.debug("Making async call to resize flavor for instance: %s" %
instance_id)
self._cast("resize_flavor", instance_id=instance_id,
old_flavor=self._transform_obj(old_flavor),
new_flavor=self._transform_obj(new_flavor))
self.cast(self.context,
self.make_msg("resize_flavor",
instance_id=instance_id,
old_flavor=self._transform_obj(old_flavor),
new_flavor=self._transform_obj(new_flavor)))
def reboot(self, instance_id):
LOG.debug("Making async call to reboot instance: %s" % instance_id)
self._cast("reboot", instance_id=instance_id)
self.cast(self.context,
self.make_msg("reboot", instance_id=instance_id))
def restart(self, instance_id):
LOG.debug("Making async call to restart instance: %s" % instance_id)
self._cast("restart", instance_id=instance_id)
self.cast(self.context,
self.make_msg("restart", instance_id=instance_id))
def migrate(self, instance_id, host):
LOG.debug("Making async call to migrate instance: %s" % instance_id)
self._cast("migrate", instance_id=instance_id, host=host)
self.cast(self.context,
self.make_msg("migrate", instance_id=instance_id, host=host))
def delete_instance(self, instance_id):
LOG.debug("Making async call to delete instance: %s" % instance_id)
self._cast("delete_instance", instance_id=instance_id)
self.cast(self.context,
self.make_msg("delete_instance", instance_id=instance_id))
def create_backup(self, backup_id, instance_id):
LOG.debug("Making async call to create a backup for instance: %s" %
instance_id)
self._cast("create_backup",
backup_id=backup_id,
instance_id=instance_id)
self.cast(self.context, self.make_msg("create_backup",
backup_id=backup_id,
instance_id=instance_id))
def delete_backup(self, backup_id):
LOG.debug("Making async call to delete backup: %s" % backup_id)
self._cast("delete_backup", backup_id=backup_id)
self.cast(self.context, self.make_msg("delete_backup",
backup_id=backup_id))
def create_instance(self, instance_id, name, flavor,
image_id, databases, users, service_type,
volume_size, security_groups, backup_id=None,
availability_zone=None, root_password=None):
LOG.debug("Making async call to create instance %s " % instance_id)
self._cast("create_instance", instance_id=instance_id, name=name,
flavor=self._transform_obj(flavor), image_id=image_id,
databases=databases, users=users,
service_type=service_type, volume_size=volume_size,
security_groups=security_groups, backup_id=backup_id,
availability_zone=availability_zone,
root_password=root_password)
self.cast(self.context,
self.make_msg("create_instance",
instance_id=instance_id, name=name,
flavor=self._transform_obj(flavor),
image_id=image_id,
databases=databases,
users=users,
service_type=service_type,
volume_size=volume_size,
security_groups=security_groups,
backup_id=backup_id,
availability_zone=availability_zone,
root_password=root_password))

View File

@ -31,7 +31,6 @@ from trove.common.remote import create_nova_client
from trove.common.remote import create_heat_client
from trove.common.remote import create_cinder_client
from swiftclient.client import ClientException
from trove.common.utils import poll_until
from trove.instance import models as inst_models
from trove.instance.models import BuiltInstance
from trove.instance.models import FreshInstance
@ -461,8 +460,8 @@ class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
{'instance': self.id, 'status': server.status})
raise TroveError(status=server.status)
poll_until(get_server, ip_is_available,
sleep_time=1, time_out=DNS_TIME_OUT)
utils.poll_until(get_server, ip_is_available,
sleep_time=1, time_out=DNS_TIME_OUT)
server = self.nova_client.servers.get(
self.db_info.compute_instance_id)
LOG.info("Creating dns entry...")
@ -525,10 +524,10 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
return True
try:
poll_until(server_is_finished, sleep_time=2,
time_out=CONF.server_delete_time_out)
except PollTimeOut as e:
LOG.error("Timout during nova server delete", e)
utils.poll_until(server_is_finished, sleep_time=2,
time_out=CONF.server_delete_time_out)
except PollTimeOut:
LOG.exception("Timout during nova server delete.")
self.send_usage_event('delete',
deleted_at=timeutils.isotime(deleted_at),
server=old_server)

View File

@ -20,7 +20,7 @@ from proboscis import test
from proboscis import SkipTest
from proboscis.decorators import time_out
import troveclient
from trove.tests.util import poll_until
from trove.common.utils import poll_until
from trove.tests.util import test_config
from trove.tests.util import create_dbaas_client
from trove.tests.util.users import Requirements

View File

@ -62,7 +62,7 @@ from trove.tests.util import iso_time
from trove.tests.util import process
from trove.tests.util.users import Requirements
from trove.tests.util import string_in_list
from trove.tests.util import poll_until
from trove.common.utils import poll_until
from trove.tests.util.check import AttrCheck
from trove.tests.util.check import TypeCheck

View File

@ -33,7 +33,7 @@ from trove.tests.api.instances import assert_unprocessable
from trove.tests.api.instances import VOLUME_SUPPORT
from trove.tests.api.instances import EPHEMERAL_SUPPORT
from trove.tests.util.server_connection import create_server_connection
from trove.tests.util import poll_until
from trove.common.utils import poll_until
from trove.tests.config import CONFIG
from trove.tests.util import LocalSqlClient
from trove.tests.util import iso_time

View File

@ -7,7 +7,7 @@ from proboscis.decorators import time_out
from troveclient import exceptions
from trove.tests.util import create_dbaas_client
from trove.tests.util import poll_until
from trove.common.utils import poll_until
from trove.tests.util import test_config
from trove.tests.util.users import Requirements
from trove.tests.api.instances import instance_info

View File

@ -28,7 +28,7 @@ from datetime import datetime
from troveclient import exceptions
from trove.tests import util
from trove.tests.util import create_client
from trove.tests.util import poll_until
from trove.common.utils import poll_until
from trove.tests.util import test_config
from trove.tests.api.instances import VOLUME_SUPPORT
from trove.tests.api.instances import EPHEMERAL_SUPPORT

View File

@ -26,7 +26,7 @@ from trove import tests
from trove.tests.api.instances import instance_info
from trove.tests.util import test_config
from trove.tests.util import create_dbaas_client
from trove.tests.util import poll_until
from trove.common.utils import poll_until
from trove.tests.config import CONFIG
from trove.tests.util.users import Requirements
from trove.tests.api.instances import existing_instance

View File

@ -34,7 +34,7 @@ from trove.tests.api.instances import CreateInstance
from trove.tests.api.instances import instance_info
from trove.tests.api.instances import GROUP_START
from trove.tests.api.instances import GROUP_TEST
from trove.tests.util import poll_until
from trove.common.utils import poll_until
GROUP = "dbaas.api.mgmt.instances"

View File

@ -11,7 +11,7 @@ from trove.tests.api.instances import VOLUME_SUPPORT
from trove.tests.util.users import Requirements
from trove.tests.util import create_dbaas_client
import trove.tests.util as tests_utils
from trove.tests.util import poll_until
from trove.common.utils import poll_until
@test(groups=["dbaas.api.mgmt.malformed_json"])

View File

@ -24,7 +24,6 @@ environments if we choose to.
import json
import os
from collections import Mapping
from trove.tests.fakes.common import event_simulator_sleep
#TODO(tim.simpson): I feel like this class already exists somewhere in core
@ -81,8 +80,6 @@ class TestConfig(object):
"known_bugs": {},
"in_proc_server": True,
"report_directory": os.environ.get("REPORT_DIRECTORY", None),
"sleep_mode": "simulated",
"simulate_events": False,
"trove_volume_support": True,
"trove_max_volumes_per_user": 100,
"usage_endpoint": USAGE_ENDPOINT,
@ -90,21 +87,6 @@ class TestConfig(object):
}
self._frozen_values = FrozenDict(self._values)
self._users = None
self._dawdler = None
@property
def dawdler(self):
"""Equivalent (in theory) to time.sleep.
Calling this in place of sleep allows the tests to run faster in
fake mode.
"""
if not self._dawdler:
if self.sleep_mode == "simulated":
self._dawdler = event_simulator_sleep
else:
self._dawdler = greenthread.sleep
return self._dawdler
def get(self, name, default_value):
return self.values.get(name, default_value)

View File

@ -17,8 +17,6 @@
"""Common code to help in faking the models."""
import time
from novaclient import exceptions as nova_exceptions
from trove.common import cfg
from trove.openstack.common import log as logging
@ -31,54 +29,3 @@ LOG = logging.getLogger(__name__)
def authorize(context):
if not context.is_admin:
raise nova_exceptions.Forbidden(403, "Forbidden")
def get_event_spawer():
if CONF.fake_mode_events == "simulated":
return event_simulator
else:
return eventlet_spawner
pending_events = []
sleep_entrance_count = 0
def eventlet_spawner(time_from_now_in_seconds, func):
"""Uses eventlet to spawn events."""
import eventlet
eventlet.spawn_after(time_from_now_in_seconds, func)
def event_simulator(time_from_now_in_seconds, func):
"""Fakes events without doing any actual waiting."""
pending_events.append({"time": time_from_now_in_seconds, "func": func})
def event_simulator_sleep(time_to_sleep):
"""Simulates waiting for an event."""
global sleep_entrance_count
sleep_entrance_count += 1
time_to_sleep = float(time_to_sleep)
global pending_events
while time_to_sleep > 0:
itr_sleep = 0.5
for i in range(len(pending_events)):
event = pending_events[i]
event["time"] = event["time"] - itr_sleep
if event["func"] is not None and event["time"] < 0:
# Call event, but first delete it so this function can be
# reentrant.
func = event["func"]
event["func"] = None
try:
func()
except Exception:
LOG.exception("Simulated event error.")
time_to_sleep -= itr_sleep
sleep_entrance_count -= 1
if sleep_entrance_count < 1:
# Clear out old events
pending_events = [event for event in pending_events
if event["func"] is not None]

View File

@ -19,7 +19,7 @@ from trove.openstack.common import log as logging
import time
import re
from trove.tests.fakes.common import get_event_spawer
import eventlet
from trove.common import exception as rd_exception
from trove.common import instance as rd_instance
from trove.tests.util import unquote_user_host
@ -36,7 +36,6 @@ class FakeGuest(object):
self.dbs = {}
self.root_was_enabled = False
self.version = 1
self.event_spawn = get_event_spawer()
self.grants = {}
# Our default admin user.
@ -228,7 +227,7 @@ class FakeGuest(object):
status.status = rd_instance.ServiceStatuses.RUNNING
status.save()
AgentHeartBeat.create(instance_id=self.id)
self.event_spawn(1.0, update_db)
eventlet.spawn_after(1.0, update_db)
def _set_status(self, new_status='RUNNING'):
from trove.instance.models import InstanceServiceStatus
@ -305,7 +304,7 @@ class FakeGuest(object):
backup.state = BackupState.COMPLETED
backup.location = 'http://localhost/path/to/backup'
backup.save()
self.event_spawn(1.0, finish_create_backup)
eventlet.spawn_after(1.0, finish_create_backup)
def get_or_create(id):

View File

@ -22,7 +22,6 @@ from trove.common import instance as rd_instance
from trove.common.utils import poll_until
from trove.openstack.common import log as logging
from trove.tests.fakes.common import authorize
from trove.tests.fakes.common import get_event_spawer
import eventlet
import uuid
@ -106,7 +105,6 @@ class FakeServer(object):
self.image_id = image_id
self.flavor_ref = flavor_ref
self.old_flavor_ref = None
self.event_spawn = get_event_spawer()
self._current_status = "BUILD"
self.volumes = volumes
# This is used by "RdServers". Its easier to compute the
@ -149,7 +147,7 @@ class FakeServer(object):
self.parent.schedule_simulate_running_server(self.id, 1.5)
self._current_status = "REBOOT"
self.event_spawn(1, set_to_active)
eventlet.spawn_after(1, set_to_active)
def delete(self):
self.schedule_status = []
@ -188,7 +186,7 @@ class FakeServer(object):
def set_to_active():
self.parent.schedule_simulate_running_server(self.id, 1.5)
self.event_spawn(1, set_to_active)
eventlet.spawn_after(1, set_to_active)
def change_host():
self.old_host = self.host
@ -207,21 +205,21 @@ class FakeServer(object):
# A resize MIGHT change the host, but a migrate
# deliberately does.
LOG.debug("Migrating fake instance.")
self.event_spawn(0.75, change_host)
eventlet.spawn_after(0.75, change_host)
else:
LOG.debug("Resizing fake instance.")
self.old_flavor_ref = self.flavor_ref
flavor = self.parent.flavors.get(new_flavor_id)
self.flavor_ref = flavor.links[0]['href']
self.event_spawn(1, set_to_confirm_mode)
eventlet.spawn_after(1, set_to_confirm_mode)
self.event_spawn(0.8, set_flavor)
eventlet.spawn_after(0.8, set_flavor)
def schedule_status(self, new_status, time_from_now):
"""Makes a new status take effect at the given time."""
def set_status():
self._current_status = new_status
self.event_spawn(time_from_now, set_status)
eventlet.spawn_after(time_from_now, set_status)
@property
def status(self):
@ -255,7 +253,6 @@ class FakeServers(object):
self.context = context
self.db = FAKE_SERVERS_DB
self.flavors = flavors
self.event_spawn = get_event_spawer()
def can_see(self, id):
"""Can this FakeServers, with its context, see some resource?"""
@ -331,7 +328,7 @@ class FakeServers(object):
def delete_server():
LOG.info("Simulated event ended, deleting server %s." % id)
del self.db[id]
self.event_spawn(time_from_now, delete_server)
eventlet.spawn_after(time_from_now, delete_server)
def schedule_simulate_running_server(self, id, time_from_now):
from trove.instance.models import DBInstance
@ -343,7 +340,7 @@ class FakeServers(object):
status = InstanceServiceStatus.find_by(instance_id=instance.id)
status.status = rd_instance.ServiceStatuses.RUNNING
status.save()
self.event_spawn(time_from_now, set_server_running)
eventlet.spawn_after(time_from_now, set_server_running)
class FakeRdServer(object):
@ -401,7 +398,6 @@ class FakeVolume(object):
self.size = size
self.name = name
self.description = description
self.event_spawn = get_event_spawer()
self._current_status = "BUILD"
# For some reason we grab this thing from device then call it mount
# point.
@ -429,7 +425,7 @@ class FakeVolume(object):
"""Makes a new status take effect at the given time."""
def set_status():
self._current_status = new_status
self.event_spawn(time_from_now, set_status)
eventlet.spawn_after(time_from_now, set_status)
def set_attachment(self, server_id):
"""Fake method we've added to set attachments. Idempotent."""
@ -462,7 +458,6 @@ class FakeVolumes(object):
def __init__(self, context):
self.context = context
self.db = FAKE_VOLUMES_DB
self.event_spawn = get_event_spawer()
def can_see(self, id):
"""Can this FakeVolumes, with its context, see some resource?"""
@ -507,7 +502,7 @@ class FakeVolumes(object):
def finish_resize():
volume.size = new_size
self.event_spawn(1.0, finish_resize)
eventlet.spawn_after(1.0, finish_resize)
def detach(self, volume_id):
volume = self.get(volume_id)
@ -517,7 +512,7 @@ class FakeVolumes(object):
def finish_detach():
volume._current_status = "available"
self.event_spawn(1.0, finish_detach)
eventlet.spawn_after(1.0, finish_detach)
class FakeAccount(object):
@ -545,7 +540,6 @@ class FakeAccounts(object):
self.context = context
self.db = FAKE_SERVERS_DB
self.servers = servers
self.event_spawn = get_event_spawer()
def _belongs_to_tenant(self, tenant, id):
server = self.db[id]

View File

@ -546,7 +546,7 @@ class WsgiLimiterTest(BaseLimitTestSuite):
self.assertEqual(delay, None)
delay = self._request("GET", "/delayed")
self.assertEqual(delay, '60.00')
self.assertAlmostEqual(float(delay), 60, 1)
def test_response_to_delays_usernames(self):
delay = self._request("GET", "/delayed", "user1")
@ -556,10 +556,10 @@ class WsgiLimiterTest(BaseLimitTestSuite):
self.assertEqual(delay, None)
delay = self._request("GET", "/delayed", "user1")
self.assertEqual(delay, '60.00')
self.assertAlmostEqual(float(delay), 60, 1)
delay = self._request("GET", "/delayed", "user2")
self.assertEqual(delay, '60.00')
self.assertAlmostEqual(float(delay), 60, 1)
class FakeHttplibSocket(object):
@ -681,10 +681,9 @@ class WsgiLimiterProxyTest(BaseLimitTestSuite):
delay, error = self.proxy.check_for_delay("GET", "/delayed")
error = error.strip()
expected = ("60.00", "403 Forbidden\n\nOnly 1 GET request(s) can be "
"made to /delayed every minute.")
self.assertEqual((delay, error), expected)
self.assertAlmostEqual(float(delay), 60, 1)
self.assertEqual(error, "403 Forbidden\n\nOnly 1 GET request(s) can be"
" made to /delayed every minute.")
def tearDown(self):
# restore original HTTPConnection object

View File

@ -223,31 +223,10 @@ def iso_time(time_string):
pass
return '%sZ' % ts
if CONFIG.simulate_events:
# Without event let, this just calls time.sleep.
def poll_until(retriever, condition=lambda value: value,
sleep_time=1, time_out=None):
"""Retrieves object until it passes condition, then returns it.
If time_out_limit is passed in, PollTimeOut will be raised once that
amount of time is eclipsed.
"""
start_time = time.time()
def check_timeout():
if time_out is not None and time.time() > start_time + time_out:
raise PollTimeOut
while True:
obj = retriever()
if condition(obj):
return
check_timeout()
time.sleep(sleep_time)
else:
from trove.common.utils import poll_until
# TODO(dukhlov): Still required by trove integration
# Should be removed after trove integration fix
# https://bugs.launchpad.net/trove-integration/+bug/1228306
from trove.common.utils import poll_until
def mysql_connection():

View File

@ -36,7 +36,8 @@ class UsageVerifier(object):
pass
def check_message(self, resource_id, event_type, **attrs):
messages = self.get_messages(resource_id)
messages = utils.poll_until(lambda: self.get_messages(resource_id),
lambda x: len(x) > 0, time_out=30)
found = None
for message in messages:
if message['event_type'] == event_type: