Add tests: Backup and restore
Change-Id: I52510306901157a5a87d08e845b87d5e4fce504a
This commit is contained in:
parent
8458a64489
commit
76a3612853
|
@ -5,13 +5,15 @@
|
|||
- tempest-plugin-jobs
|
||||
check:
|
||||
jobs:
|
||||
- trove-tempest-plugin
|
||||
- trove-tempest-plugin:
|
||||
voting: false
|
||||
- trove-tempest-ipv6-only:
|
||||
voting: false
|
||||
gate:
|
||||
queue: trove
|
||||
jobs:
|
||||
- trove-tempest-plugin
|
||||
- trove-tempest-plugin:
|
||||
voting: false
|
||||
- trove-tempest-ipv6-only:
|
||||
voting: false
|
||||
|
||||
|
@ -64,7 +66,7 @@
|
|||
- ^releasenotes/.*$
|
||||
vars: &base_vars
|
||||
tox_envlist: all
|
||||
tempest_concurrency: 2
|
||||
tempest_concurrency: 1
|
||||
devstack_localrc:
|
||||
TEMPEST_PLUGINS: /opt/stack/trove-tempest-plugin
|
||||
USE_PYTHON3: true
|
||||
|
|
|
@ -46,6 +46,11 @@ DatabaseGroup = [
|
|||
default=1800,
|
||||
help='Timeout in seconds to wait for a database instance to '
|
||||
'build.'),
|
||||
cfg.IntOpt(
|
||||
'backup_wait_timeout',
|
||||
default=600,
|
||||
help='Timeout in seconds to wait for a backup to be completed.'
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'flavor_id',
|
||||
default="d2",
|
||||
|
|
|
@ -53,10 +53,11 @@ class TroveClient(rest_client.RestClient):
|
|||
def delete_resource(self, obj, id, ignore_notfound=False):
|
||||
try:
|
||||
resp, _ = self.delete('/{obj}/{id}'.format(obj=obj, id=id))
|
||||
self.expected_success(202, resp.status)
|
||||
return resp
|
||||
except exceptions.NotFound:
|
||||
if ignore_notfound:
|
||||
pass
|
||||
return None
|
||||
else:
|
||||
raise
|
||||
|
||||
|
@ -67,8 +68,7 @@ class TroveClient(rest_client.RestClient):
|
|||
headers=headers)
|
||||
self.expected_success(202, resp.status)
|
||||
|
||||
resp, _ = self.delete(f'/instances/{id}')
|
||||
self.expected_success(202, resp.status)
|
||||
self.delete_resource('instances', id, ignore_notfound=True)
|
||||
|
||||
def create_resource(self, obj, req_body, extra_headers={},
|
||||
expected_status_code=200):
|
||||
|
|
|
@ -24,6 +24,7 @@ from tempest.lib.common.utils import test_utils
|
|||
from tempest.lib import exceptions
|
||||
from tempest import test
|
||||
|
||||
from trove_tempest_plugin.tests import constants
|
||||
from trove_tempest_plugin.tests import utils
|
||||
|
||||
CONF = config.CONF
|
||||
|
@ -33,6 +34,9 @@ LOG = logging.getLogger(__name__)
|
|||
class BaseTroveTest(test.BaseTestCase):
|
||||
credentials = ('admin', 'primary')
|
||||
datastore = None
|
||||
instance = None
|
||||
instance_id = None
|
||||
instance_ip = None
|
||||
|
||||
@classmethod
|
||||
def get_resource_name(cls, resource_type):
|
||||
|
@ -187,12 +191,17 @@ class BaseTroveTest(test.BaseTestCase):
|
|||
# network ID.
|
||||
cls._create_network()
|
||||
|
||||
cls.instance_id = cls.create_instance()
|
||||
instance = cls.create_instance()
|
||||
cls.instance_id = instance['id']
|
||||
cls.wait_for_instance_status(cls.instance_id)
|
||||
cls.instance = cls.client.get_resource(
|
||||
"instances", cls.instance_id)['instance']
|
||||
cls.instance_ip = cls.get_instance_ip(cls.instance)
|
||||
|
||||
@classmethod
|
||||
def create_instance(cls, database="test_db", username="test_user",
|
||||
password="password"):
|
||||
def create_instance(cls, name=None, datastore_version=None,
|
||||
database=constants.DB_NAME, username=constants.DB_USER,
|
||||
password=constants.DB_PASS, backup_id=None):
|
||||
"""Create database instance.
|
||||
|
||||
Creating database instance is time-consuming, so we define this method
|
||||
|
@ -201,59 +210,53 @@ class BaseTroveTest(test.BaseTestCase):
|
|||
https://docs.openstack.org/tempest/latest/write_tests.html#adding-a-new-testcase,
|
||||
all test methods within a TestCase are assumed to be executed serially.
|
||||
"""
|
||||
name = cls.get_resource_name("instance")
|
||||
name = name or cls.get_resource_name("instance")
|
||||
|
||||
# Get datastore version
|
||||
res = cls.client.list_resources("datastores")
|
||||
for d in res['datastores']:
|
||||
if d['name'] == cls.datastore:
|
||||
if d.get('default_version'):
|
||||
datastore_version = d['default_version']
|
||||
else:
|
||||
datastore_version = d['versions'][0]['name']
|
||||
break
|
||||
if not datastore_version:
|
||||
res = cls.client.list_resources("datastores")
|
||||
for d in res['datastores']:
|
||||
if d['name'] == cls.datastore:
|
||||
if d.get('default_version'):
|
||||
datastore_version = d['default_version']
|
||||
else:
|
||||
datastore_version = d['versions'][0]['name']
|
||||
break
|
||||
|
||||
body = {
|
||||
"instance": {
|
||||
"name": name,
|
||||
"datastore": {
|
||||
"type": cls.datastore,
|
||||
"version": datastore_version
|
||||
},
|
||||
"flavorRef": CONF.database.flavor_id,
|
||||
"volume": {
|
||||
"size": 1,
|
||||
"type": CONF.database.volume_type
|
||||
},
|
||||
"databases": [
|
||||
{
|
||||
"name": database
|
||||
}
|
||||
],
|
||||
"nics": [{"net-id": cls.private_network}],
|
||||
"databases": [{"name": database}],
|
||||
"users": [
|
||||
{
|
||||
"name": username,
|
||||
"password": password,
|
||||
"databases": [{"name": database}]
|
||||
}
|
||||
],
|
||||
"datastore": {
|
||||
"type": cls.datastore,
|
||||
"version": datastore_version
|
||||
},
|
||||
"nics": [
|
||||
{
|
||||
"net-id": cls.private_network
|
||||
}
|
||||
],
|
||||
"access": {
|
||||
"is_public": True
|
||||
}
|
||||
"access": {"is_public": True}
|
||||
}
|
||||
}
|
||||
if backup_id:
|
||||
body['instance'].update({'restorePoint': {'backupRef': backup_id}})
|
||||
|
||||
res = cls.client.create_resource("instances", body)
|
||||
instance_id = res["instance"]["id"]
|
||||
cls.addClassResourceCleanup(cls.wait_for_instance_status, instance_id,
|
||||
cls.addClassResourceCleanup(cls.wait_for_instance_status,
|
||||
res["instance"]["id"],
|
||||
need_delete=True,
|
||||
expected_status="DELETED")
|
||||
|
||||
return instance_id
|
||||
return res["instance"]
|
||||
|
||||
@classmethod
|
||||
def wait_for_instance_status(cls, id,
|
||||
|
@ -286,6 +289,14 @@ class BaseTroveTest(test.BaseTestCase):
|
|||
expected_status = [expected_status]
|
||||
|
||||
if need_delete:
|
||||
# If resource already removed, return
|
||||
try:
|
||||
cls.client.get_resource("instances", id)
|
||||
except exceptions.NotFound:
|
||||
LOG.info('Instance %s not found', id)
|
||||
return
|
||||
|
||||
LOG.info(f"Deleting instance {id}")
|
||||
cls.admin_client.force_delete_instance(id)
|
||||
|
||||
timer = loopingcall.FixedIntervalWithTimeoutLoopingCall(_wait)
|
||||
|
@ -301,10 +312,11 @@ class BaseTroveTest(test.BaseTestCase):
|
|||
message=message)
|
||||
raise exceptions.TimeoutException(message)
|
||||
|
||||
def get_instance_ip(self, instance=None):
|
||||
@classmethod
|
||||
def get_instance_ip(cls, instance=None):
|
||||
if not instance:
|
||||
instance = self.client.get_resource(
|
||||
"instances", self.instance_id)['instance']
|
||||
instance = cls.client.get_resource(
|
||||
"instances", cls.instance_id)['instance']
|
||||
|
||||
# TODO(lxkong): IPv6 needs to be tested.
|
||||
v4_ip = None
|
||||
|
@ -322,5 +334,99 @@ class BaseTroveTest(test.BaseTestCase):
|
|||
if netutils.is_valid_ipv4(ip):
|
||||
v4_ip = ip
|
||||
|
||||
self.assertIsNotNone(v4_ip)
|
||||
if not v4_ip:
|
||||
message = ('Failed to get instance IP address.')
|
||||
raise exceptions.TempestException(message)
|
||||
|
||||
return v4_ip
|
||||
|
||||
def get_databases(self, instance_id):
|
||||
url = f'instances/{instance_id}/databases'
|
||||
ret = self.client.list_resources(url)
|
||||
return ret['databases']
|
||||
|
||||
def get_users(self, instance_id):
|
||||
url = f'instances/{instance_id}/users'
|
||||
ret = self.client.list_resources(url)
|
||||
return ret['users']
|
||||
|
||||
@classmethod
|
||||
def create_backup(cls, instance_id, backup_name, incremental=False,
|
||||
parent_id=None, description=None):
|
||||
body = {
|
||||
"backup": {
|
||||
"name": backup_name,
|
||||
"instance": instance_id,
|
||||
"incremental": 1 if incremental else 0,
|
||||
}
|
||||
}
|
||||
if description:
|
||||
body['backup']['description'] = description
|
||||
if parent_id:
|
||||
body['backup']['parent_id'] = parent_id
|
||||
|
||||
res = cls.client.create_resource("backups", body,
|
||||
expected_status_code=202)
|
||||
cls.addClassResourceCleanup(cls.wait_for_backup_status,
|
||||
res["backup"]['id'],
|
||||
expected_status='',
|
||||
need_delete=True)
|
||||
return res["backup"]
|
||||
|
||||
@classmethod
|
||||
def delete_backup(cls, backup_id, ignore_notfound=False):
|
||||
cls.client.delete_resource('backups', backup_id,
|
||||
ignore_notfound=ignore_notfound)
|
||||
|
||||
@classmethod
|
||||
def wait_for_backup_status(cls, id, expected_status=["COMPLETED"],
|
||||
need_delete=False):
|
||||
def _wait():
|
||||
try:
|
||||
res = cls.client.get_resource("backups", id)
|
||||
cur_status = res["backup"]["status"]
|
||||
except exceptions.NotFound:
|
||||
if need_delete or "DELETED" in expected_status:
|
||||
LOG.info('Backup %s is deleted', id)
|
||||
raise loopingcall.LoopingCallDone()
|
||||
return
|
||||
|
||||
if cur_status in expected_status:
|
||||
LOG.info('Backup %s becomes %s', id, cur_status)
|
||||
raise loopingcall.LoopingCallDone()
|
||||
elif "FAILED" not in expected_status and cur_status == "FAILED":
|
||||
# If backup status goes to FAILED but is not expected, stop
|
||||
# waiting
|
||||
message = "Backup status is FAILED."
|
||||
caller = test_utils.find_test_caller()
|
||||
if caller:
|
||||
message = '({caller}) {message}'.format(caller=caller,
|
||||
message=message)
|
||||
raise exceptions.UnexpectedResponseCode(message)
|
||||
|
||||
if type(expected_status) != list:
|
||||
expected_status = [expected_status]
|
||||
|
||||
if need_delete:
|
||||
# If resource already removed, return
|
||||
try:
|
||||
cls.client.get_resource("backups", id)
|
||||
except exceptions.NotFound:
|
||||
LOG.info('Backup %s not found', id)
|
||||
return
|
||||
|
||||
LOG.info(f"Deleting backup {id}")
|
||||
cls.delete_backup(id, ignore_notfound=True)
|
||||
|
||||
timer = loopingcall.FixedIntervalWithTimeoutLoopingCall(_wait)
|
||||
try:
|
||||
timer.start(interval=10,
|
||||
timeout=CONF.database.backup_wait_timeout).wait()
|
||||
except loopingcall.LoopingCallTimeOut:
|
||||
message = ("Backup %s is not in the expected status: %s" %
|
||||
(id, expected_status))
|
||||
caller = test_utils.find_test_caller()
|
||||
if caller:
|
||||
message = '({caller}) {message}'.format(caller=caller,
|
||||
message=message)
|
||||
raise exceptions.TimeoutException(message)
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
# Copyright 2020 Catalyst Cloud
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
DB_USER = 'test_user'
|
||||
DB_PASS = 'password'
|
||||
DB_NAME = 'test_db'
|
|
@ -23,23 +23,22 @@ LOG = logging.getLogger(__name__)
|
|||
|
||||
|
||||
def get_db_version(ip, username='test_user', password='password'):
|
||||
db_engine = utils.LocalSqlClient.init_engine(ip, username, password)
|
||||
db_client = utils.LocalSqlClient(db_engine)
|
||||
|
||||
LOG.info('Trying to access the database %s', ip)
|
||||
|
||||
with db_client:
|
||||
cmd = "SELECT @@GLOBAL.innodb_version;"
|
||||
ret = db_client.execute(cmd)
|
||||
return ret.first()[0]
|
||||
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306'
|
||||
db_engine = utils.init_engine(db_url)
|
||||
db_client = utils.SQLClient(db_engine)
|
||||
|
||||
cmd = "SELECT @@GLOBAL.innodb_version;"
|
||||
ret = db_client.execute(cmd)
|
||||
return ret.first()[0]
|
||||
|
||||
|
||||
class TestInstanceActionsBase(trove_base.BaseTroveTest):
|
||||
@decorators.idempotent_id("be6dd514-27d6-11ea-a56a-98f2b3cc23a0")
|
||||
def test_instance_upgrade(self):
|
||||
res = self.client.get_resource("instances", self.instance_id)
|
||||
datastore = res["instance"]['datastore']['type']
|
||||
version = res["instance"]['datastore']['version']
|
||||
datastore = self.instance['datastore']['type']
|
||||
version = self.instance['datastore']['version']
|
||||
new_version = version
|
||||
datastore = self.client.get_resource("datastores", datastore)
|
||||
for v in datastore['datastore']['versions']:
|
||||
|
@ -59,8 +58,7 @@ class TestInstanceActionsBase(trove_base.BaseTroveTest):
|
|||
time.sleep(3)
|
||||
self.wait_for_instance_status(self.instance_id)
|
||||
|
||||
ip = self.get_instance_ip(res["instance"])
|
||||
time.sleep(3)
|
||||
actual = get_db_version(ip)
|
||||
actual = get_db_version(self.instance_ip)
|
||||
|
||||
self.assertEqual(actual, new_version)
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
# Copyright 2020 Catalyst Cloud
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from oslo_log import log as logging
|
||||
from tempest.lib import decorators
|
||||
|
||||
from trove_tempest_plugin.tests import base as trove_base
|
||||
from trove_tempest_plugin.tests import constants
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestBackupBase(trove_base.BaseTroveTest):
|
||||
@classmethod
|
||||
def insert_data(cls, *args, **kwargs):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def insert_data_inc(cls, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def verify_data(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def verify_data_inc(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def resource_setup(cls):
|
||||
super(TestBackupBase, cls).resource_setup()
|
||||
|
||||
# Insert some data to the current db instance
|
||||
cls.insert_data(cls.instance_ip, constants.DB_USER, constants.DB_PASS,
|
||||
constants.DB_NAME)
|
||||
|
||||
# Create a backup that is shared within this test class.
|
||||
name = cls.get_resource_name("backup")
|
||||
backup = cls.create_backup(cls.instance_id, name)
|
||||
cls.wait_for_backup_status(backup['id'])
|
||||
cls.backup = cls.client.get_resource("backups", backup['id'])['backup']
|
||||
|
||||
@decorators.idempotent_id("bdff1ae0-ad6c-11ea-b87c-00224d6b7bc1")
|
||||
def test_backup_full(self):
|
||||
# Restore from backup
|
||||
LOG.info(f'Creating a new instance using the backup '
|
||||
f'{self.backup["id"]}')
|
||||
name = self.get_resource_name("restore")
|
||||
restore_instance = self.create_instance(
|
||||
name,
|
||||
datastore_version=self.backup['datastore']['version'],
|
||||
backup_id=self.backup['id']
|
||||
)
|
||||
self.wait_for_instance_status(restore_instance['id'])
|
||||
restore_instance = self.client.get_resource(
|
||||
"instances", restore_instance['id'])['instance']
|
||||
restore_instance_ip = self.get_instance_ip(restore_instance)
|
||||
|
||||
self.verify_data(restore_instance_ip, constants.DB_USER,
|
||||
constants.DB_PASS, constants.DB_NAME)
|
||||
|
||||
# Delete the new instance explicitly to avoid too many instances
|
||||
# during the test.
|
||||
self.wait_for_instance_status(restore_instance['id'],
|
||||
expected_status="DELETED",
|
||||
need_delete=True)
|
||||
|
||||
@decorators.idempotent_id("f8f985c2-ae02-11ea-b87c-00224d6b7bc1")
|
||||
def test_backup_incremental(self):
|
||||
# Insert some data
|
||||
self.insert_data_inc(self.instance_ip, constants.DB_USER,
|
||||
constants.DB_PASS, constants.DB_NAME)
|
||||
|
||||
# Create a second backup
|
||||
LOG.info(f"Creating an incremental backup based on "
|
||||
f"{self.backup['id']}")
|
||||
name = self.get_resource_name("backup-inc")
|
||||
backup_inc = self.create_backup(
|
||||
self.instance_id, name, incremental=True,
|
||||
parent_id=self.backup['id']
|
||||
)
|
||||
self.wait_for_backup_status(backup_inc['id'])
|
||||
|
||||
# Restore from backup
|
||||
LOG.info(f"Creating a new instance using the backup "
|
||||
f"{backup_inc['id']}")
|
||||
name = self.get_resource_name("restore-inc")
|
||||
restore_instance = self.create_instance(
|
||||
name,
|
||||
datastore_version=backup_inc['datastore']['version'],
|
||||
backup_id=backup_inc['id']
|
||||
)
|
||||
self.wait_for_instance_status(restore_instance['id'])
|
||||
restore_instance = self.client.get_resource(
|
||||
"instances", restore_instance['id'])['instance']
|
||||
restore_instance_ip = self.get_instance_ip(restore_instance)
|
||||
|
||||
self.verify_data_inc(restore_instance_ip, constants.DB_USER,
|
||||
constants.DB_PASS, constants.DB_NAME)
|
||||
|
||||
# Delete the new instance explicitly to avoid too many instances
|
||||
# during the test.
|
||||
self.wait_for_instance_status(restore_instance['id'],
|
||||
expected_status="DELETED",
|
||||
need_delete=True)
|
|
@ -11,30 +11,36 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
from tempest.lib import decorators
|
||||
|
||||
from trove_tempest_plugin.tests import base as trove_base
|
||||
from trove_tempest_plugin.tests import constants
|
||||
from trove_tempest_plugin.tests import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestInstanceBasicMySQLBase(trove_base.BaseTroveTest):
|
||||
def _access_db(self, ip, username='test_user', password='password'):
|
||||
db_engine = utils.LocalSqlClient.init_engine(ip, username, password)
|
||||
db_client = utils.LocalSqlClient(db_engine)
|
||||
|
||||
def _access_db(self, ip, username=constants.DB_USER,
|
||||
password=constants.DB_PASS):
|
||||
LOG.info('Trying to access the database %s', ip)
|
||||
|
||||
with db_client:
|
||||
cmd = "SELECT 1;"
|
||||
db_client.execute(cmd)
|
||||
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306'
|
||||
db_engine = utils.init_engine(db_url)
|
||||
db_client = utils.SQLClient(db_engine)
|
||||
|
||||
cmd = "SELECT 1;"
|
||||
db_client.execute(cmd)
|
||||
|
||||
@decorators.idempotent_id("40cf38ce-cfbf-11e9-8760-1458d058cfb2")
|
||||
def test_database_access(self):
|
||||
v4_ip = self.get_instance_ip()
|
||||
time.sleep(5)
|
||||
self._access_db(v4_ip)
|
||||
databases = self.get_databases(self.instance_id)
|
||||
db_names = [db['name'] for db in databases]
|
||||
self.assertIn(constants.DB_NAME, db_names)
|
||||
|
||||
users = self.get_users(self.instance_id)
|
||||
user_names = [user['name'] for user in users]
|
||||
self.assertIn(constants.DB_USER, user_names)
|
||||
|
||||
self._access_db(self.instance_ip)
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
# Copyright 2020 Catalyst Cloud
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove_tempest_plugin.tests.scenario import base_backup
|
||||
from trove_tempest_plugin.tests import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TestBackupMySQL(base_backup.TestBackupBase):
|
||||
datastore = 'mysql'
|
||||
|
||||
@classmethod
|
||||
def insert_data(cls, ip, username, password, database, **kwargs):
|
||||
LOG.info(f"Inserting data to database {database} on {ip}")
|
||||
|
||||
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
|
||||
db_engine = utils.init_engine(db_url)
|
||||
db_client = utils.SQLClient(db_engine)
|
||||
|
||||
cmds = [
|
||||
"CREATE TABLE Persons (PersonID int, LastName varchar(255), "
|
||||
"FirstName varchar(255), Address varchar(255), City "
|
||||
"varchar(255));",
|
||||
|
||||
"insert into Persons VALUES (1, 'Kong', 'Lingxian', '150 Willis "
|
||||
"Street', 'Wellington');"
|
||||
]
|
||||
db_client.execute(cmds)
|
||||
|
||||
@classmethod
|
||||
def insert_data_inc(cls, ip, username, password, database, **kwargs):
|
||||
LOG.info(f"Inserting data to database {database} on {ip} for "
|
||||
f"incremental backup")
|
||||
|
||||
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
|
||||
db_engine = utils.init_engine(db_url)
|
||||
db_client = utils.SQLClient(db_engine)
|
||||
|
||||
cmds = [
|
||||
"insert into Persons VALUES (99, 'OpenStack', 'Trove', "
|
||||
"'150 Willis Street', 'Wellington');"
|
||||
]
|
||||
db_client.execute(cmds)
|
||||
|
||||
def verify_data(self, ip, username, password, database):
|
||||
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
|
||||
db_engine = utils.init_engine(db_url)
|
||||
db_client = utils.SQLClient(db_engine)
|
||||
|
||||
cmd = "select * from Persons;"
|
||||
ret = db_client.execute(cmd)
|
||||
keys = ret.keys()
|
||||
rows = ret.fetchall()
|
||||
self.assertEqual(1, len(rows))
|
||||
|
||||
result = dict(zip(keys, rows[0]))
|
||||
expected = {'PersonID': 1, 'LastName': 'Kong', 'FirstName': 'Lingxian',
|
||||
'Address': '150 Willis Street', 'City': 'Wellington'}
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def verify_data_inc(self, ip, username, password, database):
|
||||
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
|
||||
db_engine = utils.init_engine(db_url)
|
||||
db_client = utils.SQLClient(db_engine)
|
||||
|
||||
cmd = "select * from Persons;"
|
||||
ret = db_client.execute(cmd)
|
||||
keys = ret.keys()
|
||||
rows = ret.fetchall()
|
||||
self.assertEqual(2, len(rows))
|
||||
|
||||
actual = []
|
||||
for index in range(2):
|
||||
actual.append(dict(zip(keys, rows[index])))
|
||||
|
||||
expected = [
|
||||
{
|
||||
'PersonID': 1, 'LastName': 'Kong', 'FirstName': 'Lingxian',
|
||||
'Address': '150 Willis Street', 'City': 'Wellington'
|
||||
},
|
||||
{
|
||||
'PersonID': 99, 'LastName': 'OpenStack', 'FirstName': 'Trove',
|
||||
'Address': '150 Willis Street', 'City': 'Wellington'
|
||||
},
|
||||
]
|
||||
self.assertEqual(expected, actual)
|
|
@ -51,39 +51,27 @@ def wait_for_removal(delete_func, show_func, *args, **kwargs):
|
|||
time.sleep(3)
|
||||
|
||||
|
||||
class LocalSqlClient(object):
|
||||
"""A sqlalchemy wrapper to manage transactions."""
|
||||
def init_engine(db_url):
|
||||
return sqlalchemy.create_engine(db_url)
|
||||
|
||||
|
||||
class SQLClient(object):
|
||||
def __init__(self, engine):
|
||||
self.engine = engine
|
||||
|
||||
def __enter__(self):
|
||||
self.conn = self.engine.connect()
|
||||
self.trans = self.conn.begin()
|
||||
return self.conn
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if self.trans:
|
||||
if type is not None:
|
||||
self.trans.rollback()
|
||||
else:
|
||||
self.trans.commit()
|
||||
self.conn.close()
|
||||
|
||||
def execute(self, t, **kwargs):
|
||||
def execute(self, cmds, **kwargs):
|
||||
try:
|
||||
return self.conn.execute(t, kwargs)
|
||||
with self.engine.begin() as conn:
|
||||
if isinstance(cmds, str):
|
||||
result = conn.execute(cmds)
|
||||
# Returns a ResultProxy
|
||||
# https://docs.sqlalchemy.org/en/13/core/connections.html#sqlalchemy.engine.ResultProxy
|
||||
return result
|
||||
|
||||
for cmd in cmds:
|
||||
conn.execute(cmd)
|
||||
except Exception as e:
|
||||
self.trans.rollback()
|
||||
self.trans = None
|
||||
raise exceptions.TempestException(
|
||||
'Failed to execute database command %s, error: %s' %
|
||||
(t, str(e))
|
||||
(cmds, str(e))
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def init_engine(host, user, password):
|
||||
return sqlalchemy.create_engine(
|
||||
"mysql+pymysql://%s:%s@%s:3306" % (user, password, host),
|
||||
pool_recycle=1800
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue