Merge "Move compute node operations to conductor"
This commit is contained in:
commit
52fe25a528
@ -25,7 +25,6 @@ from nova.compute import task_states
|
||||
from nova.compute import vm_states
|
||||
from nova import conductor
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common import importutils
|
||||
@ -304,8 +303,8 @@ class ResourceTracker(object):
|
||||
def _create(self, context, values):
|
||||
"""Create the compute node in the DB."""
|
||||
# initialize load stats from existing instances:
|
||||
compute_node = db.compute_node_create(context, values)
|
||||
self.compute_node = dict(compute_node)
|
||||
self.compute_node = self.conductor_api.compute_node_create(context,
|
||||
values)
|
||||
|
||||
def _get_service(self, context):
|
||||
try:
|
||||
@ -349,9 +348,10 @@ class ResourceTracker(object):
|
||||
|
||||
def _update(self, context, values, prune_stats=False):
|
||||
"""Persist the compute node updates to the DB."""
|
||||
compute_node = db.compute_node_update(context,
|
||||
self.compute_node['id'], values, prune_stats)
|
||||
self.compute_node = dict(compute_node)
|
||||
if "service" in self.compute_node:
|
||||
del self.compute_node['service']
|
||||
self.compute_node = self.conductor_api.compute_node_update(
|
||||
context, self.compute_node, values, prune_stats)
|
||||
|
||||
def confirm_resize(self, context, migration, status='confirmed'):
|
||||
"""Cleanup usage for a confirmed resize."""
|
||||
|
@ -278,6 +278,13 @@ class LocalAPI(object):
|
||||
def service_destroy(self, context, service_id):
|
||||
return self._manager.service_destroy(context, service_id)
|
||||
|
||||
def compute_node_create(self, context, values):
|
||||
return self._manager.compute_node_create(context, values)
|
||||
|
||||
def compute_node_update(self, context, node, values, prune_stats=False):
|
||||
return self._manager.compute_node_update(context, node, values,
|
||||
prune_stats)
|
||||
|
||||
|
||||
class API(object):
|
||||
"""Conductor API that does updates via RPC to the ConductorManager."""
|
||||
@ -534,3 +541,10 @@ class API(object):
|
||||
|
||||
def service_destroy(self, context, service_id):
|
||||
return self.conductor_rpcapi.service_destroy(context, service_id)
|
||||
|
||||
def compute_node_create(self, context, values):
|
||||
return self.conductor_rpcapi.compute_node_create(context, values)
|
||||
|
||||
def compute_node_update(self, context, node, values, prune_stats=False):
|
||||
return self.conductor_rpcapi.compute_node_update(context, node,
|
||||
values, prune_stats)
|
||||
|
@ -43,7 +43,7 @@ datetime_fields = ['launched_at', 'terminated_at']
|
||||
class ConductorManager(manager.SchedulerDependentManager):
|
||||
"""Mission: TBD."""
|
||||
|
||||
RPC_API_VERSION = '1.32'
|
||||
RPC_API_VERSION = '1.33'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ConductorManager, self).__init__(service_name='conductor',
|
||||
@ -301,3 +301,12 @@ class ConductorManager(manager.SchedulerDependentManager):
|
||||
@rpc_common.client_exceptions(exception.ServiceNotFound)
|
||||
def service_destroy(self, context, service_id):
|
||||
self.db.service_destroy(context, service_id)
|
||||
|
||||
def compute_node_create(self, context, values):
|
||||
result = self.db.compute_node_create(context, values)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
||||
def compute_node_update(self, context, node, values, prune_stats=False):
|
||||
result = self.db.compute_node_update(context, node['id'], values,
|
||||
prune_stats)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
@ -65,6 +65,7 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
1.30 - Added migration_create
|
||||
1.31 - Added migration_get_in_progress_by_host_and_node
|
||||
1.32 - Added optional node to instance_get_all_by_host
|
||||
1.33 - Added compute_node_create and compute_node_update
|
||||
"""
|
||||
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
@ -305,3 +306,13 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
def service_destroy(self, context, service_id):
|
||||
msg = self.make_msg('service_destroy', service_id=service_id)
|
||||
return self.call(context, msg, version='1.29')
|
||||
|
||||
def compute_node_create(self, context, values):
|
||||
msg = self.make_msg('compute_node_create', values=values)
|
||||
return self.call(context, msg, version='1.33')
|
||||
|
||||
def compute_node_update(self, context, node, values, prune_stats=False):
|
||||
node_p = jsonutils.to_primitive(node)
|
||||
msg = self.make_msg('compute_node_update', node=node_p, values=values,
|
||||
prune_stats=prune_stats)
|
||||
return self.call(context, msg, version='1.33')
|
||||
|
@ -253,6 +253,12 @@ def exact_filter(query, model, filters, legal_keys):
|
||||
return query
|
||||
|
||||
|
||||
def convert_datetimes(values, *datetime_keys):
|
||||
for key in values:
|
||||
if key in datetime_keys and isinstance(values[key], basestring):
|
||||
values[key] = timeutils.parse_strtime(values[key])
|
||||
return values
|
||||
|
||||
###################
|
||||
|
||||
|
||||
@ -497,6 +503,7 @@ def compute_node_create(context, values):
|
||||
"""Creates a new ComputeNode and populates the capacity fields
|
||||
with the most recent data."""
|
||||
_prep_stats_dict(values)
|
||||
convert_datetimes(values, 'created_at', 'deleted_at', 'updated_at')
|
||||
|
||||
compute_node_ref = models.ComputeNode()
|
||||
compute_node_ref.update(values)
|
||||
@ -545,9 +552,10 @@ def compute_node_update(context, compute_id, values, prune_stats=False):
|
||||
stats = values.pop('stats', {})
|
||||
|
||||
session = get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
with session.begin():
|
||||
_update_stats(context, stats, compute_id, session, prune_stats)
|
||||
compute_ref = _compute_node_get(context, compute_id, session=session)
|
||||
convert_datetimes(values, 'created_at', 'deleted_at', 'updated_at')
|
||||
compute_ref.update(values)
|
||||
return compute_ref
|
||||
|
||||
|
@ -398,6 +398,25 @@ class _BaseTestCase(object):
|
||||
result = self.conductor.ping(self.context, 'foo')
|
||||
self.assertEqual(result, {'service': 'conductor', 'arg': 'foo'})
|
||||
|
||||
def test_compute_node_create(self):
|
||||
self.mox.StubOutWithMock(db, 'compute_node_create')
|
||||
db.compute_node_create(self.context, 'fake-values').AndReturn(
|
||||
'fake-result')
|
||||
self.mox.ReplayAll()
|
||||
result = self.conductor.compute_node_create(self.context,
|
||||
'fake-values')
|
||||
self.assertEqual(result, 'fake-result')
|
||||
|
||||
def test_compute_node_update(self):
|
||||
node = {'id': 'fake-id'}
|
||||
self.mox.StubOutWithMock(db, 'compute_node_update')
|
||||
db.compute_node_update(self.context, node['id'], 'fake-values',
|
||||
False).AndReturn('fake-result')
|
||||
self.mox.ReplayAll()
|
||||
result = self.conductor.compute_node_update(self.context, node,
|
||||
'fake-values', False)
|
||||
self.assertEqual(result, 'fake-result')
|
||||
|
||||
|
||||
class ConductorTestCase(_BaseTestCase, test.TestCase):
|
||||
"""Conductor Manager Tests."""
|
||||
|
Loading…
Reference in New Issue
Block a user