Move queue_get_for() from db to rpc.

Part of blueprint common-rpc.

The function queue_get_for() is a utility function used by various
consumers of the rpc API.  This function lived in the db API, but never
ended up using anything from the database.  This patch moves it into the
rpc API so that it can be used by other users of rpc once it moves into
openstack-common.

Change-Id: If92675beecff5471b416a929c161b810e3c71939
This commit is contained in:
Russell Bryant
2012-05-29 16:35:35 -04:00
parent 56295a8d67
commit 592d693346
4 changed files with 58 additions and 53 deletions

View File

@@ -1152,7 +1152,7 @@ class VolumeCommands(object):
return
rpc.cast(ctxt,
db.queue_get_for(ctxt, FLAGS.volume_topic, host),
rpc.queue_get_for(ctxt, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume['id']}})
@@ -1170,7 +1170,7 @@ class VolumeCommands(object):
instance = db.instance_get(ctxt, volume['instance_id'])
host = instance['host']
rpc.cast(ctxt,
db.queue_get_for(ctxt, FLAGS.compute_topic, host),
rpc.queue_get_for(ctxt, FLAGS.compute_topic, host),
{"method": "attach_volume",
"args": {"instance_id": instance['id'],
"volume_id": volume['id'],

View File

@@ -226,6 +226,11 @@ def fanout_cast_to_server(context, server_params, topic, msg):
topic, msg)
def queue_get_for(context, topic, host):
"""Get a queue name for a given topic + host."""
return '%s.%s' % (topic, host)
_RPCIMPL = None

View File

@@ -62,8 +62,8 @@ def cast_to_volume_host(context, host, method, update_db=True, **kwargs):
db.volume_update(context, volume_id,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
db.queue_get_for(context, 'volume', host),
{"method": method, "args": kwargs})
rpc.queue_get_for(context, 'volume', host),
{"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to volume '%(host)s'") % locals())
@@ -79,8 +79,8 @@ def cast_to_compute_host(context, host, method, update_db=True, **kwargs):
db.instance_update(context, instance_uuid,
{'host': host, 'scheduled_at': now})
rpc.cast(context,
db.queue_get_for(context, 'compute', host),
{"method": method, "args": kwargs})
rpc.queue_get_for(context, 'compute', host),
{"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to compute '%(host)s'") % locals())
@@ -88,8 +88,8 @@ def cast_to_network_host(context, host, method, update_db=False, **kwargs):
"""Cast request to a network host queue"""
rpc.cast(context,
db.queue_get_for(context, 'network', host),
{"method": method, "args": kwargs})
rpc.queue_get_for(context, 'network', host),
{"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to network '%(host)s'") % locals())
@@ -106,8 +106,8 @@ def cast_to_host(context, topic, host, method, update_db=True, **kwargs):
func(context, host, method, update_db=update_db, **kwargs)
else:
rpc.cast(context,
db.queue_get_for(context, topic, host),
{"method": method, "args": kwargs})
rpc.queue_get_for(context, topic, host),
{"method": method, "args": kwargs})
LOG.debug(_("Casted '%(method)s' to %(topic)s '%(host)s'")
% locals())
@@ -355,7 +355,7 @@ class Scheduler(object):
# Checking cpuinfo.
try:
rpc.call(context,
db.queue_get_for(context, FLAGS.compute_topic, dest),
rpc.queue_get_for(context, FLAGS.compute_topic, dest),
{"method": 'compare_cpu',
"args": {'cpu_info': oservice_ref['cpu_info']}})
@@ -443,7 +443,7 @@ class Scheduler(object):
available = available_gb * (1024 ** 3)
# Getting necessary disk size
topic = db.queue_get_for(context, FLAGS.compute_topic,
topic = rpc.queue_get_for(context, FLAGS.compute_topic,
instance_ref['host'])
ret = rpc.call(context, topic,
{"method": 'get_instance_disk_info',
@@ -492,8 +492,8 @@ class Scheduler(object):
"""
src = instance_ref['host']
dst_t = db.queue_get_for(context, FLAGS.compute_topic, dest)
src_t = db.queue_get_for(context, FLAGS.compute_topic, src)
dst_t = rpc.queue_get_for(context, FLAGS.compute_topic, dest)
src_t = rpc.queue_get_for(context, FLAGS.compute_topic, src)
filename = rpc.call(context, dst_t,
{"method": 'create_shared_storage_test_file'})

View File

@@ -476,7 +476,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'instance_update_and_get_original')
@@ -504,7 +504,7 @@ class SchedulerTestCase(test.TestCase):
# assert_compute_node_has_enough_disk()
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1025)
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue1')
rpc.call(self.context, 'src_queue1',
{'method': 'get_instance_disk_info',
@@ -512,9 +512,9 @@ class SchedulerTestCase(test.TestCase):
json.dumps([{'disk_size': 1024 * (1024 ** 3)}]))
# Common checks (shared storage ok, same hypervisor,e tc)
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -535,7 +535,7 @@ class SchedulerTestCase(test.TestCase):
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
@@ -696,7 +696,7 @@ class SchedulerTestCase(test.TestCase):
'assert_compute_node_has_enough_memory')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
dest = 'fake_host2'
@@ -717,7 +717,7 @@ class SchedulerTestCase(test.TestCase):
# Not enough disk
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1023)
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
rpc.call(self.context, 'src_queue',
{'method': 'get_instance_disk_info',
@@ -737,7 +737,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
@@ -751,9 +751,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -779,7 +779,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
@@ -793,9 +793,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -819,7 +819,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -834,9 +834,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -868,7 +868,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -883,9 +883,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -916,7 +916,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
@@ -931,9 +931,9 @@ class SchedulerTestCase(test.TestCase):
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
tmp_filename = 'test-filename'
rpc.call(self.context, 'dest_queue',
@@ -953,7 +953,7 @@ class SchedulerTestCase(test.TestCase):
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
db.queue_get_for(self.context, FLAGS.compute_topic,
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
@@ -1018,13 +1018,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
self.mox.StubOutWithMock(utils, 'utcnow')
self.mox.StubOutWithMock(db, 'volume_update')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
utils.utcnow().AndReturn('fake-now')
db.volume_update(self.context, 31337,
{'host': host, 'scheduled_at': 'fake-now'})
db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1039,10 +1039,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1057,10 +1057,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
db.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.queue_get_for(self.context, 'volume', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1078,13 +1078,13 @@ class SchedulerDriverModuleTestCase(test.TestCase):
self.mox.StubOutWithMock(utils, 'utcnow')
self.mox.StubOutWithMock(db, 'instance_update')
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
utils.utcnow().AndReturn('fake-now')
db.instance_update(self.context, 31337,
{'host': host, 'scheduled_at': 'fake-now'})
db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1099,10 +1099,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1117,10 +1117,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
db.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.queue_get_for(self.context, 'compute', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1135,10 +1135,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
fake_kwargs = {'extra_arg': 'meow'}
queue = 'fake_queue'
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
db.queue_get_for(self.context, 'network', host).AndReturn(queue)
rpc.queue_get_for(self.context, 'network', host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})
@@ -1193,10 +1193,10 @@ class SchedulerDriverModuleTestCase(test.TestCase):
topic = 'unknown'
queue = 'fake_queue'
self.mox.StubOutWithMock(db, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'cast')
db.queue_get_for(self.context, topic, host).AndReturn(queue)
rpc.queue_get_for(self.context, topic, host).AndReturn(queue)
rpc.cast(self.context, queue,
{'method': method,
'args': fake_kwargs})