Broke parts of compute manager out into compute.api to separate what gets run on the API side vs the worker side.

This commit is contained in:
Eric Day 2010-12-01 09:24:39 -08:00
parent 7d771bf9c5
commit 6956057ac4
7 changed files with 262 additions and 188 deletions

View File

@ -39,6 +39,7 @@ from nova import flags
from nova import quota
from nova import rpc
from nova import utils
from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.api import cloud
from nova.image.s3 import S3ImageService
@ -94,7 +95,7 @@ class CloudController(object):
"""
def __init__(self):
self.network_manager = utils.import_object(FLAGS.network_manager)
self.compute_manager = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
self.image_service = S3ImageService()
self.setup()
@ -255,7 +256,7 @@ class CloudController(object):
return True
def describe_security_groups(self, context, group_name=None, **kwargs):
self.compute_manager.ensure_default_security_group(context)
self.compute_api.ensure_default_security_group(context)
if context.user.is_admin():
groups = db.security_group_get_all(context)
else:
@ -353,7 +354,7 @@ class CloudController(object):
return False
def revoke_security_group_ingress(self, context, group_name, **kwargs):
self.compute_manager.ensure_default_security_group(context)
self.compute_api.ensure_default_security_group(context)
security_group = db.security_group_get_by_name(context,
context.project_id,
group_name)
@ -378,7 +379,7 @@ class CloudController(object):
# for these operations, so support for newer API versions
# is sketchy.
def authorize_security_group_ingress(self, context, group_name, **kwargs):
self.compute_manager.ensure_default_security_group(context)
self.compute_api.ensure_default_security_group(context)
security_group = db.security_group_get_by_name(context,
context.project_id,
group_name)
@ -414,7 +415,7 @@ class CloudController(object):
return source_project_id
def create_security_group(self, context, group_name, group_description):
self.compute_manager.ensure_default_security_group(context)
self.compute_api.ensure_default_security_group(context)
if db.security_group_exists(context, context.project_id, group_name):
raise exception.ApiError('group %s already exists' % group_name)
@ -748,7 +749,7 @@ class CloudController(object):
def run_instances(self, context, **kwargs):
max_count = int(kwargs.get('max_count', 1))
instances = self.compute_manager.create_instances(context,
instances = self.compute_api.create_instances(context,
instance_types.get_by_type(kwargs.get('instance_type', None)),
self.image_service,
kwargs['image_id'],
@ -789,7 +790,7 @@ class CloudController(object):
id_str)
continue
now = datetime.datetime.utcnow()
self.compute_manager.update_instance(context,
self.compute_api.update_instance(context,
instance_ref['id'],
state_description='terminating',
state=0,

View File

@ -27,6 +27,7 @@ from nova import wsgi
from nova import context
from nova.api import cloud
from nova.api.openstack import faults
from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.compute import power_state
import nova.api.openstack
@ -95,7 +96,7 @@ class Controller(wsgi.Controller):
db_driver = FLAGS.db_driver
self.db_driver = utils.import_object(db_driver)
self.network_manager = utils.import_object(FLAGS.network_manager)
self.compute_manager = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
super(Controller, self).__init__()
def index(self, req):
@ -147,7 +148,7 @@ class Controller(wsgi.Controller):
user_id = req.environ['nova.context']['user']['id']
ctxt = context.RequestContext(user_id, user_id)
key_pair = self.db_driver.key_pair_get_all_by_user(None, user_id)[0]
instances = self.compute_manager.create_instances(ctxt,
instances = self.compute_api.create_instances(ctxt,
instance_types.get_by_flavor_id(env['server']['flavorId']),
utils.import_object(FLAGS.image_service),
env['server']['imageId'],

207
nova/compute/api.py Normal file
View File

@ -0,0 +1,207 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all API requests relating to instances (guest vms).
"""
import logging
import time
from nova import db
from nova import exception
from nova import flags
from nova import quota
from nova import rpc
from nova import utils
from nova.compute import instance_types
from nova.db import base
FLAGS = flags.FLAGS
def generate_default_hostname(internal_id):
"""Default function to generate a hostname given an instance reference."""
return str(internal_id)
class ComputeAPI(base.Base):
"""API for interacting with the compute manager."""
def __init__(self, **kwargs):
self.network_manager = utils.import_object(FLAGS.network_manager)
super(ComputeAPI, self).__init__(**kwargs)
# TODO(eday): network_topic arg should go away once we push network
# allocation into the scheduler or compute worker.
def create_instances(self, context, instance_type, image_service, image_id,
network_topic, min_count=1, max_count=1,
kernel_id=None, ramdisk_id=None, name='',
description='', user_data='', key_name=None,
key_data=None, security_group='default',
generate_hostname=generate_default_hostname):
"""Create the number of instances requested if quote and
other arguments check out ok."""
num_instances = quota.allowed_instances(context, max_count,
instance_type)
if num_instances < min_count:
logging.warn("Quota exceeeded for %s, tried to run %s instances",
context.project_id, min_count)
raise quota.QuotaError("Instance quota exceeded. You can only "
"run %s more instances of this type." %
num_instances, "InstanceLimitExceeded")
is_vpn = image_id == FLAGS.vpn_image_id
if not is_vpn:
image = image_service.show(context, image_id)
if kernel_id is None:
kernel_id = image.get('kernelId', FLAGS.default_kernel)
if ramdisk_id is None:
ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
# Make sure we have access to kernel and ramdisk
image_service.show(context, kernel_id)
image_service.show(context, ramdisk_id)
if security_group is None:
security_group = ['default']
if not type(security_group) is list:
security_group = [security_group]
security_groups = []
self.ensure_default_security_group(context)
for security_group_name in security_group:
group = db.security_group_get_by_name(context,
context.project_id,
security_group_name)
security_groups.append(group['id'])
if key_data is None and key_name:
key_pair = db.key_pair_get(context, context.user_id, key_name)
key_data = key_pair['public_key']
type_data = instance_types.INSTANCE_TYPES[instance_type]
base_options = {
'reservation_id': utils.generate_uid('r'),
'server_name': name,
'image_id': image_id,
'kernel_id': kernel_id,
'ramdisk_id': ramdisk_id,
'state_description': 'scheduling',
'user_id': context.user_id,
'project_id': context.project_id,
'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
'instance_type': instance_type,
'memory_mb': type_data['memory_mb'],
'vcpus': type_data['vcpus'],
'local_gb': type_data['local_gb'],
'display_name': name,
'display_description': description,
'key_name': key_name,
'key_data': key_data}
elevated = context.elevated()
instances = []
logging.debug("Going to run %s instances...", num_instances)
for num in range(num_instances):
instance = dict(mac_address=utils.generate_mac(),
launch_index=num,
**base_options)
instance_ref = self.create_instance(context, security_groups,
**instance)
instance_id = instance_ref['id']
internal_id = instance_ref['internal_id']
hostname = generate_hostname(internal_id)
self.update_instance(context, instance_id, hostname=hostname)
instances.append(dict(id=instance_id, internal_id=internal_id,
hostname=hostname, **instance))
# TODO(vish): This probably should be done in the scheduler
# or in compute as a call. The network should be
# allocated after the host is assigned and setup
# can happen at the same time.
address = self.network_manager.allocate_fixed_ip(context,
instance_id,
is_vpn)
rpc.cast(elevated,
network_topic,
{"method": "setup_fixed_ip",
"args": {"address": address}})
logging.debug("Casting to scheduler for %s/%s's instance %s" %
(context.project_id, context.user_id, instance_id))
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "run_instance",
"args": {"topic": FLAGS.compute_topic,
"instance_id": instance_id}})
return instances
def ensure_default_security_group(self, context):
try:
db.security_group_get_by_name(context, context.project_id,
'default')
except exception.NotFound:
values = {'name': 'default',
'description': 'default',
'user_id': context.user_id,
'project_id': context.project_id}
group = db.security_group_create(context, values)
def create_instance(self, context, security_groups=None, **kwargs):
"""Creates the instance in the datastore and returns the
new instance as a mapping
:param context: The security context
:param security_groups: list of security group ids to
attach to the instance
:param kwargs: All additional keyword args are treated
as data fields of the instance to be
created
:retval Returns a mapping of the instance information
that has just been created
"""
instance_ref = self.db.instance_create(context, kwargs)
inst_id = instance_ref['id']
elevated = context.elevated()
if not security_groups:
security_groups = []
for security_group_id in security_groups:
self.db.instance_add_security_group(elevated,
inst_id,
security_group_id)
return instance_ref
def update_instance(self, context, instance_id, **kwargs):
"""Updates the instance in the datastore.
:param context: The security context
:param instance_id: ID of the instance to update
:param kwargs: All additional keyword args are treated
as data fields of the instance to be
updated
:retval None
"""
self.db.instance_update(context, instance_id, kwargs)

View File

@ -36,7 +36,6 @@ termination.
import datetime
import logging
import time
from twisted.internet import defer
@ -44,13 +43,9 @@ from nova import db
from nova import exception
from nova import flags
from nova import manager
from nova import quota
from nova import rpc
from nova import utils
from nova.compute import instance_types
from nova.compute import power_state
FLAGS = flags.FLAGS
flags.DEFINE_string('instances_path', utils.abspath('../instances'),
'where instances are stored on disk')
@ -58,11 +53,6 @@ flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection',
'Driver to use for volume creation')
def generate_default_hostname(internal_id):
"""Default function to generate a hostname given an instance reference."""
return str(internal_id)
class ComputeManager(manager.Manager):
"""Manages the running instances from creation to destruction."""
@ -94,165 +84,6 @@ class ComputeManager(manager.Manager):
"""This call passes stright through to the virtualization driver."""
yield self.driver.refresh_security_group(security_group_id)
# TODO(eday): network_topic arg should go away once we push network
# allocation into the scheduler or compute worker.
def create_instances(self, context, instance_type, image_service, image_id,
network_topic, min_count=1, max_count=1,
kernel_id=None, ramdisk_id=None, name='',
description='', user_data='', key_name=None,
key_data=None, security_group='default',
generate_hostname=generate_default_hostname):
"""Create the number of instances requested if quote and
other arguments check out ok."""
num_instances = quota.allowed_instances(context, max_count,
instance_type)
if num_instances < min_count:
logging.warn("Quota exceeeded for %s, tried to run %s instances",
context.project_id, min_count)
raise quota.QuotaError("Instance quota exceeded. You can only "
"run %s more instances of this type." %
num_instances, "InstanceLimitExceeded")
is_vpn = image_id == FLAGS.vpn_image_id
if not is_vpn:
image = image_service.show(context, image_id)
if kernel_id is None:
kernel_id = image.get('kernelId', FLAGS.default_kernel)
if ramdisk_id is None:
ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
# Make sure we have access to kernel and ramdisk
image_service.show(context, kernel_id)
image_service.show(context, ramdisk_id)
if security_group is None:
security_group = ['default']
if not type(security_group) is list:
security_group = [security_group]
security_groups = []
self.ensure_default_security_group(context)
for security_group_name in security_group:
group = db.security_group_get_by_name(context,
context.project_id,
security_group_name)
security_groups.append(group['id'])
if key_data is None and key_name:
key_pair = db.key_pair_get(context, context.user_id, key_name)
key_data = key_pair['public_key']
type_data = instance_types.INSTANCE_TYPES[instance_type]
base_options = {
'reservation_id': utils.generate_uid('r'),
'server_name': name,
'image_id': image_id,
'kernel_id': kernel_id,
'ramdisk_id': ramdisk_id,
'state_description': 'scheduling',
'user_id': context.user_id,
'project_id': context.project_id,
'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
'instance_type': instance_type,
'memory_mb': type_data['memory_mb'],
'vcpus': type_data['vcpus'],
'local_gb': type_data['local_gb'],
'display_name': name,
'display_description': description,
'key_name': key_name,
'key_data': key_data}
elevated = context.elevated()
instances = []
logging.debug("Going to run %s instances...", num_instances)
for num in range(num_instances):
instance = dict(mac_address=utils.generate_mac(),
launch_index=num,
**base_options)
instance_ref = self.create_instance(context, security_groups,
**instance)
instance_id = instance_ref['id']
internal_id = instance_ref['internal_id']
hostname = generate_hostname(internal_id)
self.update_instance(context, instance_id, hostname=hostname)
instances.append(dict(id=instance_id, internal_id=internal_id,
hostname=hostname, **instance))
# TODO(vish): This probably should be done in the scheduler
# or in compute as a call. The network should be
# allocated after the host is assigned and setup
# can happen at the same time.
address = self.network_manager.allocate_fixed_ip(context,
instance_id,
is_vpn)
rpc.cast(elevated,
network_topic,
{"method": "setup_fixed_ip",
"args": {"address": address}})
logging.debug("Casting to scheduler for %s/%s's instance %s" %
(context.project_id, context.user_id, instance_id))
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "run_instance",
"args": {"topic": FLAGS.compute_topic,
"instance_id": instance_id}})
return instances
def ensure_default_security_group(self, context):
try:
db.security_group_get_by_name(context, context.project_id,
'default')
except exception.NotFound:
values = {'name': 'default',
'description': 'default',
'user_id': context.user_id,
'project_id': context.project_id}
group = db.security_group_create(context, values)
def create_instance(self, context, security_groups=None, **kwargs):
"""Creates the instance in the datastore and returns the
new instance as a mapping
:param context: The security context
:param security_groups: list of security group ids to
attach to the instance
:param kwargs: All additional keyword args are treated
as data fields of the instance to be
created
:retval Returns a mapping of the instance information
that has just been created
"""
instance_ref = self.db.instance_create(context, kwargs)
inst_id = instance_ref['id']
elevated = context.elevated()
if not security_groups:
security_groups = []
for security_group_id in security_groups:
self.db.instance_add_security_group(elevated,
inst_id,
security_group_id)
return instance_ref
def update_instance(self, context, instance_id, **kwargs):
"""Updates the instance in the datastore.
:param context: The security context
:param instance_id: ID of the instance to update
:param kwargs: All additional keyword args are treated
as data fields of the instance to be
updated
:retval None
"""
self.db.instance_update(context, instance_id, kwargs)
@defer.inlineCallbacks
@exception.wrap_exception
def run_instance(self, context, instance_id, **_kwargs):

36
nova/db/base.py Normal file
View File

@ -0,0 +1,36 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base class for classes that need modular database access.
"""
from nova import utils
from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_string('db_driver', 'nova.db.api',
'driver to use for database access')
class Base(object):
"""DB driver is injected in the init method"""
def __init__(self, db_driver=None):
if not db_driver:
db_driver = FLAGS.db_driver
self.db = utils.import_object(db_driver) # pylint: disable-msg=C0103

View File

@ -53,23 +53,19 @@ This module provides Manager, a base class for managers.
from nova import utils
from nova import flags
from nova.db import base
from twisted.internet import defer
FLAGS = flags.FLAGS
flags.DEFINE_string('db_driver', 'nova.db.api',
'driver to use for volume creation')
class Manager(object):
"""DB driver is injected in the init method"""
class Manager(base.Base):
def __init__(self, host=None, db_driver=None):
if not host:
host = FLAGS.host
self.host = host
if not db_driver:
db_driver = FLAGS.db_driver
self.db = utils.import_object(db_driver) # pylint: disable-msg=C0103
super(Manager, self).__init__(db_driver)
@defer.inlineCallbacks
def periodic_tasks(self, context=None):

View File

@ -31,6 +31,7 @@ from nova import flags
from nova import test
from nova import utils
from nova.auth import manager
from nova.compute import api as compute_api
FLAGS = flags.FLAGS
@ -43,6 +44,7 @@ class ComputeTestCase(test.TrialTestCase):
self.flags(connection_type='fake',
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake')
self.project = self.manager.create_project('fake', 'fake', 'fake')
@ -76,9 +78,9 @@ class ComputeTestCase(test.TrialTestCase):
'user_id': self.user.id,
'project_id': self.project.id}
group = db.security_group_create(self.context, values)
ref = self.compute.create_instance(self.context,
security_groups=[group['id']],
**inst)
ref = self.compute_api.create_instance(self.context,
security_groups=[group['id']],
**inst)
# reload to get groups
instance_ref = db.instance_get(self.context, ref['id'])
try: