Create volume-from-snapshot, fix AWS volume names

This commit is contained in:
Sachin Manpathak 2016-11-18 14:09:02 -05:00
parent 5b6816f479
commit db31845d4c
2 changed files with 35 additions and 3 deletions

View File

@ -19,6 +19,8 @@ from cinder.exception import APITimeout, NotFound, VolumeNotFound
from cinder.volume.drivers.aws import ebs from cinder.volume.drivers.aws import ebs
from moto import mock_ec2 from moto import mock_ec2
import boto
class EBSVolumeTestCase(test.TestCase): class EBSVolumeTestCase(test.TestCase):
@mock_ec2 @mock_ec2
@ -56,7 +58,7 @@ class EBSVolumeTestCase(test.TestCase):
ss['project_id'] = kwargs.get('project_id', 'aws_proj_700') ss['project_id'] = kwargs.get('project_id', 'aws_proj_700')
ss['created_at'] = kwargs.get('create_at', created_at) ss['created_at'] = kwargs.get('create_at', created_at)
ss['volume'] = kwargs.get('volume', self._stub_volume()) ss['volume'] = kwargs.get('volume', self._stub_volume())
ss['display_name'] = kwargs.get('display_name', 'snapshot_007')
return ss return ss
@mock_ec2 @mock_ec2
@ -115,3 +117,16 @@ class EBSVolumeTestCase(test.TestCase):
ss = self._stub_snapshot() ss = self._stub_snapshot()
self._driver.create_volume(ss['volume']) self._driver.create_volume(ss['volume'])
self.assertRaises(APITimeout, self._driver.create_snapshot, ss) self.assertRaises(APITimeout, self._driver.create_snapshot, ss)
@mock_ec2
def test_volume_from_snapshot(self):
snapshot = self._stub_snapshot()
volume = self._stub_volume()
self._driver.create_volume(volume)
self._driver.create_snapshot(snapshot)
self.assertIsNone(self._driver.create_volume_from_snapshot(volume, snapshot))
@mock_ec2
def test_volume_from_non_existing_snapshot(self):
self.assertRaises(NotFound, self._driver.create_volume_from_snapshot,
self._stub_volume(), self._stub_snapshot())

View File

@ -116,7 +116,8 @@ class EBSDriver(BaseVD):
self._conn.create_tags([ebs_vol.id], {'project_id': volume['project_id'], self._conn.create_tags([ebs_vol.id], {'project_id': volume['project_id'],
'uuid': volume['id'], 'uuid': volume['id'],
'is_clone': False, 'is_clone': False,
'created_at': volume['created_at']}) 'created_at': volume['created_at'],
'Name': volume['display_name']})
def _find(self, obj_id, find_func): def _find(self, obj_id, find_func):
ebs_objs = find_func(filters={'tag:uuid': obj_id}) ebs_objs = find_func(filters={'tag:uuid': obj_id})
@ -196,7 +197,8 @@ class EBSDriver(BaseVD):
self._conn.create_tags([ebs_snap.id], {'project_id': snapshot['project_id'], self._conn.create_tags([ebs_snap.id], {'project_id': snapshot['project_id'],
'uuid': snapshot['id'], 'uuid': snapshot['id'],
'is_clone': True, 'is_clone': True,
'created_at': snapshot['created_at']}) 'created_at': snapshot['created_at'],
'Name': snapshot['display_name']})
def delete_snapshot(self, snapshot): def delete_snapshot(self, snapshot):
try: try:
@ -206,6 +208,21 @@ class EBSDriver(BaseVD):
return return
self._conn.delete_snapshot(ebs_ss.id) self._conn.delete_snapshot(ebs_ss.id)
def create_volume_from_snapshot(self, volume, snapshot):
try:
ebs_ss = self._find(snapshot['id'], self._conn.get_all_snapshots)
except NotFound:
LOG.error(_LE('Snapshot %s was not found'), snapshot['id'])
raise
ebs_vol = ebs_ss.create_volume(self._zone)
if self._wait_for_create(ebs_vol.id, 'available') is False:
raise APITimeout(service='EC2')
self._conn.create_tags([ebs_vol.id], {'project_id': volume['project_id'],
'uuid': volume['id'],
'is_clone': False,
'created_at': volume['created_at'],
'Name': volume['display_name']})
def copy_image_to_volume(self, context, volume, image_service, image_id): def copy_image_to_volume(self, context, volume, image_service, image_id):
raise NotImplemented() raise NotImplemented()