Merge "Create a Scheduler client library"

This commit is contained in:
Jenkins 2014-08-31 04:31:01 +00:00 committed by Gerrit Code Review
commit 999a192e90
5 changed files with 162 additions and 4 deletions

View File

@ -18,6 +18,7 @@ Track resources like memory and disk for a compute host. Provides the
scheduler with useful information about availability through the ComputeNode
model.
"""
import copy
from oslo.config import cfg
@ -38,6 +39,7 @@ from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.pci import pci_manager
from nova import rpc
from nova.scheduler import client as scheduler_client
from nova import utils
resource_tracker_opts = [
@ -83,6 +85,7 @@ class ResourceTracker(object):
ext_resources.ResourceHandler(CONF.compute_resources)
self.notifier = rpc.get_notifier()
self.old_resources = {}
self.scheduler_client = scheduler_client.SchedulerClient()
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
def instance_claim(self, context, instance_ref, limits=None):
@ -397,6 +400,8 @@ class ResourceTracker(object):
self.compute_node = self.conductor_api.compute_node_create(context,
values)
# NOTE(sbauza): We don't want to miss the first creation event
self._update_resource_stats(context, values)
def _get_service(self, context):
try:
@ -463,12 +468,12 @@ class ResourceTracker(object):
def _resource_change(self, resources):
"""Check to see if any resouces have changed."""
if cmp(resources, self.old_resources) != 0:
self.old_resources = resources
self.old_resources = copy.deepcopy(resources)
return True
return False
def _update(self, context, values):
"""Persist the compute node updates to the DB."""
"""Update partial stats locally and populate them to Scheduler."""
self._write_ext_resources(values)
# NOTE(pmurray): the stats field is stored as a json string. The
# json conversion will be done automatically by the ComputeNode object
@ -479,11 +484,20 @@ class ResourceTracker(object):
return
if "service" in self.compute_node:
del self.compute_node['service']
self.compute_node = self.conductor_api.compute_node_update(
context, self.compute_node, values)
# NOTE(sbauza): Now the DB update is asynchronous, we need to locally
# update the values
self.compute_node.update(values)
# Persist the stats to the Scheduler
self._update_resource_stats(context, values)
if self.pci_tracker:
self.pci_tracker.save(context)
def _update_resource_stats(self, context, values):
stats = values.copy()
stats['id'] = self.compute_node['id']
self.scheduler_client.update_resource_stats(
context, (self.host, self.nodename), stats)
def _update_usage(self, resources, usage, sign=1):
mem_usage = usage['memory_mb']

View File

@ -842,6 +842,11 @@ class ComputeHostNotFound(HostNotFound):
msg_fmt = _("Compute host %(host)s could not be found.")
class ComputeHostNotCreated(HostNotFound):
msg_fmt = _("Compute host %(name)s needs to be created first"
" before updating.")
class HostBinaryNotFound(NotFound):
msg_fmt = _("Could not find binary %(binary)s on host %(host)s.")

52
nova/scheduler/client.py Normal file
View File

@ -0,0 +1,52 @@
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import conductor
from nova import exception
from nova.i18n import _LI
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class SchedulerClient(object):
"""Client library for placing calls to the scheduler."""
def __init__(self):
self.conductor_api = conductor.API()
def update_resource_stats(self, context, name, stats):
"""Creates or updates stats for the desired service.
:param context: local context
:param name: name of resource to update
:type name: immutable (str or tuple)
:param stats: updated stats to send to scheduler
:type stats: dict
"""
if 'id' in stats:
compute_node_id = stats['id']
updates = stats.copy()
del updates['id']
else:
raise exception.ComputeHostNotCreated(name=str(name))
self.conductor_api.compute_node_update(context,
{'id': compute_node_id},
updates)
LOG.info(_LI('Compute_service record updated for '
'%s') % str(name))

View File

@ -408,6 +408,7 @@ class MissingComputeNodeTestCase(BaseTestCase):
self._fake_service_get_by_compute_host)
self.stubs.Set(db, 'compute_node_create',
self._fake_create_compute_node)
self.tracker.scheduler_client.update_resource_stats = mock.Mock()
def _fake_create_compute_node(self, context, values):
self.created = True
@ -554,6 +555,39 @@ class TrackerTestCase(BaseTrackerTestCase):
jsonutils.loads(self.tracker.compute_node['pci_stats']))
class SchedulerClientTrackerTestCase(BaseTrackerTestCase):
def setUp(self):
super(SchedulerClientTrackerTestCase, self).setUp()
self.tracker.scheduler_client.update_resource_stats = mock.Mock()
def test_create_resource(self):
self.tracker._write_ext_resources = mock.Mock()
self.tracker.conductor_api.compute_node_create = mock.Mock(
return_value=dict(id=1))
values = {'stats': {}, 'foo': 'bar', 'baz_count': 0}
self.tracker._create(self.context, values)
expected = {'stats': '{}', 'foo': 'bar', 'baz_count': 0,
'id': 1}
self.tracker.scheduler_client.update_resource_stats.\
assert_called_once_with(self.context,
("fakehost", "fakenode"),
expected)
def test_update_resource(self):
self.tracker._write_ext_resources = mock.Mock()
values = {'stats': {}, 'foo': 'bar', 'baz_count': 0}
self.tracker._update(self.context, values)
expected = {'stats': '{}', 'foo': 'bar', 'baz_count': 0,
'id': 1}
self.tracker.scheduler_client.update_resource_stats.\
assert_called_once_with(self.context,
("fakehost", "fakenode"),
expected)
class TrackerPciStatsTestCase(BaseTrackerTestCase):
def test_update_compute_node(self):

View File

@ -0,0 +1,53 @@
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from nova.conductor import api as conductor_api
from nova import context
from nova import exception
from nova.scheduler import client as scheduler_client
from nova import test
"""Tests for Scheduler Client."""
class SchedulerClientTestCase(test.TestCase):
def setUp(self):
super(SchedulerClientTestCase, self).setUp()
self.context = context.get_admin_context()
self.flags(use_local=True, group='conductor')
self.client = scheduler_client.SchedulerClient()
def test_constructor(self):
self.assertIsNotNone(self.client.conductor_api)
@mock.patch.object(conductor_api.LocalAPI, 'compute_node_update')
def test_update_compute_node_works(self, mock_cn_update):
stats = {"id": 1, "foo": "bar"}
self.client.update_resource_stats(self.context,
('fakehost', 'fakenode'),
stats)
mock_cn_update.assert_called_once_with(self.context,
{"id": 1},
{"foo": "bar"})
def test_update_compute_node_raises(self):
stats = {"foo": "bar"}
self.assertRaises(exception.ComputeHostNotCreated,
self.client.update_resource_stats,
self.context, ('fakehost', 'fakenode'), stats)