Support PostgreSQL

Change-Id: I7b2870fb93025d9de3dad18d14fa27d6da53c6f0
This commit is contained in:
Lingxian Kong 2020-09-11 22:30:33 +12:00
parent 4624bc56c8
commit fa57416207
12 changed files with 813 additions and 354 deletions

View File

@ -13,4 +13,5 @@ six>=1.10.0 # MIT
tempest>=17.1.0 # Apache-2.0
tenacity>=5.1.1 # Apache-2.0
SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
PyMySQL>=0.7.6 # MIT License
PyMySQL>=0.7.6 # MIT License
psycopg2-binary>=2.6.2 # LGPL/ZPL

View File

@ -42,6 +42,17 @@ DatabaseGroup = [
'enabled_datastores',
default=['mysql']
),
cfg.DictOpt(
'default_datastore_versions',
default={'mysql': '5.7.29'},
help='The default datastore versions used to create instance',
),
cfg.DictOpt(
'pre_upgrade_datastore_versions',
default={},
help='The datastore versions used to create instances that need to be '
'upgrade.',
),
cfg.IntOpt('database_build_timeout',
default=1800,
help='Timeout in seconds to wait for a database instance to '
@ -84,17 +95,6 @@ DatabaseGroup = [
default="lvmdriver-1",
help="The Cinder volume type used for creating database instance."
),
cfg.DictOpt(
'default_datastore_versions',
default={'mysql': '5.7.29'},
help='The default datastore versions used to create instance',
),
cfg.DictOpt(
'pre_upgrade_datastore_versions',
default={},
help='The datastore versions used to create instances that need to be '
'upgrade.',
),
cfg.BoolOpt(
'remove_swift_account',
default=True,

View File

@ -37,6 +37,9 @@ class BaseTroveTest(test.BaseTestCase):
instance = None
instance_id = None
instance_ip = None
password = ""
create_user = True
enable_root = False
@classmethod
def get_resource_name(cls, resource_type):
@ -193,13 +196,16 @@ class BaseTroveTest(test.BaseTestCase):
# network ID.
cls._create_network()
instance = cls.create_instance()
instance = cls.create_instance(create_user=cls.create_user)
cls.instance_id = instance['id']
cls.wait_for_instance_status(cls.instance_id)
cls.instance = cls.client.get_resource(
"instances", cls.instance_id)['instance']
cls.instance_ip = cls.get_instance_ip(cls.instance)
if cls.enable_root:
cls.password = cls.get_root_pass(cls.instance_id)
def assert_single_item(self, items, **props):
return self.assert_multiple_items(items, 1, **props)[0]
@ -244,7 +250,7 @@ class BaseTroveTest(test.BaseTestCase):
def create_instance(cls, name=None, datastore_version=None,
database=constants.DB_NAME, username=constants.DB_USER,
password=constants.DB_PASS, backup_id=None,
replica_of=None):
replica_of=None, create_user=True):
"""Create database instance.
Creating database instance is time-consuming, so we define this method
@ -298,20 +304,23 @@ class BaseTroveTest(test.BaseTestCase):
"type": CONF.database.volume_type
},
"nics": [{"net-id": cls.private_network}],
"databases": [{"name": database}],
"users": [
{
"name": username,
"password": password,
"databases": [{"name": database}]
}
],
"access": {"is_public": True}
}
}
if backup_id:
body['instance'].update(
{'restorePoint': {'backupRef': backup_id}})
if create_user:
body['instance'].update({
'databases': [{"name": database}],
"users": [
{
"name": username,
"password": password,
"databases": [{"name": database}]
}
]
})
res = cls.client.create_resource("instances", body)
cls.addClassResourceCleanup(cls.wait_for_instance_status,
@ -321,6 +330,16 @@ class BaseTroveTest(test.BaseTestCase):
return res["instance"]
@classmethod
def restart_instance(cls, instance_id):
"""Restart database service and wait until it's healthy."""
cls.client.create_resource(
f"instances/{instance_id}/action",
{"restart": {}},
expected_status_code=202,
need_response=False)
cls.wait_for_instance_status(instance_id)
@classmethod
def wait_for_instance_status(cls, id,
expected_status=["HEALTHY", "ACTIVE"],
@ -403,7 +422,7 @@ class BaseTroveTest(test.BaseTestCase):
return v4_ip
def get_databases(self, instance_id):
def get_databases(self, instance_id, **kwargs):
url = f'instances/{instance_id}/databases'
ret = self.client.list_resources(url)
return ret['databases']
@ -493,3 +512,8 @@ class BaseTroveTest(test.BaseTestCase):
message = '({caller}) {message}'.format(caller=caller,
message=message)
raise exceptions.TimeoutException(message)
@classmethod
def get_root_pass(cls, instance_id):
resp = cls.client.create_resource(f"instances/{instance_id}/root", {})
return resp['user']['password']

View File

@ -13,28 +13,13 @@
# limitations under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
import testtools
from trove_tempest_plugin.tests import base as trove_base
from trove_tempest_plugin.tests import constants
from trove_tempest_plugin.tests import utils
LOG = logging.getLogger(__name__)
CONF = config.CONF
def get_db_version(ip, username=constants.DB_USER, password=constants.DB_PASS):
LOG.info('Trying to access the database %s', ip)
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306'
db_client = utils.SQLClient(db_url)
cmd = "SELECT @@GLOBAL.innodb_version;"
ret = db_client.execute(cmd)
return ret.first()[0]
class TestInstanceActionsBase(trove_base.BaseTroveTest):
@classmethod
def init_db(cls, *args, **kwargs):
@ -52,18 +37,18 @@ class TestInstanceActionsBase(trove_base.BaseTroveTest):
def verify_data_after_rebuild(self, *args, **kwargs):
pass
def get_db_version(self):
pass
@classmethod
def resource_setup(cls):
super(TestInstanceActionsBase, cls).resource_setup()
# Initialize database
cls.init_db(cls.instance_ip, constants.DB_USER, constants.DB_PASS,
constants.DB_NAME)
LOG.info(f"Initializing data on {cls.instance_ip}")
cls.init_db(cls.instance_ip)
@decorators.idempotent_id("be6dd514-27d6-11ea-a56a-98f2b3cc23a0")
@testtools.skipUnless(CONF.database.pre_upgrade_datastore_versions,
'Datastore upgrade is disabled.')
def test_instance_upgrade(self):
def instance_upgrade_test(self):
cur_version = self.instance['datastore']['version']
cfg_versions = CONF.database.pre_upgrade_datastore_versions
ds_version = cfg_versions.get(self.datastore)
@ -77,17 +62,18 @@ class TestInstanceActionsBase(trove_base.BaseTroveTest):
LOG.info(f'Creating instance {name} with datastore version '
f'{ds_version} for upgrade')
instance = self.create_instance(name=name,
datastore_version=ds_version)
datastore_version=ds_version,
create_user=self.create_user)
self.wait_for_instance_status(instance['id'])
instance = self.client.get_resource(
"instances", instance['id'])['instance']
instance_ip = self.get_instance_ip(instance)
# Insert data before upgrading
self.init_db(instance_ip, constants.DB_USER, constants.DB_PASS,
constants.DB_NAME)
self.insert_data_upgrade(instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Initializing data on {instance_ip} before upgrade")
self.init_db(instance_ip)
LOG.info(f"Inserting data on {instance_ip} before upgrade")
self.insert_data_upgrade(instance_ip)
new_version = cur_version
LOG.info(f"Upgrading instance {instance['id']} using datastore "
@ -95,11 +81,13 @@ class TestInstanceActionsBase(trove_base.BaseTroveTest):
body = {"instance": {"datastore_version": new_version}}
self.client.patch_resource('instances', instance['id'], body)
self.wait_for_instance_status(instance['id'])
actual = get_db_version(instance_ip)
LOG.info(f"Getting database version on {instance_ip}")
actual = self.get_db_version(instance_ip)
self.assertEqual(new_version, actual)
self.verify_data_upgrade(instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Verifying data on {instance_ip} after upgrade")
self.verify_data_upgrade(instance_ip)
# Delete the new instance explicitly to avoid too many instances
# during the test.
@ -107,8 +95,7 @@ class TestInstanceActionsBase(trove_base.BaseTroveTest):
expected_status="DELETED",
need_delete=True)
@decorators.idempotent_id("27914e82-b061-11ea-b87c-00224d6b7bc1")
def test_resize(self):
def resize_test(self):
# Resize flavor
LOG.info(f"Resizing flavor to {CONF.database.resize_flavor_id} for "
f"instance {self.instance_id}")
@ -156,12 +143,9 @@ class TestInstanceActionsBase(trove_base.BaseTroveTest):
ret = self.client.get_resource('instances', self.instance_id)
self.assertEqual(2, ret['instance']['volume']['size'])
@decorators.idempotent_id("8d4d675c-d829-11ea-b87c-00224d6b7bc1")
@testtools.skipUnless(CONF.database.rebuild_image_id,
'Image for rebuild not configured.')
def test_rebuild(self):
self.insert_data_before_rebuild(self.instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
def rebuild_test(self):
LOG.info(f"Inserting data on {self.instance_ip} before rebuilding")
self.insert_data_before_rebuild(self.instance_ip)
LOG.info(f"Rebuilding instance {self.instance_id} with image "
f"{CONF.database.rebuild_image_id}")
@ -176,5 +160,5 @@ class TestInstanceActionsBase(trove_base.BaseTroveTest):
need_response=False)
self.wait_for_instance_status(self.instance_id)
self.verify_data_after_rebuild(self.instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Verifying data on {self.instance_ip} after rebuilding")
self.verify_data_after_rebuild(self.instance_ip)

View File

@ -13,10 +13,8 @@
# limitations under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
from trove_tempest_plugin.tests import base as trove_base
from trove_tempest_plugin.tests import constants
LOG = logging.getLogger(__name__)
CONF = config.CONF
@ -48,17 +46,18 @@ class TestBackupBase(trove_base.BaseTroveTest):
cls.addClassResourceCleanup(cls.delete_swift_account)
# Insert some data to the current db instance
cls.insert_data(cls.instance_ip, constants.DB_USER, constants.DB_PASS,
constants.DB_NAME)
LOG.info(f"Inserting data on {cls.instance_ip} before creating full"
f"backup")
cls.insert_data(cls.instance_ip)
# Create a backup that is shared within this test class.
LOG.info(f"Creating full backup for instance {cls.instance_id}")
name = cls.get_resource_name("backup")
backup = cls.create_backup(cls.instance_id, name)
cls.wait_for_backup_status(backup['id'])
cls.backup = cls.client.get_resource("backups", backup['id'])['backup']
@decorators.idempotent_id("bdff1ae0-ad6c-11ea-b87c-00224d6b7bc1")
def test_backup_full(self):
def backup_full_test(self):
# Restore from backup
LOG.info(f'Creating a new instance using the backup '
f'{self.backup["id"]}')
@ -66,18 +65,22 @@ class TestBackupBase(trove_base.BaseTroveTest):
restore_instance = self.create_instance(
name,
datastore_version=self.backup['datastore']['version'],
backup_id=self.backup['id']
backup_id=self.backup['id'],
create_user=self.create_user
)
self.wait_for_instance_status(
restore_instance['id'],
timeout=CONF.database.database_restore_timeout)
if self.enable_root:
self.root_password = self.get_root_pass(restore_instance['id'])
restore_instance = self.client.get_resource(
"instances", restore_instance['id'])['instance']
restore_instance_ip = self.get_instance_ip(restore_instance)
self.verify_data(restore_instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Verifying data on restored instance {restore_instance_ip}")
self.verify_data(restore_instance_ip)
# Delete the new instance explicitly to avoid too many instances
# during the test.
@ -85,11 +88,11 @@ class TestBackupBase(trove_base.BaseTroveTest):
expected_status="DELETED",
need_delete=True)
@decorators.idempotent_id("f8f985c2-ae02-11ea-b87c-00224d6b7bc1")
def test_backup_incremental(self):
def backup_incremental_test(self):
# Insert some data
self.insert_data_inc(self.instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Inserting data on {self.instance_ip} before creating "
f"incremental backup")
self.insert_data_inc(self.instance_ip)
# Create a second backup
LOG.info(f"Creating an incremental backup based on "
@ -108,18 +111,24 @@ class TestBackupBase(trove_base.BaseTroveTest):
restore_instance = self.create_instance(
name,
datastore_version=backup_inc['datastore']['version'],
backup_id=backup_inc['id']
backup_id=backup_inc['id'],
create_user=self.create_user
)
self.wait_for_instance_status(
restore_instance['id'],
timeout=CONF.database.database_restore_timeout)
if self.enable_root:
self.root_password = self.get_root_pass(restore_instance['id'])
restore_instance = self.client.get_resource(
"instances", restore_instance['id'])['instance']
restore_instance_ip = self.get_instance_ip(restore_instance)
self.verify_data_inc(restore_instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Verifying data on {restore_instance_ip}"
f"({restore_instance['id']}) after restoring incremental "
f"backup")
self.verify_data_inc(restore_instance_ip)
# Delete the new instance explicitly to avoid too many instances
# during the test.

View File

@ -24,28 +24,129 @@ CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestInstanceBasicMySQLBase(trove_base.BaseTroveTest):
class TestInstanceBasicBase(trove_base.BaseTroveTest):
def get_config_value(self, ip, option, **kwargs):
pass
def configuration_test(self, create_values, update_values,
need_restart=False):
"""Test configuration.
The create_values and update_values are both dict with one key, the
value should be in type int.
"""
# Create new configuration
config_name = 'test_config'
key = list(create_values.keys())[0]
value = list(create_values.values())[0]
create_config = {
"configuration": {
"datastore": {
"type": self.datastore,
"version": self.instance['datastore']['version']
},
"values": create_values,
"name": config_name
}
}
LOG.info(f"Creating new configuration {config_name}")
config = self.client.create_resource('configurations', create_config)
config_id = config['configuration']['id']
self.addCleanup(self.client.delete_resource, 'configurations',
config_id, ignore_notfound=True)
self.assertEqual(0, config['configuration']['instance_count'])
ret = self.client.list_resources(
f"configurations/{config_id}/instances")
self.assertEqual(0, len(ret['instances']))
# Attach the configuration to the existing instance
attach_config = {
"instance": {
"configuration": config_id
}
}
LOG.info(f"Attaching config {config_id} to instance "
f"{self.instance_id}")
self.client.put_resource(f'instances/{self.instance_id}',
attach_config)
if need_restart:
LOG.info(f"Restarting instance {self.instance_id}")
self.restart_instance(self.instance_id)
ret = self.client.list_resources(
f"configurations/{config_id}/instances")
self.assertEqual(1, len(ret['instances']))
self.assertEqual(self.instance_id, ret['instances'][0]['id'])
# Get new config option value
LOG.info(f"Getting config value for {key} on {self.instance_ip}")
cur_value = self.get_config_value(self.instance_ip, key)
self.assertEqual(value, cur_value)
# Update configuration
new_key = list(update_values.keys())[0]
new_value = list(update_values.values())[0]
patch_config = {
"configuration": {
"values": update_values
}
}
LOG.info(f"Updating config {config_id}")
self.client.patch_resource('configurations', config_id, patch_config,
expected_status_code=200)
if need_restart:
LOG.info(f"Restarting instance {self.instance_id}")
self.restart_instance(self.instance_id)
LOG.info(f"Getting config value for {new_key} on {self.instance_ip}")
cur_value = self.get_config_value(self.instance_ip, new_key)
self.assertEqual(new_value, cur_value)
# Detach the configuration from the instance
LOG.info(f"Detaching from instance {self.instance_id}")
detach_config = {
"instance": {
"configuration": None
}
}
self.client.put_resource(f'instances/{self.instance_id}',
detach_config)
if need_restart:
LOG.info(f"Restarting instance {self.instance_id}")
self.restart_instance(self.instance_id)
ret = self.client.list_resources(
f"configurations/{config_id}/instances")
self.assertEqual(0, len(ret['instances']))
# Get new config option value
LOG.info(f"Getting config value for {new_key} on {self.instance_ip}")
cur_value = self.get_config_value(self.instance_ip, new_key)
self.assertNotEqual(value, cur_value)
self.assertNotEqual(new_value, cur_value)
class TestInstanceBasicMySQLBase(TestInstanceBasicBase):
def _access_db(self, ip, username=constants.DB_USER,
password=constants.DB_PASS, database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
LOG.info(f'Trying to access the database {db_url}')
db_client = utils.SQLClient(db_url)
cmd = "SELECT 1;"
db_client.execute(cmd)
with utils.SQLClient(db_url) as db_client:
cmd = "SELECT 1;"
db_client.mysql_execute(cmd)
def get_config_value(self, ip, option, username=constants.DB_USER,
password=constants.DB_PASS):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306'
LOG.info(f'Trying to get option value for {option} from database '
f'{db_url}')
db_client = utils.SQLClient(db_url)
cmd = f"show variables where Variable_name in ('{option}');"
ret = db_client.execute(cmd)
rows = ret.fetchall()
with utils.SQLClient(db_url) as db_client:
cmd = f"show variables where Variable_name in ('{option}');"
ret = db_client.mysql_execute(cmd)
rows = ret.fetchall()
self.assertEqual(1, len(rows))
return rows[0][1]
return int(rows[0][1])
@decorators.idempotent_id("40cf38ce-cfbf-11e9-8760-1458d058cfb2")
def test_database_access(self):
@ -57,6 +158,7 @@ class TestInstanceBasicMySQLBase(trove_base.BaseTroveTest):
user_names = [user['name'] for user in users]
self.assertIn(constants.DB_USER, user_names)
LOG.info(f"Accessing database on {self.instance_ip}")
self._access_db(self.instance_ip)
@decorators.idempotent_id("c5a9dcda-af5b-11ea-b87c-00224d6b7bc1")
@ -124,6 +226,8 @@ class TestInstanceBasicMySQLBase(trove_base.BaseTroveTest):
self.assertIn(user2, cur_user_names)
# user1 should have access to db1
LOG.info(f"Accessing database on {self.instance_ip}, user: {user1}, "
f"db: {db1}")
self._access_db(self.instance_ip, user1, constants.DB_PASS, db1)
# user2 should not have access to db2
self.assertRaises(exceptions.TempestException, self._access_db,
@ -145,6 +249,8 @@ class TestInstanceBasicMySQLBase(trove_base.BaseTroveTest):
user2_dbs = [db['name'] for db in user2_dbs['databases']]
self.assertIn(db2, user2_dbs)
# Now user2 should have access to db2
LOG.info(f"Accessing database on {self.instance_ip}, user: {user2}, "
f"db: {db2}")
self._access_db(self.instance_ip, user2, constants.DB_PASS, db2)
LOG.info(f"Revoking user {user2} access to database {db2}")
@ -172,82 +278,6 @@ class TestInstanceBasicMySQLBase(trove_base.BaseTroveTest):
@decorators.idempotent_id("ce8277b0-af7c-11ea-b87c-00224d6b7bc1")
def test_configuration(self):
# Create new configuration
config_name = 'test_config'
new_value = 555
create_config = {
"configuration": {
"datastore": {
"type": self.datastore,
"version": self.instance['datastore']['version']
},
"values": {
"max_connections": new_value
},
"name": config_name
}
}
LOG.info(f"Creating new configuration {config_name}")
config = self.client.create_resource('configurations', create_config)
config_id = config['configuration']['id']
self.addCleanup(self.client.delete_resource, 'configurations',
config_id, ignore_notfound=True)
self.assertEqual(0, config['configuration']['instance_count'])
ret = self.client.list_resources(
f"configurations/{config_id}/instances")
self.assertEqual(0, len(ret['instances']))
# Attach the configuration to the existing instance
attach_config = {
"instance": {
"configuration": config_id
}
}
LOG.info(f"Attaching config {config_id} to instance "
f"{self.instance_id}")
self.client.put_resource(f'instances/{self.instance_id}',
attach_config)
ret = self.client.list_resources(
f"configurations/{config_id}/instances")
self.assertEqual(1, len(ret['instances']))
self.assertEqual(self.instance_id, ret['instances'][0]['id'])
# Get new config option value
cur_value = self.get_config_value(self.instance_ip, 'max_connections')
self.assertEqual(new_value, int(cur_value))
# Update configuration
updated_value = 666
patch_config = {
"configuration": {
"values": {
"max_connections": updated_value
}
}
}
LOG.info(f"Updating config {config_id}")
self.client.patch_resource('configurations', config_id, patch_config,
expected_status_code=200)
cur_value = self.get_config_value(self.instance_ip, 'max_connections')
self.assertEqual(updated_value, int(cur_value))
# Detach the configuration from the instance
detach_config = {
"instance": {
"configuration": ""
}
}
self.client.put_resource(f'instances/{self.instance_id}',
detach_config)
ret = self.client.list_resources(
f"configurations/{config_id}/instances")
self.assertEqual(0, len(ret['instances']))
# Get new config option value
cur_value = self.get_config_value(self.instance_ip, 'max_connections')
self.assertNotEqual(new_value, int(cur_value))
self.assertNotEqual(updated_value, int(cur_value))
create_values = {"max_connections": 555}
update_values = {"max_connections": 666}
self.configuration_test(create_values, update_values)

View File

@ -15,80 +15,40 @@ import time
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
from trove_tempest_plugin.tests import base as trove_base
from trove_tempest_plugin.tests import constants
from trove_tempest_plugin.tests import utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestReplicationBase(trove_base.BaseTroveTest):
def insert_data_replication(self, ip, username, password, database):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
LOG.info(f"Inserting data for replication, db_url: {db_url}")
db_client = utils.SQLClient(db_url)
def insert_data_replication(self, *args, **kwargs):
pass
cmds = [
"CREATE TABLE Persons (ID int, String varchar(255));",
"insert into Persons VALUES (1, 'replication');"
]
db_client.execute(cmds)
def verify_data_replication(self, *args, **kwargs):
pass
def verify_data_replication(self, ip, username, password, database):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
LOG.info(f"Verifying data for replication, db_url: {db_url}")
db_client = utils.SQLClient(db_url)
cmd = "select * from Persons;"
ret = db_client.execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(1, len(rows))
def insert_data_after_promote(self, *args, **kwargs):
pass
result = []
for index in range(len(rows)):
result.append(dict(zip(keys, rows[index])))
expected = {'ID': 1, 'String': 'replication'}
self.assert_single_item(result, **expected)
def verify_data_after_promote(self, *args, **kwargs):
pass
def insert_data_after_promote(self, ip, username, password, database):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
LOG.info(f"Inserting data after promotion, db_url: {db_url}")
db_client = utils.SQLClient(db_url)
def create_database(self, name, **kwargs):
pass
cmds = [
"insert into Persons VALUES (2, 'promote');"
]
db_client.execute(cmds)
def verify_data_after_promote(self, ip, username, password, database):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
LOG.info(f"Verifying data after promotion, db_url: {db_url}")
db_client = utils.SQLClient(db_url)
cmd = "select * from Persons;"
ret = db_client.execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertGreater(len(rows), 1)
result = []
for index in range(len(rows)):
result.append(dict(zip(keys, rows[index])))
expected = {'ID': 2, 'String': 'promote'}
self.assert_single_item(result, **expected)
@decorators.idempotent_id("280d09c6-b027-11ea-b87c-00224d6b7bc1")
def test_replication(self):
def replication_test(self):
# Insert data for primary
self.insert_data_replication(self.instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Inserting data before creating replicas on "
f"{self.instance_ip}")
self.insert_data_replication(self.instance_ip)
# Create replica1
LOG.info(f"Creating replica1 for instance {self.instance_id}")
name = self.get_resource_name("replica-01")
replica1 = self.create_instance(name, replica_of=self.instance_id)
replica1 = self.create_instance(name, replica_of=self.instance_id,
create_user=self.create_user)
replica1_id = replica1['id']
self.addCleanup(self.wait_for_instance_status, replica1_id,
need_delete=True, expected_status='DELETED')
@ -114,28 +74,35 @@ class TestReplicationBase(trove_base.BaseTroveTest):
# Verify databases created in replica
time.sleep(5)
primary_dbs = self.get_databases(self.instance_id)
replica_dbs = self.get_databases(replica1_id)
LOG.info(f"Getting databases on primary {self.instance_ip}"
f"({self.instance_id}) and replica {replica1_ip}"
f"({replica1_id})")
primary_dbs = self.get_databases(self.instance_id, ip=self.instance_ip)
replica_dbs = self.get_databases(replica1_id, ip=replica1_ip)
self.assertEqual(len(primary_dbs), len(replica_dbs))
# Create a new database in primary and verify in replica
LOG.info(f"Creating database in instance {self.instance_id}")
create_db = {"databases": [{"name": 'db_for_replication'}]}
self.client.create_resource(f"instances/{self.instance_id}/databases",
create_db, expected_status_code=202,
need_response=False)
db_name = 'db_for_replication'
self.create_database(db_name, ip=self.instance_ip)
time.sleep(5)
new_primary_dbs = self.get_databases(self.instance_id)
new_replica1_dbs = self.get_databases(replica1_id)
LOG.info(f"Getting databases on primary {self.instance_ip}"
f"({self.instance_id}) and replica {replica1_ip}"
f"({replica1_id})")
new_primary_dbs = self.get_databases(self.instance_id,
ip=self.instance_ip)
new_replica1_dbs = self.get_databases(replica1_id, ip=replica1_ip)
self.assertEqual(len(new_primary_dbs), len(new_replica1_dbs))
self.assertGreater(len(new_replica1_dbs), len(replica_dbs))
new_db_names = [db['name'] for db in new_replica1_dbs]
self.assertIn('db_for_replication', new_db_names)
self.assertIn(db_name, new_db_names)
# Create replica2
LOG.info(f"Creating replica2 for instance {self.instance_id}")
name = self.get_resource_name("replica-02")
replica2 = self.create_instance(name, replica_of=self.instance_id)
replica2 = self.create_instance(name, replica_of=self.instance_id,
create_user=self.create_user)
replica2_id = replica2['id']
self.addCleanup(self.wait_for_instance_status, replica2_id,
need_delete=True, expected_status='DELETED')
@ -157,15 +124,15 @@ class TestReplicationBase(trove_base.BaseTroveTest):
# Verify databases synced to replica2
time.sleep(5)
replica2_dbs = self.get_databases(replica2_id)
LOG.info(f"Getting databases on replica {replica2_ip}({replica2_id})")
replica2_dbs = self.get_databases(replica2_id, ip=replica2_ip)
replica2_db_names = [db['name'] for db in replica2_dbs]
self.assertIn('db_for_replication', replica2_db_names)
self.assertIn(db_name, replica2_db_names)
# Verify data synchronization on replica1 and replica2
self.verify_data_replication(replica1_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
self.verify_data_replication(replica2_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Verifying data on replicas {replica1_ip} and {replica2_ip}")
self.verify_data_replication(replica1_ip)
self.verify_data_replication(replica2_ip)
# Volume resize to primary
LOG.info(f"Resizing volume for primary {self.instance_id} to 2G")
@ -218,13 +185,13 @@ class TestReplicationBase(trove_base.BaseTroveTest):
self.assertEqual(replica1_id, ret['instance']['replica_of']['id'])
# Insert data to new primary and verify in replicas
self.insert_data_after_promote(replica1_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Inserting data on new primary {replica1_ip}")
self.insert_data_after_promote(replica1_ip)
time.sleep(5)
self.verify_data_after_promote(self.instance_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
self.verify_data_after_promote(replica2_ip, constants.DB_USER,
constants.DB_PASS, constants.DB_NAME)
LOG.info(f"Verifying data on new replicas {self.instance_ip} and "
f"{replica2_ip}")
self.verify_data_after_promote(self.instance_ip)
self.verify_data_after_promote(replica2_ip)
# Detach original primary from the replication cluster
LOG.info(f"Detaching replica {self.instance_id} from the replication "
@ -234,8 +201,8 @@ class TestReplicationBase(trove_base.BaseTroveTest):
"replica_of": ""
}
}
self.client.patch_resource('instances', self.instance_id,
detach_replica)
self.client.put_resource(f'/instances/{self.instance_id}',
detach_replica)
self.wait_for_instance_status(self.instance_id)
# Verify original primary

View File

@ -11,65 +11,62 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from tempest.lib import decorators
from trove_tempest_plugin.tests import constants
from trove_tempest_plugin.tests.scenario import base_backup
from trove_tempest_plugin.tests import utils
LOG = logging.getLogger(__name__)
class TestBackupMySQL(base_backup.TestBackupBase):
datastore = 'mysql'
@classmethod
def insert_data(cls, ip, username, password, database, **kwargs):
LOG.info(f"Inserting data to database {database} on {ip}")
def insert_data(cls, ip, username=constants.DB_USER,
password=constants.DB_PASS, database=constants.DB_NAME,
**kwargs):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
cmds = [
"CREATE TABLE Persons (ID int, String varchar(255));",
"insert into Persons VALUES (1, 'Lingxian Kong');",
]
db_client.execute(cmds)
with utils.SQLClient(db_url) as db_client:
cmds = [
"CREATE TABLE Persons (ID int, String varchar(255));",
"insert into Persons VALUES (1, 'Lingxian Kong');",
]
db_client.mysql_execute(cmds)
@classmethod
def insert_data_inc(cls, ip, username, password, database, **kwargs):
LOG.info(f"Inserting data to database {database} on {ip} for "
f"incremental backup")
def insert_data_inc(cls, ip, username=constants.DB_USER,
password=constants.DB_PASS, database=constants.DB_NAME,
**kwargs):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
with utils.SQLClient(db_url) as db_client:
cmds = [
"insert into Persons VALUES (99, 'OpenStack');"
]
db_client.mysql_execute(cmds)
cmds = [
"insert into Persons VALUES (99, 'OpenStack');"
]
db_client.execute(cmds)
def verify_data(self, ip, username, password, database):
def verify_data(self, ip, username=constants.DB_USER,
password=constants.DB_PASS, database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
cmd = "select * from Persons;"
ret = db_client.execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Persons;"
ret = db_client.mysql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(1, len(rows))
result = dict(zip(keys, rows[0]))
expected = {'ID': 1, 'String': 'Lingxian Kong'}
self.assertEqual(expected, result)
def verify_data_inc(self, ip, username, password, database):
def verify_data_inc(self, ip, username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
cmd = "select * from Persons;"
ret = db_client.execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Persons;"
ret = db_client.mysql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(2, len(rows))
actual = []
@ -81,3 +78,90 @@ class TestBackupMySQL(base_backup.TestBackupBase):
{'ID': 99, 'String': 'OpenStack'},
]
self.assertEqual(expected, actual)
@decorators.idempotent_id("b90626ae-f412-11ea-a950-00224d6b7bc1")
def test_backup_full(self):
self.backup_full_test()
@decorators.idempotent_id("f8f985c2-ae02-11ea-b87c-00224d6b7bc1")
def test_backup_incremental(self):
self.backup_incremental_test()
class TestBackupPostgreSQL(base_backup.TestBackupBase):
datastore = 'postgresql'
create_user = False
enable_root = True
root_password = ""
@classmethod
def insert_data(cls, ip):
db_url = (f'postgresql+psycopg2://root:{cls.password}@'
f'{ip}:5432/postgres')
with utils.SQLClient(db_url) as db_client:
cmd = "CREATE DATABASE testdb;"
db_client.pgsql_execute(cmd)
db_url = (f'postgresql+psycopg2://root:{cls.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmds = [
"CREATE TABLE persons (id INT PRIMARY KEY NOT NULL, "
"string VARCHAR(255));",
"INSERT INTO persons (id,string) VALUES (1, 'Lingxian Kong');",
]
db_client.pgsql_execute(cmds)
@classmethod
def insert_data_inc(cls, ip):
db_url = (f'postgresql+psycopg2://root:{cls.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmds = [
"INSERT INTO persons (id,string) VALUES (99, 'OpenStack');"
]
db_client.pgsql_execute(cmds)
def verify_data(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.root_password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmd = "select * from persons;"
ret = db_client.pgsql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(1, len(rows))
result = dict(zip(keys, rows[0]))
expected = {'id': 1, 'string': 'Lingxian Kong'}
self.assertEqual(expected, result)
def verify_data_inc(self, ip, username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = (f'postgresql+psycopg2://root:{self.root_password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmd = "select * from persons;"
ret = db_client.pgsql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(2, len(rows))
actual = []
for index in range(2):
actual.append(dict(zip(keys, rows[index])))
expected = [
{'id': 1, 'string': 'Lingxian Kong'},
{'id': 99, 'string': 'OpenStack'},
]
self.assertEqual(expected, actual)
@decorators.idempotent_id("e8339fce-f412-11ea-a950-00224d6b7bc1")
def test_backup_full(self):
self.backup_full_test()
@decorators.idempotent_id("ec387400-f412-11ea-a950-00224d6b7bc1")
def test_backup_incremental(self):
self.backup_incremental_test()

View File

@ -11,49 +11,49 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
import testtools
from trove_tempest_plugin.tests import constants
from trove_tempest_plugin.tests.scenario import base_actions
from trove_tempest_plugin.tests import utils
LOG = logging.getLogger(__name__)
CONF = config.CONF
class TestInstanceActionsMySQL(base_actions.TestInstanceActionsBase):
datastore = 'mysql'
class InstanceActionsMySQLBase(base_actions.TestInstanceActionsBase):
@classmethod
def init_db(cls, ip, username, password, database):
LOG.info(f"Initializing database {database} on {ip}")
def init_db(cls, ip, username=constants.DB_USER,
password=constants.DB_PASS, database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
cmds = [
"CREATE TABLE Persons (ID int, String varchar(255));",
]
db_client.execute(cmds)
def insert_data_upgrade(self, ip, username, password, database):
LOG.info(f"Inserting data to database {database} on {ip} for "
f"datastore upgrade")
with utils.SQLClient(db_url) as db_client:
cmds = [
"CREATE TABLE Persons (ID int, String varchar(255));",
]
db_client.mysql_execute(cmds)
def insert_data_upgrade(self, ip,
username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
with utils.SQLClient(db_url) as db_client:
cmds = [
"insert into Persons VALUES (99, 'Upgrade');"
]
db_client.mysql_execute(cmds)
cmds = [
"insert into Persons VALUES (99, 'Upgrade');"
]
db_client.execute(cmds)
def verify_data_upgrade(self, ip, username, password, database):
def verify_data_upgrade(self, ip,
username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
cmd = "select * from Persons;"
ret = db_client.execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Persons;"
ret = db_client.mysql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertGreaterEqual(len(rows), 1)
result = []
@ -62,36 +62,177 @@ class TestInstanceActionsMySQL(base_actions.TestInstanceActionsBase):
expected = {'ID': 99, 'String': 'Upgrade'}
self.assert_single_item(result, **expected)
def insert_data_before_rebuild(self, ip, username, password, database):
LOG.info(f"Inserting data to database {database} on {ip} "
f"before rebuilding instance")
def insert_data_before_rebuild(self, ip,
username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
cmds = [
"CREATE TABLE Rebuild (ID int, String varchar(255));",
"insert into Rebuild VALUES (1, 'rebuild-data');"
]
db_client.execute(cmds)
def verify_data_after_rebuild(self, ip, username, password, database):
LOG.info(f"Verifying data in database {database} on {ip} "
f"after rebuilding instance")
with utils.SQLClient(db_url) as db_client:
cmds = [
"CREATE TABLE Rebuild (ID int, String varchar(255));",
"insert into Rebuild VALUES (1, 'rebuild-data');"
]
db_client.mysql_execute(cmds)
def verify_data_after_rebuild(self, ip,
username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
db_client = utils.SQLClient(db_url)
cmd = "select * from Rebuild;"
ret = db_client.execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Rebuild;"
ret = db_client.mysql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(1, len(rows))
actual = dict(zip(keys, rows[0]))
expected = {'ID': 1, 'String': 'rebuild-data'}
self.assertEqual(expected, actual)
def get_db_version(self, ip, username=constants.DB_USER,
password=constants.DB_PASS):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306'
with utils.SQLClient(db_url) as db_client:
cmd = "SELECT @@GLOBAL.innodb_version;"
ret = db_client.mysql_execute(cmd)
return ret.first()[0]
class TestInstanceActionsMariaDB(TestInstanceActionsMySQL):
class TestInstanceActionsMySQL(InstanceActionsMySQLBase):
datastore = 'mysql'
@decorators.idempotent_id("be6dd514-27d6-11ea-a56a-98f2b3cc23a0")
@testtools.skipUnless(CONF.database.pre_upgrade_datastore_versions,
'Datastore upgrade is disabled.')
def test_instance_upgrade(self):
self.instance_upgrade_test()
@decorators.idempotent_id("27914e82-b061-11ea-b87c-00224d6b7bc1")
def test_resize(self):
self.resize_test()
@decorators.idempotent_id("8d4d675c-d829-11ea-b87c-00224d6b7bc1")
@testtools.skipUnless(CONF.database.rebuild_image_id,
'Image for rebuild not configured.')
def test_rebuild(self):
self.rebuild_test()
class TestInstanceActionsMariaDB(InstanceActionsMySQLBase):
datastore = 'mariadb'
@decorators.idempotent_id("f7a0fef6-f413-11ea-a950-00224d6b7bc1")
@testtools.skipUnless(CONF.database.pre_upgrade_datastore_versions,
'Datastore upgrade is disabled.')
def test_instance_upgrade(self):
self.instance_upgrade_test()
@decorators.idempotent_id("fb89d402-f413-11ea-a950-00224d6b7bc1")
def test_resize(self):
self.resize_test()
@decorators.idempotent_id("ff34768e-f413-11ea-a950-00224d6b7bc1")
@testtools.skipUnless(CONF.database.rebuild_image_id,
'Image for rebuild not configured.')
def test_rebuild(self):
self.rebuild_test()
class TestInstanceActionsPostgreSQL(base_actions.TestInstanceActionsBase):
datastore = 'postgresql'
create_user = False
enable_root = True
@classmethod
def init_db(cls, ip):
db_url = (f'postgresql+psycopg2://root:{cls.password}@'
f'{ip}:5432/postgres')
with utils.SQLClient(db_url) as db_client:
cmd = "CREATE DATABASE testdb;"
db_client.pgsql_execute(cmd)
db_url = (f'postgresql+psycopg2://root:{cls.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmds = [
"CREATE TABLE persons (id INT PRIMARY KEY NOT NULL, "
"string VARCHAR(255));",
]
db_client.pgsql_execute(cmds)
def insert_data_upgrade(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmds = [
"insert into Persons VALUES (99, 'Upgrade');"
]
db_client.pgsql_execute(cmds)
def verify_data_upgrade(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Persons;"
ret = db_client.pgsql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertGreaterEqual(len(rows), 1)
result = []
for index in range(len(rows)):
result.append(dict(zip(keys, rows[index])))
expected = {'id': 99, 'string': 'Upgrade'}
self.assert_single_item(result, **expected)
def insert_data_before_rebuild(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmds = [
"CREATE TABLE Rebuild (ID int, String varchar(255));",
"insert into Rebuild VALUES (1, 'rebuild-data');"
]
db_client.pgsql_execute(cmds)
def verify_data_after_rebuild(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Rebuild;"
ret = db_client.pgsql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(1, len(rows))
actual = dict(zip(keys, rows[0]))
expected = {'id': 1, 'string': 'rebuild-data'}
self.assertEqual(expected, actual)
def get_db_version(self, ip, username=constants.DB_USER,
password=constants.DB_PASS):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/postgres')
with utils.SQLClient(db_url) as db_client:
cmd = "SHOW server_version;"
ret = db_client.pgsql_execute(cmd)
version = ret.first()[0]
return version.split(' ')[0]
@decorators.idempotent_id("97f1e7ca-f415-11ea-a950-00224d6b7bc1")
@testtools.skipUnless(CONF.database.pre_upgrade_datastore_versions,
'Datastore upgrade is disabled.')
def test_instance_upgrade(self):
self.instance_upgrade_test()
@decorators.idempotent_id("9b940c00-f415-11ea-a950-00224d6b7bc1")
def test_resize(self):
self.resize_test()
@decorators.idempotent_id("9ec5dd54-f415-11ea-a950-00224d6b7bc1")
@testtools.skipUnless(CONF.database.rebuild_image_id,
'Image for rebuild not configured.')
def test_rebuild(self):
self.rebuild_test()

View File

@ -11,12 +11,43 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from tempest.lib import decorators
from trove_tempest_plugin.tests.scenario import base_basic
from trove_tempest_plugin.tests import utils
LOG = logging.getLogger(__name__)
class TestInstanceBasicMySQL(base_basic.TestInstanceBasicMySQLBase):
datastore = 'mysql'
class TestInstanceBasicMariaDB(TestInstanceBasicMySQL):
class TestInstanceBasicMariaDB(base_basic.TestInstanceBasicMySQLBase):
datastore = 'mariadb'
class TestInstanceBasicPostgreSQL(base_basic.TestInstanceBasicBase):
datastore = 'postgresql'
create_user = False
enable_root = True
def get_config_value(self, ip, option):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/postgres')
with utils.SQLClient(db_url) as db_client:
cmd = f"SELECT setting FROM pg_settings WHERE name='{option}';"
ret = db_client.pgsql_execute(cmd)
rows = ret.fetchall()
self.assertEqual(1, len(rows))
return int(rows[0][0])
@decorators.idempotent_id("b6c03cb6-f40f-11ea-a950-00224d6b7bc1")
def test_configuration(self):
# Default is 100
create_values = {"max_connections": 101}
update_values = {"max_connections": 102}
self.configuration_test(create_values, update_values,
need_restart=True)

View File

@ -11,8 +11,175 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tempest.lib import decorators
from trove_tempest_plugin.tests import constants
from trove_tempest_plugin.tests.scenario import base_replication
from trove_tempest_plugin.tests import utils
class TestReplicationMySQL(base_replication.TestReplicationBase):
datastore = 'mysql'
def insert_data_replication(self, ip,
username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
with utils.SQLClient(db_url) as db_client:
cmds = [
"CREATE TABLE Persons (ID int, String varchar(255));",
"insert into Persons VALUES (1, 'replication');"
]
db_client.mysql_execute(cmds)
def verify_data_replication(self, ip,
username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Persons;"
ret = db_client.mysql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(1, len(rows))
result = []
for index in range(len(rows)):
result.append(dict(zip(keys, rows[index])))
expected = {'ID': 1, 'String': 'replication'}
self.assert_single_item(result, **expected)
def insert_data_after_promote(self, ip,
username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
with utils.SQLClient(db_url) as db_client:
cmds = [
"insert into Persons VALUES (2, 'promote');"
]
db_client.mysql_execute(cmds)
def verify_data_after_promote(self, ip,
username=constants.DB_USER,
password=constants.DB_PASS,
database=constants.DB_NAME):
db_url = f'mysql+pymysql://{username}:{password}@{ip}:3306/{database}'
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Persons;"
ret = db_client.mysql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertGreater(len(rows), 1)
result = []
for index in range(len(rows)):
result.append(dict(zip(keys, rows[index])))
expected = {'ID': 2, 'String': 'promote'}
self.assert_single_item(result, **expected)
def create_database(self, name, **kwargs):
create_db = {"databases": [{"name": name}]}
self.client.create_resource(f"instances/{self.instance_id}/databases",
create_db, expected_status_code=202,
need_response=False)
@decorators.idempotent_id("280d09c6-b027-11ea-b87c-00224d6b7bc1")
def test_replication(self):
self.replication_test()
class TestReplicationPostgreSQL(base_replication.TestReplicationBase):
datastore = 'postgresql'
create_user = False
enable_root = True
def insert_data_replication(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/postgres')
with utils.SQLClient(db_url) as db_client:
cmd = "CREATE DATABASE testdb;"
db_client.pgsql_execute(cmd)
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmds = [
"CREATE TABLE Persons (ID int, String varchar(255));",
"insert into Persons VALUES (1, 'replication');"
]
db_client.pgsql_execute(cmds)
def verify_data_replication(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Persons;"
ret = db_client.pgsql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertEqual(1, len(rows))
result = []
for index in range(len(rows)):
result.append(dict(zip(keys, rows[index])))
expected = {'id': 1, 'string': 'replication'}
self.assert_single_item(result, **expected)
def insert_data_after_promote(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmds = [
"insert into Persons VALUES (2, 'promote');"
]
db_client.pgsql_execute(cmds)
def verify_data_after_promote(self, ip):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/testdb')
with utils.SQLClient(db_url) as db_client:
cmd = "select * from Persons;"
ret = db_client.pgsql_execute(cmd)
keys = ret.keys()
rows = ret.fetchall()
self.assertGreater(len(rows), 1)
result = []
for index in range(len(rows)):
result.append(dict(zip(keys, rows[index])))
expected = {'id': 2, 'string': 'promote'}
self.assert_single_item(result, **expected)
def get_databases(self, instance_id, ip="", **kwargs):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/postgres')
with utils.SQLClient(db_url) as db_client:
cmd = "SELECT datname FROM pg_catalog.pg_database WHERE " \
"(datistemplate ISNULL OR datistemplate = false);"
ret = db_client.pgsql_execute(cmd)
rows = ret.fetchall()
dbs = []
for row in rows:
dbs.append({'name': row[0]})
return dbs
def create_database(self, name, ip=""):
db_url = (f'postgresql+psycopg2://root:{self.password}@'
f'{ip}:5432/postgres')
with utils.SQLClient(db_url) as db_client:
cmd = f"CREATE DATABASE {name};"
db_client.pgsql_execute(cmd)
@decorators.idempotent_id("2f37f064-f418-11ea-a950-00224d6b7bc1")
def test_replication(self):
self.replication_test()

View File

@ -14,6 +14,7 @@
import time
from oslo_log import log as logging
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import sqlalchemy
from tempest.lib import exceptions
@ -56,22 +57,42 @@ def init_engine(db_url):
class SQLClient(object):
def __init__(self, url):
self.engine = init_engine(url)
def __init__(self, conn_str):
self.engine = init_engine(conn_str)
def execute(self, cmds, **kwargs):
def conn_execute(self, conn, cmds):
if isinstance(cmds, str):
result = conn.execute(cmds)
# Returns a ResultProxy
# https://docs.sqlalchemy.org/en/13/core/connections.html#sqlalchemy.engine.ResultProxy
return result
for cmd in cmds:
conn.execute(cmd)
def pgsql_execute(self, cmds, **kwargs):
try:
with self.engine.begin() as conn:
if isinstance(cmds, str):
result = conn.execute(cmds)
# Returns a ResultProxy
# https://docs.sqlalchemy.org/en/13/core/connections.html#sqlalchemy.engine.ResultProxy
return result
for cmd in cmds:
conn.execute(cmd)
with self.engine.connect() as conn:
conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
return self.conn_execute(conn, cmds)
except Exception as e:
raise exceptions.TempestException(
'Failed to execute database command %s, error: %s' %
(cmds, str(e))
)
def mysql_execute(self, cmds, **kwargs):
try:
with self.engine.begin() as conn:
return self.conn_execute(conn, cmds)
except Exception as e:
raise exceptions.TempestException(
'Failed to execute database command %s, error: %s' %
(cmds, str(e))
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.engine.dispose()