 0cad208658
			
		
	
	0cad208658
	
	
	
		
			
			* Set the minimum version of python to be 2.6, since we don't need to support anything older anymore. * As the first of a series of related cleanups, drop our custom json module (since json is in the stdlib since 2.6).
		
			
				
	
	
		
			125 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			125 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/python2.4
 | |
| #
 | |
| # Copyright 2014 Google Inc.
 | |
| #
 | |
| # Licensed under the Apache License, Version 2.0 (the "License");
 | |
| # you may not use this file except in compliance with the License.
 | |
| # You may obtain a copy of the License at
 | |
| #
 | |
| #      http://www.apache.org/licenses/LICENSE-2.0
 | |
| #
 | |
| # Unless required by applicable law or agreed to in writing, software
 | |
| # distributed under the License is distributed on an "AS IS" BASIS,
 | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| # See the License for the specific language governing permissions and
 | |
| # limitations under the License.
 | |
| 
 | |
| 
 | |
| """Oauth2client tests.
 | |
| 
 | |
| Unit tests for service account credentials implemented using RSA.
 | |
| """
 | |
| 
 | |
| import json
 | |
| import os
 | |
| import rsa
 | |
| import time
 | |
| import unittest
 | |
| 
 | |
| from http_mock import HttpMockSequence
 | |
| from oauth2client.service_account import _ServiceAccountCredentials
 | |
| 
 | |
| 
 | |
| def datafile(filename):
 | |
|   # TODO(orestica): Refactor this using pkgutil.get_data
 | |
|   f = open(os.path.join(os.path.dirname(__file__), 'data', filename), 'r')
 | |
|   data = f.read()
 | |
|   f.close()
 | |
|   return data
 | |
| 
 | |
| 
 | |
| class ServiceAccountCredentialsTests(unittest.TestCase):
 | |
|   def setUp(self):
 | |
|     self.service_account_id = '123'
 | |
|     self.service_account_email = 'dummy@google.com'
 | |
|     self.private_key_id = 'ABCDEF'
 | |
|     self.private_key = datafile('pem_from_pkcs12.pem')
 | |
|     self.scopes = ['dummy_scope']
 | |
|     self.credentials = _ServiceAccountCredentials(self.service_account_id,
 | |
|                                                   self.service_account_email,
 | |
|                                                   self.private_key_id,
 | |
|                                                   self.private_key,
 | |
|                                                   [])
 | |
| 
 | |
|   def test_sign_blob(self):
 | |
|     private_key_id, signature = self.credentials.sign_blob('Google')
 | |
|     self.assertEqual( self.private_key_id, private_key_id)
 | |
| 
 | |
|     pub_key = rsa.PublicKey.load_pkcs1_openssl_pem(
 | |
|         datafile('publickey_openssl.pem'))
 | |
| 
 | |
|     self.assertTrue(rsa.pkcs1.verify('Google', signature, pub_key))
 | |
| 
 | |
|     try:
 | |
|       rsa.pkcs1.verify('Orest', signature, pub_key)
 | |
|       self.fail('Verification should have failed!')
 | |
|     except rsa.pkcs1.VerificationError:
 | |
|       pass  # Expected
 | |
| 
 | |
|     try:
 | |
|       rsa.pkcs1.verify('Google', 'bad signature', pub_key)
 | |
|       self.fail('Verification should have failed!')
 | |
|     except rsa.pkcs1.VerificationError:
 | |
|       pass  # Expected
 | |
| 
 | |
|   def test_service_account_email(self):
 | |
|     self.assertEqual(self.service_account_email,
 | |
|                      self.credentials.service_account_email)
 | |
| 
 | |
|   def test_create_scoped_required_without_scopes(self):
 | |
|     self.assertTrue(self.credentials.create_scoped_required())
 | |
| 
 | |
|   def test_create_scoped_required_with_scopes(self):
 | |
|     self.credentials = _ServiceAccountCredentials(self.service_account_id,
 | |
|                                                   self.service_account_email,
 | |
|                                                   self.private_key_id,
 | |
|                                                   self.private_key,
 | |
|                                                   self.scopes)
 | |
|     self.assertFalse(self.credentials.create_scoped_required())
 | |
| 
 | |
|   def test_create_scoped(self):
 | |
|     new_credentials = self.credentials.create_scoped(self.scopes)
 | |
|     self.assertNotEqual(self.credentials, new_credentials)
 | |
|     self.assertTrue(isinstance(new_credentials, _ServiceAccountCredentials))
 | |
|     self.assertEqual('dummy_scope', new_credentials._scopes)
 | |
| 
 | |
|   def test_access_token(self):
 | |
|     S = 2  # number of seconds in which the token expires
 | |
|     token_response_first = {'access_token': 'first_token', 'expires_in': S}
 | |
|     token_response_second = {'access_token': 'second_token', 'expires_in': S}
 | |
|     http = HttpMockSequence([
 | |
|         ({'status': '200'}, json.dumps(token_response_first)),
 | |
|         ({'status': '200'}, json.dumps(token_response_second)),
 | |
|     ])
 | |
| 
 | |
|     token = self.credentials.get_access_token(http=http)
 | |
|     self.assertEqual('first_token', token.access_token)
 | |
|     self.assertEqual(S - 1, token.expires_in)
 | |
|     self.assertFalse(self.credentials.access_token_expired)
 | |
|     self.assertEqual(token_response_first, self.credentials.token_response)
 | |
| 
 | |
|     token = self.credentials.get_access_token(http=http)
 | |
|     self.assertEqual('first_token', token.access_token)
 | |
|     self.assertEqual(S - 1, token.expires_in)
 | |
|     self.assertFalse(self.credentials.access_token_expired)
 | |
|     self.assertEqual(token_response_first, self.credentials.token_response)
 | |
| 
 | |
|     time.sleep(S)
 | |
|     self.assertTrue(self.credentials.access_token_expired)
 | |
| 
 | |
|     token = self.credentials.get_access_token(http=http)
 | |
|     self.assertEqual('second_token', token.access_token)
 | |
|     self.assertEqual(S - 1, token.expires_in)
 | |
|     self.assertFalse(self.credentials.access_token_expired)
 | |
|     self.assertEqual(token_response_second, self.credentials.token_response)
 |