OpenStack Image Management (Glance)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_auth.py 38KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095
  1. # Copyright 2011 OpenStack Foundation
  2. # Copyright 2013 IBM Corp.
  3. # All Rights Reserved.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License. You may obtain
  7. # a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14. # License for the specific language governing permissions and limitations
  15. # under the License.
  16. from oslo_serialization import jsonutils
  17. from oslotest import moxstubout
  18. from six.moves import http_client as http
  19. import webob
  20. from glance.api import authorization
  21. from glance.common import auth
  22. from glance.common import exception
  23. from glance.common import timeutils
  24. import glance.domain
  25. from glance.tests.unit import utils as unittest_utils
  26. from glance.tests import utils
  27. TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
  28. TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'
  29. UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
  30. UUID2 = 'a85abd86-55b3-4d5b-b0b4-5d0a6e6042fc'
  31. class FakeResponse(object):
  32. """
  33. Simple class that masks the inconsistency between
  34. webob.Response.status_int and httplib.Response.status
  35. """
  36. def __init__(self, resp):
  37. self.resp = resp
  38. def __getitem__(self, key):
  39. return self.resp.headers.get(key)
  40. @property
  41. def status(self):
  42. return self.resp.status_int
  43. class V2Token(object):
  44. def __init__(self):
  45. self.tok = self.base_token
  46. def add_service_no_type(self):
  47. catalog = self.tok['access']['serviceCatalog']
  48. service_type = {"name": "glance_no_type"}
  49. catalog.append(service_type)
  50. service = catalog[-1]
  51. service['endpoints'] = [self.base_endpoint]
  52. def add_service(self, s_type, region_list=None):
  53. if region_list is None:
  54. region_list = []
  55. catalog = self.tok['access']['serviceCatalog']
  56. service_type = {"type": s_type, "name": "glance"}
  57. catalog.append(service_type)
  58. service = catalog[-1]
  59. endpoint_list = []
  60. if not region_list:
  61. endpoint_list.append(self.base_endpoint)
  62. else:
  63. for region in region_list:
  64. endpoint = self.base_endpoint
  65. endpoint['region'] = region
  66. endpoint_list.append(endpoint)
  67. service['endpoints'] = endpoint_list
  68. @property
  69. def token(self):
  70. return self.tok
  71. @property
  72. def base_endpoint(self):
  73. return {
  74. "adminURL": "http://localhost:9292",
  75. "internalURL": "http://localhost:9292",
  76. "publicURL": "http://localhost:9292"
  77. }
  78. @property
  79. def base_token(self):
  80. return {
  81. "access": {
  82. "token": {
  83. "expires": "2010-11-23T16:40:53.321584",
  84. "id": "5c7f8799-2e54-43e4-851b-31f81871b6c",
  85. "tenant": {"id": "1", "name": "tenant-ok"}
  86. },
  87. "serviceCatalog": [
  88. ],
  89. "user": {
  90. "id": "2",
  91. "roles": [{
  92. "tenantId": "1",
  93. "id": "1",
  94. "name": "Admin"
  95. }],
  96. "name": "joeadmin"
  97. }
  98. }
  99. }
  100. class TestKeystoneAuthPlugin(utils.BaseTestCase):
  101. """Test that the Keystone auth plugin works properly"""
  102. def setUp(self):
  103. super(TestKeystoneAuthPlugin, self).setUp()
  104. mox_fixture = self.useFixture(moxstubout.MoxStubout())
  105. self.stubs = mox_fixture.stubs
  106. def test_get_plugin_from_strategy_keystone(self):
  107. strategy = auth.get_plugin_from_strategy('keystone')
  108. self.assertIsInstance(strategy, auth.KeystoneStrategy)
  109. self.assertTrue(strategy.configure_via_auth)
  110. def test_get_plugin_from_strategy_keystone_configure_via_auth_false(self):
  111. strategy = auth.get_plugin_from_strategy('keystone',
  112. configure_via_auth=False)
  113. self.assertIsInstance(strategy, auth.KeystoneStrategy)
  114. self.assertFalse(strategy.configure_via_auth)
  115. def test_required_creds(self):
  116. """
  117. Test that plugin created without required
  118. credential pieces raises an exception
  119. """
  120. bad_creds = [
  121. {}, # missing everything
  122. {
  123. 'username': 'user1',
  124. 'strategy': 'keystone',
  125. 'password': 'pass'
  126. }, # missing auth_url
  127. {
  128. 'password': 'pass',
  129. 'strategy': 'keystone',
  130. 'auth_url': 'http://localhost/v1'
  131. }, # missing username
  132. {
  133. 'username': 'user1',
  134. 'strategy': 'keystone',
  135. 'auth_url': 'http://localhost/v1'
  136. }, # missing password
  137. {
  138. 'username': 'user1',
  139. 'password': 'pass',
  140. 'auth_url': 'http://localhost/v1'
  141. }, # missing strategy
  142. {
  143. 'username': 'user1',
  144. 'password': 'pass',
  145. 'strategy': 'keystone',
  146. 'auth_url': 'http://localhost/v2.0/'
  147. }, # v2.0: missing tenant
  148. {
  149. 'username': None,
  150. 'password': 'pass',
  151. 'auth_url': 'http://localhost/v2.0/'
  152. }, # None parameter
  153. {
  154. 'username': 'user1',
  155. 'password': 'pass',
  156. 'auth_url': 'http://localhost/v2.0/',
  157. 'tenant': None
  158. } # None tenant
  159. ]
  160. for creds in bad_creds:
  161. try:
  162. plugin = auth.KeystoneStrategy(creds)
  163. plugin.authenticate()
  164. self.fail("Failed to raise correct exception when supplying "
  165. "bad credentials: %r" % creds)
  166. except exception.MissingCredentialError:
  167. continue # Expected
  168. def test_invalid_auth_url_v1(self):
  169. """
  170. Test that a 400 during authenticate raises exception.AuthBadRequest
  171. """
  172. def fake_do_request(*args, **kwargs):
  173. resp = webob.Response()
  174. resp.status = http.BAD_REQUEST
  175. return FakeResponse(resp), ""
  176. self.stubs.Set(auth.KeystoneStrategy, '_do_request', fake_do_request)
  177. bad_creds = {
  178. 'username': 'user1',
  179. 'auth_url': 'http://localhost/badauthurl/',
  180. 'password': 'pass',
  181. 'strategy': 'keystone',
  182. 'region': 'RegionOne'
  183. }
  184. plugin = auth.KeystoneStrategy(bad_creds)
  185. self.assertRaises(exception.AuthBadRequest, plugin.authenticate)
  186. def test_invalid_auth_url_v2(self):
  187. """
  188. Test that a 400 during authenticate raises exception.AuthBadRequest
  189. """
  190. def fake_do_request(*args, **kwargs):
  191. resp = webob.Response()
  192. resp.status = http.BAD_REQUEST
  193. return FakeResponse(resp), ""
  194. self.stubs.Set(auth.KeystoneStrategy, '_do_request', fake_do_request)
  195. bad_creds = {
  196. 'username': 'user1',
  197. 'auth_url': 'http://localhost/badauthurl/v2.0/',
  198. 'password': 'pass',
  199. 'tenant': 'tenant1',
  200. 'strategy': 'keystone',
  201. 'region': 'RegionOne'
  202. }
  203. plugin = auth.KeystoneStrategy(bad_creds)
  204. self.assertRaises(exception.AuthBadRequest, plugin.authenticate)
  205. def test_v1_auth(self):
  206. """Test v1 auth code paths"""
  207. def fake_do_request(cls, url, method, headers=None, body=None):
  208. if url.find("2.0") != -1:
  209. self.fail("Invalid v1.0 token path (%s)" % url)
  210. headers = headers or {}
  211. resp = webob.Response()
  212. if (headers.get('X-Auth-User') != 'user1' or
  213. headers.get('X-Auth-Key') != 'pass'):
  214. resp.status = http.UNAUTHORIZED
  215. else:
  216. resp.status = http.OK
  217. resp.headers.update({"x-image-management-url": "example.com"})
  218. return FakeResponse(resp), ""
  219. self.stubs.Set(auth.KeystoneStrategy, '_do_request', fake_do_request)
  220. unauthorized_creds = [
  221. {
  222. 'username': 'wronguser',
  223. 'auth_url': 'http://localhost/badauthurl/',
  224. 'strategy': 'keystone',
  225. 'region': 'RegionOne',
  226. 'password': 'pass'
  227. }, # wrong username
  228. {
  229. 'username': 'user1',
  230. 'auth_url': 'http://localhost/badauthurl/',
  231. 'strategy': 'keystone',
  232. 'region': 'RegionOne',
  233. 'password': 'badpass'
  234. }, # bad password...
  235. ]
  236. for creds in unauthorized_creds:
  237. try:
  238. plugin = auth.KeystoneStrategy(creds)
  239. plugin.authenticate()
  240. self.fail("Failed to raise NotAuthenticated when supplying "
  241. "bad credentials: %r" % creds)
  242. except exception.NotAuthenticated:
  243. continue # Expected
  244. no_strategy_creds = {
  245. 'username': 'user1',
  246. 'auth_url': 'http://localhost/redirect/',
  247. 'password': 'pass',
  248. 'region': 'RegionOne'
  249. }
  250. try:
  251. plugin = auth.KeystoneStrategy(no_strategy_creds)
  252. plugin.authenticate()
  253. self.fail("Failed to raise MissingCredentialError when "
  254. "supplying no strategy: %r" % no_strategy_creds)
  255. except exception.MissingCredentialError:
  256. pass # Expected
  257. good_creds = [
  258. {
  259. 'username': 'user1',
  260. 'auth_url': 'http://localhost/redirect/',
  261. 'password': 'pass',
  262. 'strategy': 'keystone',
  263. 'region': 'RegionOne'
  264. }
  265. ]
  266. for creds in good_creds:
  267. plugin = auth.KeystoneStrategy(creds)
  268. self.assertIsNone(plugin.authenticate())
  269. self.assertEqual("example.com", plugin.management_url)
  270. # Assert it does not update management_url via auth response
  271. for creds in good_creds:
  272. plugin = auth.KeystoneStrategy(creds, configure_via_auth=False)
  273. self.assertIsNone(plugin.authenticate())
  274. self.assertIsNone(plugin.management_url)
  275. def test_v2_auth(self):
  276. """Test v2 auth code paths"""
  277. mock_token = None
  278. def fake_do_request(cls, url, method, headers=None, body=None):
  279. if (not url.rstrip('/').endswith('v2.0/tokens') or
  280. url.count("2.0") != 1):
  281. self.fail("Invalid v2.0 token path (%s)" % url)
  282. creds = jsonutils.loads(body)['auth']
  283. username = creds['passwordCredentials']['username']
  284. password = creds['passwordCredentials']['password']
  285. tenant = creds['tenantName']
  286. resp = webob.Response()
  287. if (username != 'user1' or password != 'pass' or
  288. tenant != 'tenant-ok'):
  289. resp.status = http.UNAUTHORIZED
  290. else:
  291. resp.status = http.OK
  292. body = mock_token.token
  293. return FakeResponse(resp), jsonutils.dumps(body)
  294. mock_token = V2Token()
  295. mock_token.add_service('image', ['RegionOne'])
  296. self.stubs.Set(auth.KeystoneStrategy, '_do_request', fake_do_request)
  297. unauthorized_creds = [
  298. {
  299. 'username': 'wronguser',
  300. 'auth_url': 'http://localhost/v2.0',
  301. 'password': 'pass',
  302. 'tenant': 'tenant-ok',
  303. 'strategy': 'keystone',
  304. 'region': 'RegionOne'
  305. }, # wrong username
  306. {
  307. 'username': 'user1',
  308. 'auth_url': 'http://localhost/v2.0',
  309. 'password': 'badpass',
  310. 'tenant': 'tenant-ok',
  311. 'strategy': 'keystone',
  312. 'region': 'RegionOne'
  313. }, # bad password...
  314. {
  315. 'username': 'user1',
  316. 'auth_url': 'http://localhost/v2.0',
  317. 'password': 'pass',
  318. 'tenant': 'carterhayes',
  319. 'strategy': 'keystone',
  320. 'region': 'RegionOne'
  321. }, # bad tenant...
  322. ]
  323. for creds in unauthorized_creds:
  324. try:
  325. plugin = auth.KeystoneStrategy(creds)
  326. plugin.authenticate()
  327. self.fail("Failed to raise NotAuthenticated when supplying "
  328. "bad credentials: %r" % creds)
  329. except exception.NotAuthenticated:
  330. continue # Expected
  331. no_region_creds = {
  332. 'username': 'user1',
  333. 'tenant': 'tenant-ok',
  334. 'auth_url': 'http://localhost/redirect/v2.0/',
  335. 'password': 'pass',
  336. 'strategy': 'keystone'
  337. }
  338. plugin = auth.KeystoneStrategy(no_region_creds)
  339. self.assertIsNone(plugin.authenticate())
  340. self.assertEqual('http://localhost:9292', plugin.management_url)
  341. # Add another image service, with a different region
  342. mock_token.add_service('image', ['RegionTwo'])
  343. try:
  344. plugin = auth.KeystoneStrategy(no_region_creds)
  345. plugin.authenticate()
  346. self.fail("Failed to raise RegionAmbiguity when no region present "
  347. "and multiple regions exist: %r" % no_region_creds)
  348. except exception.RegionAmbiguity:
  349. pass # Expected
  350. wrong_region_creds = {
  351. 'username': 'user1',
  352. 'tenant': 'tenant-ok',
  353. 'auth_url': 'http://localhost/redirect/v2.0/',
  354. 'password': 'pass',
  355. 'strategy': 'keystone',
  356. 'region': 'NonExistentRegion'
  357. }
  358. try:
  359. plugin = auth.KeystoneStrategy(wrong_region_creds)
  360. plugin.authenticate()
  361. self.fail("Failed to raise NoServiceEndpoint when supplying "
  362. "wrong region: %r" % wrong_region_creds)
  363. except exception.NoServiceEndpoint:
  364. pass # Expected
  365. no_strategy_creds = {
  366. 'username': 'user1',
  367. 'tenant': 'tenant-ok',
  368. 'auth_url': 'http://localhost/redirect/v2.0/',
  369. 'password': 'pass',
  370. 'region': 'RegionOne'
  371. }
  372. try:
  373. plugin = auth.KeystoneStrategy(no_strategy_creds)
  374. plugin.authenticate()
  375. self.fail("Failed to raise MissingCredentialError when "
  376. "supplying no strategy: %r" % no_strategy_creds)
  377. except exception.MissingCredentialError:
  378. pass # Expected
  379. bad_strategy_creds = {
  380. 'username': 'user1',
  381. 'tenant': 'tenant-ok',
  382. 'auth_url': 'http://localhost/redirect/v2.0/',
  383. 'password': 'pass',
  384. 'region': 'RegionOne',
  385. 'strategy': 'keypebble'
  386. }
  387. try:
  388. plugin = auth.KeystoneStrategy(bad_strategy_creds)
  389. plugin.authenticate()
  390. self.fail("Failed to raise BadAuthStrategy when supplying "
  391. "bad auth strategy: %r" % bad_strategy_creds)
  392. except exception.BadAuthStrategy:
  393. pass # Expected
  394. mock_token = V2Token()
  395. mock_token.add_service('image', ['RegionOne', 'RegionTwo'])
  396. good_creds = [
  397. {
  398. 'username': 'user1',
  399. 'auth_url': 'http://localhost/v2.0/',
  400. 'password': 'pass',
  401. 'tenant': 'tenant-ok',
  402. 'strategy': 'keystone',
  403. 'region': 'RegionOne'
  404. }, # auth_url with trailing '/'
  405. {
  406. 'username': 'user1',
  407. 'auth_url': 'http://localhost/v2.0',
  408. 'password': 'pass',
  409. 'tenant': 'tenant-ok',
  410. 'strategy': 'keystone',
  411. 'region': 'RegionOne'
  412. }, # auth_url without trailing '/'
  413. {
  414. 'username': 'user1',
  415. 'auth_url': 'http://localhost/v2.0',
  416. 'password': 'pass',
  417. 'tenant': 'tenant-ok',
  418. 'strategy': 'keystone',
  419. 'region': 'RegionTwo'
  420. } # Second region
  421. ]
  422. for creds in good_creds:
  423. plugin = auth.KeystoneStrategy(creds)
  424. self.assertIsNone(plugin.authenticate())
  425. self.assertEqual('http://localhost:9292', plugin.management_url)
  426. ambiguous_region_creds = {
  427. 'username': 'user1',
  428. 'auth_url': 'http://localhost/v2.0/',
  429. 'password': 'pass',
  430. 'tenant': 'tenant-ok',
  431. 'strategy': 'keystone',
  432. 'region': 'RegionOne'
  433. }
  434. mock_token = V2Token()
  435. # Add two identical services
  436. mock_token.add_service('image', ['RegionOne'])
  437. mock_token.add_service('image', ['RegionOne'])
  438. try:
  439. plugin = auth.KeystoneStrategy(ambiguous_region_creds)
  440. plugin.authenticate()
  441. self.fail("Failed to raise RegionAmbiguity when "
  442. "non-unique regions exist: %r" % ambiguous_region_creds)
  443. except exception.RegionAmbiguity:
  444. pass
  445. mock_token = V2Token()
  446. mock_token.add_service('bad-image', ['RegionOne'])
  447. good_creds = {
  448. 'username': 'user1',
  449. 'auth_url': 'http://localhost/v2.0/',
  450. 'password': 'pass',
  451. 'tenant': 'tenant-ok',
  452. 'strategy': 'keystone',
  453. 'region': 'RegionOne'
  454. }
  455. try:
  456. plugin = auth.KeystoneStrategy(good_creds)
  457. plugin.authenticate()
  458. self.fail("Failed to raise NoServiceEndpoint when bad service "
  459. "type encountered")
  460. except exception.NoServiceEndpoint:
  461. pass
  462. mock_token = V2Token()
  463. mock_token.add_service_no_type()
  464. try:
  465. plugin = auth.KeystoneStrategy(good_creds)
  466. plugin.authenticate()
  467. self.fail("Failed to raise NoServiceEndpoint when bad service "
  468. "type encountered")
  469. except exception.NoServiceEndpoint:
  470. pass
  471. try:
  472. plugin = auth.KeystoneStrategy(good_creds,
  473. configure_via_auth=False)
  474. plugin.authenticate()
  475. except exception.NoServiceEndpoint:
  476. self.fail("NoServiceEndpoint was raised when authenticate "
  477. "should not check for endpoint.")
  478. class TestEndpoints(utils.BaseTestCase):
  479. def setUp(self):
  480. super(TestEndpoints, self).setUp()
  481. self.service_catalog = [
  482. {
  483. 'endpoint_links': [],
  484. 'endpoints': [
  485. {
  486. 'adminURL': 'http://localhost:8080/',
  487. 'region': 'RegionOne',
  488. 'internalURL': 'http://internalURL/',
  489. 'publicURL': 'http://publicURL/',
  490. },
  491. ],
  492. 'type': 'object-store',
  493. 'name': 'Object Storage Service',
  494. }
  495. ]
  496. def test_get_endpoint_with_custom_server_type(self):
  497. endpoint = auth.get_endpoint(self.service_catalog,
  498. service_type='object-store')
  499. self.assertEqual('http://publicURL/', endpoint)
  500. def test_get_endpoint_with_custom_endpoint_type(self):
  501. endpoint = auth.get_endpoint(self.service_catalog,
  502. service_type='object-store',
  503. endpoint_type='internalURL')
  504. self.assertEqual('http://internalURL/', endpoint)
  505. def test_get_endpoint_raises_with_invalid_service_type(self):
  506. self.assertRaises(exception.NoServiceEndpoint,
  507. auth.get_endpoint,
  508. self.service_catalog,
  509. service_type='foo')
  510. def test_get_endpoint_raises_with_invalid_endpoint_type(self):
  511. self.assertRaises(exception.NoServiceEndpoint,
  512. auth.get_endpoint,
  513. self.service_catalog,
  514. service_type='object-store',
  515. endpoint_type='foo')
  516. def test_get_endpoint_raises_with_invalid_endpoint_region(self):
  517. self.assertRaises(exception.NoServiceEndpoint,
  518. auth.get_endpoint,
  519. self.service_catalog,
  520. service_type='object-store',
  521. endpoint_region='foo',
  522. endpoint_type='internalURL')
  523. class TestImageMutability(utils.BaseTestCase):
  524. def setUp(self):
  525. super(TestImageMutability, self).setUp()
  526. self.image_factory = glance.domain.ImageFactory()
  527. def _is_mutable(self, tenant, owner, is_admin=False):
  528. context = glance.context.RequestContext(tenant=tenant,
  529. is_admin=is_admin)
  530. image = self.image_factory.new_image(owner=owner)
  531. return authorization.is_image_mutable(context, image)
  532. def test_admin_everything_mutable(self):
  533. self.assertTrue(self._is_mutable(None, None, is_admin=True))
  534. self.assertTrue(self._is_mutable(None, TENANT1, is_admin=True))
  535. self.assertTrue(self._is_mutable(TENANT1, None, is_admin=True))
  536. self.assertTrue(self._is_mutable(TENANT1, TENANT1, is_admin=True))
  537. self.assertTrue(self._is_mutable(TENANT1, TENANT2, is_admin=True))
  538. def test_no_tenant_nothing_mutable(self):
  539. self.assertFalse(self._is_mutable(None, None))
  540. self.assertFalse(self._is_mutable(None, TENANT1))
  541. def test_regular_user(self):
  542. self.assertFalse(self._is_mutable(TENANT1, None))
  543. self.assertFalse(self._is_mutable(TENANT1, TENANT2))
  544. self.assertTrue(self._is_mutable(TENANT1, TENANT1))
  545. class TestImmutableImage(utils.BaseTestCase):
  546. def setUp(self):
  547. super(TestImmutableImage, self).setUp()
  548. image_factory = glance.domain.ImageFactory()
  549. self.context = glance.context.RequestContext(tenant=TENANT1)
  550. image = image_factory.new_image(
  551. image_id=UUID1,
  552. name='Marvin',
  553. owner=TENANT1,
  554. disk_format='raw',
  555. container_format='bare',
  556. extra_properties={'foo': 'bar'},
  557. tags=['ping', 'pong'],
  558. )
  559. self.image = authorization.ImmutableImageProxy(image, self.context)
  560. def _test_change(self, attr, value):
  561. self.assertRaises(exception.Forbidden,
  562. setattr, self.image, attr, value)
  563. self.assertRaises(exception.Forbidden,
  564. delattr, self.image, attr)
  565. def test_change_id(self):
  566. self._test_change('image_id', UUID2)
  567. def test_change_name(self):
  568. self._test_change('name', 'Freddie')
  569. def test_change_owner(self):
  570. self._test_change('owner', TENANT2)
  571. def test_change_min_disk(self):
  572. self._test_change('min_disk', 100)
  573. def test_change_min_ram(self):
  574. self._test_change('min_ram', 1024)
  575. def test_change_disk_format(self):
  576. self._test_change('disk_format', 'vhd')
  577. def test_change_container_format(self):
  578. self._test_change('container_format', 'ova')
  579. def test_change_visibility(self):
  580. self._test_change('visibility', 'public')
  581. def test_change_status(self):
  582. self._test_change('status', 'active')
  583. def test_change_created_at(self):
  584. self._test_change('created_at', timeutils.utcnow())
  585. def test_change_updated_at(self):
  586. self._test_change('updated_at', timeutils.utcnow())
  587. def test_change_locations(self):
  588. self._test_change('locations', ['http://a/b/c'])
  589. self.assertRaises(exception.Forbidden,
  590. self.image.locations.append, 'http://a/b/c')
  591. self.assertRaises(exception.Forbidden,
  592. self.image.locations.extend, ['http://a/b/c'])
  593. self.assertRaises(exception.Forbidden,
  594. self.image.locations.insert, 'foo')
  595. self.assertRaises(exception.Forbidden,
  596. self.image.locations.pop)
  597. self.assertRaises(exception.Forbidden,
  598. self.image.locations.remove, 'foo')
  599. self.assertRaises(exception.Forbidden,
  600. self.image.locations.reverse)
  601. self.assertRaises(exception.Forbidden,
  602. self.image.locations.sort)
  603. self.assertRaises(exception.Forbidden,
  604. self.image.locations.__delitem__, 0)
  605. self.assertRaises(exception.Forbidden,
  606. self.image.locations.__delslice__, 0, 2)
  607. self.assertRaises(exception.Forbidden,
  608. self.image.locations.__setitem__, 0, 'foo')
  609. self.assertRaises(exception.Forbidden,
  610. self.image.locations.__setslice__,
  611. 0, 2, ['foo', 'bar'])
  612. self.assertRaises(exception.Forbidden,
  613. self.image.locations.__iadd__, 'foo')
  614. self.assertRaises(exception.Forbidden,
  615. self.image.locations.__imul__, 2)
  616. def test_change_size(self):
  617. self._test_change('size', 32)
  618. def test_change_tags(self):
  619. self.assertRaises(exception.Forbidden,
  620. delattr, self.image, 'tags')
  621. self.assertRaises(exception.Forbidden,
  622. setattr, self.image, 'tags', ['king', 'kong'])
  623. self.assertRaises(exception.Forbidden, self.image.tags.pop)
  624. self.assertRaises(exception.Forbidden, self.image.tags.clear)
  625. self.assertRaises(exception.Forbidden, self.image.tags.add, 'king')
  626. self.assertRaises(exception.Forbidden, self.image.tags.remove, 'ping')
  627. self.assertRaises(exception.Forbidden,
  628. self.image.tags.update, set(['king', 'kong']))
  629. self.assertRaises(exception.Forbidden,
  630. self.image.tags.intersection_update, set([]))
  631. self.assertRaises(exception.Forbidden,
  632. self.image.tags.difference_update, set([]))
  633. self.assertRaises(exception.Forbidden,
  634. self.image.tags.symmetric_difference_update,
  635. set([]))
  636. def test_change_properties(self):
  637. self.assertRaises(exception.Forbidden,
  638. delattr, self.image, 'extra_properties')
  639. self.assertRaises(exception.Forbidden,
  640. setattr, self.image, 'extra_properties', {})
  641. self.assertRaises(exception.Forbidden,
  642. self.image.extra_properties.__delitem__, 'foo')
  643. self.assertRaises(exception.Forbidden,
  644. self.image.extra_properties.__setitem__, 'foo', 'b')
  645. self.assertRaises(exception.Forbidden,
  646. self.image.extra_properties.__setitem__, 'z', 'j')
  647. self.assertRaises(exception.Forbidden,
  648. self.image.extra_properties.pop)
  649. self.assertRaises(exception.Forbidden,
  650. self.image.extra_properties.popitem)
  651. self.assertRaises(exception.Forbidden,
  652. self.image.extra_properties.setdefault, 'p', 'j')
  653. self.assertRaises(exception.Forbidden,
  654. self.image.extra_properties.update, {})
  655. def test_delete(self):
  656. self.assertRaises(exception.Forbidden, self.image.delete)
  657. def test_set_data(self):
  658. self.assertRaises(exception.Forbidden,
  659. self.image.set_data, 'blah', 4)
  660. def test_deactivate_image(self):
  661. self.assertRaises(exception.Forbidden, self.image.deactivate)
  662. def test_reactivate_image(self):
  663. self.assertRaises(exception.Forbidden, self.image.reactivate)
  664. def test_get_data(self):
  665. class FakeImage(object):
  666. def get_data(self):
  667. return 'tiddlywinks'
  668. image = glance.api.authorization.ImmutableImageProxy(
  669. FakeImage(), self.context)
  670. self.assertEqual('tiddlywinks', image.get_data())
  671. class TestImageFactoryProxy(utils.BaseTestCase):
  672. def setUp(self):
  673. super(TestImageFactoryProxy, self).setUp()
  674. factory = glance.domain.ImageFactory()
  675. self.context = glance.context.RequestContext(tenant=TENANT1)
  676. self.image_factory = authorization.ImageFactoryProxy(factory,
  677. self.context)
  678. def test_default_owner_is_set(self):
  679. image = self.image_factory.new_image()
  680. self.assertEqual(TENANT1, image.owner)
  681. def test_wrong_owner_cannot_be_set(self):
  682. self.assertRaises(exception.Forbidden,
  683. self.image_factory.new_image, owner=TENANT2)
  684. def test_cannot_set_owner_to_none(self):
  685. self.assertRaises(exception.Forbidden,
  686. self.image_factory.new_image, owner=None)
  687. def test_admin_can_set_any_owner(self):
  688. self.context.is_admin = True
  689. image = self.image_factory.new_image(owner=TENANT2)
  690. self.assertEqual(TENANT2, image.owner)
  691. def test_admin_can_set_owner_to_none(self):
  692. self.context.is_admin = True
  693. image = self.image_factory.new_image(owner=None)
  694. self.assertIsNone(image.owner)
  695. def test_admin_still_gets_default_tenant(self):
  696. self.context.is_admin = True
  697. image = self.image_factory.new_image()
  698. self.assertEqual(TENANT1, image.owner)
  699. class TestImageRepoProxy(utils.BaseTestCase):
  700. class ImageRepoStub(object):
  701. def __init__(self, fixtures):
  702. self.fixtures = fixtures
  703. def get(self, image_id):
  704. for f in self.fixtures:
  705. if f.image_id == image_id:
  706. return f
  707. else:
  708. raise ValueError(image_id)
  709. def list(self, *args, **kwargs):
  710. return self.fixtures
  711. def setUp(self):
  712. super(TestImageRepoProxy, self).setUp()
  713. image_factory = glance.domain.ImageFactory()
  714. self.fixtures = [
  715. image_factory.new_image(owner=TENANT1),
  716. image_factory.new_image(owner=TENANT2, visibility='public'),
  717. image_factory.new_image(owner=TENANT2),
  718. ]
  719. self.context = glance.context.RequestContext(tenant=TENANT1)
  720. image_repo = self.ImageRepoStub(self.fixtures)
  721. self.image_repo = authorization.ImageRepoProxy(image_repo,
  722. self.context)
  723. def test_get_mutable_image(self):
  724. image = self.image_repo.get(self.fixtures[0].image_id)
  725. self.assertEqual(image.image_id, self.fixtures[0].image_id)
  726. def test_get_immutable_image(self):
  727. image = self.image_repo.get(self.fixtures[1].image_id)
  728. self.assertRaises(exception.Forbidden,
  729. setattr, image, 'name', 'Vince')
  730. def test_list(self):
  731. images = self.image_repo.list()
  732. self.assertEqual(images[0].image_id, self.fixtures[0].image_id)
  733. self.assertRaises(exception.Forbidden,
  734. setattr, images[1], 'name', 'Wally')
  735. self.assertRaises(exception.Forbidden,
  736. setattr, images[2], 'name', 'Calvin')
  737. class TestImmutableTask(utils.BaseTestCase):
  738. def setUp(self):
  739. super(TestImmutableTask, self).setUp()
  740. task_factory = glance.domain.TaskFactory()
  741. self.context = glance.context.RequestContext(tenant=TENANT2)
  742. task_type = 'import'
  743. owner = TENANT2
  744. task = task_factory.new_task(task_type, owner)
  745. self.task = authorization.ImmutableTaskProxy(task)
  746. def _test_change(self, attr, value):
  747. self.assertRaises(
  748. exception.Forbidden,
  749. setattr,
  750. self.task,
  751. attr,
  752. value
  753. )
  754. self.assertRaises(
  755. exception.Forbidden,
  756. delattr,
  757. self.task,
  758. attr
  759. )
  760. def test_change_id(self):
  761. self._test_change('task_id', UUID2)
  762. def test_change_type(self):
  763. self._test_change('type', 'fake')
  764. def test_change_status(self):
  765. self._test_change('status', 'success')
  766. def test_change_owner(self):
  767. self._test_change('owner', 'fake')
  768. def test_change_expires_at(self):
  769. self._test_change('expires_at', 'fake')
  770. def test_change_created_at(self):
  771. self._test_change('created_at', 'fake')
  772. def test_change_updated_at(self):
  773. self._test_change('updated_at', 'fake')
  774. def test_begin_processing(self):
  775. self.assertRaises(
  776. exception.Forbidden,
  777. self.task.begin_processing
  778. )
  779. def test_succeed(self):
  780. self.assertRaises(
  781. exception.Forbidden,
  782. self.task.succeed,
  783. 'result'
  784. )
  785. def test_fail(self):
  786. self.assertRaises(
  787. exception.Forbidden,
  788. self.task.fail,
  789. 'message'
  790. )
  791. class TestImmutableTaskStub(utils.BaseTestCase):
  792. def setUp(self):
  793. super(TestImmutableTaskStub, self).setUp()
  794. task_factory = glance.domain.TaskFactory()
  795. self.context = glance.context.RequestContext(tenant=TENANT2)
  796. task_type = 'import'
  797. owner = TENANT2
  798. task = task_factory.new_task(task_type, owner)
  799. self.task = authorization.ImmutableTaskStubProxy(task)
  800. def _test_change(self, attr, value):
  801. self.assertRaises(
  802. exception.Forbidden,
  803. setattr,
  804. self.task,
  805. attr,
  806. value
  807. )
  808. self.assertRaises(
  809. exception.Forbidden,
  810. delattr,
  811. self.task,
  812. attr
  813. )
  814. def test_change_id(self):
  815. self._test_change('task_id', UUID2)
  816. def test_change_type(self):
  817. self._test_change('type', 'fake')
  818. def test_change_status(self):
  819. self._test_change('status', 'success')
  820. def test_change_owner(self):
  821. self._test_change('owner', 'fake')
  822. def test_change_expires_at(self):
  823. self._test_change('expires_at', 'fake')
  824. def test_change_created_at(self):
  825. self._test_change('created_at', 'fake')
  826. def test_change_updated_at(self):
  827. self._test_change('updated_at', 'fake')
  828. class TestTaskFactoryProxy(utils.BaseTestCase):
  829. def setUp(self):
  830. super(TestTaskFactoryProxy, self).setUp()
  831. factory = glance.domain.TaskFactory()
  832. self.context = glance.context.RequestContext(tenant=TENANT1)
  833. self.context_owner_is_none = glance.context.RequestContext()
  834. self.task_factory = authorization.TaskFactoryProxy(
  835. factory,
  836. self.context
  837. )
  838. self.task_type = 'import'
  839. self.task_input = '{"loc": "fake"}'
  840. self.owner = 'foo'
  841. self.request1 = unittest_utils.get_fake_request(tenant=TENANT1)
  842. self.request2 = unittest_utils.get_fake_request(tenant=TENANT2)
  843. def test_task_create_default_owner(self):
  844. owner = self.request1.context.owner
  845. task = self.task_factory.new_task(task_type=self.task_type,
  846. owner=owner)
  847. self.assertEqual(TENANT1, task.owner)
  848. def test_task_create_wrong_owner(self):
  849. self.assertRaises(exception.Forbidden,
  850. self.task_factory.new_task,
  851. task_type=self.task_type,
  852. task_input=self.task_input,
  853. owner=self.owner)
  854. def test_task_create_owner_as_None(self):
  855. self.assertRaises(exception.Forbidden,
  856. self.task_factory.new_task,
  857. task_type=self.task_type,
  858. task_input=self.task_input,
  859. owner=None)
  860. def test_task_create_admin_context_owner_as_None(self):
  861. self.context.is_admin = True
  862. self.assertRaises(exception.Forbidden,
  863. self.task_factory.new_task,
  864. task_type=self.task_type,
  865. task_input=self.task_input,
  866. owner=None)
  867. class TestTaskRepoProxy(utils.BaseTestCase):
  868. class TaskRepoStub(object):
  869. def __init__(self, fixtures):
  870. self.fixtures = fixtures
  871. def get(self, task_id):
  872. for f in self.fixtures:
  873. if f.task_id == task_id:
  874. return f
  875. else:
  876. raise ValueError(task_id)
  877. class TaskStubRepoStub(object):
  878. def __init__(self, fixtures):
  879. self.fixtures = fixtures
  880. def list(self, *args, **kwargs):
  881. return self.fixtures
  882. def setUp(self):
  883. super(TestTaskRepoProxy, self).setUp()
  884. task_factory = glance.domain.TaskFactory()
  885. task_type = 'import'
  886. owner = None
  887. self.fixtures = [
  888. task_factory.new_task(task_type, owner),
  889. task_factory.new_task(task_type, owner),
  890. task_factory.new_task(task_type, owner),
  891. ]
  892. self.context = glance.context.RequestContext(tenant=TENANT1)
  893. task_repo = self.TaskRepoStub(self.fixtures)
  894. task_stub_repo = self.TaskStubRepoStub(self.fixtures)
  895. self.task_repo = authorization.TaskRepoProxy(
  896. task_repo,
  897. self.context
  898. )
  899. self.task_stub_repo = authorization.TaskStubRepoProxy(
  900. task_stub_repo,
  901. self.context
  902. )
  903. def test_get_mutable_task(self):
  904. task = self.task_repo.get(self.fixtures[0].task_id)
  905. self.assertEqual(task.task_id, self.fixtures[0].task_id)
  906. def test_get_immutable_task(self):
  907. task_id = self.fixtures[1].task_id
  908. task = self.task_repo.get(task_id)
  909. self.assertRaises(exception.Forbidden,
  910. setattr, task, 'input', 'foo')
  911. def test_list(self):
  912. tasks = self.task_stub_repo.list()
  913. self.assertEqual(tasks[0].task_id, self.fixtures[0].task_id)
  914. self.assertRaises(exception.Forbidden,
  915. setattr,
  916. tasks[1],
  917. 'owner',
  918. 'foo')
  919. self.assertRaises(exception.Forbidden,
  920. setattr,
  921. tasks[2],
  922. 'owner',
  923. 'foo')