Add SSL functional test

Change-Id: Ia78a5ba94349830b5e7e9a48dd8eee081fe458ea
This commit is contained in:
James E. Blair 2014-12-19 17:50:07 -08:00
parent 865be4cec8
commit ac52675061
2 changed files with 80 additions and 11 deletions

View File

@ -13,8 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from OpenSSL import crypto
import fixtures
import testscenarios
import gear
from gear import tests
@ -30,26 +35,85 @@ def iterate_timeout(max_seconds, purpose):
class TestFunctional(tests.BaseTestCase):
scenarios = [
('no_ssl', dict(ssl=False)),
('ssl', dict(ssl=True)),
]
def setUp(self):
super(TestFunctional, self).setUp()
self.server = gear.Server(0)
if self.ssl:
self.tmp_root = self.useFixture(fixtures.TempDir()).path
root_subject, root_key = self.create_cert('root')
self.create_cert('server', root_subject, root_key)
self.create_cert('client', root_subject, root_key)
self.create_cert('worker', root_subject, root_key)
self.server = gear.Server(
0,
os.path.join(self.tmp_root, 'server.key'),
os.path.join(self.tmp_root, 'server.crt'),
os.path.join(self.tmp_root, 'root.crt'))
self.client = gear.Client('client')
self.worker = gear.Worker('worker')
self.client.addServer('127.0.0.1', self.server.port,
os.path.join(self.tmp_root, 'client.key'),
os.path.join(self.tmp_root, 'client.crt'),
os.path.join(self.tmp_root, 'root.crt'))
self.worker.addServer('127.0.0.1', self.server.port,
os.path.join(self.tmp_root, 'worker.key'),
os.path.join(self.tmp_root, 'worker.crt'),
os.path.join(self.tmp_root, 'root.crt'))
else:
self.server = gear.Server(0)
self.client = gear.Client('client')
self.worker = gear.Worker('worker')
self.client.addServer('127.0.0.1', self.server.port)
self.worker.addServer('127.0.0.1', self.server.port)
self.client.waitForServer()
self.worker.waitForServer()
def create_cert(self, cn, issuer=None, signing_key=None):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 1024)
cert = crypto.X509()
subject = cert.get_subject()
subject.C = "US"
subject.ST = "State"
subject.L = "Locality"
subject.O = "Org"
subject.OU = "Org Unit"
subject.CN = cn
cert.set_serial_number(1)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(3600)
cert.set_pubkey(key)
if issuer:
cert.set_issuer(issuer)
else:
cert.set_issuer(subject)
if signing_key:
cert.sign(signing_key, 'sha1')
else:
cert.sign(key, 'sha1')
open(os.path.join(self.tmp_root, '%s.crt' % cn), 'w').write(
crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
open(os.path.join(self.tmp_root, '%s.key' % cn), 'w').write(
crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
return (subject, key)
def test_job(self):
client = gear.Client('testclient')
client.addServer('127.0.0.1', self.server.port)
client.waitForServer()
worker = gear.Worker('testworker')
worker.addServer('127.0.0.1', self.server.port)
worker.waitForServer()
worker.registerFunction('test')
self.worker.registerFunction('test')
for jobcount in range(2):
job = gear.Job('test', 'testdata')
client.submitJob(job)
self.client.submitJob(job)
self.assertNotEqual(job.handle, None)
workerjob = worker.getJob()
workerjob = self.worker.getJob()
self.assertEqual(workerjob.handle, job.handle)
self.assertEqual(workerjob.arguments, 'testdata')
workerjob.sendWorkData('workdata')
@ -60,3 +124,7 @@ class TestFunctional(tests.BaseTestCase):
break
self.assertTrue(job.complete)
self.assertEqual(job.data, ['workdata'])
def load_tests(loader, in_tests, pattern):
return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)

View File

@ -4,6 +4,7 @@ coverage>=3.6
discover
fixtures>=0.3.12
python-subunit
pyOpenSSL
statsd>=1.0.0,<3.0
sphinx>=1.1.2,<1.2
testrepository>=0.0.13