OpenStack Image Management (Glance)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_auth.py 38KB


  1. # Copyright 2011 OpenStack Foundation
  2. # Copyright 2013 IBM Corp.
  3. # All Rights Reserved.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License. You may obtain
  7. # a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14. # License for the specific language governing permissions and limitations
  15. # under the License.
  16. from oslo_serialization import jsonutils
  17. from six.moves import http_client as http
  18. import webob
  19. from glance.api import authorization
  20. from glance.common import auth
  21. from glance.common import exception
  22. from glance.common import timeutils
  23. import glance.domain
  24. from glance.tests.unit import utils as unittest_utils
  25. from glance.tests import utils
  26. TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
  27. TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'
  28. UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
  29. UUID2 = 'a85abd86-55b3-4d5b-b0b4-5d0a6e6042fc'
  30. class FakeResponse(object):
  31. """
  32. Simple class that masks the inconsistency between
  33. webob.Response.status_int and httplib.Response.status
  34. """
  35. def __init__(self, resp):
  36. self.resp = resp
  37. def __getitem__(self, key):
  38. return self.resp.headers.get(key)
  39. @property
  40. def status(self):
  41. return self.resp.status_int
  42. class V2Token(object):
  43. def __init__(self):
  44. self.tok = self.base_token
  45. def add_service_no_type(self):
  46. catalog = self.tok['access']['serviceCatalog']
  47. service_type = {"name": "glance_no_type"}
  48. catalog.append(service_type)
  49. service = catalog[-1]
  50. service['endpoints'] = [self.base_endpoint]
  51. def add_service(self, s_type, region_list=None):
  52. if region_list is None:
  53. region_list = []
  54. catalog = self.tok['access']['serviceCatalog']
  55. service_type = {"type": s_type, "name": "glance"}
  56. catalog.append(service_type)
  57. service = catalog[-1]
  58. endpoint_list = []
  59. if not region_list:
  60. endpoint_list.append(self.base_endpoint)
  61. else:
  62. for region in region_list:
  63. endpoint = self.base_endpoint
  64. endpoint['region'] = region
  65. endpoint_list.append(endpoint)
  66. service['endpoints'] = endpoint_list
  67. @property
  68. def token(self):
  69. return self.tok
  70. @property
  71. def base_endpoint(self):
  72. return {
  73. "adminURL": "http://localhost:9292",
  74. "internalURL": "http://localhost:9292",
  75. "publicURL": "http://localhost:9292"
  76. }
  77. @property
  78. def base_token(self):
  79. return {
  80. "access": {
  81. "token": {
  82. "expires": "2010-11-23T16:40:53.321584",
  83. "id": "5c7f8799-2e54-43e4-851b-31f81871b6c",
  84. "tenant": {"id": "1", "name": "tenant-ok"}
  85. },
  86. "serviceCatalog": [
  87. ],
  88. "user": {
  89. "id": "2",
  90. "roles": [{
  91. "tenantId": "1",
  92. "id": "1",
  93. "name": "Admin"
  94. }],
  95. "name": "joeadmin"
  96. }
  97. }
  98. }
  99. class TestKeystoneAuthPlugin(utils.BaseTestCase):
  100. """Test that the Keystone auth plugin works properly"""
  101. def setUp(self):
  102. super(TestKeystoneAuthPlugin, self).setUp()
  103. def test_get_plugin_from_strategy_keystone(self):
  104. strategy = auth.get_plugin_from_strategy('keystone')
  105. self.assertIsInstance(strategy, auth.KeystoneStrategy)
  106. self.assertTrue(strategy.configure_via_auth)
  107. def test_get_plugin_from_strategy_keystone_configure_via_auth_false(self):
  108. strategy = auth.get_plugin_from_strategy('keystone',
  109. configure_via_auth=False)
  110. self.assertIsInstance(strategy, auth.KeystoneStrategy)
  111. self.assertFalse(strategy.configure_via_auth)
  112. def test_required_creds(self):
  113. """
  114. Test that plugin created without required
  115. credential pieces raises an exception
  116. """
  117. bad_creds = [
  118. {}, # missing everything
  119. {
  120. 'username': 'user1',
  121. 'strategy': 'keystone',
  122. 'password': 'pass'
  123. }, # missing auth_url
  124. {
  125. 'password': 'pass',
  126. 'strategy': 'keystone',
  127. 'auth_url': 'http://localhost/v1'
  128. }, # missing username
  129. {
  130. 'username': 'user1',
  131. 'strategy': 'keystone',
  132. 'auth_url': 'http://localhost/v1'
  133. }, # missing password
  134. {
  135. 'username': 'user1',
  136. 'password': 'pass',
  137. 'auth_url': 'http://localhost/v1'
  138. }, # missing strategy
  139. {
  140. 'username': 'user1',
  141. 'password': 'pass',
  142. 'strategy': 'keystone',
  143. 'auth_url': 'http://localhost/v2.0/'
  144. }, # v2.0: missing tenant
  145. {
  146. 'username': None,
  147. 'password': 'pass',
  148. 'auth_url': 'http://localhost/v2.0/'
  149. }, # None parameter
  150. {
  151. 'username': 'user1',
  152. 'password': 'pass',
  153. 'auth_url': 'http://localhost/v2.0/',
  154. 'tenant': None
  155. } # None tenant
  156. ]
  157. for creds in bad_creds:
  158. try:
  159. plugin = auth.KeystoneStrategy(creds)
  160. plugin.authenticate()
  161. self.fail("Failed to raise correct exception when supplying "
  162. "bad credentials: %r" % creds)
  163. except exception.MissingCredentialError:
  164. continue # Expected
  165. def test_invalid_auth_url_v1(self):
  166. """
  167. Test that a 400 during authenticate raises exception.AuthBadRequest
  168. """
  169. def fake_do_request(*args, **kwargs):
  170. resp = webob.Response()
  171. resp.status = http.BAD_REQUEST
  172. return FakeResponse(resp), ""
  173. self.mock_object(auth.KeystoneStrategy, '_do_request', fake_do_request)
  174. bad_creds = {
  175. 'username': 'user1',
  176. 'auth_url': 'http://localhost/badauthurl/',
  177. 'password': 'pass',
  178. 'strategy': 'keystone',
  179. 'region': 'RegionOne'
  180. }
  181. plugin = auth.KeystoneStrategy(bad_creds)
  182. self.assertRaises(exception.AuthBadRequest, plugin.authenticate)
  183. def test_invalid_auth_url_v2(self):
  184. """
  185. Test that a 400 during authenticate raises exception.AuthBadRequest
  186. """
  187. def fake_do_request(*args, **kwargs):
  188. resp = webob.Response()
  189. resp.status = http.BAD_REQUEST
  190. return FakeResponse(resp), ""
  191. self.mock_object(auth.KeystoneStrategy, '_do_request', fake_do_request)
  192. bad_creds = {
  193. 'username': 'user1',
  194. 'auth_url': 'http://localhost/badauthurl/v2.0/',
  195. 'password': 'pass',
  196. 'tenant': 'tenant1',
  197. 'strategy': 'keystone',
  198. 'region': 'RegionOne'
  199. }
  200. plugin = auth.KeystoneStrategy(bad_creds)
  201. self.assertRaises(exception.AuthBadRequest, plugin.authenticate)
  202. def test_v1_auth(self):
  203. """Test v1 auth code paths"""
  204. def fake_do_request(cls, url, method, headers=None, body=None):
  205. if url.find("2.0") != -1:
  206. self.fail("Invalid v1.0 token path (%s)" % url)
  207. headers = headers or {}
  208. resp = webob.Response()
  209. if (headers.get('X-Auth-User') != 'user1' or
  210. headers.get('X-Auth-Key') != 'pass'):
  211. resp.status = http.UNAUTHORIZED
  212. else:
  213. resp.status = http.OK
  214. resp.headers.update({"x-image-management-url": "example.com"})
  215. return FakeResponse(resp), ""
  216. self.mock_object(auth.KeystoneStrategy, '_do_request', fake_do_request)
  217. unauthorized_creds = [
  218. {
  219. 'username': 'wronguser',
  220. 'auth_url': 'http://localhost/badauthurl/',
  221. 'strategy': 'keystone',
  222. 'region': 'RegionOne',
  223. 'password': 'pass'
  224. }, # wrong username
  225. {
  226. 'username': 'user1',
  227. 'auth_url': 'http://localhost/badauthurl/',
  228. 'strategy': 'keystone',
  229. 'region': 'RegionOne',
  230. 'password': 'badpass'
  231. }, # bad password...
  232. ]
  233. for creds in unauthorized_creds:
  234. try:
  235. plugin = auth.KeystoneStrategy(creds)
  236. plugin.authenticate()
  237. self.fail("Failed to raise NotAuthenticated when supplying "
  238. "bad credentials: %r" % creds)
  239. except exception.NotAuthenticated:
  240. continue # Expected
  241. no_strategy_creds = {
  242. 'username': 'user1',
  243. 'auth_url': 'http://localhost/redirect/',
  244. 'password': 'pass',
  245. 'region': 'RegionOne'
  246. }
  247. try:
  248. plugin = auth.KeystoneStrategy(no_strategy_creds)
  249. plugin.authenticate()
  250. self.fail("Failed to raise MissingCredentialError when "
  251. "supplying no strategy: %r" % no_strategy_creds)
  252. except exception.MissingCredentialError:
  253. pass # Expected
  254. good_creds = [
  255. {
  256. 'username': 'user1',
  257. 'auth_url': 'http://localhost/redirect/',
  258. 'password': 'pass',
  259. 'strategy': 'keystone',
  260. 'region': 'RegionOne'
  261. }
  262. ]
  263. for creds in good_creds:
  264. plugin = auth.KeystoneStrategy(creds)
  265. self.assertIsNone(plugin.authenticate())
  266. self.assertEqual("example.com", plugin.management_url)
  267. # Assert it does not update management_url via auth response
  268. for creds in good_creds:
  269. plugin = auth.KeystoneStrategy(creds, configure_via_auth=False)
  270. self.assertIsNone(plugin.authenticate())
  271. self.assertIsNone(plugin.management_url)
  272. def test_v2_auth(self):
  273. """Test v2 auth code paths"""
  274. mock_token = None
  275. def fake_do_request(cls, url, method, headers=None, body=None):
  276. if (not url.rstrip('/').endswith('v2.0/tokens') or
  277. url.count("2.0") != 1):
  278. self.fail("Invalid v2.0 token path (%s)" % url)
  279. creds = jsonutils.loads(body)['auth']
  280. username = creds['passwordCredentials']['username']
  281. password = creds['passwordCredentials']['password']
  282. tenant = creds['tenantName']
  283. resp = webob.Response()
  284. if (username != 'user1' or password != 'pass' or
  285. tenant != 'tenant-ok'):
  286. resp.status = http.UNAUTHORIZED
  287. else:
  288. resp.status = http.OK
  289. body = mock_token.token
  290. return FakeResponse(resp), jsonutils.dumps(body)
  291. mock_token = V2Token()
  292. mock_token.add_service('image', ['RegionOne'])
  293. self.mock_object(auth.KeystoneStrategy, '_do_request', fake_do_request)
  294. unauthorized_creds = [
  295. {
  296. 'username': 'wronguser',
  297. 'auth_url': 'http://localhost/v2.0',
  298. 'password': 'pass',
  299. 'tenant': 'tenant-ok',
  300. 'strategy': 'keystone',
  301. 'region': 'RegionOne'
  302. }, # wrong username
  303. {
  304. 'username': 'user1',
  305. 'auth_url': 'http://localhost/v2.0',
  306. 'password': 'badpass',
  307. 'tenant': 'tenant-ok',
  308. 'strategy': 'keystone',
  309. 'region': 'RegionOne'
  310. }, # bad password...
  311. {
  312. 'username': 'user1',
  313. 'auth_url': 'http://localhost/v2.0',
  314. 'password': 'pass',
  315. 'tenant': 'carterhayes',
  316. 'strategy': 'keystone',
  317. 'region': 'RegionOne'
  318. }, # bad tenant...
  319. ]
  320. for creds in unauthorized_creds:
  321. try:
  322. plugin = auth.KeystoneStrategy(creds)
  323. plugin.authenticate()
  324. self.fail("Failed to raise NotAuthenticated when supplying "
  325. "bad credentials: %r" % creds)
  326. except exception.NotAuthenticated:
  327. continue # Expected
  328. no_region_creds = {
  329. 'username': 'user1',
  330. 'tenant': 'tenant-ok',
  331. 'auth_url': 'http://localhost/redirect/v2.0/',
  332. 'password': 'pass',
  333. 'strategy': 'keystone'
  334. }
  335. plugin = auth.KeystoneStrategy(no_region_creds)
  336. self.assertIsNone(plugin.authenticate())
  337. self.assertEqual('http://localhost:9292', plugin.management_url)
  338. # Add another image service, with a different region
  339. mock_token.add_service('image', ['RegionTwo'])
  340. try:
  341. plugin = auth.KeystoneStrategy(no_region_creds)
  342. plugin.authenticate()
  343. self.fail("Failed to raise RegionAmbiguity when no region present "
  344. "and multiple regions exist: %r" % no_region_creds)
  345. except exception.RegionAmbiguity:
  346. pass # Expected
  347. wrong_region_creds = {
  348. 'username': 'user1',
  349. 'tenant': 'tenant-ok',
  350. 'auth_url': 'http://localhost/redirect/v2.0/',
  351. 'password': 'pass',
  352. 'strategy': 'keystone',
  353. 'region': 'NonExistentRegion'
  354. }
  355. try:
  356. plugin = auth.KeystoneStrategy(wrong_region_creds)
  357. plugin.authenticate()
  358. self.fail("Failed to raise NoServiceEndpoint when supplying "
  359. "wrong region: %r" % wrong_region_creds)
  360. except exception.NoServiceEndpoint:
  361. pass # Expected
  362. no_strategy_creds = {
  363. 'username': 'user1',
  364. 'tenant': 'tenant-ok',
  365. 'auth_url': 'http://localhost/redirect/v2.0/',
  366. 'password': 'pass',
  367. 'region': 'RegionOne'
  368. }
  369. try:
  370. plugin = auth.KeystoneStrategy(no_strategy_creds)
  371. plugin.authenticate()
  372. self.fail("Failed to raise MissingCredentialError when "
  373. "supplying no strategy: %r" % no_strategy_creds)
  374. except exception.MissingCredentialError:
  375. pass # Expected
  376. bad_strategy_creds = {
  377. 'username': 'user1',
  378. 'tenant': 'tenant-ok',
  379. 'auth_url': 'http://localhost/redirect/v2.0/',
  380. 'password': 'pass',
  381. 'region': 'RegionOne',
  382. 'strategy': 'keypebble'
  383. }
  384. try:
  385. plugin = auth.KeystoneStrategy(bad_strategy_creds)
  386. plugin.authenticate()
  387. self.fail("Failed to raise BadAuthStrategy when supplying "
  388. "bad auth strategy: %r" % bad_strategy_creds)
  389. except exception.BadAuthStrategy:
  390. pass # Expected
  391. mock_token = V2Token()
  392. mock_token.add_service('image', ['RegionOne', 'RegionTwo'])
  393. good_creds = [
  394. {
  395. 'username': 'user1',
  396. 'auth_url': 'http://localhost/v2.0/',
  397. 'password': 'pass',
  398. 'tenant': 'tenant-ok',
  399. 'strategy': 'keystone',
  400. 'region': 'RegionOne'
  401. }, # auth_url with trailing '/'
  402. {
  403. 'username': 'user1',
  404. 'auth_url': 'http://localhost/v2.0',
  405. 'password': 'pass',
  406. 'tenant': 'tenant-ok',
  407. 'strategy': 'keystone',
  408. 'region': 'RegionOne'
  409. }, # auth_url without trailing '/'
  410. {
  411. 'username': 'user1',
  412. 'auth_url': 'http://localhost/v2.0',
  413. 'password': 'pass',
  414. 'tenant': 'tenant-ok',
  415. 'strategy': 'keystone',
  416. 'region': 'RegionTwo'
  417. } # Second region
  418. ]
  419. for creds in good_creds:
  420. plugin = auth.KeystoneStrategy(creds)
  421. self.assertIsNone(plugin.authenticate())
  422. self.assertEqual('http://localhost:9292', plugin.management_url)
  423. ambiguous_region_creds = {
  424. 'username': 'user1',
  425. 'auth_url': 'http://localhost/v2.0/',
  426. 'password': 'pass',
  427. 'tenant': 'tenant-ok',
  428. 'strategy': 'keystone',
  429. 'region': 'RegionOne'
  430. }
  431. mock_token = V2Token()
  432. # Add two identical services
  433. mock_token.add_service('image', ['RegionOne'])
  434. mock_token.add_service('image', ['RegionOne'])
  435. try:
  436. plugin = auth.KeystoneStrategy(ambiguous_region_creds)
  437. plugin.authenticate()
  438. self.fail("Failed to raise RegionAmbiguity when "
  439. "non-unique regions exist: %r" % ambiguous_region_creds)
  440. except exception.RegionAmbiguity:
  441. pass
  442. mock_token = V2Token()
  443. mock_token.add_service('bad-image', ['RegionOne'])
  444. good_creds = {
  445. 'username': 'user1',
  446. 'auth_url': 'http://localhost/v2.0/',
  447. 'password': 'pass',
  448. 'tenant': 'tenant-ok',
  449. 'strategy': 'keystone',
  450. 'region': 'RegionOne'
  451. }
  452. try:
  453. plugin = auth.KeystoneStrategy(good_creds)
  454. plugin.authenticate()
  455. self.fail("Failed to raise NoServiceEndpoint when bad service "
  456. "type encountered")
  457. except exception.NoServiceEndpoint:
  458. pass
  459. mock_token = V2Token()
  460. mock_token.add_service_no_type()
  461. try:
  462. plugin = auth.KeystoneStrategy(good_creds)
  463. plugin.authenticate()
  464. self.fail("Failed to raise NoServiceEndpoint when bad service "
  465. "type encountered")
  466. except exception.NoServiceEndpoint:
  467. pass
  468. try:
  469. plugin = auth.KeystoneStrategy(good_creds,
  470. configure_via_auth=False)
  471. plugin.authenticate()
  472. except exception.NoServiceEndpoint:
  473. self.fail("NoServiceEndpoint was raised when authenticate "
  474. "should not check for endpoint.")
  475. class TestEndpoints(utils.BaseTestCase):
  476. def setUp(self):
  477. super(TestEndpoints, self).setUp()
  478. self.service_catalog = [
  479. {
  480. 'endpoint_links': [],
  481. 'endpoints': [
  482. {
  483. 'adminURL': 'http://localhost:8080/',
  484. 'region': 'RegionOne',
  485. 'internalURL': 'http://internalURL/',
  486. 'publicURL': 'http://publicURL/',
  487. },
  488. ],
  489. 'type': 'object-store',
  490. 'name': 'Object Storage Service',
  491. }
  492. ]
  493. def test_get_endpoint_with_custom_server_type(self):
  494. endpoint = auth.get_endpoint(self.service_catalog,
  495. service_type='object-store')
  496. self.assertEqual('http://publicURL/', endpoint)
  497. def test_get_endpoint_with_custom_endpoint_type(self):
  498. endpoint = auth.get_endpoint(self.service_catalog,
  499. service_type='object-store',
  500. endpoint_type='internalURL')
  501. self.assertEqual('http://internalURL/', endpoint)
  502. def test_get_endpoint_raises_with_invalid_service_type(self):
  503. self.assertRaises(exception.NoServiceEndpoint,
  504. auth.get_endpoint,
  505. self.service_catalog,
  506. service_type='foo')
  507. def test_get_endpoint_raises_with_invalid_endpoint_type(self):
  508. self.assertRaises(exception.NoServiceEndpoint,
  509. auth.get_endpoint,
  510. self.service_catalog,
  511. service_type='object-store',
  512. endpoint_type='foo')
  513. def test_get_endpoint_raises_with_invalid_endpoint_region(self):
  514. self.assertRaises(exception.NoServiceEndpoint,
  515. auth.get_endpoint,
  516. self.service_catalog,
  517. service_type='object-store',
  518. endpoint_region='foo',
  519. endpoint_type='internalURL')
  520. class TestImageMutability(utils.BaseTestCase):
  521. def setUp(self):
  522. super(TestImageMutability, self).setUp()
  523. self.image_factory = glance.domain.ImageFactory()
  524. def _is_mutable(self, tenant, owner, is_admin=False):
  525. context = glance.context.RequestContext(tenant=tenant,
  526. is_admin=is_admin)
  527. image = self.image_factory.new_image(owner=owner)
  528. return authorization.is_image_mutable(context, image)
  529. def test_admin_everything_mutable(self):
  530. self.assertTrue(self._is_mutable(None, None, is_admin=True))
  531. self.assertTrue(self._is_mutable(None, TENANT1, is_admin=True))
  532. self.assertTrue(self._is_mutable(TENANT1, None, is_admin=True))
  533. self.assertTrue(self._is_mutable(TENANT1, TENANT1, is_admin=True))
  534. self.assertTrue(self._is_mutable(TENANT1, TENANT2, is_admin=True))
  535. def test_no_tenant_nothing_mutable(self):
  536. self.assertFalse(self._is_mutable(None, None))
  537. self.assertFalse(self._is_mutable(None, TENANT1))
  538. def test_regular_user(self):
  539. self.assertFalse(self._is_mutable(TENANT1, None))
  540. self.assertFalse(self._is_mutable(TENANT1, TENANT2))
  541. self.assertTrue(self._is_mutable(TENANT1, TENANT1))
  542. class TestImmutableImage(utils.BaseTestCase):
  543. def setUp(self):
  544. super(TestImmutableImage, self).setUp()
  545. image_factory = glance.domain.ImageFactory()
  546. self.context = glance.context.RequestContext(tenant=TENANT1)
  547. image = image_factory.new_image(
  548. image_id=UUID1,
  549. name='Marvin',
  550. owner=TENANT1,
  551. disk_format='raw',
  552. container_format='bare',
  553. extra_properties={'foo': 'bar'},
  554. tags=['ping', 'pong'],
  555. )
  556. self.image = authorization.ImmutableImageProxy(image, self.context)
  557. def _test_change(self, attr, value):
  558. self.assertRaises(exception.Forbidden,
  559. setattr, self.image, attr, value)
  560. self.assertRaises(exception.Forbidden,
  561. delattr, self.image, attr)
  562. def test_change_id(self):
  563. self._test_change('image_id', UUID2)
  564. def test_change_name(self):
  565. self._test_change('name', 'Freddie')
  566. def test_change_owner(self):
  567. self._test_change('owner', TENANT2)
  568. def test_change_min_disk(self):
  569. self._test_change('min_disk', 100)
  570. def test_change_min_ram(self):
  571. self._test_change('min_ram', 1024)
  572. def test_change_disk_format(self):
  573. self._test_change('disk_format', 'vhd')
  574. def test_change_container_format(self):
  575. self._test_change('container_format', 'ova')
  576. def test_change_visibility(self):
  577. self._test_change('visibility', 'public')
  578. def test_change_status(self):
  579. self._test_change('status', 'active')
  580. def test_change_created_at(self):
  581. self._test_change('created_at', timeutils.utcnow())
  582. def test_change_updated_at(self):
  583. self._test_change('updated_at', timeutils.utcnow())
  584. def test_change_locations(self):
  585. self._test_change('locations', ['http://a/b/c'])
  586. self.assertRaises(exception.Forbidden,
  587. self.image.locations.append, 'http://a/b/c')
  588. self.assertRaises(exception.Forbidden,
  589. self.image.locations.extend, ['http://a/b/c'])
  590. self.assertRaises(exception.Forbidden,
  591. self.image.locations.insert, 'foo')
  592. self.assertRaises(exception.Forbidden,
  593. self.image.locations.pop)
  594. self.assertRaises(exception.Forbidden,
  595. self.image.locations.remove, 'foo')
  596. self.assertRaises(exception.Forbidden,
  597. self.image.locations.reverse)
  598. self.assertRaises(exception.Forbidden,
  599. self.image.locations.sort)
  600. self.assertRaises(exception.Forbidden,
  601. self.image.locations.__delitem__, 0)
  602. self.assertRaises(exception.Forbidden,
  603. self.image.locations.__delslice__, 0, 2)
  604. self.assertRaises(exception.Forbidden,
  605. self.image.locations.__setitem__, 0, 'foo')
  606. self.assertRaises(exception.Forbidden,
  607. self.image.locations.__setslice__,
  608. 0, 2, ['foo', 'bar'])
  609. self.assertRaises(exception.Forbidden,
  610. self.image.locations.__iadd__, 'foo')
  611. self.assertRaises(exception.Forbidden,
  612. self.image.locations.__imul__, 2)
  613. def test_change_size(self):
  614. self._test_change('size', 32)
  615. def test_change_tags(self):
  616. self.assertRaises(exception.Forbidden,
  617. delattr, self.image, 'tags')
  618. self.assertRaises(exception.Forbidden,
  619. setattr, self.image, 'tags', ['king', 'kong'])
  620. self.assertRaises(exception.Forbidden, self.image.tags.pop)
  621. self.assertRaises(exception.Forbidden, self.image.tags.clear)
  622. self.assertRaises(exception.Forbidden, self.image.tags.add, 'king')
  623. self.assertRaises(exception.Forbidden, self.image.tags.remove, 'ping')
  624. self.assertRaises(exception.Forbidden,
  625. self.image.tags.update, set(['king', 'kong']))
  626. self.assertRaises(exception.Forbidden,
  627. self.image.tags.intersection_update, set([]))
  628. self.assertRaises(exception.Forbidden,
  629. self.image.tags.difference_update, set([]))
  630. self.assertRaises(exception.Forbidden,
  631. self.image.tags.symmetric_difference_update,
  632. set([]))
  633. def test_change_properties(self):
  634. self.assertRaises(exception.Forbidden,
  635. delattr, self.image, 'extra_properties')
  636. self.assertRaises(exception.Forbidden,
  637. setattr, self.image, 'extra_properties', {})
  638. self.assertRaises(exception.Forbidden,
  639. self.image.extra_properties.__delitem__, 'foo')
  640. self.assertRaises(exception.Forbidden,
  641. self.image.extra_properties.__setitem__, 'foo', 'b')
  642. self.assertRaises(exception.Forbidden,
  643. self.image.extra_properties.__setitem__, 'z', 'j')
  644. self.assertRaises(exception.Forbidden,
  645. self.image.extra_properties.pop)
  646. self.assertRaises(exception.Forbidden,
  647. self.image.extra_properties.popitem)
  648. self.assertRaises(exception.Forbidden,
  649. self.image.extra_properties.setdefault, 'p', 'j')
  650. self.assertRaises(exception.Forbidden,
  651. self.image.extra_properties.update, {})
  652. def test_delete(self):
  653. self.assertRaises(exception.Forbidden, self.image.delete)
  654. def test_set_data(self):
  655. self.assertRaises(exception.Forbidden,
  656. self.image.set_data, 'blah', 4)
  657. def test_deactivate_image(self):
  658. self.assertRaises(exception.Forbidden, self.image.deactivate)
  659. def test_reactivate_image(self):
  660. self.assertRaises(exception.Forbidden, self.image.reactivate)
  661. def test_get_data(self):
  662. class FakeImage(object):
  663. def get_data(self):
  664. return 'tiddlywinks'
  665. image = glance.api.authorization.ImmutableImageProxy(
  666. FakeImage(), self.context)
  667. self.assertEqual('tiddlywinks', image.get_data())
  668. class TestImageFactoryProxy(utils.BaseTestCase):
  669. def setUp(self):
  670. super(TestImageFactoryProxy, self).setUp()
  671. factory = glance.domain.ImageFactory()
  672. self.context = glance.context.RequestContext(tenant=TENANT1)
  673. self.image_factory = authorization.ImageFactoryProxy(factory,
  674. self.context)
  675. def test_default_owner_is_set(self):
  676. image = self.image_factory.new_image()
  677. self.assertEqual(TENANT1, image.owner)
  678. def test_wrong_owner_cannot_be_set(self):
  679. self.assertRaises(exception.Forbidden,
  680. self.image_factory.new_image, owner=TENANT2)
  681. def test_cannot_set_owner_to_none(self):
  682. self.assertRaises(exception.Forbidden,
  683. self.image_factory.new_image, owner=None)
  684. def test_admin_can_set_any_owner(self):
  685. self.context.is_admin = True
  686. image = self.image_factory.new_image(owner=TENANT2)
  687. self.assertEqual(TENANT2, image.owner)
  688. def test_admin_can_set_owner_to_none(self):
  689. self.context.is_admin = True
  690. image = self.image_factory.new_image(owner=None)
  691. self.assertIsNone(image.owner)
  692. def test_admin_still_gets_default_tenant(self):
  693. self.context.is_admin = True
  694. image = self.image_factory.new_image()
  695. self.assertEqual(TENANT1, image.owner)
  696. class TestImageRepoProxy(utils.BaseTestCase):
  697. class ImageRepoStub(object):
  698. def __init__(self, fixtures):
  699. self.fixtures = fixtures
  700. def get(self, image_id):
  701. for f in self.fixtures:
  702. if f.image_id == image_id:
  703. return f
  704. else:
  705. raise ValueError(image_id)
  706. def list(self, *args, **kwargs):
  707. return self.fixtures
  708. def setUp(self):
  709. super(TestImageRepoProxy, self).setUp()
  710. image_factory = glance.domain.ImageFactory()
  711. self.fixtures = [
  712. image_factory.new_image(owner=TENANT1),
  713. image_factory.new_image(owner=TENANT2, visibility='public'),
  714. image_factory.new_image(owner=TENANT2),
  715. ]
  716. self.context = glance.context.RequestContext(tenant=TENANT1)
  717. image_repo = self.ImageRepoStub(self.fixtures)
  718. self.image_repo = authorization.ImageRepoProxy(image_repo,
  719. self.context)
  720. def test_get_mutable_image(self):
  721. image = self.image_repo.get(self.fixtures[0].image_id)
  722. self.assertEqual(image.image_id, self.fixtures[0].image_id)
  723. def test_get_immutable_image(self):
  724. image = self.image_repo.get(self.fixtures[1].image_id)
  725. self.assertRaises(exception.Forbidden,
  726. setattr, image, 'name', 'Vince')
  727. def test_list(self):
  728. images = self.image_repo.list()
  729. self.assertEqual(images[0].image_id, self.fixtures[0].image_id)
  730. self.assertRaises(exception.Forbidden,
  731. setattr, images[1], 'name', 'Wally')
  732. self.assertRaises(exception.Forbidden,
  733. setattr, images[2], 'name', 'Calvin')
  734. class TestImmutableTask(utils.BaseTestCase):
  735. def setUp(self):
  736. super(TestImmutableTask, self).setUp()
  737. task_factory = glance.domain.TaskFactory()
  738. self.context = glance.context.RequestContext(tenant=TENANT2)
  739. task_type = 'import'
  740. owner = TENANT2
  741. task = task_factory.new_task(task_type, owner)
  742. self.task = authorization.ImmutableTaskProxy(task)
  743. def _test_change(self, attr, value):
  744. self.assertRaises(
  745. exception.Forbidden,
  746. setattr,
  747. self.task,
  748. attr,
  749. value
  750. )
  751. self.assertRaises(
  752. exception.Forbidden,
  753. delattr,
  754. self.task,
  755. attr
  756. )
  757. def test_change_id(self):
  758. self._test_change('task_id', UUID2)
  759. def test_change_type(self):
  760. self._test_change('type', 'fake')
  761. def test_change_status(self):
  762. self._test_change('status', 'success')
  763. def test_change_owner(self):
  764. self._test_change('owner', 'fake')
  765. def test_change_expires_at(self):
  766. self._test_change('expires_at', 'fake')
  767. def test_change_created_at(self):
  768. self._test_change('created_at', 'fake')
  769. def test_change_updated_at(self):
  770. self._test_change('updated_at', 'fake')
  771. def test_begin_processing(self):
  772. self.assertRaises(
  773. exception.Forbidden,
  774. self.task.begin_processing
  775. )
  776. def test_succeed(self):
  777. self.assertRaises(
  778. exception.Forbidden,
  779. self.task.succeed,
  780. 'result'
  781. )
  782. def test_fail(self):
  783. self.assertRaises(
  784. exception.Forbidden,
  785. self.task.fail,
  786. 'message'
  787. )
  788. class TestImmutableTaskStub(utils.BaseTestCase):
  789. def setUp(self):
  790. super(TestImmutableTaskStub, self).setUp()
  791. task_factory = glance.domain.TaskFactory()
  792. self.context = glance.context.RequestContext(tenant=TENANT2)
  793. task_type = 'import'
  794. owner = TENANT2
  795. task = task_factory.new_task(task_type, owner)
  796. self.task = authorization.ImmutableTaskStubProxy(task)
  797. def _test_change(self, attr, value):
  798. self.assertRaises(
  799. exception.Forbidden,
  800. setattr,
  801. self.task,
  802. attr,
  803. value
  804. )
  805. self.assertRaises(
  806. exception.Forbidden,
  807. delattr,
  808. self.task,
  809. attr
  810. )
  811. def test_change_id(self):
  812. self._test_change('task_id', UUID2)
  813. def test_change_type(self):
  814. self._test_change('type', 'fake')
  815. def test_change_status(self):
  816. self._test_change('status', 'success')
  817. def test_change_owner(self):
  818. self._test_change('owner', 'fake')
  819. def test_change_expires_at(self):
  820. self._test_change('expires_at', 'fake')
  821. def test_change_created_at(self):
  822. self._test_change('created_at', 'fake')
  823. def test_change_updated_at(self):
  824. self._test_change('updated_at', 'fake')
  825. class TestTaskFactoryProxy(utils.BaseTestCase):
  826. def setUp(self):
  827. super(TestTaskFactoryProxy, self).setUp()
  828. factory = glance.domain.TaskFactory()
  829. self.context = glance.context.RequestContext(tenant=TENANT1)
  830. self.context_owner_is_none = glance.context.RequestContext()
  831. self.task_factory = authorization.TaskFactoryProxy(
  832. factory,
  833. self.context
  834. )
  835. self.task_type = 'import'
  836. self.task_input = '{"loc": "fake"}'
  837. self.owner = 'foo'
  838. self.request1 = unittest_utils.get_fake_request(tenant=TENANT1)
  839. self.request2 = unittest_utils.get_fake_request(tenant=TENANT2)
  840. def test_task_create_default_owner(self):
  841. owner = self.request1.context.owner
  842. task = self.task_factory.new_task(task_type=self.task_type,
  843. owner=owner)
  844. self.assertEqual(TENANT1, task.owner)
  845. def test_task_create_wrong_owner(self):
  846. self.assertRaises(exception.Forbidden,
  847. self.task_factory.new_task,
  848. task_type=self.task_type,
  849. task_input=self.task_input,
  850. owner=self.owner)
  851. def test_task_create_owner_as_None(self):
  852. self.assertRaises(exception.Forbidden,
  853. self.task_factory.new_task,
  854. task_type=self.task_type,
  855. task_input=self.task_input,
  856. owner=None)
  857. def test_task_create_admin_context_owner_as_None(self):
  858. self.context.is_admin = True
  859. self.assertRaises(exception.Forbidden,
  860. self.task_factory.new_task,
  861. task_type=self.task_type,
  862. task_input=self.task_input,
  863. owner=None)
  864. class TestTaskRepoProxy(utils.BaseTestCase):
  865. class TaskRepoStub(object):
  866. def __init__(self, fixtures):
  867. self.fixtures = fixtures
  868. def get(self, task_id):
  869. for f in self.fixtures:
  870. if f.task_id == task_id:
  871. return f
  872. else:
  873. raise ValueError(task_id)
  874. class TaskStubRepoStub(object):
  875. def __init__(self, fixtures):
  876. self.fixtures = fixtures
  877. def list(self, *args, **kwargs):
  878. return self.fixtures
  879. def setUp(self):
  880. super(TestTaskRepoProxy, self).setUp()
  881. task_factory = glance.domain.TaskFactory()
  882. task_type = 'import'
  883. owner = None
  884. self.fixtures = [
  885. task_factory.new_task(task_type, owner),
  886. task_factory.new_task(task_type, owner),
  887. task_factory.new_task(task_type, owner),
  888. ]
  889. self.context = glance.context.RequestContext(tenant=TENANT1)
  890. task_repo = self.TaskRepoStub(self.fixtures)
  891. task_stub_repo = self.TaskStubRepoStub(self.fixtures)
  892. self.task_repo = authorization.TaskRepoProxy(
  893. task_repo,
  894. self.context
  895. )
  896. self.task_stub_repo = authorization.TaskStubRepoProxy(
  897. task_stub_repo,
  898. self.context
  899. )
  900. def test_get_mutable_task(self):
  901. task = self.task_repo.get(self.fixtures[0].task_id)
  902. self.assertEqual(task.task_id, self.fixtures[0].task_id)
  903. def test_get_immutable_task(self):
  904. task_id = self.fixtures[1].task_id
  905. task = self.task_repo.get(task_id)
  906. self.assertRaises(exception.Forbidden,
  907. setattr, task, 'input', 'foo')
  908. def test_list(self):
  909. tasks = self.task_stub_repo.list()
  910. self.assertEqual(tasks[0].task_id, self.fixtures[0].task_id)
  911. self.assertRaises(exception.Forbidden,
  912. setattr,
  913. tasks[1],
  914. 'owner',
  915. 'foo')
  916. self.assertRaises(exception.Forbidden,
  917. setattr,
  918. tasks[2],
  919. 'owner',
  920. 'foo')