Moves the ip allocation requests to the from the api host into calls to the network host made from the compute host.

This commit is contained in:
Vishvananda Ishaya
2010-12-22 22:04:30 +00:00
committed by Tarmac
8 changed files with 106 additions and 94 deletions

View File

@@ -25,6 +25,10 @@ from carrot.backends import base
from eventlet import greenthread
EXCHANGES = {}
QUEUES = {}
class Message(base.BaseMessage):
pass
@@ -68,81 +72,63 @@ class Queue(object):
return self._queue.get()
class Backend(object):
""" Singleton backend for testing """
class __impl(base.BaseBackend):
def __init__(self, *args, **kwargs):
#super(__impl, self).__init__(*args, **kwargs)
self._exchanges = {}
self._queues = {}
class Backend(base.BaseBackend):
def queue_declare(self, queue, **kwargs):
global QUEUES
if queue not in QUEUES:
logging.debug(_('Declaring queue %s'), queue)
QUEUES[queue] = Queue(queue)
def _reset_all(self):
self._exchanges = {}
self._queues = {}
def exchange_declare(self, exchange, type, *args, **kwargs):
global EXCHANGES
if exchange not in EXCHANGES:
logging.debug(_('Declaring exchange %s'), exchange)
EXCHANGES[exchange] = Exchange(exchange, type)
def queue_declare(self, queue, **kwargs):
if queue not in self._queues:
logging.debug(_('Declaring queue %s'), queue)
self._queues[queue] = Queue(queue)
def queue_bind(self, queue, exchange, routing_key, **kwargs):
global EXCHANGES
global QUEUES
logging.debug(_('Binding %s to %s with key %s'),
queue, exchange, routing_key)
EXCHANGES[exchange].bind(QUEUES[queue].push, routing_key)
def exchange_declare(self, exchange, type, *args, **kwargs):
if exchange not in self._exchanges:
logging.debug(_('Declaring exchange %s'), exchange)
self._exchanges[exchange] = Exchange(exchange, type)
def declare_consumer(self, queue, callback, *args, **kwargs):
self.current_queue = queue
self.current_callback = callback
def queue_bind(self, queue, exchange, routing_key, **kwargs):
logging.debug(_('Binding %s to %s with key %s'),
queue, exchange, routing_key)
self._exchanges[exchange].bind(self._queues[queue].push,
routing_key)
def consume(self, limit=None):
while True:
item = self.get(self.current_queue)
if item:
self.current_callback(item)
raise StopIteration()
greenthread.sleep(0)
def declare_consumer(self, queue, callback, *args, **kwargs):
self.current_queue = queue
self.current_callback = callback
def get(self, queue, no_ack=False):
global QUEUES
if not queue in QUEUES or not QUEUES[queue].size():
return None
(message_data, content_type, content_encoding) = QUEUES[queue].pop()
message = Message(backend=self, body=message_data,
content_type=content_type,
content_encoding=content_encoding)
message.result = True
logging.debug(_('Getting from %s: %s'), queue, message)
return message
def consume(self, *args, **kwargs):
while True:
item = self.get(self.current_queue)
if item:
self.current_callback(item)
raise StopIteration()
greenthread.sleep(0)
def prepare_message(self, message_data, delivery_mode,
content_type, content_encoding, **kwargs):
"""Prepare message for sending."""
return (message_data, content_type, content_encoding)
def get(self, queue, no_ack=False):
if not queue in self._queues or not self._queues[queue].size():
return None
(message_data, content_type, content_encoding) = \
self._queues[queue].pop()
message = Message(backend=self, body=message_data,
content_type=content_type,
content_encoding=content_encoding)
message.result = True
logging.debug(_('Getting from %s: %s'), queue, message)
return message
def prepare_message(self, message_data, delivery_mode,
content_type, content_encoding, **kwargs):
"""Prepare message for sending."""
return (message_data, content_type, content_encoding)
def publish(self, message, exchange, routing_key, **kwargs):
if exchange in self._exchanges:
self._exchanges[exchange].publish(
message, routing_key=routing_key)
__instance = None
def __init__(self, *args, **kwargs):
if Backend.__instance is None:
Backend.__instance = Backend.__impl(*args, **kwargs)
self.__dict__['_Backend__instance'] = Backend.__instance
def __getattr__(self, attr):
return getattr(self.__instance, attr)
def __setattr__(self, attr, value):
return setattr(self.__instance, attr, value)
def publish(self, message, exchange, routing_key, **kwargs):
global EXCHANGES
if exchange in EXCHANGES:
EXCHANGES[exchange].publish(message, routing_key=routing_key)
def reset_all():
Backend()._reset_all()
global EXCHANGES
global QUEUES
EXCHANGES = {}
QUEUES = {}

View File

@@ -245,7 +245,7 @@ def msg_reply(msg_id, reply=None, failure=None):
logging.error(_("Returning exception %s to caller"), message)
logging.error(tb)
failure = (failure[0].__name__, str(failure[1]), tb)
conn = Connection.instance()
conn = Connection.instance(True)
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})

View File

@@ -22,20 +22,18 @@ import logging
from M2Crypto import BIO
from M2Crypto import RSA
import os
import StringIO
import tempfile
import time
from eventlet import greenthread
from xml.etree import ElementTree
from nova import context
from nova import crypto
from nova import db
from nova import flags
from nova import rpc
from nova import service
from nova import test
from nova import utils
from nova.auth import manager
from nova.compute import power_state
from nova.api.ec2 import cloud
@@ -54,7 +52,8 @@ os.makedirs(IMAGES_PATH)
class CloudTestCase(test.TestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
self.flags(connection_type='fake', images_path=IMAGES_PATH)
self.flags(connection_type='fake',
images_path=IMAGES_PATH)
self.conn = rpc.Connection.instance()
logging.getLogger().setLevel(logging.DEBUG)
@@ -62,27 +61,23 @@ class CloudTestCase(test.TestCase):
# set up our cloud
self.cloud = cloud.CloudController()
# set up a service
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.compute_topic,
proxy=self.compute)
self.compute_consumer.attach_to_eventlet()
self.network = utils.import_object(FLAGS.network_manager)
self.network_consumer = rpc.AdapterConsumer(connection=self.conn,
topic=FLAGS.network_topic,
proxy=self.network)
self.network_consumer.attach_to_eventlet()
# set up services
self.compute = service.Service.create(binary='nova-compute')
self.compute.start()
self.network = service.Service.create(binary='nova-network')
self.network.start()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('admin', 'admin', 'admin', True)
self.project = self.manager.create_project('proj', 'admin', 'proj')
self.context = context.RequestContext(user=self.user,
project=self.project)
project=self.project)
def tearDown(self):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
self.compute.kill()
self.network.kill()
super(CloudTestCase, self).tearDown()
def _create_key(self, name):
@@ -109,12 +104,13 @@ class CloudTestCase(test.TestCase):
{'address': address,
'host': FLAGS.host})
self.cloud.allocate_address(self.context)
inst = db.instance_create(self.context, {})
inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
greenthread.sleep(0.3)
self.cloud.disassociate_address(self.context,
public_ip=address)
self.cloud.release_address(self.context,

View File

@@ -41,6 +41,7 @@ class ComputeTestCase(test.TestCase):
logging.getLogger().setLevel(logging.DEBUG)
super(ComputeTestCase, self).setUp()
self.flags(connection_type='fake',
stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()

View File

@@ -26,6 +26,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import service
from nova import test
from nova import utils
from nova.auth import manager
@@ -40,6 +41,7 @@ class NetworkTestCase(test.TestCase):
# NOTE(vish): if you change these flags, make sure to change the
# flags in the corresponding section in nova-dhcpbridge
self.flags(connection_type='fake',
fake_call=True,
fake_network=True,
network_size=16,
num_networks=5)
@@ -56,16 +58,13 @@ class NetworkTestCase(test.TestCase):
# create the necessary network data for the project
user_context = context.RequestContext(project=self.projects[i],
user=self.user)
network_ref = self.network.get_network(user_context)
self.network.set_network_host(context.get_admin_context(),
network_ref['id'])
host = self.network.get_network_host(user_context.elevated())
instance_ref = self._create_instance(0)
self.instance_id = instance_ref['id']
instance_ref = self._create_instance(1)
self.instance2_id = instance_ref['id']
def tearDown(self):
super(NetworkTestCase, self).tearDown()
# TODO(termie): this should really be instantiating clean datastores
# in between runs, one failure kills all the tests
db.instance_destroy(context.get_admin_context(), self.instance_id)
@@ -73,6 +72,7 @@ class NetworkTestCase(test.TestCase):
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
super(NetworkTestCase, self).tearDown()
def _create_instance(self, project_num, mac=None):
if not mac:

View File

@@ -33,7 +33,7 @@ class RpcTestCase(test.TestCase):
"""Test cases for rpc"""
def setUp(self):
super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance()
self.conn = rpc.Connection.instance(True)
self.receiver = TestReceiver()
self.consumer = rpc.AdapterConsumer(connection=self.conn,
topic='test',
@@ -79,6 +79,33 @@ class RpcTestCase(test.TestCase):
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
def test_nested_calls(self):
"""Test that we can do an rpc.call inside another call"""
class Nested(object):
@staticmethod
def echo(context, queue, value):
"""Calls echo in the passed queue"""
logging.debug("Nested received %s, %s", queue, value)
ret = rpc.call(context,
queue,
{"method": "echo",
"args": {"value": value}})
logging.debug("Nested return %s", ret)
return value
nested = Nested()
conn = rpc.Connection.instance(True)
consumer = rpc.AdapterConsumer(connection=conn,
topic='nested',
proxy=nested)
consumer.attach_to_eventlet()
value = 42
result = rpc.call(self.context,
'nested', {"method": "echo",
"args": {"queue": "test",
"value": value}})
self.assertEqual(value, result)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call

View File

@@ -78,6 +78,7 @@ class SimpleDriverTestCase(test.TestCase):
def setUp(self):
super(SimpleDriverTestCase, self).setUp()
self.flags(connection_type='fake',
stub_network=True,
max_cores=4,
max_gigabytes=4,
network_manager='nova.network.manager.FlatManager',

View File

@@ -33,6 +33,7 @@ flags.DECLARE('instances_path', 'nova.compute.manager')
class LibvirtConnTestCase(test.TestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
self.flags(fake_call=True)
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
@@ -88,9 +89,9 @@ class LibvirtConnTestCase(test.TestCase):
user_context = context.RequestContext(project=self.project,
user=self.user)
instance_ref = db.instance_create(user_context, instance)
network_ref = self.network.get_network(user_context)
self.network.set_network_host(context.get_admin_context(),
network_ref['id'])
host = self.network.get_network_host(user_context.elevated())
network_ref = db.project_get_network(context.get_admin_context(),
self.project.id)
fixed_ip = {'address': self.test_ip,
'network_id': network_ref['id']}