Refactor Django helpers (#546)

* Move all Django code into contrib.django_util.
* Add DjangORMStorage. This is backwards compatible with old django_orm storage.
* Add new functionality to decorators and views that integrate with Django ORM-based storage.
* Fix issue where Django storage always creates new Models.
* Move tests into Django package and have them create an actual test app to create test databases etc.
* Remove FlowField.
This commit is contained in:
Bill Prin
2016-07-26 12:04:48 -07:00
committed by Jon Wayne Parrott
parent 3ba3c60b3e
commit 25165adbc1
26 changed files with 1526 additions and 996 deletions

View File

@@ -12,7 +12,7 @@ import sys
# settings module and load it. This assumes django has been installed
# (but it must be for the docs to build), so if it has not already
# been installed run `pip install -r docs/requirements.txt`.
os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.contrib.test_django_settings'
os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.contrib.django_util.settings'
import django
import mock
from pkg_resources import get_distribution

View File

@@ -1,7 +0,0 @@
oauth2client.contrib.django_orm module
======================================
.. automodule:: oauth2client.contrib.django_orm
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
oauth2client.contrib.django_util.models module
==============================================
.. automodule:: oauth2client.contrib.django_util.models
:members:
:undoc-members:
:show-inheritance:

View File

@@ -8,6 +8,7 @@ Submodules
oauth2client.contrib.django_util.apps
oauth2client.contrib.django_util.decorators
oauth2client.contrib.django_util.models
oauth2client.contrib.django_util.signals
oauth2client.contrib.django_util.site
oauth2client.contrib.django_util.storage

View File

@@ -16,7 +16,6 @@ Submodules
oauth2client.contrib.appengine
oauth2client.contrib.devshell
oauth2client.contrib.dictionary_storage
oauth2client.contrib.django_orm
oauth2client.contrib.flask_util
oauth2client.contrib.gce
oauth2client.contrib.keyring_storage

View File

@@ -1,182 +0,0 @@
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OAuth 2.0 utilities for Django.
Utilities for using OAuth 2.0 in conjunction with
the Django datastore.
Only Django versions 1.8+ are supported.
"""
import base64
import pickle
from django.db import models
from django.utils.encoding import smart_bytes, smart_text
import oauth2client
from oauth2client.client import Storage as BaseStorage
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
class CredentialsField(models.Field):
def __init__(self, *args, **kwargs):
if 'null' not in kwargs:
kwargs['null'] = True
super(CredentialsField, self).__init__(*args, **kwargs)
def get_internal_type(self):
return 'TextField'
def from_db_value(self, value, expression, connection, context):
return self.to_python(value)
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Credentials):
return value
return pickle.loads(base64.b64decode(smart_bytes(value)))
def get_prep_value(self, value):
if value is None:
return None
return smart_text(base64.b64encode(pickle.dumps(value)))
def value_to_string(self, obj):
"""Convert the field value from the provided model to a string.
Used during model serialization.
Args:
obj: db.Model, model object
Returns:
string, the serialized field value
"""
value = self._get_val_from_obj(obj)
return self.get_prep_value(value)
class FlowField(models.Field):
def __init__(self, *args, **kwargs):
if 'null' not in kwargs:
kwargs['null'] = True
super(FlowField, self).__init__(*args, **kwargs)
def get_internal_type(self):
return 'TextField'
def from_db_value(self, value, expression, connection, context):
return self.to_python(value)
def to_python(self, value):
if value is None:
return None
if isinstance(value, oauth2client.client.Flow):
return value
return pickle.loads(base64.b64decode(value))
def get_prep_value(self, value):
if value is None:
return None
return smart_text(base64.b64encode(pickle.dumps(value)))
def value_to_string(self, obj):
"""Convert the field value from the provided model to a string.
Used during model serialization.
Args:
obj: db.Model, model object
Returns:
string, the serialized field value
"""
value = self._get_val_from_obj(obj)
return self.get_prep_value(value)
class Storage(BaseStorage):
"""Store and retrieve a single credential to and from the Django datastore.
This Storage helper presumes the Credentials
have been stored as a CredentialsField
on a db model class.
"""
def __init__(self, model_class, key_name, key_value, property_name):
"""Constructor for Storage.
Args:
model: db.Model, model class
key_name: string, key name for the entity that has the credentials
key_value: string, key value for the entity that has the
credentials
property_name: string, name of the property that is an
CredentialsProperty
"""
super(Storage, self).__init__()
self.model_class = model_class
self.key_name = key_name
self.key_value = key_value
self.property_name = property_name
def locked_get(self):
"""Retrieve stored credential.
Returns:
oauth2client.Credentials
"""
credential = None
query = {self.key_name: self.key_value}
entities = self.model_class.objects.filter(**query)
if len(entities) > 0:
credential = getattr(entities[0], self.property_name)
if credential and hasattr(credential, 'set_store'):
credential.set_store(self)
return credential
def locked_put(self, credentials, overwrite=False):
"""Write a Credentials to the Django datastore.
Args:
credentials: Credentials, the credentials to store.
overwrite: Boolean, indicates whether you would like these
credentials to overwrite any existing stored
credentials.
"""
args = {self.key_name: self.key_value}
if overwrite:
(entity,
unused_is_new) = self.model_class.objects.get_or_create(**args)
else:
entity = self.model_class(**args)
setattr(entity, self.property_name, credentials)
entity.save()
def locked_delete(self):
"""Delete Credentials from the datastore."""
query = {self.key_name: self.key_value}
self.model_class.objects.filter(**query).delete()

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for the Django web framework
"""Utilities for the Django web framework.
Provides Django views and helpers the make using the OAuth2 web server
flow easier. It includes an ``oauth_required`` decorator to automatically
@@ -20,10 +20,20 @@ ensure that user credentials are available, and an ``oauth_enabled`` decorator
to check if the user has authorized, and helper shortcuts to create the
authorization URL otherwise.
There are two basic use cases supported. The first is using Google OAuth as the
primary form of authentication, which is the simpler approach recommended
for applications without their own user system.
The second use case is adding Google OAuth credentials to an
existing Django model containing a Django user field. Most of the
configuration is the same, except for `GOOGLE_OAUTH_MODEL_STORAGE` in
settings.py. See "Adding Credentials To An Existing Django User System" for
usage differences.
Only Django versions 1.8+ are supported.
Configuration
=============
===============
To configure, you'll need a set of OAuth2 web application credentials from
`Google Developer's Console <https://console.developers.google.com/project/_/apiui/credential>`.
@@ -36,9 +46,13 @@ Add the helper to your INSTALLED_APPS:
INSTALLED_APPS = (
# other apps
"django.contrib.sessions.middleware"
"oauth2client.contrib.django_util"
)
This helper also requires the Django Session Middleware, so
``django.contrib.sessions.middleware`` should be in INSTALLED_APPS as well.
Add the client secrets created earlier to the settings. You can either
specify the path to the credentials file in JSON format
@@ -106,6 +120,8 @@ request.oauth
http=request.oauth.http,
developerKey=API_KEY)
events = service.events().list(calendarId='primary').execute()['items']
return HttpResponse("email: {0} , calendar: {1}".format(
email,str(events)))
return HttpResponse(
"email: {0} , calendar: {1}".format(email, str(events)))
@@ -166,8 +182,49 @@ oauth2_authorized signal:
oauth2_authorized.connect(test_callback)
Adding Credentials To An Existing Django User System
=====================================================
As an alternative to storing the credentials in the session, the helper
can be configured to store the fields on a Django model. This might be useful
if you need to use the credentials outside the context of a user request. It
also prevents the need for a logged in user to repeat the OAuth flow when
starting a new session.
To use, change ``settings.py``
.. code-block:: python
:caption: settings.py
:name: storage_model_config
GOOGLE_OAUTH2_STORAGE_MODEL = {
'model': 'path.to.model.MyModel',
'user_property': 'user_id',
'credentials_property': 'credential'
}
Where ``path.to.model`` class is the fully qualified name of a
``django.db.model`` class containing a ``django.contrib.auth.models.User``
field with the name specified by `user_property` and a
:class:`oauth2client.contrib.django_util.models.CredentialsField` with the name
specified by `credentials_property`. For the sample configuration given,
our model would look like
.. code-block:: python
:caption: models.py
:name: storage_model_model
from django.contrib.auth.models import User
from oauth2client.contrib.django_util.models import CredentialsField
class MyModel(models.Model):
# ... other fields here ...
user = models.OneToOneField(User)
credential = CredentialsField()
"""
import importlib
import django.conf
from django.core import exceptions
from django.core import urlresolvers
@@ -175,6 +232,7 @@ import httplib2
from six.moves.urllib import parse
from oauth2client import clientsecrets
from oauth2client.contrib import dictionary_storage
from oauth2client.contrib.django_util import storage
GOOGLE_OAUTH2_DEFAULT_SCOPES = ('email',)
@@ -182,7 +240,15 @@ GOOGLE_OAUTH2_REQUEST_ATTRIBUTE = 'oauth'
def _load_client_secrets(filename):
"""Loads client secrets from the given filename."""
"""Loads client secrets from the given filename.
Args:
filename: The name of the file containing the JSON secret key.
Returns:
A 2-tuple, the first item containing the client id, and the second
item containing a client secret.
"""
client_type, client_info = clientsecrets.loadfile(filename)
if client_type != clientsecrets.TYPE_WEB:
@@ -193,8 +259,16 @@ def _load_client_secrets(filename):
def _get_oauth2_client_id_and_secret(settings_instance):
"""Initializes client id and client secret based on the settings"""
secret_json = getattr(django.conf.settings,
"""Initializes client id and client secret based on the settings.
Args:
settings_instance: An instance of ``django.conf.settings``.
Returns:
A 2-tuple, the first item is the client id and the second
item is the client secret.
"""
secret_json = getattr(settings_instance,
'GOOGLE_OAUTH2_CLIENT_SECRETS_JSON', None)
if secret_json is not None:
return _load_client_secrets(secret_json)
@@ -212,6 +286,33 @@ def _get_oauth2_client_id_and_secret(settings_instance):
"GOOGLE_OAUTH2_CLIENT_SECRET in settings.py")
def _get_storage_model():
"""This configures whether the credentials will be stored in the session
or the Django ORM based on the settings. By default, the credentials
will be stored in the session, unless `GOOGLE_OAUTH2_STORAGE_MODEL`
is found in the settings. Usually, the ORM storage is used to integrate
credentials into an existing Django user system.
Returns:
A tuple containing three strings, or None. If
``GOOGLE_OAUTH2_STORAGE_MODEL`` is configured, the tuple
will contain the fully qualifed path of the `django.db.model`,
the name of the ``django.contrib.auth.models.User`` field on the
model, and the name of the
:class:`oauth2client.contrib.django_util.models.CredentialsField`
field on the model. If Django ORM storage is not configured,
this function returns None.
"""
storage_model_settings = getattr(django.conf.settings,
'GOOGLE_OAUTH2_STORAGE_MODEL', None)
if storage_model_settings is not None:
return (storage_model_settings['model'],
storage_model_settings['user_property'],
storage_model_settings['credentials_property'])
else:
return None, None, None
class OAuth2Settings(object):
"""Initializes Django OAuth2 Helper Settings
@@ -221,11 +322,11 @@ class OAuth2Settings(object):
Attributes:
scopes: A list of OAuth2 scopes that the decorators and views will use
as defaults
as defaults.
request_prefix: The name of the attribute that the decorators use to
attach the UserOAuth2 object to the Django request object.
client_id: The OAuth2 Client ID
client_secret: The OAuth2 Client Secret
client_id: The OAuth2 Client ID.
client_secret: The OAuth2 Client Secret.
"""
def __init__(self, settings_instance):
@@ -238,75 +339,139 @@ class OAuth2Settings(object):
_get_oauth2_client_id_and_secret(settings_instance)
if ('django.contrib.sessions.middleware.SessionMiddleware'
not in settings_instance.MIDDLEWARE_CLASSES):
not in settings_instance.MIDDLEWARE_CLASSES):
raise exceptions.ImproperlyConfigured(
"The Google OAuth2 Helper requires session middleware to "
"be installed. Edit your MIDDLEWARE_CLASSES setting"
" to include 'django.contrib.sessions.middleware."
"SessionMiddleware'.")
'The Google OAuth2 Helper requires session middleware to '
'be installed. Edit your MIDDLEWARE_CLASSES setting'
' to include \'django.contrib.sessions.middleware.'
'SessionMiddleware\'.')
(self.storage_model, self.storage_model_user_property,
self.storage_model_credentials_property) = _get_storage_model()
oauth2_settings = OAuth2Settings(django.conf.settings)
_CREDENTIALS_KEY = 'google_oauth2_credentials'
def get_storage(request):
""" Gets a Credentials storage object provided by the Django OAuth2 Helper
object.
Args:
request: Reference to the current request object.
Returns:
An :class:`oauth2.client.Storage` object.
"""
storage_model = oauth2_settings.storage_model
user_property = oauth2_settings.storage_model_user_property
credentials_property = oauth2_settings.storage_model_credentials_property
if storage_model:
module_name, class_name = storage_model.rsplit('.', 1)
module = importlib.import_module(module_name)
storage_model_class = getattr(module, class_name)
return storage.DjangoORMStorage(storage_model_class,
user_property,
request.user,
credentials_property)
else:
# use session
return dictionary_storage.DictionaryStorage(
request.session, key=_CREDENTIALS_KEY)
def _redirect_with_params(url_name, *args, **kwargs):
"""Helper method to create a redirect response that uses GET URL
parameters."""
"""Helper method to create a redirect response with URL params.
This builds a redirect string that converts kwargs into a
query string.
Args:
url_name: The name of the url to redirect to.
kwargs: the query string param and their values to build.
Returns:
A properly formatted redirect string.
"""
url = urlresolvers.reverse(url_name, args=args)
params = parse.urlencode(kwargs, True)
return "{0}?{1}".format(url, params)
def _credentials_from_request(request):
"""Gets the authorized credentials for this flow, if they exist."""
# ORM storage requires a logged in user
if (oauth2_settings.storage_model is None or
request.user.is_authenticated()):
return get_storage(request).get()
else:
return None
class UserOAuth2(object):
"""Class to create oauth2 objects on Django request objects containing
credentials and helper methods.
"""
def __init__(self, request, scopes=None, return_url=None):
"""Initialize the Oauth2 Object
:param request: Django request object
:param scopes: Scopes desired for this OAuth2 flow
:param return_url: URL to return to after authorization is complete
:return:
"""Initialize the Oauth2 Object.
Args:
request: Django request object.
scopes: Scopes desired for this OAuth2 flow.
return_url: The url to return to after the OAuth flow is complete,
defaults to the request's current URL path.
"""
self.request = request
self.return_url = return_url or request.get_full_path()
self.scopes = set(oauth2_settings.scopes)
if scopes:
self.scopes |= set(scopes)
# make sure previously requested custom scopes are maintained
# in future authorizations
credentials = storage.get_storage(self.request).get()
if credentials:
self.scopes |= credentials.scopes
self._scopes = set(oauth2_settings.scopes) | set(scopes)
else:
self._scopes = set(oauth2_settings.scopes)
def get_authorize_redirect(self):
"""Creates a URl to start the OAuth2 authorization flow"""
"""Creates a URl to start the OAuth2 authorization flow."""
get_params = {
'return_url': self.return_url,
'scopes': self.scopes
'scopes': self._get_scopes()
}
return _redirect_with_params('google_oauth:authorize',
**get_params)
return _redirect_with_params('google_oauth:authorize', **get_params)
def has_credentials(self):
"""Returns True if there are valid credentials for the current user
and required scopes."""
return (self.credentials and not self.credentials.invalid and
self.credentials.has_scopes(self.scopes))
credentials = _credentials_from_request(self.request)
return (credentials and not credentials.invalid and
credentials.has_scopes(self._get_scopes()))
def _get_scopes(self):
"""Returns the scopes associated with this object, kept up to
date for incremental auth."""
if _credentials_from_request(self.request):
return (self._scopes |
_credentials_from_request(self.request).scopes)
else:
return self._scopes
@property
def scopes(self):
"""Returns the scopes associated with this OAuth2 object."""
# make sure previously requested custom scopes are maintained
# in future authorizations
return self._get_scopes()
@property
def credentials(self):
"""Gets the authorized credentials for this flow, if they exist"""
return storage.get_storage(self.request).get()
"""Gets the authorized credentials for this flow, if they exist."""
return _credentials_from_request(self.request)
@property
def http(self):
"""Helper method to create an HTTP client authorized with OAuth2
credentials"""
credentials."""
if self.has_credentials():
return self.credentials.authorize(httplib2.Http())
return None

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Application Config For Django OAuth2 Helper
"""Application Config For Django OAuth2 Helper.
Django 1.7+ provides an
[applications](https://docs.djangoproject.com/en/1.8/ref/applications/)

View File

@@ -12,14 +12,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Decorators for Django OAuth2 Flow.
Contains two decorators, ``oauth_required`` and ``oauth_enabled``.
``oauth_required`` will ensure that a user has an oauth object containing
credentials associated with the request, and if not, redirect to the
authorization flow.
``oauth_enabled`` will attach the oauth2 object containing credentials if it
exists. If it doesn't, the view will still render, but helper methods will be
attached to start the oauth2 flow.
"""
from django import shortcuts
import django.conf
from six import wraps
from six.moves.urllib import parse
from oauth2client.contrib import django_util
def oauth_required(decorated_function=None, scopes=None, **decorator_kwargs):
""" Decorator to require OAuth2 credentials for a view
""" Decorator to require OAuth2 credentials for a view.
.. code-block:: python
@@ -40,19 +55,28 @@ def oauth_required(decorated_function=None, scopes=None, **decorator_kwargs):
return HttpResponse(
"email: {0}, calendar: {1}".format(email, str(events)))
:param decorated_function: View function to decorate, must have the Django
request object as the first argument
:param scopes: Scopes to require, will default
:param decorator_kwargs: Can include ``return_url`` to specify the URL to
return to after OAuth2 authorization is complete
:return: An OAuth2 Authorize view if credentials are not found or if the
credentials are missing the required scopes. Otherwise,
the decorated view.
"""
Args:
decorated_function: View function to decorate, must have the Django
request object as the first argument.
scopes: Scopes to require, will default.
decorator_kwargs: Can include ``return_url`` to specify the URL to
return to after OAuth2 authorization is complete.
Returns:
An OAuth2 Authorize view if credentials are not found or if the
credentials are missing the required scopes. Otherwise,
the decorated view.
"""
def curry_wrapper(wrapped_function):
@wraps(wrapped_function)
def required_wrapper(request, *args, **kwargs):
if not (django_util.oauth2_settings.storage_model is None or
request.user.is_authenticated()):
redirect_str = '{0}?next={1}'.format(
django.conf.settings.LOGIN_URL,
parse.quote(request.path))
return shortcuts.redirect(redirect_str)
return_url = decorator_kwargs.pop('return_url',
request.get_full_path())
user_oauth = django_util.UserOAuth2(request, scopes, return_url)
@@ -87,20 +111,22 @@ def oauth_enabled(decorated_function=None, scopes=None, **decorator_kwargs):
# this could be passed into a view
# request.oauth.http is also initialized
return HttpResponse("User email: {0}".format(
request.oauth.credentials.id_token['email']))
request.oauth.credentials.id_token['email'])
else:
return HttpResponse('Here is an OAuth Authorize link:
<a href="{0}">Authorize</a>'.format(
request.oauth.get_authorize_redirect()))
:param decorated_function: View function to decorate
:param scopes: Scopes to require, will default
:param decorator_kwargs: Can include ``return_url`` to specify the URL to
return to after OAuth2 authorization is complete
:return: The decorated view function
"""
Args:
decorated_function: View function to decorate.
scopes: Scopes to require, will default.
decorator_kwargs: Can include ``return_url`` to specify the URL to
return to after OAuth2 authorization is complete.
Returns:
The decorated view function.
"""
def curry_wrapper(wrapped_function):
@wraps(wrapped_function)
def enabled_wrapper(request, *args, **kwargs):

View File

@@ -0,0 +1,75 @@
# Copyright 2016 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains classes used for the Django ORM storage."""
import base64
import pickle
from django.db import models
from django.utils import encoding
import oauth2client
class CredentialsField(models.Field):
"""Django ORM field for storing OAuth2 Credentials."""
def __init__(self, *args, **kwargs):
if 'null' not in kwargs:
kwargs['null'] = True
super(CredentialsField, self).__init__(*args, **kwargs)
def get_internal_type(self):
return 'BinaryField'
def from_db_value(self, value, expression, connection, context):
"""Overrides ``models.Field`` method. This converts the value
returned from the database to an instance of this class.
"""
return self.to_python(value)
def to_python(self, value):
"""Overrides ``models.Field`` method. This is used to convert
bytes (from serialization etc) to an instance of this class"""
if value is None:
return None
elif isinstance(value, oauth2client.client.Credentials):
return value
else:
return pickle.loads(base64.b64decode(encoding.smart_bytes(value)))
def get_prep_value(self, value):
"""Overrides ``models.Field`` method. This is used to convert
the value from an instances of this class to bytes that can be
inserted into the database.
"""
if value is None:
return None
else:
return encoding.smart_text(base64.b64encode(pickle.dumps(value)))
def value_to_string(self, obj):
"""Convert the field value from the provided model to a string.
Used during model serialization.
Args:
obj: db.Model, model object
Returns:
string, the serialized field value
"""
value = self._get_val_from_obj(obj)
return self.get_prep_value(value)

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
""" Signals for Google OAuth2 Helper
"""Signals for Google OAuth2 Helper.
This module contains signals for Google OAuth2 Helper. Currently it only
contains one, which fires when an OAuth2 authorization flow has completed.

View File

@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains Django URL patterns used for OAuth2 flow."""
from django.conf import urls
from oauth2client.contrib.django_util import views

View File

@@ -12,16 +12,70 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from oauth2client.contrib.dictionary_storage import DictionaryStorage
"""Contains a storage module that stores credentials using the Django ORM."""
_CREDENTIALS_KEY = 'google_oauth2_credentials'
from oauth2client.client import Storage
def get_storage(request):
# TODO(issue 319): Make this pluggable with different storage providers
# https://github.com/google/oauth2client/issues/319
""" Gets a Credentials storage object for the Django OAuth2 Helper object
:param request: Reference to the current request object
:return: A OAuth2Client Storage implementation based on sessions
class DjangoORMStorage(Storage):
"""Store and retrieve a single credential to and from the Django datastore.
This Storage helper presumes the Credentials
have been stored as a CredentialsField
on a db model class.
"""
return DictionaryStorage(request.session, key=_CREDENTIALS_KEY)
def __init__(self, model_class, key_name, key_value, property_name):
"""Constructor for Storage.
Args:
model: string, fully qualified name of db.Model model class.
key_name: string, key name for the entity that has the credentials
key_value: string, key value for the entity that has the
credentials.
property_name: string, name of the property that is an
CredentialsProperty.
"""
super(DjangoORMStorage, self).__init__()
self.model_class = model_class
self.key_name = key_name
self.key_value = key_value
self.property_name = property_name
def locked_get(self):
"""Retrieve stored credential from the Django ORM.
Returns:
oauth2client.Credentials retrieved from the Django ORM, associated
with the ``model``, ``key_value``->``key_name`` pair used to query
for the model, and ``property_name`` identifying the
``CredentialsProperty`` field, all of which are defined in the
constructor for this Storage object.
"""
query = {self.key_name: self.key_value}
entities = self.model_class.objects.filter(**query)
if len(entities) > 0:
credential = getattr(entities[0], self.property_name)
if getattr(credential, 'set_store', None) is not None:
credential.set_store(self)
return credential
else:
return None
def locked_put(self, credentials):
"""Write a Credentials to the Django datastore.
Args:
credentials: Credentials, the credentials to store.
"""
entity, _ = self.model_class.objects.get_or_create(
**{self.key_name: self.key_value})
setattr(entity, self.property_name, credentials)
entity.save()
def locked_delete(self):
"""Delete Credentials from the datastore."""
query = {self.key_name: self.key_value}
self.model_class.objects.filter(**query).delete()

View File

@@ -12,6 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module contains the views used by the OAuth2 flows.
Their are two views used by the OAuth2 flow, the authorize and the callback
view. The authorize view kicks off the three-legged OAuth flow, and the
callback view validates the flow and if successful stores the credentials
in the configured storage."""
import hashlib
import json
import os
@@ -19,19 +26,32 @@ import pickle
from django import http
from django import shortcuts
from django.conf import settings
from django.core import urlresolvers
from django.shortcuts import redirect
from six.moves.urllib import parse
from oauth2client import client
from oauth2client.contrib import django_util
from oauth2client.contrib.django_util import get_storage
from oauth2client.contrib.django_util import signals
from oauth2client.contrib.django_util import storage
_CSRF_KEY = 'google_oauth2_csrf_token'
_FLOW_KEY = 'google_oauth2_flow_{0}'
def _make_flow(request, scopes, return_url=None):
"""Creates a Web Server Flow"""
"""Creates a Web Server Flow
Args:
request: A Django request object.
scopes: the request oauth2 scopes.
return_url: The URL to return to after the flow is complete. Defaults
to the path of the current request.
Returns:
An OAuth2 flow object that has been stored in the session.
"""
# Generate a CSRF token to prevent malicious requests.
csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest()
@@ -57,7 +77,17 @@ def _make_flow(request, scopes, return_url=None):
def _get_flow_for_token(csrf_token, request):
""" Looks up the flow in session to recover information about requested
scopes."""
scopes.
Args:
csrf_token: The token passed in the callback request that should
match the one previously generated and stored in the request on the
initial authorization view.
Returns:
The OAuth2 Flow object associated with this flow based on the
CSRF token.
"""
flow_pickle = request.session.get(_FLOW_KEY.format(csrf_token), None)
return None if flow_pickle is None else pickle.loads(flow_pickle)
@@ -70,8 +100,11 @@ def oauth2_callback(request):
and redirects to the return_url specified in the authorize view and
stored in the session.
:param request: Django request
:return: A redirect response back to the return_url
Args:
request: Django request.
Returns:
A redirect response back to the return_url.
"""
if 'error' in request.GET:
reason = request.GET.get(
@@ -84,13 +117,13 @@ def oauth2_callback(request):
code = request.GET['code']
except KeyError:
return http.HttpResponseBadRequest(
"Request missing state or authorization code")
'Request missing state or authorization code')
try:
server_csrf = request.session[_CSRF_KEY]
except KeyError:
return http.HttpResponseBadRequest(
"No existing session for this flow.")
'No existing session for this flow.')
try:
state = json.loads(encoded_state)
@@ -105,23 +138,24 @@ def oauth2_callback(request):
flow = _get_flow_for_token(client_csrf, request)
if not flow:
return http.HttpResponseBadRequest("Missing Oauth2 flow.")
return http.HttpResponseBadRequest('Missing Oauth2 flow.')
try:
credentials = flow.step2_exchange(code)
except client.FlowExchangeError as exchange_error:
return http.HttpResponseBadRequest(
"An error has occurred: {0}".format(exchange_error))
'An error has occurred: {0}'.format(exchange_error))
storage.get_storage(request).put(credentials)
get_storage(request).put(credentials)
signals.oauth2_authorized.send(sender=signals.oauth2_authorized,
request=request, credentials=credentials)
return shortcuts.redirect(return_url)
def oauth2_authorize(request):
""" View to start the OAuth2 Authorization flow
""" View to start the OAuth2 Authorization flow.
This view starts the OAuth2 authorization flow. If scopes is passed in
as a GET URL parameter, it will authorize those scopes, otherwise the
@@ -129,12 +163,26 @@ def oauth2_authorize(request):
specified as a GET parameter, otherwise the referer header will be
checked, and if that isn't found it will return to the root path.
:param request: The Django request object
:return: A redirect to Google OAuth2 Authorization
Args:
request: The Django request object.
Returns:
A redirect to Google OAuth2 Authorization.
"""
scopes = request.GET.getlist('scopes', django_util.oauth2_settings.scopes)
return_url = request.GET.get('return_url', None)
# Model storage (but not session storage) requires a logged in user
if django_util.oauth2_settings.storage_model:
if not request.user.is_authenticated():
return redirect('{0}?next={1}'.format(
settings.LOGIN_URL, parse.quote(request.get_full_path())))
# This checks for the case where we ended up here because of a logged
# out user but we had credentials for it in the first place
elif get_storage(request).get() is not None:
return redirect(return_url)
scopes = request.GET.getlist('scopes', django_util.oauth2_settings.scopes)
if not return_url:
return_url = request.META.get('HTTP_REFERER', '/')
flow = _make_flow(request=request, scopes=scopes, return_url=return_url)

View File

@@ -0,0 +1,44 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Setups the Django test environment and provides helper classes."""
import django
from django import test
from django.contrib.sessions.backends.file import SessionStore
from django.test.runner import DiscoverRunner
django.setup()
default_app_config = 'tests.contrib.django_util.apps.AppConfig'
class TestWithDjangoEnvironment(test.TestCase):
@classmethod
def setUpClass(cls):
django.setup()
cls.runner = DiscoverRunner()
cls.runner.setup_test_environment()
cls.old_config = cls.runner.setup_databases()
@classmethod
def tearDownClass(cls):
cls.runner.teardown_databases(cls.old_config)
cls.runner.teardown_test_environment()
def setUp(self):
self.factory = test.RequestFactory()
store = SessionStore()
store.save()
self.session = store

View File

@@ -0,0 +1,27 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines a configuration for our test application.
Having a test application enables us to use the Django test database and
other useful features."""
from django.apps import AppConfig
class DjangoOrmTestApp(AppConfig):
"""App Config for Django Helper."""
name = 'tests.contrib.django_util'
verbose_name = "Django Test App"
label = "DjangoORMTestApp"

View File

@@ -0,0 +1,25 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Models used in our tests"""
from django.contrib.auth.models import User
from django.db import models
from oauth2client.contrib.django_util.models import CredentialsField
class CredentialsModel(models.Model):
user_id = models.OneToOneField(User)
credentials = CredentialsField()

View File

@@ -12,8 +12,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides a base Django settings module used by the rest of the tests."""
import os
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'oauth2client.contrib.django_util',
'tests.contrib.django_util.apps.DjangoOrmTestApp',
]
SECRET_KEY = 'this string is not a real django secret key'
DATABASES = {
'default': {
@@ -31,4 +44,4 @@ GOOGLE_OAUTH2_CLIENT_ID = 'client_id2'
GOOGLE_OAUTH2_CLIENT_SECRET = 'hunter2'
GOOGLE_OAUTH2_SCOPES = ('https://www.googleapis.com/auth/cloud-platform',)
ROOT_URLCONF = 'tests.contrib.test_django_util'
ROOT_URLCONF = 'tests.contrib.django_util.test_django_util'

View File

@@ -0,0 +1,246 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the django_util decorators."""
import copy
from django import http
import django.conf
from django.contrib.auth.models import AnonymousUser, User
import mock
from six.moves import http_client
from six.moves import reload_module
from six.moves.urllib import parse
from tests.contrib.django_util import TestWithDjangoEnvironment
import oauth2client.contrib.django_util
from oauth2client.contrib.django_util import decorators
class OAuth2EnabledDecoratorTest(TestWithDjangoEnvironment):
def setUp(self):
super(OAuth2EnabledDecoratorTest, self).setUp()
self.save_settings = copy.deepcopy(django.conf.settings)
# OAuth2 Settings gets configured based on Django settings
# at import time, so in order for us to reload the settings
# we need to reload the module
reload_module(oauth2client.contrib.django_util)
self.user = User.objects.create_user(
username='bill', email='bill@example.com', password='hunter2')
def tearDown(self):
super(OAuth2EnabledDecoratorTest, self).tearDown()
django.conf.settings = copy.deepcopy(self.save_settings)
def test_no_credentials_without_credentials(self):
request = self.factory.get('/test')
request.session = self.session
@decorators.oauth_enabled
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertIsNotNone(request.oauth)
self.assertFalse(request.oauth.has_credentials())
self.assertIsNone(request.oauth.http)
@mock.patch('oauth2client.contrib.dictionary_storage.OAuth2Credentials')
def test_has_credentials_in_storage(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes.return_value = True
credentials_mock.invalid = False
credentials_mock.scopes = set([])
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_enabled
def test_view(request):
return http.HttpResponse('test')
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertEqual(response.content, b'test')
self.assertTrue(request.oauth.has_credentials())
self.assertIsNotNone(request.oauth.http)
self.assertSetEqual(
request.oauth.scopes,
set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
@mock.patch('oauth2client.contrib.dictionary_storage.DictionaryStorage')
def test_specified_scopes(self, dictionary_storage_mock):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes = True
credentials_mock.is_valid = True
dictionary_storage_mock.get.return_value = credentials_mock
@decorators.oauth_enabled(scopes=['additional-scope'])
def test_view(request):
return http.HttpResponse('hello world') # pragma: NO COVER
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertIsNotNone(request.oauth)
self.assertFalse(request.oauth.has_credentials())
class OAuth2RequiredDecoratorTest(TestWithDjangoEnvironment):
def setUp(self):
super(OAuth2RequiredDecoratorTest, self).setUp()
self.save_settings = copy.deepcopy(django.conf.settings)
reload_module(oauth2client.contrib.django_util)
self.user = User.objects.create_user(
username='bill', email='bill@example.com', password='hunter2')
def tearDown(self):
super(OAuth2RequiredDecoratorTest, self).tearDown()
django.conf.settings = copy.deepcopy(self.save_settings)
def test_redirects_without_credentials(self):
request = self.factory.get('/test')
request.session = self.session
@decorators.oauth_required
def test_view(request):
return http.HttpResponse('test') # pragma: NO COVER
response = test_view(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
self.assertEqual(parse.urlparse(response['Location']).path,
'/oauth2/oauth2authorize/')
self.assertIn(
'return_url=%2Ftest', parse.urlparse(response['Location']).query)
self.assertEqual(response.status_code,
http.HttpResponseRedirect.status_code)
@mock.patch('oauth2client.contrib.django_util.UserOAuth2', autospec=True)
def test_has_credentials_in_storage(self, UserOAuth2):
request = self.factory.get('/test')
request.session = mock.MagicMock()
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test")
my_user_oauth = mock.MagicMock()
UserOAuth2.return_value = my_user_oauth
my_user_oauth.has_credentials.return_value = True
response = test_view(request)
self.assertEqual(response.status_code, http_client.OK)
self.assertEqual(response.content, b"test")
@mock.patch('oauth2client.contrib.dictionary_storage.OAuth2Credentials')
def test_has_credentials_in_storage_no_scopes(
self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes.return_value = False
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertEqual(
response.status_code, django.http.HttpResponseRedirect.status_code)
@mock.patch('oauth2client.contrib.dictionary_storage.OAuth2Credentials')
def test_specified_scopes(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes = False
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_required(scopes=['additional-scope'])
def test_view(request):
return http.HttpResponse("hello world") # pragma: NO COVER
response = test_view(request)
self.assertEqual(
response.status_code, django.http.HttpResponseRedirect.status_code)
class OAuth2RequiredDecoratorStorageModelTest(TestWithDjangoEnvironment):
def setUp(self):
super(OAuth2RequiredDecoratorStorageModelTest, self).setUp()
self.save_settings = copy.deepcopy(django.conf.settings)
STORAGE_MODEL = {
'model': 'tests.contrib.django_util.models.CredentialsModel',
'user_property': 'user_id',
'credentials_property': 'credentials'
}
django.conf.settings.GOOGLE_OAUTH2_STORAGE_MODEL = STORAGE_MODEL
reload_module(oauth2client.contrib.django_util)
self.user = User.objects.create_user(
username='bill', email='bill@example.com', password='hunter2')
def tearDown(self):
super(OAuth2RequiredDecoratorStorageModelTest, self).tearDown()
django.conf.settings = copy.deepcopy(self.save_settings)
def test_redirects_anonymous_to_login(self):
request = self.factory.get('/test')
request.session = self.session
request.user = AnonymousUser()
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
self.assertEqual(parse.urlparse(response['Location']).path,
django.conf.settings.LOGIN_URL)
def test_redirects_user_to_oauth_authorize(self):
request = self.factory.get('/test')
request.session = self.session
request.user = User.objects.create_user(
username='bill3', email='bill@example.com', password='hunter2')
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
self.assertEqual(parse.urlparse(response['Location']).path,
'/oauth2/oauth2authorize/')

View File

@@ -0,0 +1,99 @@
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Django model tests.
Unit tests for models and fields defined by the django_util helper.
"""
import base64
import pickle
from tests.contrib.django_util.models import CredentialsModel
import unittest2
from oauth2client._helpers import _from_bytes
from oauth2client.client import Credentials
from oauth2client.contrib.django_util.models import CredentialsField
class TestCredentialsField(unittest2.TestCase):
def setUp(self):
self.fake_model = CredentialsModel()
self.fake_model_field = self.fake_model._meta.get_field('credentials')
self.field = CredentialsField(null=True)
self.credentials = Credentials()
self.pickle_str = _from_bytes(
base64.b64encode(pickle.dumps(self.credentials)))
def test_field_is_text(self):
self.assertEqual(self.field.get_internal_type(), 'BinaryField')
def test_field_unpickled(self):
self.assertIsInstance(
self.field.to_python(self.pickle_str), Credentials)
def test_field_already_unpickled(self):
self.assertIsInstance(
self.field.to_python(self.credentials), Credentials)
def test_none_field_unpickled(self):
self.assertIsNone(self.field.to_python(None))
def test_from_db_value(self):
value = self.field.from_db_value(
self.pickle_str, None, None, None)
self.assertIsInstance(value, Credentials)
def test_field_unpickled_none(self):
self.assertEqual(self.field.to_python(None), None)
def test_field_pickled(self):
prep_value = self.field.get_db_prep_value(self.credentials,
connection=None)
self.assertEqual(prep_value, self.pickle_str)
def test_field_value_to_string(self):
self.fake_model.credentials = self.credentials
value_str = self.fake_model_field.value_to_string(self.fake_model)
self.assertEqual(value_str, self.pickle_str)
def test_field_value_to_string_none(self):
self.fake_model.credentials = None
value_str = self.fake_model_field.value_to_string(self.fake_model)
self.assertIsNone(value_str)
def test_credentials_without_null(self):
credentials = CredentialsField()
self.assertTrue(credentials.null)
class CredentialWithSetStore(CredentialsField):
def __init__(self):
self.model = CredentialWithSetStore
def set_store(self, storage):
pass # pragma: NO COVER
class FakeCredentialsModelMock(object):
credentials = CredentialWithSetStore()
class FakeCredentialsModelMockNoSet(object):
credentials = CredentialsField()

View File

@@ -0,0 +1,164 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the DjangoORM storage class."""
# Mock a Django environment
import datetime
from django.db import models
import mock
import unittest2
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client.client import OAuth2Credentials
from oauth2client.contrib.django_util.models import CredentialsField
from oauth2client.contrib.django_util.storage import (
DjangoORMStorage as Storage)
class TestStorage(unittest2.TestCase):
def setUp(self):
access_token = 'foo'
client_id = 'some_client_id'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
user_agent = 'refresh_checker/1.0'
self.credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, GOOGLE_TOKEN_URI,
user_agent)
self.key_name = 'id'
self.key_value = '1'
self.property_name = 'credentials'
def test_constructor(self):
storage = Storage(FakeCredentialsModel, self.key_name,
self.key_value, self.property_name)
self.assertEqual(storage.model_class, FakeCredentialsModel)
self.assertEqual(storage.key_name, self.key_name)
self.assertEqual(storage.key_value, self.key_value)
self.assertEqual(storage.property_name, self.property_name)
@mock.patch('django.db.models')
def test_locked_get(self, djangoModel):
fake_model_with_credentials = FakeCredentialsModelMock()
entities = [
fake_model_with_credentials
]
filter_mock = mock.Mock(return_value=entities)
object_mock = mock.Mock()
object_mock.filter = filter_mock
FakeCredentialsModelMock.objects = object_mock
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
credential = storage.locked_get()
self.assertEqual(
credential, fake_model_with_credentials.credentials)
@mock.patch('django.db.models')
def test_locked_get_no_entities(self, djangoModel):
entities = []
filter_mock = mock.Mock(return_value=entities)
object_mock = mock.Mock()
object_mock.filter = filter_mock
FakeCredentialsModelMock.objects = object_mock
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
credential = storage.locked_get()
self.assertIsNone(credential)
@mock.patch('django.db.models')
def test_locked_get_no_set_store(self, djangoModel):
fake_model_with_credentials = FakeCredentialsModelMockNoSet()
entities = [
fake_model_with_credentials
]
filter_mock = mock.Mock(return_value=entities)
object_mock = mock.Mock()
object_mock.filter = filter_mock
FakeCredentialsModelMockNoSet.objects = object_mock
storage = Storage(FakeCredentialsModelMockNoSet, self.key_name,
self.key_value, self.property_name)
credential = storage.locked_get()
self.assertEqual(
credential, fake_model_with_credentials.credentials)
@mock.patch('django.db.models')
def test_locked_put(self, djangoModel):
entity_mock = mock.Mock(credentials=None)
objects = mock.Mock(
get_or_create=mock.Mock(return_value=(entity_mock, None)))
FakeCredentialsModelMock.objects = objects
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
storage.locked_put(self.credentials)
@mock.patch('django.db.models')
def test_locked_delete(self, djangoModel):
class FakeEntities(object):
def __init__(self):
self.deleted = False
def delete(self):
self.deleted = True
fake_entities = FakeEntities()
entities = fake_entities
filter_mock = mock.Mock(return_value=entities)
object_mock = mock.Mock()
object_mock.filter = filter_mock
FakeCredentialsModelMock.objects = object_mock
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
storage.locked_delete()
self.assertTrue(fake_entities.deleted)
class CredentialWithSetStore(CredentialsField):
def __init__(self):
self.model = CredentialWithSetStore
def set_store(self, storage):
pass
class FakeCredentialsModel(models.Model):
credentials = CredentialsField()
class FakeCredentialsModelMock(object):
def __init__(self, *args, **kwargs):
self.model = FakeCredentialsModelMock
self.saved = False
self.deleted = False
credentials = CredentialWithSetStore()
class FakeCredentialsModelMockNoSet(object):
def __init__(self, set_store=False, *args, **kwargs):
self.model = FakeCredentialsModelMock
self.saved = False
self.deleted = False
credentials = CredentialsField()

View File

@@ -0,0 +1,172 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests the initialization logic of django_util."""
import copy
import django.conf
from django.conf.urls import include, url
from django.contrib.auth.models import AnonymousUser
from django.core import exceptions
import mock
from six.moves import reload_module
from tests.contrib.django_util import TestWithDjangoEnvironment
import unittest2
from oauth2client.contrib import django_util
import oauth2client.contrib.django_util
from oauth2client.contrib.django_util import (
_CREDENTIALS_KEY, get_storage, site, UserOAuth2)
urlpatterns = [
url(r'^oauth2/', include(site.urls))
]
class OAuth2SetupTest(unittest2.TestCase):
def setUp(self):
self.save_settings = copy.deepcopy(django.conf.settings)
# OAuth2 Settings gets configured based on Django settings
# at import time, so in order for us to reload the settings
# we need to reload the module
reload_module(oauth2client.contrib.django_util)
def tearDown(self):
django.conf.settings = copy.deepcopy(self.save_settings)
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_settings_initialize(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = 'file.json'
clientsecrets.loadfile.return_value = (
clientsecrets.TYPE_WEB,
{
'client_id': 'myid',
'client_secret': 'hunter2'
}
)
oauth2_settings = django_util.OAuth2Settings(django.conf.settings)
self.assertTrue(clientsecrets.loadfile.called)
self.assertEqual(oauth2_settings.client_id, 'myid')
self.assertEqual(oauth2_settings.client_secret, 'hunter2')
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = None
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_settings_initialize_invalid_type(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = 'file.json'
clientsecrets.loadfile.return_value = (
"wrong_type",
{
'client_id': 'myid',
'client_secret': 'hunter2'
}
)
with self.assertRaises(ValueError):
django_util.OAuth2Settings.__init__(
object.__new__(django_util.OAuth2Settings),
django.conf.settings)
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_no_settings(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = None
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRET = None
django.conf.settings.GOOGLE_OAUTH2_CLIENT_ID = None
with self.assertRaises(exceptions.ImproperlyConfigured):
django_util.OAuth2Settings.__init__(
object.__new__(django_util.OAuth2Settings),
django.conf.settings)
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_no_session_middleware(self, clientsecrets):
django.conf.settings.MIDDLEWARE_CLASSES = ()
with self.assertRaises(exceptions.ImproperlyConfigured):
django_util.OAuth2Settings.__init__(
object.__new__(django_util.OAuth2Settings),
django.conf.settings)
def test_storage_model(self):
STORAGE_MODEL = {
'model': 'tests.contrib.django_util.models.CredentialsModel',
'user_property': 'user_id',
'credentials_property': 'credentials'
}
django.conf.settings.GOOGLE_OAUTH2_STORAGE_MODEL = STORAGE_MODEL
oauth2_settings = django_util.OAuth2Settings(django.conf.settings)
self.assertEqual(oauth2_settings.storage_model, STORAGE_MODEL['model'])
self.assertEqual(oauth2_settings.storage_model_user_property,
STORAGE_MODEL['user_property'])
self.assertEqual(oauth2_settings.storage_model_credentials_property,
STORAGE_MODEL['credentials_property'])
class MockObjectWithSession(object):
def __init__(self, session):
self.session = session
class SessionStorageTest(TestWithDjangoEnvironment):
def setUp(self):
super(SessionStorageTest, self).setUp()
self.save_settings = copy.deepcopy(django.conf.settings)
reload_module(oauth2client.contrib.django_util)
def tearDown(self):
super(SessionStorageTest, self).tearDown()
django.conf.settings = copy.deepcopy(self.save_settings)
def test_session_delete(self):
self.session[_CREDENTIALS_KEY] = "test_val"
request = MockObjectWithSession(self.session)
django_storage = get_storage(request)
django_storage.delete()
self.assertIsNone(self.session.get(_CREDENTIALS_KEY))
def test_session_delete_nothing(self):
request = MockObjectWithSession(self.session)
django_storage = get_storage(request)
django_storage.delete()
class TestUserOAuth2Object(TestWithDjangoEnvironment):
def setUp(self):
super(TestUserOAuth2Object, self).setUp()
self.save_settings = copy.deepcopy(django.conf.settings)
STORAGE_MODEL = {
'model': 'tests.contrib.django_util.models.CredentialsModel',
'user_property': 'user_id',
'credentials_property': 'credentials'
}
django.conf.settings.GOOGLE_OAUTH2_STORAGE_MODEL = STORAGE_MODEL
reload_module(oauth2client.contrib.django_util)
def tearDown(self):
super(TestUserOAuth2Object, self).tearDown()
import django.conf
django.conf.settings = copy.deepcopy(self.save_settings)
def test_get_credentials_anon_user(self):
request = self.factory.get('oauth2/oauth2authorize',
data={'return_url': '/return_endpoint'})
request.session = self.session
request.user = AnonymousUser()
oauth2 = UserOAuth2(request)
self.assertIsNone(oauth2.credentials)

View File

@@ -0,0 +1,274 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit test for django_util views"""
import copy
import json
import django
from django import http
import django.conf
from django.contrib.auth.models import AnonymousUser, User
import mock
from six.moves import reload_module
from tests.contrib.django_util import TestWithDjangoEnvironment
from tests.contrib.django_util.models import CredentialsModel
from oauth2client.client import FlowExchangeError, OAuth2WebServerFlow
import oauth2client.contrib.django_util
from oauth2client.contrib.django_util import views
from oauth2client.contrib.django_util.models import CredentialsField
class OAuth2AuthorizeTest(TestWithDjangoEnvironment):
def setUp(self):
super(OAuth2AuthorizeTest, self).setUp()
self.save_settings = copy.deepcopy(django.conf.settings)
reload_module(oauth2client.contrib.django_util)
self.user = User.objects.create_user(
username='bill', email='bill@example.com', password='hunter2')
def tearDown(self):
django.conf.settings = copy.deepcopy(self.save_settings)
def test_authorize_works(self):
request = self.factory.get('oauth2/oauth2authorize')
request.session = self.session
request.user = self.user
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
def test_authorize_anonymous_user(self):
request = self.factory.get('oauth2/oauth2authorize')
request.session = self.session
request.user = AnonymousUser()
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
def test_authorize_works_explicit_return_url(self):
request = self.factory.get('oauth2/oauth2authorize',
data={'return_url': '/return_endpoint'})
request.session = self.session
request.user = self.user
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
class Oauth2AuthorizeStorageModelTest(TestWithDjangoEnvironment):
def setUp(self):
super(Oauth2AuthorizeStorageModelTest, self).setUp()
self.save_settings = copy.deepcopy(django.conf.settings)
STORAGE_MODEL = {
'model': 'tests.contrib.django_util.models.CredentialsModel',
'user_property': 'user_id',
'credentials_property': 'credentials'
}
django.conf.settings.GOOGLE_OAUTH2_STORAGE_MODEL = STORAGE_MODEL
# OAuth2 Settings gets configured based on Django settings
# at import time, so in order for us to reload the settings
# we need to reload the module
reload_module(oauth2client.contrib.django_util)
self.user = User.objects.create_user(
username='bill', email='bill@example.com', password='hunter2')
def tearDown(self):
django.conf.settings = copy.deepcopy(self.save_settings)
def test_authorize_works(self):
request = self.factory.get('oauth2/oauth2authorize')
request.session = self.session
request.user = self.user
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
# redirects to Google oauth
self.assertIn('accounts.google.com', response.url)
def test_authorize_anonymous_user_redirects_login(self):
request = self.factory.get('oauth2/oauth2authorize')
request.session = self.session
request.user = AnonymousUser()
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
# redirects to Django login
self.assertIn(django.conf.settings.LOGIN_URL, response.url)
def test_authorize_works_explicit_return_url(self):
request = self.factory.get('oauth2/oauth2authorize',
data={'return_url': '/return_endpoint'})
request.session = self.session
request.user = self.user
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
def test_authorized_user_not_logged_in_redirects(self):
request = self.factory.get('oauth2/oauth2authorize',
data={'return_url': '/return_endpoint'})
request.session = self.session
authorized_user = User.objects.create_user(
username='bill2', email='bill@example.com', password='hunter2')
credentials = CredentialsField()
CredentialsModel.objects.create(
user_id=authorized_user,
credentials=credentials)
request.user = authorized_user
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
class Oauth2CallbackTest(TestWithDjangoEnvironment):
def setUp(self):
super(Oauth2CallbackTest, self).setUp()
self.save_settings = copy.deepcopy(django.conf.settings)
reload_module(oauth2client.contrib.django_util)
self.CSRF_TOKEN = 'token'
self.RETURN_URL = 'http://return-url.com'
self.fake_state = {
'csrf_token': self.CSRF_TOKEN,
'return_url': self.RETURN_URL,
'scopes': django.conf.settings.GOOGLE_OAUTH2_SCOPES
}
self.user = User.objects.create_user(
username='bill', email='bill@example.com', password='hunter2')
@mock.patch('oauth2client.contrib.django_util.views.pickle')
def test_callback_works(self, pickle):
request = self.factory.get('oauth2/oauth2callback', data={
'state': json.dumps(self.fake_state),
'code': 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
flow = OAuth2WebServerFlow(
client_id='clientid',
client_secret='clientsecret',
scope=['email'],
state=json.dumps(self.fake_state),
redirect_uri=request.build_absolute_uri("oauth2/oauth2callback"))
name = 'google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)
self.session[name] = pickle.dumps(flow)
flow.step2_exchange = mock.Mock()
pickle.loads.return_value = flow
request.session = self.session
request.user = self.user
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
self.assertEqual(
response.status_code, django.http.HttpResponseRedirect.status_code)
self.assertEqual(response['Location'], self.RETURN_URL)
@mock.patch('oauth2client.contrib.django_util.views.pickle')
def test_callback_handles_bad_flow_exchange(self, pickle):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
flow = OAuth2WebServerFlow(
client_id='clientid',
client_secret='clientsecret',
scope=['email'],
state=json.dumps(self.fake_state),
redirect_uri=request.build_absolute_uri('oauth2/oauth2callback'))
self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] \
= pickle.dumps(flow)
def local_throws(code):
raise FlowExchangeError('test')
flow.step2_exchange = local_throws
pickle.loads.return_value = flow
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
def test_error_returns_bad_request(self):
request = self.factory.get('oauth2/oauth2callback', data={
'error': 'There was an error in your authorization.',
})
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertIn(b'Authorization failed', response.content)
def test_no_session(self):
request = self.factory.get('oauth2/oauth2callback', data={
'code': 123,
'state': json.dumps(self.fake_state)
})
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertEqual(
response.content, b'No existing session for this flow.')
def test_missing_state_returns_bad_request(self):
request = self.factory.get('oauth2/oauth2callback', data={
'code': 123
})
self.session['google_oauth2_csrf_token'] = "token"
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
def test_bad_state(self):
request = self.factory.get('oauth2/oauth2callback', data={
'code': 123,
'state': json.dumps({'wrong': 'state'})
})
self.session['google_oauth2_csrf_token'] = 'token'
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertEqual(response.content, b'Invalid state parameter.')
def test_bad_csrf(self):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = 'WRONG TOKEN'
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertEqual(response.content, b'Invalid CSRF token.')
def test_no_saved_flow(self):
request = self.factory.get('oauth2/oauth2callback', data={
'state': json.dumps(self.fake_state),
'code': 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] = None
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertEqual(response.content, b'Missing Oauth2 flow.')

View File

@@ -1,316 +0,0 @@
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Discovery document tests
Unit tests for objects created from discovery documents.
"""
import base64
import datetime
import os
import pickle
# Mock a Django environment
os.environ['DJANGO_SETTINGS_MODULE'] = 'tests.contrib.test_django_settings'
from django.conf import settings
settings.SECRET_KEY = 'this string is not a real Django SECRET_KEY'
settings.INSTALLED_APPS = ['tests.contrib.test_django_orm']
import django
django.setup()
from django.apps import AppConfig
class DjangoOrmTestApp(AppConfig):
"""App Config for Django Helper."""
name = 'oauth2client.tests.contrib.test_django_orm'
verbose_name = "Django Test App"
from django.db import models
import mock
import unittest2
from oauth2client import GOOGLE_TOKEN_URI
from oauth2client._helpers import _from_bytes
from oauth2client.client import Credentials
from oauth2client.client import Flow
from oauth2client.client import OAuth2Credentials
from oauth2client.contrib.django_orm import CredentialsField
from oauth2client.contrib.django_orm import FlowField
from oauth2client.contrib.django_orm import Storage
__author__ = 'conleyo@google.com (Conley Owens)'
class TestCredentialsField(unittest2.TestCase):
def setUp(self):
self.fake_model = FakeCredentialsModel()
self.fake_model_field = self.fake_model._meta.get_field('credentials')
self.field = CredentialsField(null=True)
self.credentials = Credentials()
self.pickle_str = _from_bytes(
base64.b64encode(pickle.dumps(self.credentials)))
def test_field_is_text(self):
self.assertEquals(self.field.get_internal_type(), 'TextField')
def test_field_unpickled(self):
self.assertTrue(
isinstance(self.field.to_python(self.pickle_str), Credentials))
def test_field_already_unpickled(self):
self.assertIsInstance(
self.field.to_python(self.credentials), Credentials)
def test_none_field_unpickled(self):
self.assertIsNone(self.field.to_python(None))
def test_from_db_value(self):
value = self.field.from_db_value(
self.pickle_str, None, None, None)
self.assertIsInstance(value, Credentials)
def test_field_unpickled_none(self):
self.assertEqual(self.field.to_python(None), None)
def test_field_pickled(self):
prep_value = self.field.get_db_prep_value(self.credentials,
connection=None)
self.assertEqual(prep_value, self.pickle_str)
def test_field_value_to_string(self):
self.fake_model.credentials = self.credentials
value_str = self.fake_model_field.value_to_string(self.fake_model)
self.assertEqual(value_str, self.pickle_str)
def test_field_value_to_string_none(self):
self.fake_model.credentials = None
value_str = self.fake_model_field.value_to_string(self.fake_model)
self.assertEqual(value_str, None)
def test_credentials_without_null(self):
credentials = CredentialsField()
self.assertTrue(credentials.null)
class TestFlowField(unittest2.TestCase):
class FakeFlowModel(models.Model):
flow = FlowField()
def setUp(self):
self.fake_model = self.FakeFlowModel()
self.fake_model_field = self.fake_model._meta.get_field('flow')
self.field = FlowField(null=True)
self.flow = Flow()
self.pickle_str = _from_bytes(
base64.b64encode(pickle.dumps(self.flow)))
def test_field_is_text(self):
self.assertEquals(self.field.get_internal_type(), 'TextField')
def test_field_unpickled(self):
python_val = self.field.to_python(self.pickle_str)
self.assertIsInstance(python_val, Flow)
def test_field_already_unpickled(self):
self.assertTrue(
isinstance(self.field.to_python(self.flow), Flow))
def test_none_field_unpickled(self):
self.assertIsNone(self.field.to_python(None))
def test_from_db_value(self):
python_val = self.field.from_db_value(
self.pickle_str, None, None, None)
self.assertIsInstance(python_val, Flow)
def test_field_pickled(self):
prep_value = self.field.get_db_prep_value(self.flow, connection=None)
self.assertEqual(prep_value, self.pickle_str)
def test_field_value_to_string(self):
self.fake_model.flow = self.flow
value_str = self.fake_model_field.value_to_string(self.fake_model)
self.assertEqual(value_str, self.pickle_str)
def test_field_value_to_string_none(self):
self.fake_model.flow = None
value_str = self.fake_model_field.value_to_string(self.fake_model)
self.assertEqual(value_str, None)
def test_flow_with_null(self):
flow = FlowField()
self.assertTrue(flow.null)
class TestStorage(unittest2.TestCase):
def setUp(self):
access_token = 'foo'
client_id = 'some_client_id'
client_secret = 'cOuDdkfjxxnv+'
refresh_token = '1/0/a.df219fjls0'
token_expiry = datetime.datetime.utcnow()
user_agent = 'refresh_checker/1.0'
self.credentials = OAuth2Credentials(
access_token, client_id, client_secret,
refresh_token, token_expiry, GOOGLE_TOKEN_URI,
user_agent)
self.key_name = 'id'
self.key_value = '1'
self.property_name = 'credentials'
def test_constructor(self):
storage = Storage(FakeCredentialsModel, self.key_name,
self.key_value, self.property_name)
self.assertEqual(storage.model_class, FakeCredentialsModel)
self.assertEqual(storage.key_name, self.key_name)
self.assertEqual(storage.key_value, self.key_value)
self.assertEqual(storage.property_name, self.property_name)
@mock.patch('django.db.models')
def test_locked_get(self, djangoModel):
fake_model_with_credentials = FakeCredentialsModelMock()
entities = [
fake_model_with_credentials
]
filter_mock = mock.Mock(return_value=entities)
object_mock = mock.Mock()
object_mock.filter = filter_mock
FakeCredentialsModelMock.objects = object_mock
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
credential = storage.locked_get()
self.assertEqual(
credential, fake_model_with_credentials.credentials)
@mock.patch('django.db.models')
def test_locked_get_no_entities(self, djangoModel):
entities = [
]
filter_mock = mock.Mock(return_value=entities)
object_mock = mock.Mock()
object_mock.filter = filter_mock
FakeCredentialsModelMock.objects = object_mock
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
credential = storage.locked_get()
self.assertIsNone(credential)
@mock.patch('django.db.models')
def test_locked_get_no_set_store(self, djangoModel):
fake_model_with_credentials = FakeCredentialsModelMockNoSet()
entities = [
fake_model_with_credentials
]
filter_mock = mock.Mock(return_value=entities)
object_mock = mock.Mock()
object_mock.filter = filter_mock
FakeCredentialsModelMockNoSet.objects = object_mock
storage = Storage(FakeCredentialsModelMockNoSet, self.key_name,
self.key_value, self.property_name)
credential = storage.locked_get()
self.assertEqual(
credential, fake_model_with_credentials.credentials)
@mock.patch('django.db.models')
def test_locked_put(self, djangoModel):
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
storage.locked_put(self.credentials)
@mock.patch('django.db.models')
def test_locked_put_with_overwite(self, djangoModel):
get_or_create_mock = mock.Mock()
fake_credentials = FakeCredentialsModelMock()
get_or_create_mock.return_value = (fake_credentials, True)
object_mock = mock.Mock()
object_mock.get_or_create = get_or_create_mock
FakeCredentialsModelMock.objects.get_or_create = get_or_create_mock
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
storage.locked_put(self.credentials, True)
self.assertTrue(fake_credentials.saved)
@mock.patch('django.db.models')
def test_locked_delete(self, djangoModel):
class FakeEntities(object):
def __init__(self):
self.deleted = False
def delete(self):
self.deleted = True
fake_entities = FakeEntities()
entities = fake_entities
filter_mock = mock.Mock(return_value=entities)
object_mock = mock.Mock()
object_mock.filter = filter_mock
FakeCredentialsModelMock.objects = object_mock
storage = Storage(FakeCredentialsModelMock, self.key_name,
self.key_value, self.property_name)
storage.locked_delete()
self.assertTrue(fake_entities.deleted)
class CredentialWithSetStore(CredentialsField):
def __init__(self):
self.model = CredentialWithSetStore
def set_store(self, storage):
pass
class FakeCredentialsModel(models.Model):
credentials = CredentialsField()
class FakeCredentialsModelMock(object):
def __init__(self, *args, **kwargs):
self.model = FakeCredentialsModelMock
self.saved = False
self.deleted = False
def save(self):
self.saved = True
credentials = CredentialWithSetStore()
class FakeCredentialsModelMockNoSet(object):
def __init__(self, set_store=False, *args, **kwargs):
self.model = FakeCredentialsModelMock
self.saved = False
self.deleted = False
credentials = CredentialsField()

View File

@@ -1,405 +0,0 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from django import http
from django import test
import django.conf
from django.conf.urls import include, url
from django.core import exceptions
import mock
from six.moves import http_client
from six.moves.urllib import parse
import unittest2
from oauth2client.client import FlowExchangeError, OAuth2WebServerFlow
from oauth2client.contrib import django_util
from oauth2client.contrib.django_util import decorators
from oauth2client.contrib.django_util import site
from oauth2client.contrib.django_util import storage
from oauth2client.contrib.django_util import views
urlpatterns = [
url(r'^oauth2/', include(site.urls))
]
urlpatterns += [url(r'^oauth2/', include(site.urls))]
class OAuth2SetupTest(unittest2.TestCase):
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_settings_initialize(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = 'file.json'
clientsecrets.loadfile.return_value = (
clientsecrets.TYPE_WEB,
{
'client_id': 'myid',
'client_secret': 'hunter2'
}
)
oauth2_settings = django_util.OAuth2Settings(django.conf.settings)
self.assertTrue(clientsecrets.loadfile.called)
self.assertEqual(oauth2_settings.client_id, 'myid')
self.assertEqual(oauth2_settings.client_secret, 'hunter2')
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_settings_initialize_invalid_type(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = 'file.json'
clientsecrets.loadfile.return_value = (
"wrong_type",
{
'client_id': 'myid',
'client_secret': 'hunter2'
}
)
with self.assertRaises(ValueError):
django_util.OAuth2Settings.__init__(
object.__new__(django_util.OAuth2Settings),
django.conf.settings)
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_no_settings(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = None
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRET = None
django.conf.settings.GOOGLE_OAUTH2_CLIENT_ID = None
with self.assertRaises(exceptions.ImproperlyConfigured):
django_util.OAuth2Settings.__init__(
object.__new__(django_util.OAuth2Settings),
django.conf.settings)
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_no_session_middleware(self, clientsecrets):
old_classes = django.conf.settings.MIDDLEWARE_CLASSES
django.conf.settings.MIDDLEWARE_CLASSES = ()
with self.assertRaises(exceptions.ImproperlyConfigured):
django_util.OAuth2Settings.__init__(
object.__new__(django_util.OAuth2Settings),
django.conf.settings)
django.conf.settings.MIDDLEWARE_CLASSES = old_classes
class TestWithSession(test.TestCase):
def setUp(self):
self.factory = test.RequestFactory()
from django.contrib.sessions.backends.file import SessionStore
store = SessionStore()
store.save()
self.session = store
class OAuth2EnabledDecoratorTest(TestWithSession):
def test_no_credentials_without_credentials(self):
request = self.factory.get('/test')
request.session = self.session
@decorators.oauth_enabled
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertEquals(response.status_code, http_client.OK)
self.assertIsNotNone(request.oauth)
self.assertFalse(request.oauth.has_credentials())
self.assertIsNone(request.oauth.http)
@mock.patch('oauth2client.contrib.dictionary_storage.OAuth2Credentials')
def test_has_credentials_in_storage(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes.return_value = True
credentials_mock.invalid = False
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_enabled
def test_view(request):
return http.HttpResponse("test")
response = test_view(request)
self.assertEquals(response.status_code, http_client.OK)
self.assertEquals(response.content, b"test")
self.assertTrue(request.oauth.has_credentials())
self.assertIsNotNone(request.oauth.http)
@mock.patch('oauth2client.contrib.dictionary_storage.OAuth2Credentials')
def test_specified_scopes(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes = True
credentials_mock.is_valid = True
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_enabled(scopes=['additional-scope'])
def test_view(request):
return http.HttpResponse("hello world") # pragma: NO COVER
response = test_view(request)
self.assertEquals(response.status_code, http_client.OK)
self.assertIsNotNone(request.oauth)
self.assertFalse(request.oauth.has_credentials())
class OAuth2RequiredDecoratorTest(TestWithSession):
def test_redirects_without_credentials(self):
request = self.factory.get('/test')
request.session = self.session
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
self.assertEquals(parse.urlparse(response['Location']).path,
"/oauth2/oauth2authorize/")
self.assertTrue(
"return_url=%2Ftest" in parse.urlparse(response['Location']).query)
self.assertEquals(response.status_code, 302)
@mock.patch('oauth2client.contrib.django_util.UserOAuth2', autospec=True)
def test_has_credentials_in_storage(self, UserOAuth2):
request = self.factory.get('/test')
request.session = mock.MagicMock()
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test")
my_user_oauth = mock.MagicMock()
UserOAuth2.return_value = my_user_oauth
my_user_oauth.has_credentials.return_value = True
response = test_view(request)
self.assertEquals(response.status_code, http_client.OK)
self.assertEquals(response.content, b"test")
@mock.patch('oauth2client.contrib.dictionary_storage.OAuth2Credentials')
def test_has_credentials_in_storage_no_scopes(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes.return_value = False
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertEquals(response.status_code, 302)
@mock.patch('oauth2client.contrib.dictionary_storage.OAuth2Credentials')
def test_specified_scopes(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes = False
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_required(scopes=['additional-scope'])
def test_view(request):
return http.HttpResponse("hello world") # pragma: NO COVER
response = test_view(request)
self.assertEquals(response.status_code, 302)
class Oauth2AuthorizeTest(TestWithSession):
def test_authorize_works(self):
request = self.factory.get('oauth2/oauth2authorize')
request.session = self.session
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
def test_authorize_works_explicit_return_url(self):
request = self.factory.get('oauth2/oauth2authorize',
data={'return_url': '/return_endpoint'})
request.session = self.session
response = views.oauth2_authorize(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
class Oauth2CallbackTest(TestWithSession):
def setUp(self):
global mycallback
mycallback = mock.Mock()
super(Oauth2CallbackTest, self).setUp()
self.CSRF_TOKEN = "token"
self.RETURN_URL = "http://return-url.com"
self.fake_state = {
'csrf_token': self.CSRF_TOKEN,
'return_url': self.RETURN_URL,
'scopes': django.conf.settings.GOOGLE_OAUTH2_SCOPES
}
@mock.patch("oauth2client.contrib.django_util.views.pickle")
def test_callback_works(self, pickle):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
flow = OAuth2WebServerFlow(
client_id='clientid',
client_secret='clientsecret',
scope=['email'],
state=json.dumps(self.fake_state),
redirect_uri=request.build_absolute_uri("oauth2/oauth2callback"))
self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] \
= pickle.dumps(flow)
flow.step2_exchange = mock.Mock()
pickle.loads.return_value = flow
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
self.assertEquals(response.status_code, 302)
self.assertEquals(response['Location'], self.RETURN_URL)
@mock.patch("oauth2client.contrib.django_util.views.pickle")
def test_callback_handles_bad_flow_exchange(self, pickle):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
flow = OAuth2WebServerFlow(
client_id='clientid',
client_secret='clientsecret',
scope=['email'],
state=json.dumps(self.fake_state),
redirect_uri=request.build_absolute_uri("oauth2/oauth2callback"))
self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)]\
= pickle.dumps(flow)
def local_throws(code):
raise FlowExchangeError("test")
flow.step2_exchange = local_throws
pickle.loads.return_value = flow
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
def test_error_returns_bad_request(self):
request = self.factory.get('oauth2/oauth2callback', data={
"error": "There was an error in your authorization.",
})
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertTrue(b"Authorization failed" in response.content)
def test_no_session(self):
request = self.factory.get('oauth2/oauth2callback', data={
"code": 123,
"state": json.dumps(self.fake_state)
})
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertEquals(
response.content, b'No existing session for this flow.')
def test_missing_state_returns_bad_request(self):
request = self.factory.get('oauth2/oauth2callback', data={
"code": 123
})
self.session['google_oauth2_csrf_token'] = "token"
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
def test_bad_state(self):
request = self.factory.get('oauth2/oauth2callback', data={
"code": 123,
"state": json.dumps({"wrong": "state"})
})
self.session['google_oauth2_csrf_token'] = "token"
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertEquals(response.content, b'Invalid state parameter.')
def test_bad_csrf(self):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = "WRONG TOKEN"
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertEquals(response.content, b'Invalid CSRF token.')
def test_no_saved_flow(self):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] = None
request.session = self.session
response = views.oauth2_callback(request)
self.assertIsInstance(response, http.HttpResponseBadRequest)
self.assertEquals(response.content, b'Missing Oauth2 flow.')
class MockObjectWithSession(object):
def __init__(self, session):
self.session = session
class StorageTest(TestWithSession):
def test_session_delete(self):
self.session[storage._CREDENTIALS_KEY] = "test_val"
request = MockObjectWithSession(self.session)
django_storage = storage.get_storage(request)
django_storage.delete()
self.assertIsNone(self.session.get(storage._CREDENTIALS_KEY))
def test_session_delete_nothing(self):
request = MockObjectWithSession(self.session)
django_storage = storage.get_storage(request)
django_storage.delete()

View File

@@ -17,7 +17,7 @@ deps = {[testenv]basedeps}
keyring
setenv =
pypy: with_gmp=no
DJANGO_SETTINGS_MODULE=tests.contrib.test_django_settings
DJANGO_SETTINGS_MODULE=tests.contrib.django_util.settings
commands = nosetests --ignore-files=test_appengine\.py --ignore-files=test__appengine_ndb\.py {posargs}
[coverbase]
@@ -57,11 +57,9 @@ commands =
nosetests \
--ignore-files=test_appengine\.py \
--ignore-files=test__appengine_ndb\.py \
--ignore-files=test_django_orm\.py \
--ignore-files=test_django_settings\.py \
--ignore-files=test_django_util\.py \
--ignore-files=test_keyring_storage\.py \
--exclude-dir=oauth2client/contrib/django_util \
--exclude-dir=tests/contrib/django_util \
{posargs}
deps = {[testenv]basedeps}
nose-exclude
@@ -77,6 +75,7 @@ commands =
--ignore-files=test_django_settings\.py \
--ignore-files=test_django_util\.py \
--exclude-dir=oauth2client/contrib/django_util \
--exclude-dir=tests/contrib/django_util \
{posargs}
deps = {[testenv]basedeps}
keyring