Merge "Core modifications for future zones service."

This commit is contained in:
Jenkins 2012-02-16 21:41:43 +00:00 committed by Gerrit Code Review
commit 34d77ac8b1
18 changed files with 428 additions and 57 deletions

View File

@ -16,5 +16,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from nova.compute.api import API
from nova.compute.api import AggregateAPI
# Importing full names to not pollute the namespace and cause possible
# collisions with use of 'from nova.compute import <foo>' elsewhere.
import nova.flags
import nova.utils
API = nova.utils.import_class(nova.flags.FLAGS.compute_api_class)

View File

@ -567,6 +567,20 @@ class API(base.Base):
"requested_networks": requested_networks,
"filter_properties": filter_properties}})
def _check_create_policies(self, context, availability_zone,
requested_networks, block_device_mapping):
"""Check policies for create()."""
target = {'project_id': context.project_id,
'user_id': context.user_id,
'availability_zone': availability_zone}
check_policy(context, 'create', target)
if requested_networks:
check_policy(context, 'create:attach_network', target)
if block_device_mapping:
check_policy(context, 'create:attach_volume', target)
def create(self, context, instance_type,
image_href, kernel_id=None, ramdisk_id=None,
min_count=None, max_count=None,
@ -586,16 +600,9 @@ class API(base.Base):
could be 'None' or a list of instance dicts depending on if
we waited for information from the scheduler or not.
"""
target = {'project_id': context.project_id,
'user_id': context.user_id,
'availability_zone': availability_zone}
check_policy(context, 'create', target)
if requested_networks:
check_policy(context, 'create:attach_network', target)
if block_device_mapping:
check_policy(context, 'create:attach_volume', target)
self._check_create_policies(context, availability_zone,
requested_networks, block_device_mapping)
# We can create the DB entry for the instance here if we're
# only going to create 1 instance.
@ -843,20 +850,28 @@ class API(base.Base):
else:
LOG.warning(_("No host for instance %s, deleting immediately"),
instance["uuid"])
self.db.instance_destroy(context, instance['id'])
try:
self.db.instance_destroy(context, instance['id'])
except exception.InstanceNotFound:
# NOTE(comstud): Race condition. Instance already gone.
pass
def _delete(self, context, instance):
host = instance['host']
if host:
self.update(context,
instance,
task_state=task_states.DELETING,
progress=0)
try:
if host:
self.update(context,
instance,
task_state=task_states.DELETING,
progress=0)
self._cast_compute_message('terminate_instance', context,
instance)
else:
self.db.instance_destroy(context, instance['id'])
self._cast_compute_message('terminate_instance', context,
instance)
else:
self.db.instance_destroy(context, instance['id'])
except exception.InstanceNotFound:
# NOTE(comstud): Race condition. Instance already gone.
pass
# NOTE(jerdfelt): The API implies that only ACTIVE and ERROR are
# allowed but the EC2 API appears to allow from RESCUED and STOPPED

View File

@ -1354,11 +1354,12 @@ def instance_create(context, values):
context - request context object
values - dict containing column values.
"""
values = values.copy()
values['metadata'] = _metadata_refs(values.get('metadata'),
models.InstanceMetadata)
instance_ref = models.Instance()
instance_ref['uuid'] = str(utils.gen_uuid())
if not values.get('uuid'):
values['uuid'] = str(utils.gen_uuid())
instance_ref.update(values)
session = get_session()
@ -1388,7 +1389,13 @@ def instance_data_get_for_project(context, project_id):
def instance_destroy(context, instance_id):
session = get_session()
with session.begin():
instance_ref = instance_get(context, instance_id, session=session)
if utils.is_uuid_like(instance_id):
instance_ref = instance_get_by_uuid(context, instance_id,
session=session)
instance_id = instance_ref['id']
else:
instance_ref = instance_get(context, instance_id,
session=session)
session.query(models.Instance).\
filter_by(id=instance_id).\
update({'deleted': True,
@ -1412,6 +1419,7 @@ def instance_destroy(context, instance_id):
instance_info_cache_delete(context, instance_ref['uuid'],
session=session)
return instance_ref
@require_context
@ -3541,7 +3549,7 @@ def zone_get(context, zone_id):
@require_admin_context
def zone_get_all(context):
return model_query(context, models.Zone, read_deleted="yes").all()
return model_query(context, models.Zone, read_deleted="no").all()
####################

View File

@ -0,0 +1,44 @@
# Copyright 2012 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import *
meta = MetaData()
zones = Table('zones', meta,
Column('id', Integer(), primary_key=True, nullable=False),
)
is_parent = Column('is_parent', Boolean(), default=False)
rpc_host = Column('rpc_host', String(255))
rpc_port = Column('rpc_port', Integer())
rpc_virtual_host = Column('rpc_virtual_host', String(255))
def upgrade(migrate_engine):
meta.bind = migrate_engine
zones.create_column(is_parent)
zones.create_column(rpc_host)
zones.create_column(rpc_port)
zones.create_column(rpc_virtual_host)
def downgrade(migrate_engine):
meta.bind = migrate_engine
zones.drop_column(rpc_virtual_host)
zones.drop_column(rpc_port)
zones.drop_column(rpc_host)
zones.drop_column(is_parent)

View File

@ -0,0 +1,35 @@
BEGIN TRANSACTION;
CREATE TEMPORARY TABLE zones_temp (
created_at DATETIME,
updated_at DATETIME,
deleted_at DATETIME,
deleted BOOLEAN,
id INTEGER NOT NULL,
name VARCHAR(255),
api_url VARVHAR(255),
username VARCHAR(255),
password VARCHAR(255),
weight_offset FLOAT,
weight_scale FLOAT,
PRIMARY KEY (id),
CHECK (deleted IN (0, 1))
);
INSERT INTO zones_temp
SELECT created_at,
updated_at,
deleted_at,
deleted,
id,
name,
api_url,
username,
password,
weight_offset,
weight_scale FROM zones;
DROP TABLE zones;
ALTER TABLE zones_temp RENAME TO zones;
COMMIT;

View File

@ -0,0 +1,31 @@
# Copyright 2012 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import *
meta = MetaData()
def upgrade(migrate_engine):
meta.bind = migrate_engine
instances = Table('instances', meta, autoload=True)
zone_name = Column('zone_name', String(255))
instances.create_column(zone_name)
def downgrade(migrate_engine):
meta.bind = migrate_engine
instances = Table('instances', meta, autoload=True)
zone_name = Column('zone_name', String(255))
instances.drop_column(zone_name)

View File

@ -277,6 +277,9 @@ class Instance(BASE, NovaBase):
# EC2 disable_api_termination
disable_terminate = Column(Boolean(), default=False, nullable=False)
# Openstack zone name
zone_name = Column(String(255))
class InstanceInfoCache(BASE, NovaBase):
"""
@ -876,6 +879,10 @@ class Zone(BASE, NovaBase):
password = Column(String(255))
weight_offset = Column(Float(), default=0.0)
weight_scale = Column(Float(), default=1.0)
is_parent = Column(Boolean())
rpc_host = Column(String(255))
rpc_port = Column(Integer())
rpc_virtual_host = Column(String(255))
class Aggregate(BASE, NovaBase):

View File

@ -441,7 +441,16 @@ global_opts = [
help='Cache glance images locally'),
cfg.BoolOpt('use_cow_images',
default=True,
help='Whether to use cow images')
help='Whether to use cow images'),
cfg.StrOpt('compute_api_class',
default='nova.compute.api.API',
help='The compute API class to use'),
cfg.StrOpt('network_api_class',
default='nova.network.api.API',
help='The network API class to use'),
cfg.StrOpt('volume_api_class',
default='nova.volume.api.API',
help='The volume API class to use'),
]
FLAGS.register_opts(global_opts)

View File

@ -16,4 +16,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from nova.network.api import API
# Importing full names to not pollute the namespace and cause possible
# collisions with use of 'from nova.network import <foo>' elsewhere.
import nova.flags
import nova.utils
API = nova.utils.import_class(nova.flags.FLAGS.network_api_class)

View File

@ -161,6 +161,37 @@ def cleanup():
return _get_impl().cleanup()
def cast_to_server(context, server_params, topic, msg):
"""Invoke a remote method that does not return anything.
:param context: Information that identifies the user that has made this
request.
:param server_params: Connection information
:param topic: The topic to send the notification to.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().cast_to_server(context, server_params, topic, msg)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Broadcast to a remote method invocation with no return.
:param context: Information that identifies the user that has made this
request.
:param server_params: Connection information
:param topic: The topic to send the notification to.
:param msg: This is a dict in the form { "method" : "method_to_invoke",
"args" : dict_of_kwargs }
:returns: None
"""
return _get_impl().fanout_cast_to_server(context, server_params, topic,
msg)
_RPCIMPL = None

View File

@ -73,14 +73,15 @@ class ConnectionContext(rpc_common.Connection):
the pool.
"""
def __init__(self, connection_pool, pooled=True):
def __init__(self, connection_pool, pooled=True, server_params=None):
"""Create a new connection, or get one from the pool"""
self.connection = None
self.connection_pool = connection_pool
if pooled:
self.connection = connection_pool.get()
else:
self.connection = connection_pool.connection_cls()
self.connection = connection_pool.connection_cls(
server_params=server_params)
self.pooled = pooled
def __enter__(self):
@ -353,6 +354,23 @@ def fanout_cast(context, topic, msg, connection_pool):
conn.fanout_send(topic, msg)
def cast_to_server(context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
with ConnectionContext(connection_pool, pooled=False,
server_params=server_params) as conn:
conn.topic_send(topic, msg)
def fanout_cast_to_server(context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
with ConnectionContext(connection_pool, pooled=False,
server_params=server_params) as conn:
conn.fanout_send(topic, msg)
def notify(context, topic, msg, connection_pool):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending notification on %s...'), topic)

View File

@ -27,8 +27,6 @@ import kombu.entity
import kombu.messaging
import kombu.connection
from nova import context
from nova import exception
from nova import flags
from nova.rpc import common as rpc_common
from nova.rpc import amqp as rpc_amqp
@ -310,7 +308,7 @@ class NotifyPublisher(TopicPublisher):
class Connection(object):
"""Connection object."""
def __init__(self):
def __init__(self, server_params=None):
self.consumers = []
self.consumer_thread = None
self.max_retries = FLAGS.rabbit_max_retries
@ -323,11 +321,25 @@ class Connection(object):
self.interval_max = 30
self.memory_transport = False
self.params = dict(hostname=FLAGS.rabbit_host,
port=FLAGS.rabbit_port,
userid=FLAGS.rabbit_userid,
password=FLAGS.rabbit_password,
virtual_host=FLAGS.rabbit_virtual_host)
if server_params is None:
server_params = {}
# Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'}
params = {}
for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
params.setdefault('hostname', FLAGS.rabbit_host)
params.setdefault('port', FLAGS.rabbit_port)
params.setdefault('userid', FLAGS.rabbit_userid)
params.setdefault('password', FLAGS.rabbit_password)
params.setdefault('virtual_host', FLAGS.rabbit_virtual_host)
self.params = params
if FLAGS.fake_rabbit:
self.params['transport'] = 'memory'
self.memory_transport = True
@ -588,10 +600,10 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
self.declare_fanout_consumer(topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
self.declare_topic_consumer(topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
rpc_amqp.ProxyCallback(proxy, Connection.pool))
Connection.pool = rpc_amqp.Pool(connection_cls=Connection)
@ -622,6 +634,18 @@ def fanout_cast(context, topic, msg):
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def cast_to_server(context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg, Connection.pool)

View File

@ -272,19 +272,31 @@ class NotifyPublisher(Publisher):
class Connection(object):
"""Connection object."""
def __init__(self):
def __init__(self, server_params=None):
self.session = None
self.consumers = {}
self.consumer_thread = None
self.broker = FLAGS.qpid_hostname + ":" + FLAGS.qpid_port
if server_params is None:
server_params = {}
default_params = dict(hostname=FLAGS.qpid_hostname,
port=FLAGS.qpid_port,
username=FLAGS.qpid_username,
password=FLAGS.qpid_password)
params = server_params
for key in default_params.keys():
params.setdefault(key, default_params[key])
self.broker = params['hostname'] + ":" + str(params['port'])
# Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker)
# Check if flags are set and if so set them for the connection
# before we call open
self.connection.username = FLAGS.qpid_username
self.connection.password = FLAGS.qpid_password
self.connection.username = params['username']
self.connection.password = params['password']
self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms
self.connection.reconnect = FLAGS.qpid_reconnect
self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout
@ -474,10 +486,10 @@ class Connection(object):
"""Create a consumer that calls a method in a proxy object"""
if fanout:
consumer = FanoutConsumer(self.session, topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
rpc_amqp.ProxyCallback(proxy, Connection.pool))
else:
consumer = TopicConsumer(self.session, topic,
rpc_amqp.ProxyCallback(proxy, Connection.pool))
rpc_amqp.ProxyCallback(proxy, Connection.pool))
self._register_consumer(consumer)
return consumer
@ -510,6 +522,18 @@ def fanout_cast(context, topic, msg):
return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool)
def cast_to_server(context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
return rpc_amqp.cast_to_server(context, server_params, topic, msg,
Connection.pool)
def fanout_cast_to_server(context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
return rpc_amqp.fanout_cast_to_server(context, server_params, topic,
msg, Connection.pool)
def notify(context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(context, topic, msg, Connection.pool)

View File

@ -29,7 +29,6 @@ from nova.api.openstack import xmlutil
from nova import flags
from nova import test
from nova.tests.api.openstack import fakes
from nova import wsgi as base_wsgi
FLAGS = flags.FLAGS

View File

@ -1103,6 +1103,7 @@ class ServersControllerTest(test.TestCase):
'display_name': 'server_test',
}
self.assertEqual(params, filtered_dict)
filtered_dict['uuid'] = id
return filtered_dict
self.stubs.Set(nova.db, 'instance_update', server_update)

View File

@ -19,12 +19,15 @@
Unit Tests for remote procedure calls using kombu
"""
from nova import context
from nova import flags
from nova import log as logging
from nova import test
from nova.rpc import amqp as rpc_amqp
from nova.rpc import impl_kombu
from nova.tests.rpc import common
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
@ -99,6 +102,61 @@ class RpcKombuTestCase(common._BaseRpcTestCase):
self.assertEqual(self.received_message, message)
def test_cast_interface_uses_default_options(self):
"""Test kombu rpc.cast"""
ctxt = context.RequestContext('fake_user', 'fake_project')
class MyConnection(impl_kombu.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.params,
{'hostname': FLAGS.rabbit_host,
'userid': FLAGS.rabbit_userid,
'password': FLAGS.rabbit_password,
'port': FLAGS.rabbit_port,
'virtual_host': FLAGS.rabbit_virtual_host,
'transport': 'memory'})
def topic_send(_context, topic, msg):
pass
MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
impl_kombu.cast(ctxt, 'fake_topic', {'msg': 'fake'})
def test_cast_to_server_uses_server_params(self):
"""Test kombu rpc.cast"""
ctxt = context.RequestContext('fake_user', 'fake_project')
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337,
'virtual_host': 'fake_virtual_host'}
class MyConnection(impl_kombu.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.params,
{'hostname': server_params['hostname'],
'userid': server_params['username'],
'password': server_params['password'],
'port': server_params['port'],
'virtual_host': server_params['virtual_host'],
'transport': 'memory'})
def topic_send(_context, topic, msg):
pass
MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
self.stubs.Set(impl_kombu, 'Connection', MyConnection)
impl_kombu.cast_to_server(ctxt, server_params,
'fake_topic', {'msg': 'fake'})
@test.skip_test("kombu memory transport seems buggy with fanout queues "
"as this test passes when you use rabbit (fake_rabbit=False)")
def test_fanout_send_receive(self):

View File

@ -24,6 +24,7 @@ import mox
from nova import context
from nova import log as logging
from nova.rpc import amqp as rpc_amqp
from nova import test
try:
@ -80,6 +81,10 @@ class RpcQpidTestCase(test.TestCase):
qpid.messaging.Session = self.orig_session
qpid.messaging.Sender = self.orig_sender
qpid.messaging.Receiver = self.orig_receiver
if impl_qpid:
# Need to reset this in case we changed the connection_cls
# in self._setup_to_server_tests()
impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
self.mocker.ResetAll()
@ -147,13 +152,15 @@ class RpcQpidTestCase(test.TestCase):
def test_create_consumer_fanout(self):
self._test_create_consumer(fanout=True)
def _test_cast(self, fanout):
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
self.mock_session = self.mocker.CreateMock(self.orig_session)
self.mock_sender = self.mocker.CreateMock(self.orig_sender)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
if fanout:
expected_address = ('impl_qpid_test_fanout ; '
@ -166,22 +173,34 @@ class RpcQpidTestCase(test.TestCase):
'"create": "always"}')
self.mock_session.sender(expected_address).AndReturn(self.mock_sender)
self.mock_sender.send(mox.IgnoreArg())
# This is a pooled connection, so instead of closing it, it gets reset,
# which is just creating a new session on the connection.
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
if not server_params:
# This is a pooled connection, so instead of closing it, it
# gets reset, which is just creating a new session on the
# connection.
self.mock_session.close()
self.mock_connection.session().AndReturn(self.mock_session)
self.mocker.ReplayAll()
try:
ctx = context.RequestContext("user", "project")
if fanout:
impl_qpid.fanout_cast(ctx, "impl_qpid_test",
{"method": "test_method", "args": {}})
args = [ctx, "impl_qpid_test",
{"method": "test_method", "args": {}}]
if server_params:
args.insert(1, server_params)
if fanout:
method = impl_qpid.fanout_cast_to_server
else:
method = impl_qpid.cast_to_server
else:
impl_qpid.cast(ctx, "impl_qpid_test",
{"method": "test_method", "args": {}})
if fanout:
method = impl_qpid.fanout_cast
else:
method = impl_qpid.cast
method(*args)
self.mocker.VerifyAll()
finally:
@ -198,6 +217,39 @@ class RpcQpidTestCase(test.TestCase):
def test_fanout_cast(self):
self._test_cast(fanout=True)
def _setup_to_server_tests(self, server_params):
class MyConnection(impl_qpid.Connection):
def __init__(myself, *args, **kwargs):
super(MyConnection, myself).__init__(*args, **kwargs)
self.assertEqual(myself.connection.username,
server_params['username'])
self.assertEqual(myself.connection.password,
server_params['password'])
self.assertEqual(myself.broker,
server_params['hostname'] + ':' +
str(server_params['port']))
MyConnection.pool = rpc_amqp.Pool(connection_cls=MyConnection)
self.stubs.Set(impl_qpid, 'Connection', MyConnection)
@test.skip_if(qpid is None, "Test requires qpid")
def test_cast_to_server(self):
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337}
self._setup_to_server_tests(server_params)
self._test_cast(fanout=False, server_params=server_params)
@test.skip_if(qpid is None, "Test requires qpid")
def test_fanout_cast_to_server(self):
server_params = {'username': 'fake_username',
'password': 'fake_password',
'hostname': 'fake_hostname',
'port': 31337}
self._setup_to_server_tests(server_params)
self._test_cast(fanout=True, server_params=server_params)
def _test_call(self, multi):
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
self.mock_session = self.mocker.CreateMock(self.orig_session)

View File

@ -16,4 +16,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from nova.volume.api import API
# Importing full names to not pollute the namespace and cause possible
# collisions with use of 'from nova.volume import <foo>' elsewhere.
import nova.flags
import nova.utils
API = nova.utils.import_class(nova.flags.FLAGS.volume_api_class)