Mailgun driver unit tests

Change-Id: Ib08627351e3c945c636544fb3d2ddcaad687502f
This commit is contained in:
Isaac Mungai 2016-07-14 12:51:26 -04:00
parent 9d6bfd188e
commit 4452a54061
2 changed files with 147 additions and 45 deletions

View File

@ -57,7 +57,59 @@ class TestDriver(base.TestCase):
def test_init(self):
self.assertTrue(self.mailgun_notification_driver is not None)
def test_service_contoller(self):
def test_service_controller(self):
self.assertTrue(self.mailgun_notification_driver.services_controller
is not None)
self.assertTrue(self.mailgun_notification_driver.retry_send == 5)
class TestDriverInit(base.TestCase):
def setUp(self):
super(TestDriverInit, self).setUp()
def test_invalid_from_address(self):
notification_options = [
cfg.StrOpt('mailgun_api_key', default='123'),
cfg.IntOpt('retry_send', default=5),
cfg.StrOpt('mailgun_request_url', default='http://123.com/{0}'),
cfg.StrOpt('sand_box', default='123.com'),
cfg.StrOpt('from_address', default='invalid.email.address'),
cfg.ListOpt('recipients', default=['recipient@gmail.com']),
cfg.StrOpt('notification_subject',
default='Poppy SSL Certificate Provisioned')
]
with mock.patch.object(
driver, 'MAIL_NOTIFICATION_OPTIONS', new=notification_options):
self.conf = cfg.ConfigOpts()
self.assertRaises(
ValueError,
driver.MailNotificationDriver,
self.conf
)
def test_invalid_recipients(self):
notification_options = [
cfg.StrOpt('mailgun_api_key', default='123'),
cfg.IntOpt('retry_send', default=5),
cfg.StrOpt('mailgun_request_url', default='http://123.com/{0}'),
cfg.StrOpt('sand_box', default='123.com'),
cfg.StrOpt('from_address', default='noreply@poppycdn.org'),
cfg.ListOpt('recipients', default=['invalid.email.address']),
cfg.StrOpt('notification_subject',
default='Poppy SSL Certificate Provisioned')
]
with mock.patch.object(
driver, 'MAIL_NOTIFICATION_OPTIONS', new=notification_options):
self.conf = cfg.ConfigOpts()
self.assertRaises(
ValueError,
driver.MailNotificationDriver,
self.conf
)

View File

@ -74,6 +74,10 @@ class CassandraStorageDriverTests(base.TestCase):
def setUp(self):
super(CassandraStorageDriverTests, self).setUp()
cluster_patcher = mock.patch('cassandra.cluster.Cluster')
self.mock_cluster = cluster_patcher.start()
self.addCleanup(cluster_patcher.stop)
conf = cfg.ConfigOpts()
conf.register_opt(
cfg.StrOpt(
@ -82,6 +86,7 @@ class CassandraStorageDriverTests(base.TestCase):
help='datacenter where the C* cluster hosted'))
conf.register_opts(CASSANDRA_OPTIONS,
group=driver.CASSANDRA_GROUP)
self.conf = conf
self.cassandra_driver = driver.CassandraStorageDriver(conf)
migrations_patcher = mock.patch(
@ -103,15 +108,14 @@ class CassandraStorageDriverTests(base.TestCase):
cfg.migrations_consistency_level = 'LOCAL_QUORUM'
cfg.load_balance_strategy = 'RoundRobinPolicy'
with mock.patch('cassandra.cluster.Cluster') as mock_cluster:
driver._connection(cfg, None)
driver._connection(cfg, None)
kwargs = mock_cluster.call_args[1]
# ssl_options may or may not be provided to the Cluster constructor
# depending on the implementation, but if it is provided, ensure it
# has been set to None
if 'ssl_options' in kwargs:
self.assertIsNone(kwargs['ssl_options'])
kwargs = self.mock_cluster.call_args[1]
# ssl_options may or may not be provided to the Cluster constructor
# depending on the implementation, but if it is provided, ensure it
# has been set to None
if 'ssl_options' in kwargs:
self.assertIsNone(kwargs['ssl_options'])
def test_ssl_enabled(self):
cfg = mock.Mock()
@ -120,14 +124,13 @@ class CassandraStorageDriverTests(base.TestCase):
cfg.migrations_consistency_level = 'LOCAL_QUORUM'
cfg.load_balance_strategy = 'RoundRobinPolicy'
with mock.patch('cassandra.cluster.Cluster') as mock_cluster:
driver._connection(cfg, None)
driver._connection(cfg, None)
kwargs = mock_cluster.call_args[1]
self.assertTrue('ssl_options' in kwargs)
ssl_options = kwargs['ssl_options']
self.assertEqual(cfg.ssl_ca_certs, ssl_options['ca_certs'])
self.assertEqual(ssl.PROTOCOL_TLSv1, ssl_options['ssl_version'])
kwargs = self.mock_cluster.call_args[1]
self.assertTrue('ssl_options' in kwargs)
ssl_options = kwargs['ssl_options']
self.assertEqual(cfg.ssl_ca_certs, ssl_options['ca_certs'])
self.assertEqual(ssl.PROTOCOL_TLSv1, ssl_options['ssl_version'])
def test_auth_enabled(self):
cfg = mock.Mock()
@ -136,16 +139,15 @@ class CassandraStorageDriverTests(base.TestCase):
cfg.auth_enabled = True
cfg.cluster = ['localhost']
with mock.patch("cassandra.cluster.Cluster") as mock_cluster:
cluster_instance = mock.Mock()
mock_cluster.return_value = cluster_instance
cluster_instance = mock.Mock()
self.mock_cluster.return_value = cluster_instance
mock_session = mock.Mock()
cluster_instance.connect.return_value = mock_session
mock_session = mock.Mock()
cluster_instance.connect.return_value = mock_session
session = driver._connection(cfg, "ORD")
session = driver._connection(cfg, "ORD")
self.assertEqual(mock_session, session)
self.assertEqual(mock_session, session)
def test_create_dc_aware_policy(self):
cfg = mock.Mock()
@ -153,16 +155,15 @@ class CassandraStorageDriverTests(base.TestCase):
cfg.load_balance_strategy = "DCAwareRoundRobinPolicy"
cfg.cluster = ['localhost']
with mock.patch("cassandra.cluster.Cluster") as mock_cluster:
cluster_instance = mock.Mock()
mock_cluster.return_value = cluster_instance
cluster_instance = mock.Mock()
self.mock_cluster.return_value = cluster_instance
mock_session = mock.Mock()
cluster_instance.connect.return_value = mock_session
mock_session = mock.Mock()
cluster_instance.connect.return_value = mock_session
session = driver._connection(cfg, "ORD")
session = driver._connection(cfg, "ORD")
self.assertEqual(mock_session, session)
self.assertEqual(mock_session, session)
def test_consistency_level(self):
self.assertEqual(self.cassandra_driver.consistency_level,
@ -194,27 +195,33 @@ class CassandraStorageDriverTests(base.TestCase):
mocked.
'''
with mock.patch('cassandra.cluster.Cluster') as mock_cluster:
cluster_instance = mock.Mock()
mock_cluster.return_value = cluster_instance
cluster_instance = mock.Mock()
self.mock_cluster.return_value = cluster_instance
mock_session = mock.Mock()
cluster_instance.connect.return_value = mock_session
mock_session = mock.Mock()
cluster_instance.connect.return_value = mock_session
self.cassandra_driver.delete_namespace('test')
self.assertTrue(mock_session.execute.called)
mock_session.execute.assert_called_with('DROP KEYSPACE test')
self.cassandra_driver.delete_namespace('test')
self.assertTrue(mock_session.execute.called)
mock_session.execute.assert_called_with('DROP KEYSPACE test')
def test_is_alive_no_connection(self):
def test_is_alive_negative(self):
"""No connection test for checking the health of Cassandra."""
self.skipTest('Too slow, need to mock exception')
self.cassandra_driver.database.execute.side_effect = (
Exception("Mock -- DB execute() failed!")
)
self.cassandra_driver.session = None
self.assertFalse(self.cassandra_driver.is_alive())
def test_is_alive_positive(self):
"""No connection test for checking the health of Cassandra."""
self.cassandra_driver.session = None
self.assertTrue(self.cassandra_driver.is_alive())
def test_is_alive_with_exception(self):
"""Broken connection test for checking the health of Cassandra."""
self.skipTest('Too slow, need to mock exception')
self.cassandra_driver.session = None
self.cassandra_driver.connect = mock.Mock()
@ -226,7 +233,6 @@ class CassandraStorageDriverTests(base.TestCase):
def test_is_alive(self):
"""Happy path test for checking the health of Cassandra."""
self.skipTest('Too slow, need to mock exception')
self.cassandra_driver.session = None
self.cassandra_driver.connect = mock.Mock()
@ -267,7 +273,51 @@ class CassandraStorageDriverTests(base.TestCase):
isinstance(sc, flavors.FlavorsController),
True)
@mock.patch('cassandra.cluster.Cluster')
def test_database(self, mock_cluster):
def test_database(self):
self.cassandra_driver.database
mock_cluster.return_value.connect.assert_called_with()
self.mock_cluster.return_value.connect.assert_called_with()
def test_connection_create_key_space(self):
self.conf.default_consistency_level = 'ALL'
cluster_instance = mock.Mock()
self.mock_cluster.return_value = cluster_instance
mock_session = mock.Mock()
mock_session.set_keyspace.side_effect = (
cassandra.InvalidRequest
)
cluster_instance.connect.return_value = mock_session
self.assertRaises(
cassandra.InvalidRequest,
driver._connection,
self.conf[driver.CASSANDRA_GROUP],
'ORD',
keyspace='keyspace'
)
def test_change_config_group(self):
old_cassandra_conf = self.cassandra_driver.cassandra_conf
new_opts = [
cfg.ListOpt('test', default='test')
]
self.cassandra_driver.change_config_group(
new_opts, 'test_group'
)
self.assertFalse(
old_cassandra_conf ==
self.cassandra_driver.cassandra_conf
)
def test_close_connection_no_lock(self):
self.cassandra_driver.connect()
with mock.patch.object(
self.cassandra_driver, 'lock'
) as mock_lock:
mock_lock.acquire.return_value = False
self.cassandra_driver.close_connection()
self.assertFalse(mock_lock.return_value.release.called)