Files
cinder/cinder/tests/unit/keymgr/test_migration.py
Alan Bishop 189a1096da Migrate fixed_key encryption keys to Barbican
Assist users who are switching from the legacy ConfKeyManager to
Barbican by automatically migrating any existing keys. Key migration
is executed in its own thread spawned on cinder-volume startup. Two
factors are used to determine whether existing keys need to be
migrated.

1) The ConfKeyManager's fixed_key config value is set (not None).
   This indicates volumes may exist that were encrypted using the
   ConfKeyManager.
2) Barbican is the current key manager.

When the both conditions are met, each instance of the cinder-volume
service scans its volumes in the database, looking for volumes using
the ConfKeyManager's all-zeros encryption key ID. If a volume has an
all-zeros key ID, the same secret (derived from the fixed_key) is stored
in Barbican, and all database references to that volume's key ID are
replaced with the new Barbican key ID.

Implements: blueprint migrate-fixed-key-to-barbican
Change-Id: Ic70f45762cf4e426c222415e49b947a328282ca0
2017-12-01 15:30:32 -05:00

218 lines
9.7 KiB
Python

# Copyright 2017 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for encryption key migration."""
import mock
from oslo_config import cfg
from cinder import db
from cinder.keymgr import migration
from cinder import objects
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import utils as tests_utils
from cinder.tests.unit import volume as base
CONF = cfg.CONF
FIXED_KEY_ID = '00000000-0000-0000-0000-000000000000'
class KeyMigrationTestCase(base.BaseVolumeTestCase):
def setUp(self):
super(KeyMigrationTestCase, self).setUp()
self.conf = CONF
self.fixed_key = '1' * 64
self.conf.import_opt(name='fixed_key',
module_str='cinder.keymgr.conf_key_mgr',
group='key_manager')
self.conf.set_override('fixed_key',
self.fixed_key,
group='key_manager')
self.conf.set_override('backend',
'barbican',
group='key_manager')
self.my_vols = []
def tearDown(self):
for vol in objects.VolumeList.get_all(self.context):
self.volume.delete_volume(self.context, vol)
super(KeyMigrationTestCase, self).tearDown()
def create_volume(self, key_id=FIXED_KEY_ID):
vol = tests_utils.create_volume(self.context, host=self.conf.host)
vol_id = self.volume.create_volume(self.context, vol)
if key_id:
db.volume_update(self.context,
vol_id,
{'encryption_key_id': key_id})
self.my_vols = objects.VolumeList.get_all_by_host(self.context,
self.conf.host)
# Return a fully baked Volume object (not the partially baked 'vol'
# and not the DB object).
return next((v for v in self.my_vols if v.id == vol_id))
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_keys')
@mock.patch('cinder.keymgr.migration.KeyMigrator._log_migration_status')
def test_no_fixed_key(self,
mock_log_migration_status,
mock_migrate_keys):
self.create_volume()
self.conf.set_override('fixed_key', None, group='key_manager')
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
mock_migrate_keys.assert_not_called()
mock_log_migration_status.assert_not_called()
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_keys')
@mock.patch('cinder.keymgr.migration.KeyMigrator._log_migration_status')
def test_using_conf_key_manager(self,
mock_log_migration_status,
mock_migrate_keys):
self.create_volume()
self.conf.set_override('backend',
'some.ConfKeyManager',
group='key_manager')
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
mock_migrate_keys.assert_not_called()
mock_log_migration_status.assert_not_called()
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_keys')
@mock.patch('cinder.keymgr.migration.KeyMigrator._log_migration_status')
def test_using_barbican_module_path(self,
mock_log_migration_status,
mock_migrate_keys):
# Verify the long-hand method of specifying the Barbican backend
# is properly parsed.
self.create_volume()
self.conf.set_override(
'backend',
'castellan.key_manager.barbican_key_manager.BarbicanKeyManager',
group='key_manager')
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
mock_migrate_keys.assert_called_once_with(self.my_vols)
mock_log_migration_status.assert_called_once_with()
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_keys')
@mock.patch('cinder.keymgr.migration.KeyMigrator._log_migration_status')
def test_using_unsupported_key_manager(self,
mock_log_migration_status,
mock_migrate_keys):
self.create_volume()
self.conf.set_override('backend',
'some.OtherKeyManager',
group='key_manager')
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
mock_migrate_keys.assert_not_called()
mock_log_migration_status.assert_called_once_with()
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_keys')
@mock.patch('cinder.keymgr.migration.KeyMigrator._log_migration_status')
def test_no_volumes(self,
mock_log_migration_status,
mock_migrate_keys):
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
mock_migrate_keys.assert_not_called()
mock_log_migration_status.assert_called_once_with()
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_volume_key')
@mock.patch('barbicanclient.client.Client')
def test_fail_no_barbican_client(self,
mock_barbican_client,
mock_migrate_volume_key):
self.create_volume()
mock_barbican_client.side_effect = Exception
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
mock_migrate_volume_key.assert_not_called()
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_volume_key')
@mock.patch('barbicanclient.client.Client')
def test_fail_too_many_errors(self,
mock_barbican_client,
mock_migrate_volume_key):
for n in range(0, (migration.MAX_KEY_MIGRATION_ERRORS + 3)):
self.create_volume()
mock_migrate_volume_key.side_effect = Exception
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
self.assertEqual(mock_migrate_volume_key.call_count,
(migration.MAX_KEY_MIGRATION_ERRORS + 1))
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_keys')
def test_migration_status_more_to_migrate(self,
mock_migrate_keys):
mock_log = self.mock_object(migration, 'LOG')
self.create_volume()
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
# Look for one warning (more to migrate) and no info log messages.
mock_log.info.assert_not_called()
self.assertEqual(mock_log.warning.call_count, 1)
@mock.patch('cinder.keymgr.migration.KeyMigrator._migrate_keys')
def test_migration_status_all_done(self,
mock_migrate_keys):
mock_log = self.mock_object(migration, 'LOG')
self.create_volume(key_id=fake.ENCRYPTION_KEY_ID)
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
# Look for one info (all done) and no warning log messages.
mock_log.warning.assert_not_called()
self.assertEqual(mock_log.info.call_count, 1)
@mock.patch(
'cinder.keymgr.migration.KeyMigrator._update_encryption_key_id')
@mock.patch('barbicanclient.client.Client')
def test_migrate_fixed_key_volumes(self,
mock_barbican_client,
mock_update_encryption_key_id):
# Create two volumes with fixed key ID that needs to be migrated, and
# a couple of volumes with key IDs that don't need to be migrated,
# or no key ID.
vol_1 = self.create_volume()
self.create_volume(key_id=fake.UUID1)
self.create_volume(key_id=None)
vol_2 = self.create_volume()
self.create_volume(key_id=fake.UUID2)
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
calls = [mock.call(vol_1), mock.call(vol_2)]
mock_update_encryption_key_id.assert_has_calls(calls, any_order=True)
self.assertEqual(mock_update_encryption_key_id.call_count, len(calls))
@mock.patch('barbicanclient.client.Client')
def test_update_encryption_key_id(self,
mock_barbican_client):
vol = self.create_volume()
snap_ids = [fake.SNAPSHOT_ID, fake.SNAPSHOT2_ID, fake.SNAPSHOT3_ID]
for snap_id in snap_ids:
tests_utils.create_snapshot(self.context, vol.id, id=snap_id)
# Barbican's secret.store() returns a URI that contains the
# secret's key ID at the end.
secret_ref = 'http://some/path/' + fake.ENCRYPTION_KEY_ID
mock_secret = mock.MagicMock()
mock_secret.store.return_value = secret_ref
mock_barbican_client.return_value.secrets.create.return_value \
= mock_secret
migration.migrate_fixed_key(self.my_vols, conf=self.conf)
vol_db = db.volume_get(self.context, vol.id)
self.assertEqual(fake.ENCRYPTION_KEY_ID, vol_db['encryption_key_id'])
for snap_id in snap_ids:
snap_db = db.snapshot_get(self.context, snap_id)
self.assertEqual(fake.ENCRYPTION_KEY_ID,
snap_db['encryption_key_id'])