Accept role list from either X-Roles or X-Role
Accept the list of roles from either the newer X-Roles header or the deprecated X-Role header. This is useful for interoperability with a software proxy in front of Nova API that performs token authentication and might use the older header. Change-Id: I47e33233edf596dd14d07b6be16b030fd6bc352d
This commit is contained in:
		
				
					committed by
					
						
						Brian Elliott
					
				
			
			
				
	
			
			
			
						parent
						
							458ee2eac0
						
					
				
				
					commit
					bc0ba55ae6
				
			@@ -77,8 +77,9 @@ class NovaKeystoneContext(wsgi.Middleware):
 | 
			
		||||
        if user_id is None:
 | 
			
		||||
            LOG.debug("Neither X_USER_ID nor X_USER found in request")
 | 
			
		||||
            return webob.exc.HTTPUnauthorized()
 | 
			
		||||
        # get the roles
 | 
			
		||||
        roles = [r.strip() for r in req.headers.get('X_ROLE', '').split(',')]
 | 
			
		||||
 | 
			
		||||
        roles = self._get_roles(req)
 | 
			
		||||
 | 
			
		||||
        if 'X_TENANT_ID' in req.headers:
 | 
			
		||||
            # This is the new header since Keystone went to ID/Name
 | 
			
		||||
            project_id = req.headers['X_TENANT_ID']
 | 
			
		||||
@@ -117,3 +118,16 @@ class NovaKeystoneContext(wsgi.Middleware):
 | 
			
		||||
 | 
			
		||||
        req.environ['nova.context'] = ctx
 | 
			
		||||
        return self.application
 | 
			
		||||
 | 
			
		||||
    def _get_roles(self, req):
 | 
			
		||||
        """Get the list of roles"""
 | 
			
		||||
 | 
			
		||||
        if 'X_ROLES' in req.headers:
 | 
			
		||||
            roles = req.headers.get('X_ROLES', '')
 | 
			
		||||
        else:
 | 
			
		||||
            # Fallback to deprecated role header:
 | 
			
		||||
            roles = req.headers.get('X_ROLE', '')
 | 
			
		||||
            if roles:
 | 
			
		||||
                LOG.warn(_("Sourcing roles from deprecated X-Role HTTP "
 | 
			
		||||
                           "header"))
 | 
			
		||||
        return [r.strip() for r in roles.split(',')]
 | 
			
		||||
 
 | 
			
		||||
@@ -64,3 +64,61 @@ class TestNovaKeystoneContextMiddleware(test.TestCase):
 | 
			
		||||
        self.request.headers['X_SERVICE_CATALOG'] = "bad json"
 | 
			
		||||
        response = self.request.get_response(self.middleware)
 | 
			
		||||
        self.assertEqual(response.status, '500 Internal Server Error')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestKeystoneMiddlewareRoles(test.TestCase):
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(TestKeystoneMiddlewareRoles, self).setUp()
 | 
			
		||||
 | 
			
		||||
        @webob.dec.wsgify()
 | 
			
		||||
        def role_check_app(req):
 | 
			
		||||
            context = req.environ['nova.context']
 | 
			
		||||
 | 
			
		||||
            if "knight" in context.roles and "bad" not in context.roles:
 | 
			
		||||
                return webob.Response(status=_("200 Role Match"))
 | 
			
		||||
            elif context.roles == ['']:
 | 
			
		||||
                return webob.Response(status=_("200 No Roles"))
 | 
			
		||||
            else:
 | 
			
		||||
                raise Exception(context.roles)
 | 
			
		||||
                raise webob.exc.HTTPBadRequest(_("unexpected role header"))
 | 
			
		||||
 | 
			
		||||
        self.middleware = nova.api.auth.NovaKeystoneContext(role_check_app)
 | 
			
		||||
        self.request = webob.Request.blank('/')
 | 
			
		||||
        self.request.headers['X_USER'] = 'testuser'
 | 
			
		||||
        self.request.headers['X_TENANT_ID'] = 'testtenantid'
 | 
			
		||||
        self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
 | 
			
		||||
        self.request.headers['X_SERVICE_CATALOG'] = json.dumps({})
 | 
			
		||||
 | 
			
		||||
        self.roles = "pawn, knight, rook"
 | 
			
		||||
 | 
			
		||||
    def test_roles(self):
 | 
			
		||||
        """Test that the newer style role header takes precedence"""
 | 
			
		||||
        self.request.headers['X_ROLES'] = 'pawn,knight,rook'
 | 
			
		||||
        self.request.headers['X_ROLE'] = 'bad'
 | 
			
		||||
 | 
			
		||||
        response = self.request.get_response(self.middleware)
 | 
			
		||||
        self.assertEqual(response.status, '200 Role Match')
 | 
			
		||||
 | 
			
		||||
    def test_roles_empty(self):
 | 
			
		||||
        self.request.headers['X_ROLES'] = ''
 | 
			
		||||
        response = self.request.get_response(self.middleware)
 | 
			
		||||
        self.assertEqual(response.status, '200 No Roles')
 | 
			
		||||
 | 
			
		||||
    def test_deprecated_role(self):
 | 
			
		||||
        """Test fallback to older role header"""
 | 
			
		||||
        self.request.headers['X_ROLE'] = 'pawn,knight,rook'
 | 
			
		||||
 | 
			
		||||
        response = self.request.get_response(self.middleware)
 | 
			
		||||
        self.assertEqual(response.status, '200 Role Match')
 | 
			
		||||
 | 
			
		||||
    def test_role_empty(self):
 | 
			
		||||
        self.request.headers['X_ROLE'] = ''
 | 
			
		||||
        response = self.request.get_response(self.middleware)
 | 
			
		||||
        self.assertEqual(response.status, '200 No Roles')
 | 
			
		||||
 | 
			
		||||
    def test_no_role_headers(self):
 | 
			
		||||
        """Test with no role headers set"""
 | 
			
		||||
 | 
			
		||||
        response = self.request.get_response(self.middleware)
 | 
			
		||||
        self.assertEqual(response.status, '200 No Roles')
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user