Move compute node operations to conductor
This adds the following operations to conductor: compute_node_create() compute_node_update() It also makes resource_tracker use them instead of making direct calls to the database. Further, it introduces a convert_datetimes() helper to the db api, which can be used internally to convert ISO time strings back to datetimes, so that we're getting them over the RPC wire. This should be used in other places in the future to fix similar situations and avoid one-off conversions in random places. This removes the last direct database call from resource tracker, and thus the db import. Related to blueprint no-db-compute Change-Id: I49c0bb30e7dd46d6f68550ebb644381961a0867f
This commit is contained in:
parent
01f204efdf
commit
99cb17da81
@ -25,7 +25,6 @@ from nova.compute import task_states
|
||||
from nova.compute import vm_states
|
||||
from nova import conductor
|
||||
from nova import context
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.openstack.common import cfg
|
||||
from nova.openstack.common import importutils
|
||||
@ -304,8 +303,8 @@ class ResourceTracker(object):
|
||||
def _create(self, context, values):
|
||||
"""Create the compute node in the DB."""
|
||||
# initialize load stats from existing instances:
|
||||
compute_node = db.compute_node_create(context, values)
|
||||
self.compute_node = dict(compute_node)
|
||||
self.compute_node = self.conductor_api.compute_node_create(context,
|
||||
values)
|
||||
|
||||
def _get_service(self, context):
|
||||
try:
|
||||
@ -349,9 +348,10 @@ class ResourceTracker(object):
|
||||
|
||||
def _update(self, context, values, prune_stats=False):
|
||||
"""Persist the compute node updates to the DB."""
|
||||
compute_node = db.compute_node_update(context,
|
||||
self.compute_node['id'], values, prune_stats)
|
||||
self.compute_node = dict(compute_node)
|
||||
if "service" in self.compute_node:
|
||||
del self.compute_node['service']
|
||||
self.compute_node = self.conductor_api.compute_node_update(
|
||||
context, self.compute_node, values, prune_stats)
|
||||
|
||||
def confirm_resize(self, context, migration, status='confirmed'):
|
||||
"""Cleanup usage for a confirmed resize."""
|
||||
|
@ -278,6 +278,13 @@ class LocalAPI(object):
|
||||
def service_destroy(self, context, service_id):
|
||||
return self._manager.service_destroy(context, service_id)
|
||||
|
||||
def compute_node_create(self, context, values):
|
||||
return self._manager.compute_node_create(context, values)
|
||||
|
||||
def compute_node_update(self, context, node, values, prune_stats=False):
|
||||
return self._manager.compute_node_update(context, node, values,
|
||||
prune_stats)
|
||||
|
||||
|
||||
class API(object):
|
||||
"""Conductor API that does updates via RPC to the ConductorManager."""
|
||||
@ -534,3 +541,10 @@ class API(object):
|
||||
|
||||
def service_destroy(self, context, service_id):
|
||||
return self.conductor_rpcapi.service_destroy(context, service_id)
|
||||
|
||||
def compute_node_create(self, context, values):
|
||||
return self.conductor_rpcapi.compute_node_create(context, values)
|
||||
|
||||
def compute_node_update(self, context, node, values, prune_stats=False):
|
||||
return self.conductor_rpcapi.compute_node_update(context, node,
|
||||
values, prune_stats)
|
||||
|
@ -43,7 +43,7 @@ datetime_fields = ['launched_at', 'terminated_at']
|
||||
class ConductorManager(manager.SchedulerDependentManager):
|
||||
"""Mission: TBD."""
|
||||
|
||||
RPC_API_VERSION = '1.32'
|
||||
RPC_API_VERSION = '1.33'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ConductorManager, self).__init__(service_name='conductor',
|
||||
@ -301,3 +301,12 @@ class ConductorManager(manager.SchedulerDependentManager):
|
||||
@rpc_common.client_exceptions(exception.ServiceNotFound)
|
||||
def service_destroy(self, context, service_id):
|
||||
self.db.service_destroy(context, service_id)
|
||||
|
||||
def compute_node_create(self, context, values):
|
||||
result = self.db.compute_node_create(context, values)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
||||
def compute_node_update(self, context, node, values, prune_stats=False):
|
||||
result = self.db.compute_node_update(context, node['id'], values,
|
||||
prune_stats)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
@ -65,6 +65,7 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
1.30 - Added migration_create
|
||||
1.31 - Added migration_get_in_progress_by_host_and_node
|
||||
1.32 - Added optional node to instance_get_all_by_host
|
||||
1.33 - Added compute_node_create and compute_node_update
|
||||
"""
|
||||
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
@ -305,3 +306,13 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
def service_destroy(self, context, service_id):
|
||||
msg = self.make_msg('service_destroy', service_id=service_id)
|
||||
return self.call(context, msg, version='1.29')
|
||||
|
||||
def compute_node_create(self, context, values):
|
||||
msg = self.make_msg('compute_node_create', values=values)
|
||||
return self.call(context, msg, version='1.33')
|
||||
|
||||
def compute_node_update(self, context, node, values, prune_stats=False):
|
||||
node_p = jsonutils.to_primitive(node)
|
||||
msg = self.make_msg('compute_node_update', node=node_p, values=values,
|
||||
prune_stats=prune_stats)
|
||||
return self.call(context, msg, version='1.33')
|
||||
|
@ -253,6 +253,12 @@ def exact_filter(query, model, filters, legal_keys):
|
||||
return query
|
||||
|
||||
|
||||
def convert_datetimes(values, *datetime_keys):
|
||||
for key in values:
|
||||
if key in datetime_keys and isinstance(values[key], basestring):
|
||||
values[key] = timeutils.parse_strtime(values[key])
|
||||
return values
|
||||
|
||||
###################
|
||||
|
||||
|
||||
@ -497,6 +503,7 @@ def compute_node_create(context, values):
|
||||
"""Creates a new ComputeNode and populates the capacity fields
|
||||
with the most recent data."""
|
||||
_prep_stats_dict(values)
|
||||
convert_datetimes(values, 'created_at', 'deleted_at', 'updated_at')
|
||||
|
||||
compute_node_ref = models.ComputeNode()
|
||||
compute_node_ref.update(values)
|
||||
@ -545,9 +552,10 @@ def compute_node_update(context, compute_id, values, prune_stats=False):
|
||||
stats = values.pop('stats', {})
|
||||
|
||||
session = get_session()
|
||||
with session.begin(subtransactions=True):
|
||||
with session.begin():
|
||||
_update_stats(context, stats, compute_id, session, prune_stats)
|
||||
compute_ref = _compute_node_get(context, compute_id, session=session)
|
||||
convert_datetimes(values, 'created_at', 'deleted_at', 'updated_at')
|
||||
compute_ref.update(values)
|
||||
return compute_ref
|
||||
|
||||
|
@ -398,6 +398,25 @@ class _BaseTestCase(object):
|
||||
result = self.conductor.ping(self.context, 'foo')
|
||||
self.assertEqual(result, {'service': 'conductor', 'arg': 'foo'})
|
||||
|
||||
def test_compute_node_create(self):
|
||||
self.mox.StubOutWithMock(db, 'compute_node_create')
|
||||
db.compute_node_create(self.context, 'fake-values').AndReturn(
|
||||
'fake-result')
|
||||
self.mox.ReplayAll()
|
||||
result = self.conductor.compute_node_create(self.context,
|
||||
'fake-values')
|
||||
self.assertEqual(result, 'fake-result')
|
||||
|
||||
def test_compute_node_update(self):
|
||||
node = {'id': 'fake-id'}
|
||||
self.mox.StubOutWithMock(db, 'compute_node_update')
|
||||
db.compute_node_update(self.context, node['id'], 'fake-values',
|
||||
False).AndReturn('fake-result')
|
||||
self.mox.ReplayAll()
|
||||
result = self.conductor.compute_node_update(self.context, node,
|
||||
'fake-values', False)
|
||||
self.assertEqual(result, 'fake-result')
|
||||
|
||||
|
||||
class ConductorTestCase(_BaseTestCase, test.TestCase):
|
||||
"""Conductor Manager Tests."""
|
||||
|
Loading…
Reference in New Issue
Block a user