Consolidated the start instance logic in the two API classes into a single method. This also cleans up a number of small discrepencies between the two.

This commit is contained in:
Eric Day
2010-11-24 14:52:10 -08:00
parent 07ee9639a1
commit 1188dd95fb
8 changed files with 211 additions and 238 deletions

View File

@@ -39,7 +39,7 @@ from nova import flags
from nova import quota
from nova import rpc
from nova import utils
from nova.compute.instance_types import INSTANCE_TYPES
from nova.compute import instance_types
from nova.api import cloud
from nova.image.s3 import S3ImageService
@@ -50,11 +50,6 @@ flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
InvalidInputException = exception.InvalidInputException
class QuotaError(exception.ApiError):
"""Quota Exceeeded"""
pass
def _gen_key(context, user_id, key_name):
"""Generate a key
@@ -127,7 +122,7 @@ class CloudController(object):
for instance in db.instance_get_all_by_project(context, project_id):
if instance['fixed_ip']:
line = '%s slots=%d' % (instance['fixed_ip']['address'],
INSTANCE_TYPES[instance['instance_type']]['vcpus'])
instance['vcpus'])
key = str(instance['key_name'])
if key in result:
result[key].append(line)
@@ -260,7 +255,7 @@ class CloudController(object):
return True
def describe_security_groups(self, context, group_name=None, **kwargs):
self._ensure_default_security_group(context)
self.compute_manager.ensure_default_security_group(context)
if context.user.is_admin():
groups = db.security_group_get_all(context)
else:
@@ -358,7 +353,7 @@ class CloudController(object):
return False
def revoke_security_group_ingress(self, context, group_name, **kwargs):
self._ensure_default_security_group(context)
self.compute_manager.ensure_default_security_group(context)
security_group = db.security_group_get_by_name(context,
context.project_id,
group_name)
@@ -383,7 +378,7 @@ class CloudController(object):
# for these operations, so support for newer API versions
# is sketchy.
def authorize_security_group_ingress(self, context, group_name, **kwargs):
self._ensure_default_security_group(context)
self.compute_manager.ensure_default_security_group(context)
security_group = db.security_group_get_by_name(context,
context.project_id,
group_name)
@@ -419,7 +414,7 @@ class CloudController(object):
return source_project_id
def create_security_group(self, context, group_name, group_description):
self._ensure_default_security_group(context)
self.compute_manager.ensure_default_security_group(context)
if db.security_group_exists(context, context.project_id, group_name):
raise exception.ApiError('group %s already exists' % group_name)
@@ -505,9 +500,8 @@ class CloudController(object):
if quota.allowed_volumes(context, 1, size) < 1:
logging.warn("Quota exceeeded for %s, tried to create %sG volume",
context.project_id, size)
raise QuotaError("Volume quota exceeded. You cannot "
"create a volume of size %s" %
size)
raise quota.QuotaError("Volume quota exceeded. You cannot "
"create a volume of size %s" % size)
vol = {}
vol['size'] = size
vol['user_id'] = context.user.id
@@ -699,8 +693,8 @@ class CloudController(object):
if quota.allowed_floating_ips(context, 1) < 1:
logging.warn("Quota exceeeded for %s, tried to allocate address",
context.project_id)
raise QuotaError("Address quota exceeded. You cannot "
"allocate any more addresses")
raise quota.QuotaError("Address quota exceeded. You cannot "
"allocate any more addresses")
network_topic = self._get_network_topic(context)
public_ip = rpc.call(context,
network_topic,
@@ -752,137 +746,25 @@ class CloudController(object):
"args": {"network_id": network_ref['id']}})
return db.queue_get_for(context, FLAGS.network_topic, host)
def _ensure_default_security_group(self, context):
try:
db.security_group_get_by_name(context,
context.project_id,
'default')
except exception.NotFound:
values = {'name': 'default',
'description': 'default',
'user_id': context.user.id,
'project_id': context.project_id}
group = db.security_group_create(context, values)
def run_instances(self, context, **kwargs):
instance_type = kwargs.get('instance_type', 'm1.small')
if instance_type not in INSTANCE_TYPES:
raise exception.ApiError("Unknown instance type: %s",
instance_type)
# check quota
max_instances = int(kwargs.get('max_count', 1))
min_instances = int(kwargs.get('min_count', max_instances))
num_instances = quota.allowed_instances(context,
max_instances,
instance_type)
if num_instances < min_instances:
logging.warn("Quota exceeeded for %s, tried to run %s instances",
context.project_id, min_instances)
raise QuotaError("Instance quota exceeded. You can only "
"run %s more instances of this type." %
num_instances, "InstanceLimitExceeded")
# make sure user can access the image
# vpn image is private so it doesn't show up on lists
vpn = kwargs['image_id'] == FLAGS.vpn_image_id
if not vpn:
image = self.image_service.show(context, kwargs['image_id'])
# FIXME(ja): if image is vpn, this breaks
# get defaults from imagestore
image_id = image['imageId']
kernel_id = image.get('kernelId', FLAGS.default_kernel)
ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
# API parameters overrides of defaults
kernel_id = kwargs.get('kernel_id', kernel_id)
ramdisk_id = kwargs.get('ramdisk_id', ramdisk_id)
# make sure we have access to kernel and ramdisk
self.image_service.show(context, kernel_id)
self.image_service.show(context, ramdisk_id)
logging.debug("Going to run %s instances...", num_instances)
launch_time = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
key_data = None
if 'key_name' in kwargs:
key_pair_ref = db.key_pair_get(context,
context.user.id,
kwargs['key_name'])
key_data = key_pair_ref['public_key']
security_group_arg = kwargs.get('security_group', ["default"])
if not type(security_group_arg) is list:
security_group_arg = [security_group_arg]
security_groups = []
self._ensure_default_security_group(context)
for security_group_name in security_group_arg:
group = db.security_group_get_by_name(context,
context.project_id,
security_group_name)
security_groups.append(group['id'])
reservation_id = utils.generate_uid('r')
base_options = {}
base_options['state_description'] = 'scheduling'
base_options['image_id'] = image_id
base_options['kernel_id'] = kernel_id
base_options['ramdisk_id'] = ramdisk_id
base_options['reservation_id'] = reservation_id
base_options['key_data'] = key_data
base_options['key_name'] = kwargs.get('key_name', None)
base_options['user_id'] = context.user.id
base_options['project_id'] = context.project_id
base_options['user_data'] = kwargs.get('user_data', '')
base_options['display_name'] = kwargs.get('display_name')
base_options['display_description'] = kwargs.get('display_description')
type_data = INSTANCE_TYPES[instance_type]
base_options['instance_type'] = instance_type
base_options['memory_mb'] = type_data['memory_mb']
base_options['vcpus'] = type_data['vcpus']
base_options['local_gb'] = type_data['local_gb']
elevated = context.elevated()
for num in range(num_instances):
instance_ref = self.compute_manager.create_instance(context,
security_groups,
mac_address=utils.generate_mac(),
launch_index=num,
**base_options)
inst_id = instance_ref['id']
internal_id = instance_ref['internal_id']
ec2_id = internal_id_to_ec2_id(internal_id)
self.compute_manager.update_instance(context,
inst_id,
hostname=ec2_id)
# TODO(vish): This probably should be done in the scheduler
# or in compute as a call. The network should be
# allocated after the host is assigned and setup
# can happen at the same time.
address = self.network_manager.allocate_fixed_ip(context,
inst_id,
vpn)
network_topic = self._get_network_topic(context)
rpc.cast(elevated,
network_topic,
{"method": "setup_fixed_ip",
"args": {"address": address}})
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "run_instance",
"args": {"topic": FLAGS.compute_topic,
"instance_id": inst_id}})
logging.debug("Casting to scheduler for %s/%s's instance %s" %
(context.project.name, context.user.name, inst_id))
return self._format_run_instances(context, reservation_id)
max_count = int(kwargs.get('max_count', 1))
instances = self.compute_manager.create_instances(context,
instance_types.get_by_type(kwargs.get('instance_type', None)),
self.image_service,
kwargs['image_id'],
self._get_network_topic(context),
min_count=int(kwargs.get('min_count', max_count)),
max_count=max_count,
kernel_id=kwargs.get('kernel_id'),
ramdisk_id=kwargs.get('ramdisk_id'),
name=kwargs.get('display_name'),
description=kwargs.get('display_description'),
user_data=kwargs.get('user_data', ''),
key_name=kwargs.get('key_name'),
security_group=kwargs.get('security_group'),
generate_hostname=internal_id_to_ec2_id)
return self._format_run_instances(context,
instances[0]['reservation_id'])
def terminate_instances(self, context, instance_id, **kwargs):
"""Terminate each instance in instance_id, which is a list of ec2 ids.

View File

@@ -140,22 +140,23 @@ class Controller(wsgi.Controller):
def create(self, req):
""" Creates a new server for a given user """
env = self._deserialize(req.body, req)
if not env:
return faults.Fault(exc.HTTPUnprocessableEntity())
#try:
inst = self._build_server_instance(req, env)
#except Exception, e:
# return faults.Fault(exc.HTTPUnprocessableEntity())
user_id = req.environ['nova.context']['user']['id']
rpc.cast(context.RequestContext(user_id, user_id),
FLAGS.compute_topic,
{"method": "run_instance",
"args": {"instance_id": inst['id']}})
return _entity_inst(inst)
ctxt = context.RequestContext(user_id, user_id)
key_pair = self.db_driver.key_pair_get_all_by_user(None, user_id)[0]
instances = self.compute_manager.create_instances(ctxt,
instance_types.get_by_flavor_id(env['server']['flavorId']),
utils.import_object(FLAGS.image_service),
env['server']['imageId'],
self._get_network_topic(ctxt),
name=env['server']['name'],
description=env['server']['name'],
key_name=key_pair['name'],
key_data=key_pair['public_key'])
return _entity_inst(instances[0])
def update(self, req, id):
""" Updates the server name or password """
@@ -191,79 +192,6 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
cloud.reboot(id)
def _build_server_instance(self, req, env):
"""Build instance data structure and save it to the data store."""
ltime = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
inst = {}
user_id = req.environ['nova.context']['user']['id']
ctxt = context.RequestContext(user_id, user_id)
flavor_id = env['server']['flavorId']
instance_type, flavor = [(k, v) for k, v in
instance_types.INSTANCE_TYPES.iteritems()
if v['flavorid'] == flavor_id][0]
image_id = env['server']['imageId']
img_service = utils.import_object(FLAGS.image_service)
image = img_service.show(image_id)
if not image:
raise Exception("Image not found")
inst['server_name'] = env['server']['name']
inst['image_id'] = image_id
inst['user_id'] = user_id
inst['launch_time'] = ltime
inst['mac_address'] = utils.generate_mac()
inst['project_id'] = user_id
inst['state_description'] = 'scheduling'
inst['kernel_id'] = image.get('kernelId', FLAGS.default_kernel)
inst['ramdisk_id'] = image.get('ramdiskId', FLAGS.default_ramdisk)
inst['reservation_id'] = utils.generate_uid('r')
inst['display_name'] = env['server']['name']
inst['display_description'] = env['server']['name']
#TODO(dietz) this may be ill advised
key_pair_ref = self.db_driver.key_pair_get_all_by_user(
None, user_id)[0]
inst['key_data'] = key_pair_ref['public_key']
inst['key_name'] = key_pair_ref['name']
#TODO(dietz) stolen from ec2 api, see TODO there
inst['security_group'] = 'default'
# Flavor related attributes
inst['instance_type'] = instance_type
inst['memory_mb'] = flavor['memory_mb']
inst['vcpus'] = flavor['vcpus']
inst['local_gb'] = flavor['local_gb']
inst['mac_address'] = utils.generate_mac()
inst['launch_index'] = 0
ref = self.compute_manager.create_instance(ctxt, **inst)
inst['id'] = ref['internal_id']
inst['hostname'] = str(ref['internal_id'])
self.compute_manager.update_instance(ctxt, inst['id'], **inst)
address = self.network_manager.allocate_fixed_ip(ctxt,
inst['id'])
# TODO(vish): This probably should be done in the scheduler
# network is setup when host is assigned
network_topic = self._get_network_topic(ctxt)
rpc.call(ctxt,
network_topic,
{"method": "setup_fixed_ip",
"args": {"address": address}})
return inst
def _get_network_topic(self, context):
"""Retrieves the network host for a project"""
network_ref = self.network_manager.get_network(context)

View File

@@ -21,9 +21,29 @@
The built-in instance properties.
"""
from nova import flags
FLAGS = flags.FLAGS
INSTANCE_TYPES = {
'm1.tiny': dict(memory_mb=512, vcpus=1, local_gb=0, flavorid=1),
'm1.small': dict(memory_mb=2048, vcpus=1, local_gb=20, flavorid=2),
'm1.medium': dict(memory_mb=4096, vcpus=2, local_gb=40, flavorid=3),
'm1.large': dict(memory_mb=8192, vcpus=4, local_gb=80, flavorid=4),
'm1.xlarge': dict(memory_mb=16384, vcpus=8, local_gb=160, flavorid=5)}
def get_by_type(instance_type):
"""Build instance data structure and save it to the data store."""
if instance_type is None:
return FLAGS.default_instance_type
if instance_type not in INSTANCE_TYPES:
raise exception.ApiError("Unknown instance type: %s",
instance_type)
return instance_type
def get_by_flavor_id(flavor_id):
for instance_type, details in INSTANCE_TYPES.iteritems():
if details['flavorid'] == flavor_id:
return instance_type
return FLAGS.default_instance_type

View File

@@ -36,13 +36,18 @@ termination.
import datetime
import logging
import time
from twisted.internet import defer
from nova import db
from nova import exception
from nova import flags
from nova import manager
from nova import quota
from nova import rpc
from nova import utils
from nova.compute import instance_types
from nova.compute import power_state
@@ -53,6 +58,11 @@ flags.DEFINE_string('compute_driver', 'nova.virt.connection.get_connection',
'Driver to use for volume creation')
def generate_default_hostname(internal_id):
"""Default function to generate a hostname given an instance reference."""
return str(internal_id)
class ComputeManager(manager.Manager):
"""Manages the running instances from creation to destruction."""
@@ -84,6 +94,126 @@ class ComputeManager(manager.Manager):
"""This call passes stright through to the virtualization driver."""
yield self.driver.refresh_security_group(security_group_id)
# TODO(eday): network_topic arg should go away once we push network
# allocation into the scheduler or compute worker.
def create_instances(self, context, instance_type, image_service, image_id,
network_topic, min_count=1, max_count=1,
kernel_id=None, ramdisk_id=None, name='',
description='', user_data='', key_name=None,
key_data=None, security_group='default',
generate_hostname=generate_default_hostname):
"""Create the number of instances requested if quote and
other arguments check out ok."""
num_instances = quota.allowed_instances(context, max_count,
instance_type)
if num_instances < min_count:
logging.warn("Quota exceeeded for %s, tried to run %s instances",
context.project_id, min_count)
raise quota.QuotaError("Instance quota exceeded. You can only "
"run %s more instances of this type." %
num_instances, "InstanceLimitExceeded")
is_vpn = image_id == FLAGS.vpn_image_id
if not is_vpn:
image = image_service.show(context, image_id)
if not image:
raise Exception("Image not found")
if kernel_id is None:
kernel_id = image.get('kernelId', FLAGS.default_kernel)
if ramdisk_id is None:
ramdisk_id = image.get('ramdiskId', FLAGS.default_ramdisk)
# Make sure we have access to kernel and ramdisk
image_service.show(context, kernel_id)
image_service.show(context, ramdisk_id)
if security_group is None:
security_group = ['default']
if not type(security_group) is list:
security_group = [security_group]
security_groups = []
self.ensure_default_security_group(context)
for security_group_name in security_group:
group = db.security_group_get_by_name(context,
context.project_id,
security_group_name)
security_groups.append(group['id'])
if key_data is None and key_name:
key_pair = db.key_pair_get(context, context.user_id, key_name)
key_data = key_pair['public_key']
type_data = instance_types.INSTANCE_TYPES[instance_type]
base_options = {
'reservation_id': utils.generate_uid('r'),
'server_name': name,
'image_id': image_id,
'kernel_id': kernel_id,
'ramdisk_id': ramdisk_id,
'state_description': 'scheduling',
'user_id': context.user_id,
'project_id': context.project_id,
'launch_time': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
'instance_type': instance_type,
'memory_mb': type_data['memory_mb'],
'vcpus': type_data['vcpus'],
'local_gb': type_data['local_gb'],
'display_name': name,
'display_description': description,
'key_name': key_name,
'key_data': key_data}
elevated = context.elevated()
instances = []
logging.debug("Going to run %s instances...", num_instances)
for num in range(num_instances):
instance = dict(mac_address=utils.generate_mac(),
launch_index=num,
**base_options)
instance_ref = self.create_instance(context, security_groups,
**instance)
instance_id = instance_ref['id']
internal_id = instance_ref['internal_id']
hostname = generate_hostname(internal_id)
self.update_instance(context, instance_id, hostname=hostname)
instances.append(dict(id=instance_id, internal_id=internal_id,
hostname=hostname, **instance))
# TODO(vish): This probably should be done in the scheduler
# or in compute as a call. The network should be
# allocated after the host is assigned and setup
# can happen at the same time.
address = self.network_manager.allocate_fixed_ip(context,
instance_id,
is_vpn)
rpc.cast(elevated,
network_topic,
{"method": "setup_fixed_ip",
"args": {"address": address}})
logging.debug("Casting to scheduler for %s/%s's instance %s" %
(context.project_id, context.user_id, instance_id))
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "run_instance",
"args": {"topic": FLAGS.compute_topic,
"instance_id": instance_id}})
return instances
def ensure_default_security_group(self, context):
try:
db.security_group_get_by_name(context, context.project_id,
'default')
except exception.NotFound:
values = {'name': 'default',
'description': 'default',
'user_id': context.user_id,
'project_id': context.project_id}
group = db.security_group_create(context, values)
def create_instance(self, context, security_groups=None, **kwargs):
"""Creates the instance in the datastore and returns the
new instance as a mapping

View File

@@ -94,3 +94,8 @@ def allowed_floating_ips(context, num_floating_ips):
quota = get_quota(context, project_id)
allowed_floating_ips = quota['floating_ips'] - used_floating_ips
return min(num_floating_ips, allowed_floating_ips)
class QuotaError(exception.ApiError):
"""Quota Exceeeded"""
pass

View File

@@ -73,7 +73,7 @@ def stub_out_key_pair_funcs(stubs):
def stub_out_image_service(stubs):
def fake_image_show(meh, id):
def fake_image_show(meh, context, id):
return dict(kernelId=1, ramdiskId=1)
stubs.Set(nova.image.local.LocalImageService, 'show', fake_image_show)

View File

@@ -43,6 +43,10 @@ def return_servers(context, user_id=1):
return [stub_instance(i, user_id) for i in xrange(5)]
def return_security_group(context, instance_id, security_group_id):
pass
def stub_instance(id, user_id=1):
return Instance(id=id, state=0, image_id=10, server_name='server%s' % id,
user_id=user_id)
@@ -63,6 +67,8 @@ class ServersTest(unittest.TestCase):
return_server)
self.stubs.Set(nova.db.api, 'instance_get_all_by_user',
return_servers)
self.stubs.Set(nova.db.api, 'instance_add_security_group',
return_security_group)
def tearDown(self):
self.stubs.UnsetAll()

View File

@@ -94,11 +94,12 @@ class QuotaTestCase(test.TrialTestCase):
for i in range(FLAGS.quota_instances):
instance_id = self._create_instance()
instance_ids.append(instance_id)
self.assertRaises(cloud.QuotaError, self.cloud.run_instances,
self.assertRaises(quota.QuotaError, self.cloud.run_instances,
self.context,
min_count=1,
max_count=1,
instance_type='m1.small')
instance_type='m1.small',
image_id='fake')
for instance_id in instance_ids:
db.instance_destroy(self.context, instance_id)
@@ -106,11 +107,12 @@ class QuotaTestCase(test.TrialTestCase):
instance_ids = []
instance_id = self._create_instance(cores=4)
instance_ids.append(instance_id)
self.assertRaises(cloud.QuotaError, self.cloud.run_instances,
self.assertRaises(quota.QuotaError, self.cloud.run_instances,
self.context,
min_count=1,
max_count=1,
instance_type='m1.small')
instance_type='m1.small',
image_id='fake')
for instance_id in instance_ids:
db.instance_destroy(self.context, instance_id)
@@ -119,7 +121,7 @@ class QuotaTestCase(test.TrialTestCase):
for i in range(FLAGS.quota_volumes):
volume_id = self._create_volume()
volume_ids.append(volume_id)
self.assertRaises(cloud.QuotaError, self.cloud.create_volume,
self.assertRaises(quota.QuotaError, self.cloud.create_volume,
self.context,
size=10)
for volume_id in volume_ids:
@@ -129,7 +131,7 @@ class QuotaTestCase(test.TrialTestCase):
volume_ids = []
volume_id = self._create_volume(size=20)
volume_ids.append(volume_id)
self.assertRaises(cloud.QuotaError,
self.assertRaises(quota.QuotaError,
self.cloud.create_volume,
self.context,
size=10)
@@ -146,6 +148,6 @@ class QuotaTestCase(test.TrialTestCase):
# make an rpc.call, the test just finishes with OK. It
# appears to be something in the magic inline callbacks
# that is breaking.
self.assertRaises(cloud.QuotaError, self.cloud.allocate_address,
self.assertRaises(quota.QuotaError, self.cloud.allocate_address,
self.context)
db.floating_ip_destroy(context.get_admin_context(), address)