Merge "remove unsupported ec2 extensions"

This commit is contained in:
Jenkins
2012-02-01 19:08:03 +00:00
committed by Gerrit Code Review
10 changed files with 10 additions and 1226 deletions

View File

@@ -89,11 +89,11 @@ from nova import rpc
from nova import utils
from nova import version
from nova import vsa
from nova.api.ec2 import admin
from nova.api.ec2 import ec2utils
from nova.auth import manager
from nova.cloudpipe import pipelib
from nova.compute import instance_types
from nova.compute import vm_states
from nova.db import migration
from nova.volume import volume_types
@@ -214,7 +214,11 @@ class VpnCommands(object):
'vpn_public_port': int(port)})
def _vpn_for(self, context, project_id):
return admin.AdminController()._vpn_for(context, project_id)
"""Get the VPN instance for a project ID."""
for instance in db.instance_get_all_by_project(context, project_id):
if (instance['image_id'] == str(FLAGS.vpn_image_id)
and not instance['vm_state'] in [vm_states.DELETED]):
return instance
class ShellCommands(object):

View File

@@ -129,15 +129,6 @@ The :mod:`nova.api.ec2` Module
:undoc-members:
:show-inheritance:
The :mod:`admin` Module
~~~~~~~~~~~~~~~~~~~~~~~
.. automodule:: nova.api.ec2.admin
:noindex:
:members:
:undoc-members:
:show-inheritance:
The :mod:`apirequest` Module
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@@ -33,7 +33,6 @@ paste.app_factory = nova.api.metadata.handler:MetadataRequestHandler.factory
[composite:ec2]
use = egg:Paste#urlmap
/services/Cloud: ec2cloud
/services/Admin: ec2admin
[pipeline:ec2cloud]
pipeline = ec2faultwrap logrequest ec2noauth cloudrequest authorizer validator ec2executor
@@ -42,13 +41,6 @@ pipeline = ec2faultwrap logrequest ec2noauth cloudrequest authorizer validator e
# NOTE(vish): use the following pipeline for keystone auth
# pipeline = ec2faultwrap logrequest totoken authtoken keystonecontext cloudrequest authorizer validator ec2executor
[pipeline:ec2admin]
pipeline = ec2faultwrap logrequest ec2noauth adminrequest authorizer ec2executor
# NOTE(vish): use the following pipeline for deprecated auth
# pipeline = ec2faultwrap logrequest authenticate adminrequest authorizer ec2executor
# NOTE(vish): use the following pipeline for keystone auth
# pipeline = ec2faultwrap logrequest totoken authtoken keystonecontext adminrequest authorizer ec2executor
[filter:ec2faultwrap]
paste.filter_factory = nova.api.ec2:FaultWrapper.factory
@@ -71,10 +63,6 @@ paste.filter_factory = nova.api.ec2:Authenticate.factory
controller = nova.api.ec2.cloud.CloudController
paste.filter_factory = nova.api.ec2:Requestify.factory
[filter:adminrequest]
controller = nova.api.ec2.admin.AdminController
paste.filter_factory = nova.api.ec2:Requestify.factory
[filter:authorizer]
paste.filter_factory = nova.api.ec2:Authorizer.factory

View File

@@ -1,418 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Admin API controller, exposed through http via the api worker.
"""
import base64
import urllib
import netaddr
from nova.api.ec2 import ec2utils
from nova.auth import manager
from nova import compute
from nova.compute import instance_types
from nova.compute import vm_states
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import utils
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.api.ec2.admin')
def user_dict(user, base64_file=None):
"""Convert the user object to a result dict"""
if user:
return {
'username': user.id,
'accesskey': user.access,
'secretkey': user.secret,
'file': base64_file}
else:
return {}
def project_dict(project):
"""Convert the project object to a result dict"""
if project:
return {
'projectname': project.id,
'project_manager_id': project.project_manager_id,
'description': project.description}
else:
return {}
def host_dict(host, compute_service, instances, volume_service, volumes, now):
"""Convert a host model object to a result dict"""
rv = {'hostname': host, 'instance_count': len(instances),
'volume_count': len(volumes)}
if compute_service:
latest = compute_service['updated_at'] or compute_service['created_at']
delta = now - latest
if delta.seconds <= FLAGS.service_down_time:
rv['compute'] = 'up'
else:
rv['compute'] = 'down'
if volume_service:
latest = volume_service['updated_at'] or volume_service['created_at']
delta = now - latest
if delta.seconds <= FLAGS.service_down_time:
rv['volume'] = 'up'
else:
rv['volume'] = 'down'
return rv
def instance_dict(inst):
return {'name': inst['name'],
'memory_mb': inst['memory_mb'],
'vcpus': inst['vcpus'],
'disk_gb': inst['root_gb'],
'flavor_id': inst['flavorid']}
def vpn_dict(project, vpn_instance):
rv = {'project_id': project.id,
'public_ip': project.vpn_ip,
'public_port': project.vpn_port}
if vpn_instance:
rv['instance_id'] = ec2utils.id_to_ec2_id(vpn_instance['id'])
rv['created_at'] = utils.isotime(vpn_instance['created_at'])
address = vpn_instance.get('fixed_ip', None)
if address:
rv['internal_ip'] = address['address']
if project.vpn_ip and project.vpn_port:
if utils.vpn_ping(project.vpn_ip, project.vpn_port):
rv['state'] = 'running'
else:
rv['state'] = 'down'
else:
rv['state'] = 'down - invalid project vpn config'
else:
rv['state'] = 'pending'
return rv
class AdminController(object):
"""
API Controller for users, hosts, nodes, and workers.
"""
def __str__(self):
return 'AdminController'
def __init__(self):
self.compute_api = compute.API()
def describe_instance_types(self, context, **_kwargs):
"""Returns all active instance types data (vcpus, memory, etc.)"""
inst_types = instance_types.get_all_types()
inst_type_dicts = [instance_dict(i) for i in inst_types.values()]
return {'instanceTypeSet': inst_type_dicts}
def describe_user(self, _context, name, **_kwargs):
"""Returns user data, including access and secret keys."""
return user_dict(manager.AuthManager().get_user(name))
def describe_users(self, _context, **_kwargs):
"""Returns all users - should be changed to deal with a list."""
return {'userSet':
[user_dict(u) for u in manager.AuthManager().get_users()]}
def register_user(self, context, name, **_kwargs):
"""Creates a new user, and returns generated credentials."""
LOG.audit(_("Creating new user: %s"), name, context=context)
return user_dict(manager.AuthManager().create_user(name))
def deregister_user(self, context, name, **_kwargs):
"""Deletes a single user (NOT undoable.)
Should throw an exception if the user has instances,
volumes, or buckets remaining.
"""
LOG.audit(_("Deleting user: %s"), name, context=context)
manager.AuthManager().delete_user(name)
return True
def describe_roles(self, context, project_roles=True, **kwargs):
"""Returns a list of allowed roles."""
roles = manager.AuthManager().get_roles(project_roles)
return {'roles': [{'role': r} for r in roles]}
def describe_user_roles(self, context, user, project=None, **kwargs):
"""Returns a list of roles for the given user.
Omitting project will return any global roles that the user has.
Specifying project will return only project specific roles.
"""
roles = manager.AuthManager().get_user_roles(user, project=project)
return {'roles': [{'role': r} for r in roles]}
def modify_user_role(self, context, user, role, project=None,
operation='add', **kwargs):
"""Add or remove a role for a user and project."""
if operation == 'add':
if project:
msg = _("Adding role %(role)s to user %(user)s"
" for project %(project)s") % locals()
LOG.audit(msg, context=context)
else:
msg = _("Adding sitewide role %(role)s to"
" user %(user)s") % locals()
LOG.audit(msg, context=context)
manager.AuthManager().add_role(user, role, project)
elif operation == 'remove':
if project:
msg = _("Removing role %(role)s from user %(user)s"
" for project %(project)s") % locals()
LOG.audit(msg, context=context)
else:
msg = _("Removing sitewide role %(role)s"
" from user %(user)s") % locals()
LOG.audit(msg, context=context)
manager.AuthManager().remove_role(user, role, project)
else:
raise exception.ApiError(_('operation must be add or remove'))
return True
def generate_x509_for_user(self, context, name, project=None, **kwargs):
"""Generates and returns an x509 certificate for a single user.
Is usually called from a client that will wrap this with
access and secret key info, and return a zip file.
"""
if project is None:
project = name
project = manager.AuthManager().get_project(project)
user = manager.AuthManager().get_user(name)
msg = _("Getting x509 for user: %(name)s"
" on project: %(project)s") % locals()
LOG.audit(msg, context=context)
return user_dict(user, base64.b64encode(project.get_credentials(user)))
def describe_project(self, context, name, **kwargs):
"""Returns project data, including member ids."""
return project_dict(manager.AuthManager().get_project(name))
def describe_projects(self, context, user=None, **kwargs):
"""Returns all projects - should be changed to deal with a list."""
return {'projectSet':
[project_dict(u) for u in
manager.AuthManager().get_projects(user=user)]}
def register_project(self, context, name, manager_user, description=None,
member_users=None, **kwargs):
"""Creates a new project"""
msg = _("Create project %(name)s managed by"
" %(manager_user)s") % locals()
LOG.audit(msg, context=context)
return project_dict(
manager.AuthManager().create_project(
name,
manager_user,
description=None,
member_users=None))
def modify_project(self, context, name, manager_user, description=None,
**kwargs):
"""Modifies a project"""
msg = _("Modify project: %(name)s managed by"
" %(manager_user)s") % locals()
LOG.audit(msg, context=context)
manager.AuthManager().modify_project(name,
manager_user=manager_user,
description=description)
return True
def deregister_project(self, context, name):
"""Permanently deletes a project."""
LOG.audit(_("Delete project: %s"), name, context=context)
manager.AuthManager().delete_project(name)
return True
def describe_project_members(self, context, name, **kwargs):
project = manager.AuthManager().get_project(name)
result = {
'members': [{'member': m} for m in project.member_ids]}
return result
def modify_project_member(self, context, user, project, operation,
**kwargs):
"""Add or remove a user from a project."""
if operation == 'add':
msg = _("Adding user %(user)s to project %(project)s") % locals()
LOG.audit(msg, context=context)
manager.AuthManager().add_to_project(user, project)
elif operation == 'remove':
msg = _("Removing user %(user)s from"
" project %(project)s") % locals()
LOG.audit(msg, context=context)
manager.AuthManager().remove_from_project(user, project)
else:
raise exception.ApiError(_('operation must be add or remove'))
return True
def _vpn_for(self, context, project_id):
"""Get the VPN instance for a project ID."""
for instance in db.instance_get_all_by_project(context, project_id):
if (instance['image_id'] == str(FLAGS.vpn_image_id)
and not instance['vm_state'] in [vm_states.DELETED]):
return instance
def start_vpn(self, context, project):
instance = self._vpn_for(context, project)
if not instance:
# NOTE(vish) import delayed because of __init__.py
from nova.cloudpipe import pipelib
pipe = pipelib.CloudPipe()
proj = manager.AuthManager().get_project(project)
user_id = proj.project_manager_id
try:
pipe.launch_vpn_instance(project, user_id)
except db.NoMoreNetworks:
raise exception.ApiError("Unable to claim IP for VPN instance"
", ensure it isn't running, and try "
"again in a few minutes")
instance = self._vpn_for(context, project)
return {'instance_id': ec2utils.id_to_ec2_id(instance['id'])}
def describe_vpns(self, context):
vpns = []
for project in manager.AuthManager().get_projects():
instance = self._vpn_for(context, project.id)
vpns.append(vpn_dict(project, instance))
return {'items': vpns}
# FIXME(vish): these host commands don't work yet, perhaps some of the
# required data can be retrieved from service objects?
def describe_hosts(self, context, **_kwargs):
"""Returns status info for all nodes. Includes:
* Hostname
* Compute (up, down, None)
* Instance count
* Volume (up, down, None)
* Volume Count
"""
services = db.service_get_all(context, False)
now = utils.utcnow()
hosts = []
rv = []
for host in [service['host'] for service in services]:
if not host in hosts:
hosts.append(host)
for host in hosts:
compute = [s for s in services if s['host'] == host \
and s['binary'] == 'nova-compute']
if compute:
compute = compute[0]
instances = db.instance_get_all_by_host(context, host)
volume = [s for s in services if s['host'] == host \
and s['binary'] == 'nova-volume']
if volume:
volume = volume[0]
volumes = db.volume_get_all_by_host(context, host)
rv.append(host_dict(host, compute, instances, volume, volumes,
now))
return {'hosts': rv}
def _provider_fw_rule_exists(self, context, rule):
# TODO(todd): we call this repeatedly, can we filter by protocol?
for old_rule in db.provider_fw_rule_get_all(context):
if all([rule[k] == old_rule[k] for k in ('cidr', 'from_port',
'to_port', 'protocol')]):
return True
return False
def block_external_addresses(self, context, cidr):
"""Add provider-level firewall rules to block incoming traffic."""
LOG.audit(_('Blocking traffic to all projects incoming from %s'),
cidr, context=context)
cidr = urllib.unquote(cidr).decode()
failed = {'status': 'Failed', 'message': ' 0 rules added'}
if not utils.is_valid_cidr(cidr):
msg = 'Improper input. Please provide a valid cidr: ' \
'e.g. 121.12.10.11/24.'
failed['message'] = msg + failed['message']
return failed
#Normalizing cidr. e.g. '20.20.20.11/24' -> '20.20.20.0/24', so that
#db values stay in sync with filters' values (e.g. in iptables)
cidr = str(netaddr.IPNetwork(cidr).cidr)
rule = {'cidr': cidr}
tcp_rule = rule.copy()
tcp_rule.update({'protocol': 'tcp', 'from_port': 1, 'to_port': 65535})
udp_rule = rule.copy()
udp_rule.update({'protocol': 'udp', 'from_port': 1, 'to_port': 65535})
icmp_rule = rule.copy()
icmp_rule.update({'protocol': 'icmp', 'from_port': -1,
'to_port': None})
rules_added = 0
if not self._provider_fw_rule_exists(context, tcp_rule):
db.provider_fw_rule_create(context, tcp_rule)
rules_added += 1
if not self._provider_fw_rule_exists(context, udp_rule):
db.provider_fw_rule_create(context, udp_rule)
rules_added += 1
if not self._provider_fw_rule_exists(context, icmp_rule):
db.provider_fw_rule_create(context, icmp_rule)
rules_added += 1
if not rules_added:
msg = 'Duplicate Rule.'
failed['message'] = msg + failed['message']
return failed
self.compute_api.trigger_provider_fw_rules_refresh(context)
return {'status': 'OK', 'message': 'Added %s rules' % rules_added}
def describe_external_address_blocks(self, context):
blocks = db.provider_fw_rule_get_all(context)
# NOTE(todd): use a set since we have icmp/udp/tcp rules with same cidr
blocks = set([b.cidr for b in blocks])
blocks = [{'cidr': b} for b in blocks]
return {'externalIpBlockInfo':
list(sorted(blocks, key=lambda k: k['cidr']))}
def remove_external_address_block(self, context, cidr):
LOG.audit(_('Removing ip block from %s'), cidr, context=context)
cidr = urllib.unquote(cidr).decode()
# Catch the exception and LOG for improper or malicious inputs.
# Also return a proper status and message in that case
failed = {'status': 'Failed', 'message': ' 0 rules deleted'}
if not utils.is_valid_cidr(cidr):
msg = 'Improper input. Please provide a valid cidr: ' \
'e.g. 121.12.10.11/24.'
failed['message'] = msg + failed['message']
return failed
#Normalizing cidr. e.g. '20.20.20.11/24' -> '20.20.20.0/24', so that
#db values stay in sync with filters' values (e.g. in iptables)
cidr = str(netaddr.IPNetwork(cidr).cidr)
rules = db.provider_fw_rule_get_all_by_cidr(context, cidr)
if not rules:
msg = 'No such CIDR currently blocked.'
failed['message'] = msg + failed['message']
return failed
else:
for rule in rules:
db.provider_fw_rule_destroy(context, rule['id'])
self.compute_api.trigger_provider_fw_rules_refresh(context)
return {'status': 'OK', 'message': 'Deleted %s rules' % len(rules)}

View File

@@ -24,7 +24,6 @@ import datetime
# TODO(termie): replace minidom with etree
from xml.dom import minidom
from nova.api.ec2 import admin
from nova.api.ec2 import ec2utils
from nova import exception
from nova import flags
@@ -57,14 +56,6 @@ class APIRequest(object):
def invoke(self, context):
try:
# Raise NotImplemented exception for Admin specific request if
# admin flag is set to false in nova.conf
if (isinstance(self.controller, admin.AdminController)
and (not FLAGS.allow_ec2_admin_api)):
## Raise InvalidRequest exception for EC2 Admin interface ##
LOG.exception("Unsupported API request")
raise exception.InvalidRequest()
method = getattr(self.controller,
ec2utils.camelcase_to_underscore(self.action))
except AttributeError:

View File

@@ -23,10 +23,7 @@ datastore.
"""
import base64
import os
import re
import shutil
import tempfile
import time
import urllib
@@ -253,7 +250,6 @@ class CloudController(object):
'zoneState': 'available'}]}
services = db.service_get_all(context, False)
now = utils.utcnow()
hosts = []
for host in [service['host'] for service in services]:
if not host in hosts:
@@ -323,9 +319,6 @@ class CloudController(object):
s['ownerId'] = snapshot['project_id']
s['volumeSize'] = snapshot['volume_size']
s['description'] = snapshot['display_description']
s['display_name'] = snapshot['display_name']
s['display_description'] = snapshot['display_description']
return s
def create_snapshot(self, context, volume_id, **kwargs):
@@ -336,8 +329,8 @@ class CloudController(object):
snapshot = self.volume_api.create_snapshot(
context,
volume,
kwargs.get('display_name'),
kwargs.get('display_description'))
None,
kwargs.get('description'))
return self._format_snapshot(context, snapshot)
def delete_snapshot(self, context, snapshot_id, **kwargs):
@@ -790,15 +783,6 @@ class CloudController(object):
"Timestamp": now,
"output": base64.b64encode(output)}
def get_ajax_console(self, context, instance_id, **kwargs):
"""Web based ajax terminal for vm.
This is an extension to the normal ec2_api"""
ec2_id = instance_id[0]
instance_id = ec2utils.ec2_id_to_id(ec2_id)
instance = self.compute_api.get(context, instance_id)
return self.compute_api.get_ajax_console(context, instance)
def describe_volumes(self, context, volume_id=None, **kwargs):
if volume_id:
volumes = []
@@ -846,8 +830,6 @@ class CloudController(object):
else:
v['snapshotId'] = None
v['display_name'] = volume['display_name']
v['display_description'] = volume['display_description']
return v
def create_volume(self, context, **kwargs):
@@ -863,8 +845,8 @@ class CloudController(object):
volume = self.volume_api.create(context,
size,
kwargs.get('display_name'),
kwargs.get('display_description'),
None,
None,
snapshot)
# TODO(vish): Instance should be None at db layer instead of
# trying to lazy load, but for now we turn it into
@@ -877,18 +859,6 @@ class CloudController(object):
self.volume_api.delete(context, volume)
return True
def update_volume(self, context, volume_id, **kwargs):
volume_id = ec2utils.ec2_id_to_id(volume_id)
updatable_fields = ['display_name', 'display_description']
changes = {}
for field in updatable_fields:
if field in kwargs:
changes[field] = kwargs[field]
if changes:
volume = self.volume_api.get(context, volume_id)
self.volume_api.update(context, volume, fields=changes)
return True
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
volume_id = ec2utils.ec2_id_to_id(volume_id)
instance_id = ec2utils.ec2_id_to_id(instance_id)
@@ -1157,8 +1127,6 @@ class CloudController(object):
self._format_instance_type(instance, i)
i['launchTime'] = instance['created_at']
i['amiLaunchIndex'] = instance['launch_index']
i['displayName'] = instance['display_name']
i['displayDescription'] = instance['display_description']
self._format_instance_root_device_name(instance, i)
self._format_instance_bdm(context, instance_id,
i['rootDeviceName'], i)
@@ -1265,8 +1233,6 @@ class CloudController(object):
max_count=max_count,
kernel_id=kwargs.get('kernel_id'),
ramdisk_id=kwargs.get('ramdisk_id'),
display_name=kwargs.get('display_name'),
display_description=kwargs.get('display_description'),
key_name=kwargs.get('key_name'),
user_data=kwargs.get('user_data'),
security_group=kwargs.get('security_group'),
@@ -1318,35 +1284,6 @@ class CloudController(object):
self.compute_api.start(context, instance)
return True
def rescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
LOG.debug(_("Going to rescue instance %s") % instance_id)
_instance_id = ec2utils.ec2_id_to_id(instance_id)
instance = self.compute_api.get(context, _instance_id)
self.compute_api.rescue(context, instance)
return True
def unrescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
LOG.debug(_("Going to unrescue instance %s") % instance_id)
_instance_id = ec2utils.ec2_id_to_id(instance_id)
instance = self.compute_api.get(context, _instance_id)
self.compute_api.unrescue(context, instance)
return True
def update_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
updatable_fields = ['display_name', 'display_description']
changes = {}
for field in updatable_fields:
if field in kwargs:
changes[field] = kwargs[field]
if changes:
instance_id = ec2utils.ec2_id_to_id(instance_id)
instance = self.compute_api.get(context, instance_id)
self.compute_api.update(context, instance, **changes)
return True
def _get_image(self, context, ec2_id):
try:
internal_id = ec2utils.ec2_id_to_id(ec2_id)
@@ -1391,7 +1328,6 @@ class CloudController(object):
i['imageLocation'] = image['properties'].get('image_location')
i['imageState'] = self._get_image_state(image)
i['displayName'] = name
i['description'] = image.get('description')
display_mapping = {'aki': 'kernel',
'ari': 'ramdisk',

View File

@@ -520,9 +520,6 @@ global_opts = [
cfg.IntOpt('zombie_instance_updated_at_window',
default=172800,
help='Number of seconds zombie instances are cleaned up.'),
cfg.BoolOpt('allow_ec2_admin_api',
default=False,
help='Enable/Disable EC2 Admin API'),
cfg.IntOpt('service_down_time',
default=60,
help='maximum time since last check-in for up service'),

View File

@@ -1,503 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for api.ec2.admin"""
import datetime
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import test
from nova import utils
from nova.api.ec2 import admin
from nova.api.ec2 import ec2utils
from nova.cloudpipe import pipelib
from nova.compute import vm_states
class AdminTestCase(test.TestCase):
def setUp(self):
super(AdminTestCase, self).setUp()
self.stubs.Set(utils, 'vpn_ping',
lambda address, port: address == '127.0.0.1')
def test_user_dict(self):
user = type('User', (object,),
{'id': 'bob', 'access': 'foo', 'secret': 'bar'})
expected_user_dict = {'username': 'bob',
'accesskey': 'foo',
'secretkey': 'bar',
'file': 'filename'}
self.assertEqual(expected_user_dict, admin.user_dict(user, 'filename'))
def test_user_dict_no_file(self):
user = type('User', (object,),
{'id': 'bob', 'access': 'foo', 'secret': 'bar'})
expected_user_dict = {'username': 'bob',
'accesskey': 'foo',
'secretkey': 'bar',
'file': None}
self.assertEqual(expected_user_dict, admin.user_dict(user))
def test_user_dict_no_user(self):
self.assertEqual({}, admin.user_dict(None))
def test_project_dict(self):
project = type('Project', (object,), {'id': 'project',
'project_manager_id': 'foo',
'description': 'bar'})
expected_project_dict = {'projectname': 'project',
'project_manager_id': 'foo',
'description': 'bar'}
self.assertEqual(expected_project_dict, admin.project_dict(project))
def test_project_dict_no_project(self):
self.assertEqual({}, admin.project_dict(None))
def test_host_dict_using_updated_at(self):
# instances and volumes only used for count
instances = range(2)
volumes = range(3)
now = datetime.datetime.now()
updated_at = now - datetime.timedelta(seconds=10)
compute_service = {'updated_at': updated_at}
volume_service = {'updated_at': updated_at}
expected_host_dict = {'hostname': 'server',
'instance_count': 2,
'volume_count': 3,
'compute': 'up',
'volume': 'up'}
self.assertEqual(expected_host_dict,
admin.host_dict('server', compute_service, instances,
volume_service, volumes, now))
def test_host_dict_service_down_using_created_at(self):
# instances and volumes only used for count
instances = range(2)
volumes = range(3)
# service_down_time is 60 by defualt so we set to 70 to simulate
# services been down
now = datetime.datetime.now()
created_at = now - datetime.timedelta(seconds=70)
compute_service = {'created_at': created_at, 'updated_at': None}
volume_service = {'created_at': created_at, 'updated_at': None}
expected_host_dict = {'hostname': 'server',
'instance_count': 2,
'volume_count': 3,
'compute': 'down',
'volume': 'down'}
self.assertEqual(expected_host_dict,
admin.host_dict('server', compute_service, instances,
volume_service, volumes, now))
def test_instance_dict(self):
inst = {'name': 'this_inst',
'memory_mb': 1024,
'vcpus': 2,
'root_gb': 500,
'flavorid': 1}
expected_inst_dict = {'name': 'this_inst',
'memory_mb': 1024,
'vcpus': 2,
'disk_gb': 500,
'flavor_id': 1}
self.assertEqual(expected_inst_dict, admin.instance_dict(inst))
def test_vpn_dict_state_running(self):
isonow = datetime.datetime.utcnow()
vpn_instance = {'id': 1,
'created_at': isonow,
'fixed_ip': {'address': '127.0.0.1'}}
project = type('Project', (object,), {'id': 'project',
'vpn_ip': '127.0.0.1',
'vpn_port': 1234})
# Returns state running for 127.0.0.1 - look at class setup
expected_vpn_dict = {'project_id': 'project',
'public_ip': '127.0.0.1',
'public_port': 1234,
'internal_ip': '127.0.0.1',
'instance_id':
ec2utils.id_to_ec2_id(1),
'created_at': utils.isotime(isonow),
'state': 'running'}
self.assertEqual(expected_vpn_dict,
admin.vpn_dict(project, vpn_instance))
def test_vpn_dict_state_down(self):
isonow = datetime.datetime.utcnow()
vpn_instance = {'id': 1,
'created_at': isonow,
'fixed_ip': {'address': '127.0.0.1'}}
project = type('Project', (object,), {'id': 'project',
'vpn_ip': '127.0.0.2',
'vpn_port': 1234})
# Returns state down for 127.0.0.2 - look at class setup
vpn_dict = admin.vpn_dict(project, vpn_instance)
self.assertEqual('down', vpn_dict['state'])
def test_vpn_dict_invalid_project_vpn_config(self):
isonow = datetime.datetime.utcnow()
vpn_instance = {'id': 1,
'created_at': isonow,
'fixed_ip': {'address': '127.0.0.1'}}
# Inline project object - vpn_port of None to make it invalid
project = type('Project', (object,), {'id': 'project',
'vpn_ip': '127.0.0.2',
'vpn_port': None})
# Returns state down for 127.0.0.2 - look at class setup
vpn_dict = admin.vpn_dict(project, vpn_instance)
self.assertEqual('down - invalid project vpn config',
vpn_dict['state'])
def test_vpn_dict_non_vpn_instance(self):
project = type('Project', (object,), {'id': 'project',
'vpn_ip': '127.0.0.1',
'vpn_port': '1234'})
expected_vpn_dict = {'project_id': 'project',
'public_ip': '127.0.0.1',
'public_port': '1234',
'state': 'pending'}
self.assertEqual(expected_vpn_dict, admin.vpn_dict(project, None))
class AdminControllerTestCase(test.TestCase):
@classmethod
def setUpClass(cls):
cls._c = context.get_admin_context()
cls._ac = admin.AdminController()
def test_admin_controller_to_str(self):
self.assertEqual('AdminController', str(admin.AdminController()))
def test_describe_instance_types(self):
insts = self._ac.describe_instance_types(self._c)['instanceTypeSet']
for inst_name in ('m1.medium', 'm1.large', 'm1.tiny', 'm1.xlarge',
'm1.small',):
self.assertIn(inst_name, [i['name'] for i in insts])
def test_register_user(self):
registered_user = self._ac.register_user(self._c, 'bob')
self.assertEqual('bob', registered_user['username'])
def test_describe_user(self):
self._ac.register_user(self._c, 'bob')
self.assertEqual('bob',
self._ac.describe_user(self._c, 'bob')['username'])
def test_describe_users(self):
self._ac.register_user(self._c, 'bob')
users = self._ac.describe_users(self._c)
self.assertIn('userSet', users)
self.assertEqual('bob', users['userSet'][0]['username'])
def test_deregister_user(self):
self._ac.register_user(self._c, 'bob')
self._ac.deregister_user(self._c, 'bob')
self.assertRaises(exception.UserNotFound,
self._ac.describe_user,
self._c, 'bob')
def test_register_project(self):
self._ac.register_user(self._c, 'bob')
self.assertEqual('bobs_project',
self._ac.register_project(self._c,
'bobs_project',
'bob')['projectname'])
def test_describe_projects(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
projects = self._ac.describe_projects(self._c)
self.assertIn('projectSet', projects)
self.assertEqual('bobs_project',
projects['projectSet'][0]['projectname'])
def test_deregister_project(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self._ac.deregister_project(self._c, 'bobs_project')
self.assertRaises(exception.ProjectNotFound,
self._ac.describe_project,
self._c, 'bobs_project')
def test_describe_project_members(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
members = self._ac.describe_project_members(self._c, 'bobs_project')
self.assertIn('members', members)
self.assertEqual('bob', members['members'][0]['member'])
def test_modify_project(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self._ac.modify_project(self._c, 'bobs_project', 'bob',
description='I like cake')
project = self._ac.describe_project(self._c, 'bobs_project')
self.assertEqual('I like cake', project['description'])
def test_modify_project_member_add(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_user(self._c, 'mary')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self._ac.modify_project_member(self._c, 'mary', 'bobs_project', 'add')
members = self._ac.describe_project_members(self._c, 'bobs_project')
self.assertIn('mary', [m['member'] for m in members['members']])
def test_modify_project_member_remove(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self._ac.modify_project_member(self._c, 'bob', 'bobs_project',
'remove')
members = self._ac.describe_project_members(self._c, 'bobs_project')
self.assertNotIn('bob', [m['member'] for m in members['members']])
def test_modify_project_member_invalid_operation(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self.assertRaises(exception.ApiError,
self._ac.modify_project_member,
self._c, 'bob', 'bobs_project', 'invalid_operation')
def test_describe_roles(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
roles = self._ac.describe_roles(self._c, 'bobs_project')
# Default roles ('sysadmin', 'netadmin', 'developer') should be in here
roles = [r['role'] for r in roles['roles']]
for role in ('sysadmin', 'netadmin', 'developer'):
self.assertIn('sysadmin', roles)
def test_modify_user_role_add(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self._ac.modify_user_role(self._c, 'bob', 'itsec')
user_roles = self._ac.describe_user_roles(self._c, 'bob')
self.assertIn('itsec', [r['role'] for r in user_roles['roles']])
def test_modify_user_role_project_add(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self._ac.modify_user_role(self._c, 'bob', 'developer', 'bobs_project')
user_roles = self._ac.describe_user_roles(self._c, 'bob',
'bobs_project')
self.assertIn('developer', [r['role'] for r in user_roles['roles']])
def test_modify_user_role_remove(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self._ac.modify_user_role(self._c, 'bob', 'itsec')
self._ac.modify_user_role(self._c, 'bob', 'itsec', operation='remove')
user_roles = self._ac.describe_user_roles(self._c, 'bob')
self.assertNotIn('itsec', [r['role'] for r in user_roles['roles']])
def test_modify_user_role_project_remove(self):
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self._ac.modify_user_role(self._c, 'bob', 'developer', 'bobs_project')
self._ac.modify_user_role(self._c, 'bob', 'developer', 'bobs_project',
'remove')
user_roles = self._ac.describe_user_roles(self._c, 'bob',
'bobs_project')
self.assertNotIn('developer', [r['role'] for r in user_roles['roles']])
def test_modify_user_role_invalid(self):
self.assertRaises(exception.ApiError,
self._ac.modify_user_role,
self._c, 'bob', 'itsec',
operation='invalid_operation')
def test_describe_hosts_compute(self):
db.service_create(self._c, {'host': 'host1',
'binary': "nova-compute",
'topic': 'compute',
'report_count': 0,
'availability_zone': "zone1"})
hosts = self._ac.describe_hosts(self._c)['hosts']
self.assertEqual('host1', hosts[0]['hostname'])
def test_describe_hosts_volume(self):
db.service_create(self._c, {'host': 'volume1',
'binary': "nova-volume",
'topic': 'volume',
'report_count': 0,
'availability_zone': "zone1"})
hosts = self._ac.describe_hosts(self._c)['hosts']
self.assertEqual('volume1', hosts[0]['hostname'])
def test_block_external_addresses_validate_output_for_valid_input(self):
result = self._ac.block_external_addresses(self._c, '192.168.100.1/24')
self.assertEqual('OK', result['status'])
self.assertEqual('Added 3 rules', result['message'])
def test_block_external_addresses_validate_output_for_invalid_input(self):
result = self._ac.block_external_addresses(self._c, '12.10.10.256/24')
self.assertEqual('Failed', result['status'])
value = '0 rules added' in result['message']
self.assertEqual(value, True)
def test_block_external_addresses_already_existent_rule(self):
self._ac.block_external_addresses(self._c, '192.168.100.0/24')
result = self._ac.block_external_addresses(self._c, '192.168.100.0/24')
self.assertEqual('Failed', result['status'])
value = '0 rules added' in result['message']
self.assertEqual(value, True)
def test_describe_external_address_blocks_normalized_output(self):
self._ac.block_external_addresses(self._c, '192.168.100.11/24')
self.assertEqual(
{'externalIpBlockInfo': [{'cidr': u'192.168.100.0/24'}]},
self._ac.describe_external_address_blocks(self._c))
def test_describe_external_address_blocks_many_inputs(self):
self._ac.block_external_addresses(self._c, '192.168.100.11/24')
self._ac.block_external_addresses(self._c, '12.12.12.10/24')
self._ac.block_external_addresses(self._c, '18.18.18.0/24')
output1 = {'cidr': u'192.168.100.0/24'}
output2 = {'cidr': u'12.12.12.0/24'}
output3 = {'cidr': u'18.18.18.0/24'}
result = self._ac.describe_external_address_blocks(self._c)
result = sorted(result['externalIpBlockInfo'])
output = sorted([output1, output2, output3])
self.assertEqual(result, output)
def test_remove_external_address_block_existent_rule(self):
self._ac.block_external_addresses(self._c, '192.168.100.1/24')
result = self._ac.remove_external_address_block(self._c,
'192.168.100.1/24')
self.assertEqual('OK', result['status'])
self.assertEqual('Deleted 3 rules', result['message'])
result = self._ac.describe_external_address_blocks(self._c)
self.assertEqual([], result['externalIpBlockInfo'])
def test_remove_external_address_block_non_existent_rule(self):
result = self._ac.remove_external_address_block(self._c,
'192.168.100.1/24')
self.assertEqual('Failed', result['status'])
value = '0 rules deleted' in result['message']
self.assertEqual(value, True)
result = self._ac.describe_external_address_blocks(self._c)
self.assertEqual([], result['externalIpBlockInfo'])
def test_remove_external_address_block_invalid_input(self):
result = self._ac.remove_external_address_block(self._c,
'192.168.100/24')
self.assertEqual('Failed', result['status'])
value = '0 rules deleted' in result['message']
self.assertEqual(value, True)
result = self._ac.describe_external_address_blocks(self._c)
self.assertEqual([], result['externalIpBlockInfo'])
def test_start_vpn(self):
def fake_launch_vpn_instance(self, *args):
pass
def get_fake_instance_func():
first_call = [True]
def fake_instance_get_all_by_project(self, *args):
if first_call[0]:
first_call[0] = False
return []
else:
return [{'id': 1,
'user_id': 'bob',
'image_id': str(flags.FLAGS.vpn_image_id),
'project_id': 'bobs_project',
'instance_type_id': '1',
'os_type': 'linux',
'architecture': 'x86-64',
'state_description': 'running',
'vm_state': vm_states.ACTIVE,
'image_ref': '3'}]
return fake_instance_get_all_by_project
self.stubs.Set(pipelib.CloudPipe, 'launch_vpn_instance',
fake_launch_vpn_instance)
self.stubs.Set(db, 'instance_get_all_by_project',
get_fake_instance_func())
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
self.assertEqual('i-00000001',
self._ac.start_vpn(self._c, 'bobs_project')['instance_id'])
def test_describe_vpns(self):
def fake_instance_get_all_by_project(self, *args):
now = datetime.datetime.now()
created_at = now - datetime.timedelta(seconds=70)
return [{'id': 1,
'user_id': 'bob',
'image_id': str(flags.FLAGS.vpn_image_id),
'project_id': 'bobs_project',
'instance_type_id': '1',
'os_type': 'linux',
'architecture': 'x86-64',
'state_description': 'running',
'created_at': created_at,
'vm_state': vm_states.ACTIVE,
'image_ref': '3'}]
self.stubs.Set(db, 'instance_get_all_by_project',
fake_instance_get_all_by_project)
self._ac.register_user(self._c, 'bob')
self._ac.register_project(self._c, 'bobs_project', 'bob')
vpns = self._ac.describe_vpns(self._c)
self.assertIn('items', vpns)
item = vpns['items'][0]
self.assertEqual('i-00000001', item['instance_id'])
self.assertEqual(None, item['public_port'])
self.assertEqual(None, item['public_ip'])
self.assertEqual('down - invalid project vpn config', item['state'])
self.assertEqual(u'bobs_project', item['project_id'])

View File

@@ -28,7 +28,6 @@ from M2Crypto import RSA
from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils
from nova.api.ec2 import inst_state
from nova.common import cfg
from nova.compute import power_state
from nova.compute import vm_states
from nova import context
@@ -38,28 +37,13 @@ from nova import exception
from nova import flags
from nova.image import fake
from nova import log as logging
from nova import manager
from nova import rpc
from nova import test
from nova import utils
LOG = logging.getLogger('nova.tests.cloud')
ajax_proxy_manager_opt = \
cfg.StrOpt('ajax_proxy_manager',
default='nova.tests.api.ec2.test_cloud.AjaxProxyManager',
help='')
FLAGS = flags.FLAGS
FLAGS.add_option(ajax_proxy_manager_opt)
class AjaxProxyManager(manager.SchedulerDependentManager):
"""Fake ajax proxy service, so that an 'rpc.call' will work."""
@staticmethod
def authorize_ajax_console(context, **kwargs):
return None
def get_fake_cache():
@@ -115,7 +99,6 @@ class CloudTestCase(test.TestCase):
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
self.volume = self.start_service('volume')
self.ajax_proxy = self.start_service('ajax_proxy')
self.image_service = utils.import_object(FLAGS.image_service)
self.user_id = 'fake'
@@ -1183,16 +1166,6 @@ class CloudTestCase(test.TestCase):
# for unit tests.
rv = self.cloud.terminate_instances(self.context, [instance_id])
def test_ajax_console(self):
instance_id = self._run_instance(image_id='ami-1')
output = self.cloud.get_ajax_console(context=self.context,
instance_id=[instance_id])
self.assertEquals(output['url'],
'%s/?token=FAKETOKEN' % FLAGS.ajax_console_proxy_url)
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
rv = self.cloud.terminate_instances(self.context, [instance_id])
def test_key_generation(self):
result = self._create_key('test')
private_key = result['private_key']
@@ -1263,7 +1236,6 @@ class CloudTestCase(test.TestCase):
result = run_instances(self.context, **kwargs)
instance = result['instancesSet'][0]
self.assertEqual(instance['imageId'], 'ami-00000001')
self.assertEqual(instance['displayName'], 'Server 1')
self.assertEqual(instance['instanceId'], 'i-00000001')
self.assertEqual(instance['instanceState']['name'], 'running')
self.assertEqual(instance['instanceType'], 'm1.small')
@@ -1360,44 +1332,6 @@ class CloudTestCase(test.TestCase):
result = run_instances(self.context, **kwargs)
self.assertEqual(len(result['instancesSet']), 1)
def test_update_of_instance_display_fields(self):
inst = db.instance_create(self.context, {})
ec2_id = ec2utils.id_to_ec2_id(inst['id'])
self.cloud.update_instance(self.context, ec2_id,
display_name='c00l 1m4g3')
inst = db.instance_get(self.context, inst['id'])
self.assertEqual('c00l 1m4g3', inst['display_name'])
db.instance_destroy(self.context, inst['id'])
def test_update_of_instance_wont_update_private_fields(self):
inst = db.instance_create(self.context, {})
host = inst['host']
ec2_id = ec2utils.id_to_ec2_id(inst['id'])
self.cloud.update_instance(self.context, ec2_id,
display_name='c00l 1m4g3',
host='otherhost')
inst = db.instance_get(self.context, inst['id'])
self.assertEqual(host, inst['host'])
db.instance_destroy(self.context, inst['id'])
def test_update_of_volume_display_fields(self):
vol = db.volume_create(self.context, {})
self.cloud.update_volume(self.context,
ec2utils.id_to_ec2_vol_id(vol['id']),
display_name='c00l v0lum3')
vol = db.volume_get(self.context, vol['id'])
self.assertEqual('c00l v0lum3', vol['display_name'])
db.volume_destroy(self.context, vol['id'])
def test_update_of_volume_wont_update_private_fields(self):
vol = db.volume_create(self.context, {})
self.cloud.update_volume(self.context,
ec2utils.id_to_ec2_vol_id(vol['id']),
mountpoint='/not/here')
vol = db.volume_get(self.context, vol['id'])
self.assertEqual(None, vol['mountpoint'])
db.volume_destroy(self.context, vol['id'])
def _restart_compute_service(self, periodic_interval=None):
"""restart compute service. NOTE: fake driver forgets all instances."""
self.compute.kill()
@@ -1407,50 +1341,6 @@ class CloudTestCase(test.TestCase):
else:
self.compute = self.start_service('compute')
def test_rescue_instances(self):
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1, }
instance_id = self._run_instance(**kwargs)
result = self.cloud.stop_instances(self.context, [instance_id])
self.assertTrue(result)
result = self.cloud.rescue_instance(self.context, instance_id)
self.assertTrue(result)
expected = {'instancesSet': [
{'instanceId': 'i-00000001',
'previousState': {'code': 16,
'name': 'rescue'},
'shutdownState': {'code': 48,
'name': 'terminated'}}]}
result = self.cloud.terminate_instances(self.context, [instance_id])
self.assertEqual(result, expected)
self._restart_compute_service()
def test_unrescue_instances(self):
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1, }
instance_id = self._run_instance(**kwargs)
result = self.cloud.rescue_instance(self.context, instance_id)
self.assertTrue(result)
result = self.cloud.unrescue_instance(self.context, instance_id)
self.assertTrue(result)
expected = {'instancesSet': [
{'instanceId': 'i-00000001',
'previousState': {'code': 16,
'name': 'running'},
'shutdownState': {'code': 48,
'name': 'terminated'}}]}
result = self.cloud.terminate_instances(self.context, [instance_id])
self.assertEqual(result, expected)
self._restart_compute_service()
def test_stop_start_instance(self):
"""Makes sure stop/start instance works"""
# enforce periodic tasks run in short time to avoid wait for 60s.

View File

@@ -1,92 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import context
from nova import db
from nova import flags
from nova import log as logging
from nova import rpc
from nova import test
from nova import utils
from nova.api.ec2 import admin
from nova.image import fake
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.adminapi')
class AdminApiTestCase(test.TestCase):
def setUp(self):
super(AdminApiTestCase, self).setUp()
self.flags(connection_type='fake')
# set up our cloud
self.api = admin.AdminController()
# set up services
self.compute = self.start_service('compute')
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
self.volume = self.start_service('volume')
self.image_service = utils.import_object(FLAGS.image_service)
self.user_id = 'admin'
self.project_id = 'admin'
self.context = context.RequestContext(self.user_id,
self.project_id,
is_admin=True)
def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine', 'image_state': 'available'}}
self.stubs.Set(fake._FakeImageService, 'show', fake_show)
self.stubs.Set(fake._FakeImageService, 'show_by_name', fake_show)
# NOTE(comstud): Make 'cast' behave like a 'call' which will
# ensure that operations complete
self.stubs.Set(rpc, 'cast', rpc.call)
def test_block_external_ips(self):
"""Make sure provider firewall rules are created."""
result = self.api.block_external_addresses(self.context, '1.1.1.1/32')
self.api.remove_external_address_block(self.context, '1.1.1.1/32')
self.assertEqual('OK', result['status'])
self.assertEqual('Added 3 rules', result['message'])
def test_list_blocked_ips(self):
"""Make sure we can see the external blocks that exist."""
self.api.block_external_addresses(self.context, '1.1.1.2/32')
result = self.api.describe_external_address_blocks(self.context)
num = len(db.provider_fw_rule_get_all(self.context))
self.api.remove_external_address_block(self.context, '1.1.1.2/32')
# we only list IP, not tcp/udp/icmp rules
self.assertEqual(num / 3, len(result['externalIpBlockInfo']))
def test_remove_ip_block(self):
"""Remove ip blocks."""
result = self.api.block_external_addresses(self.context, '1.1.1.3/32')
self.assertEqual('OK', result['status'])
num0 = len(db.provider_fw_rule_get_all(self.context))
result = self.api.remove_external_address_block(self.context,
'1.1.1.3/32')
self.assertEqual('OK', result['status'])
self.assertEqual('Deleted 3 rules', result['message'])
num1 = len(db.provider_fw_rule_get_all(self.context))
self.assert_(num1 < num0)