OpenStack Identity (Keystone)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1114 lines
41 KiB

  1. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  2. # not use this file except in compliance with the License. You may obtain
  3. # a copy of the License at
  4. #
  5. # http://www.apache.org/licenses/LICENSE-2.0
  6. #
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  9. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  10. # License for the specific language governing permissions and limitations
  11. # under the License.
  12. import uuid
  13. from keystone.common import provider_api
  14. import keystone.conf
  15. from keystone.tests.common import auth as common_auth
  16. from keystone.tests import unit
  17. from keystone.tests.unit import base_classes
  18. from keystone.tests.unit import ksfixtures
  19. CONF = keystone.conf.CONF
  20. PROVIDERS = provider_api.ProviderAPIs
  21. class _AssignmentTestUtilities(object):
  22. """Useful utilities for setting up test assignments and assertions."""
  23. def _setup_test_role_assignments(self):
  24. # Utility to create assignments and return important data for
  25. # assertions.
  26. # Since the role doesn't really matter too much, we can just re-use an
  27. # existing role instead of creating a new one.
  28. role_id = self.bootstrapper.reader_role_id
  29. user = PROVIDERS.identity_api.create_user(
  30. unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  31. )
  32. group = PROVIDERS.identity_api.create_group(
  33. unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  34. )
  35. domain = PROVIDERS.resource_api.create_domain(
  36. uuid.uuid4().hex, unit.new_domain_ref()
  37. )
  38. project = PROVIDERS.resource_api.create_project(
  39. uuid.uuid4().hex,
  40. unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
  41. )
  42. # create a user+project role assignment.
  43. PROVIDERS.assignment_api.create_grant(
  44. role_id, user_id=user['id'], project_id=project['id']
  45. )
  46. # create a user+domain role assignment.
  47. PROVIDERS.assignment_api.create_grant(
  48. role_id, user_id=user['id'], domain_id=domain['id']
  49. )
  50. # create a user+system role assignment.
  51. PROVIDERS.assignment_api.create_system_grant_for_user(
  52. user['id'], role_id
  53. )
  54. # create a group+project role assignment.
  55. PROVIDERS.assignment_api.create_grant(
  56. role_id, group_id=group['id'], project_id=project['id']
  57. )
  58. # create a group+domain role assignment.
  59. PROVIDERS.assignment_api.create_grant(
  60. role_id, group_id=group['id'], domain_id=domain['id']
  61. )
  62. # create a group+system role assignment.
  63. PROVIDERS.assignment_api.create_system_grant_for_group(
  64. group['id'], role_id
  65. )
  66. return {
  67. 'user_id': user['id'],
  68. 'group_id': group['id'],
  69. 'domain_id': domain['id'],
  70. 'project_id': project['id'],
  71. 'role_id': role_id,
  72. }
  73. def _extract_role_assignments_from_response_body(self, r):
  74. # Condense the role assignment details into a set of key things we can
  75. # use in assertions.
  76. assignments = []
  77. for assignment in r.json['role_assignments']:
  78. a = {}
  79. if 'project' in assignment['scope']:
  80. a['project_id'] = assignment['scope']['project']['id']
  81. elif 'domain' in assignment['scope']:
  82. a['domain_id'] = assignment['scope']['domain']['id']
  83. elif 'system' in assignment['scope']:
  84. a['system'] = 'all'
  85. if 'user' in assignment:
  86. a['user_id'] = assignment['user']['id']
  87. elif 'group' in assignment:
  88. a['group_id'] = assignment['group']['id']
  89. a['role_id'] = assignment['role']['id']
  90. assignments.append(a)
  91. return assignments
  92. class _SystemUserTests(object):
  93. """Common functionality for system users regardless of default role."""
  94. def test_user_can_list_all_role_assignments_in_the_deployment(self):
  95. assignments = self._setup_test_role_assignments()
  96. # this assignment is created by keystone-manage bootstrap
  97. self.expected.append({
  98. 'user_id': self.bootstrapper.admin_user_id,
  99. 'project_id': self.bootstrapper.project_id,
  100. 'role_id': self.bootstrapper.admin_role_id
  101. })
  102. # this assignment is created by keystone-manage bootstrap
  103. self.expected.append({
  104. 'user_id': self.bootstrapper.admin_user_id,
  105. 'system': 'all',
  106. 'role_id': self.bootstrapper.admin_role_id
  107. })
  108. self.expected.append({
  109. 'user_id': assignments['user_id'],
  110. 'project_id': assignments['project_id'],
  111. 'role_id': assignments['role_id']
  112. })
  113. self.expected.append({
  114. 'user_id': assignments['user_id'],
  115. 'domain_id': assignments['domain_id'],
  116. 'role_id': assignments['role_id']
  117. })
  118. self.expected.append({
  119. 'user_id': assignments['user_id'],
  120. 'system': 'all',
  121. 'role_id': assignments['role_id']
  122. })
  123. self.expected.append({
  124. 'group_id': assignments['group_id'],
  125. 'project_id': assignments['project_id'],
  126. 'role_id': assignments['role_id']
  127. })
  128. self.expected.append({
  129. 'group_id': assignments['group_id'],
  130. 'domain_id': assignments['domain_id'],
  131. 'role_id': assignments['role_id']
  132. })
  133. self.expected.append({
  134. 'group_id': assignments['group_id'],
  135. 'system': 'all',
  136. 'role_id': assignments['role_id']
  137. })
  138. with self.test_client() as c:
  139. r = c.get('/v3/role_assignments', headers=self.headers)
  140. self.assertEqual(
  141. len(self.expected), len(r.json['role_assignments'])
  142. )
  143. actual = self._extract_role_assignments_from_response_body(r)
  144. for assignment in actual:
  145. self.assertIn(assignment, self.expected)
  146. def test_user_can_list_all_role_names_assignments_in_the_deployment(self):
  147. assignments = self._setup_test_role_assignments()
  148. # this assignment is created by keystone-manage bootstrap
  149. self.expected.append({
  150. 'user_id': self.bootstrapper.admin_user_id,
  151. 'project_id': self.bootstrapper.project_id,
  152. 'role_id': self.bootstrapper.admin_role_id
  153. })
  154. # this assignment is created by keystone-manage bootstrap
  155. self.expected.append({
  156. 'user_id': self.bootstrapper.admin_user_id,
  157. 'system': 'all',
  158. 'role_id': self.bootstrapper.admin_role_id
  159. })
  160. self.expected.append({
  161. 'user_id': assignments['user_id'],
  162. 'project_id': assignments['project_id'],
  163. 'role_id': assignments['role_id']
  164. })
  165. self.expected.append({
  166. 'user_id': assignments['user_id'],
  167. 'domain_id': assignments['domain_id'],
  168. 'role_id': assignments['role_id']
  169. })
  170. self.expected.append({
  171. 'user_id': assignments['user_id'],
  172. 'system': 'all',
  173. 'role_id': assignments['role_id']
  174. })
  175. self.expected.append({
  176. 'group_id': assignments['group_id'],
  177. 'project_id': assignments['project_id'],
  178. 'role_id': assignments['role_id']
  179. })
  180. self.expected.append({
  181. 'group_id': assignments['group_id'],
  182. 'domain_id': assignments['domain_id'],
  183. 'role_id': assignments['role_id']
  184. })
  185. self.expected.append({
  186. 'group_id': assignments['group_id'],
  187. 'system': 'all',
  188. 'role_id': assignments['role_id']
  189. })
  190. with self.test_client() as c:
  191. r = c.get(
  192. '/v3/role_assignments?include_names=True', headers=self.headers
  193. )
  194. self.assertEqual(
  195. len(self.expected), len(r.json['role_assignments'])
  196. )
  197. actual = self._extract_role_assignments_from_response_body(r)
  198. for assignment in actual:
  199. self.assertIn(assignment, self.expected)
  200. def test_user_can_filter_role_assignments_by_project(self):
  201. assignments = self._setup_test_role_assignments()
  202. expected = [
  203. {
  204. 'user_id': assignments['user_id'],
  205. 'project_id': assignments['project_id'],
  206. 'role_id': assignments['role_id']
  207. },
  208. {
  209. 'group_id': assignments['group_id'],
  210. 'project_id': assignments['project_id'],
  211. 'role_id': assignments['role_id']
  212. }
  213. ]
  214. project_id = assignments['project_id']
  215. with self.test_client() as c:
  216. r = c.get(
  217. '/v3/role_assignments?scope.project.id=%s' % project_id,
  218. headers=self.headers
  219. )
  220. self.assertEqual(len(expected), len(r.json['role_assignments']))
  221. actual = self._extract_role_assignments_from_response_body(r)
  222. for assignment in actual:
  223. self.assertIn(assignment, expected)
  224. def test_user_can_filter_role_assignments_by_domain(self):
  225. assignments = self._setup_test_role_assignments()
  226. expected = [
  227. {
  228. 'user_id': assignments['user_id'],
  229. 'domain_id': assignments['domain_id'],
  230. 'role_id': assignments['role_id']
  231. },
  232. {
  233. 'group_id': assignments['group_id'],
  234. 'domain_id': assignments['domain_id'],
  235. 'role_id': assignments['role_id']
  236. }
  237. ]
  238. domain_id = assignments['domain_id']
  239. with self.test_client() as c:
  240. r = c.get(
  241. '/v3/role_assignments?scope.domain.id=%s' % domain_id,
  242. headers=self.headers
  243. )
  244. self.assertEqual(len(expected), len(r.json['role_assignments']))
  245. actual = self._extract_role_assignments_from_response_body(r)
  246. for assignment in actual:
  247. self.assertIn(assignment, expected)
  248. def test_user_can_filter_role_assignments_by_system(self):
  249. assignments = self._setup_test_role_assignments()
  250. # this assignment is created by keystone-manage bootstrap
  251. self.expected.append({
  252. 'user_id': self.bootstrapper.admin_user_id,
  253. 'system': 'all',
  254. 'role_id': self.bootstrapper.admin_role_id
  255. })
  256. self.expected.append({
  257. 'user_id': assignments['user_id'],
  258. 'system': 'all',
  259. 'role_id': assignments['role_id']
  260. })
  261. self.expected.append({
  262. 'group_id': assignments['group_id'],
  263. 'system': 'all',
  264. 'role_id': assignments['role_id']
  265. })
  266. with self.test_client() as c:
  267. r = c.get(
  268. '/v3/role_assignments?scope.system=all',
  269. headers=self.headers
  270. )
  271. self.assertEqual(
  272. len(self.expected), len(r.json['role_assignments'])
  273. )
  274. actual = self._extract_role_assignments_from_response_body(r)
  275. for assignment in actual:
  276. self.assertIn(assignment, self.expected)
  277. def test_user_can_filter_role_assignments_by_user(self):
  278. assignments = self._setup_test_role_assignments()
  279. expected = [
  280. # assignment of the user running the test case
  281. {
  282. 'user_id': assignments['user_id'],
  283. 'project_id': assignments['project_id'],
  284. 'role_id': assignments['role_id']
  285. },
  286. {
  287. 'user_id': assignments['user_id'],
  288. 'domain_id': assignments['domain_id'],
  289. 'role_id': assignments['role_id']
  290. },
  291. {
  292. 'user_id': assignments['user_id'],
  293. 'system': 'all',
  294. 'role_id': assignments['role_id']
  295. }
  296. ]
  297. user_id = assignments['user_id']
  298. with self.test_client() as c:
  299. r = c.get(
  300. '/v3/role_assignments?user.id=%s' % user_id,
  301. headers=self.headers
  302. )
  303. self.assertEqual(len(expected), len(r.json['role_assignments']))
  304. actual = self._extract_role_assignments_from_response_body(r)
  305. for assignment in actual:
  306. self.assertIn(assignment, expected)
  307. def test_user_can_filter_role_assignments_by_group(self):
  308. assignments = self._setup_test_role_assignments()
  309. expected = [
  310. {
  311. 'group_id': assignments['group_id'],
  312. 'project_id': assignments['project_id'],
  313. 'role_id': assignments['role_id']
  314. },
  315. {
  316. 'group_id': assignments['group_id'],
  317. 'domain_id': assignments['domain_id'],
  318. 'role_id': assignments['role_id']
  319. },
  320. {
  321. 'group_id': assignments['group_id'],
  322. 'system': 'all',
  323. 'role_id': assignments['role_id']
  324. }
  325. ]
  326. group_id = assignments['group_id']
  327. with self.test_client() as c:
  328. r = c.get(
  329. '/v3/role_assignments?group.id=%s' % group_id,
  330. headers=self.headers
  331. )
  332. self.assertEqual(len(expected), len(r.json['role_assignments']))
  333. actual = self._extract_role_assignments_from_response_body(r)
  334. for assignment in actual:
  335. self.assertIn(assignment, expected)
  336. def test_user_can_filter_role_assignments_by_role(self):
  337. assignments = self._setup_test_role_assignments()
  338. self.expected.append({
  339. 'user_id': assignments['user_id'],
  340. 'project_id': assignments['project_id'],
  341. 'role_id': assignments['role_id']
  342. })
  343. self.expected.append({
  344. 'user_id': assignments['user_id'],
  345. 'domain_id': assignments['domain_id'],
  346. 'role_id': assignments['role_id']
  347. })
  348. self.expected.append({
  349. 'user_id': assignments['user_id'],
  350. 'system': 'all',
  351. 'role_id': assignments['role_id']
  352. })
  353. self.expected.append({
  354. 'group_id': assignments['group_id'],
  355. 'project_id': assignments['project_id'],
  356. 'role_id': assignments['role_id']
  357. })
  358. self.expected.append({
  359. 'group_id': assignments['group_id'],
  360. 'domain_id': assignments['domain_id'],
  361. 'role_id': assignments['role_id']
  362. })
  363. self.expected.append({
  364. 'group_id': assignments['group_id'],
  365. 'system': 'all',
  366. 'role_id': assignments['role_id']
  367. })
  368. role_id = assignments['role_id']
  369. with self.test_client() as c:
  370. r = c.get(
  371. '/v3/role_assignments?role.id=%s&include_names=True' % role_id,
  372. headers=self.headers
  373. )
  374. self.assertEqual(
  375. len(self.expected), len(r.json['role_assignments'])
  376. )
  377. actual = self._extract_role_assignments_from_response_body(r)
  378. for assignment in actual:
  379. self.assertIn(assignment, self.expected)
  380. def test_user_can_filter_role_assignments_by_project_and_role(self):
  381. assignments = self._setup_test_role_assignments()
  382. expected = [
  383. {
  384. 'user_id': assignments['user_id'],
  385. 'project_id': assignments['project_id'],
  386. 'role_id': assignments['role_id']
  387. },
  388. {
  389. 'group_id': assignments['group_id'],
  390. 'project_id': assignments['project_id'],
  391. 'role_id': assignments['role_id']
  392. },
  393. ]
  394. with self.test_client() as c:
  395. qs = (assignments['project_id'], assignments['role_id'])
  396. r = c.get(
  397. '/v3/role_assignments?scope.project.id=%s&role.id=%s' % qs,
  398. headers=self.headers
  399. )
  400. self.assertEqual(len(expected), len(r.json['role_assignments']))
  401. actual = self._extract_role_assignments_from_response_body(r)
  402. for assignment in actual:
  403. self.assertIn(assignment, expected)
  404. def test_user_can_filter_role_assignments_by_domain_and_role(self):
  405. assignments = self._setup_test_role_assignments()
  406. expected = [
  407. {
  408. 'user_id': assignments['user_id'],
  409. 'domain_id': assignments['domain_id'],
  410. 'role_id': assignments['role_id']
  411. },
  412. {
  413. 'group_id': assignments['group_id'],
  414. 'domain_id': assignments['domain_id'],
  415. 'role_id': assignments['role_id']
  416. },
  417. ]
  418. qs = (assignments['domain_id'], assignments['role_id'])
  419. with self.test_client() as c:
  420. r = c.get(
  421. '/v3/role_assignments?scope.domain.id=%s&role.id=%s' % qs,
  422. headers=self.headers
  423. )
  424. self.assertEqual(len(expected), len(r.json['role_assignments']))
  425. actual = self._extract_role_assignments_from_response_body(r)
  426. for assignment in actual:
  427. self.assertIn(assignment, expected)
  428. def test_user_can_filter_role_assignments_by_system_and_role(self):
  429. assignments = self._setup_test_role_assignments()
  430. self.expected.append({
  431. 'user_id': assignments['user_id'],
  432. 'system': 'all',
  433. 'role_id': assignments['role_id']
  434. })
  435. self.expected.append({
  436. 'group_id': assignments['group_id'],
  437. 'system': 'all',
  438. 'role_id': assignments['role_id']
  439. })
  440. role_id = assignments['role_id']
  441. with self.test_client() as c:
  442. r = c.get(
  443. '/v3/role_assignments?scope.system=all&role.id=%s' % role_id,
  444. headers=self.headers
  445. )
  446. self.assertEqual(
  447. len(self.expected), len(r.json['role_assignments'])
  448. )
  449. actual = self._extract_role_assignments_from_response_body(r)
  450. for assignment in actual:
  451. self.assertIn(assignment, self.expected)
  452. def test_user_can_filter_role_assignments_by_user_and_role(self):
  453. assignments = self._setup_test_role_assignments()
  454. expected = [
  455. {
  456. 'user_id': assignments['user_id'],
  457. 'project_id': assignments['project_id'],
  458. 'role_id': assignments['role_id']
  459. },
  460. {
  461. 'user_id': assignments['user_id'],
  462. 'domain_id': assignments['domain_id'],
  463. 'role_id': assignments['role_id']
  464. },
  465. {
  466. 'user_id': assignments['user_id'],
  467. 'system': 'all',
  468. 'role_id': assignments['role_id']
  469. }
  470. ]
  471. qs = (assignments['user_id'], assignments['role_id'])
  472. with self.test_client() as c:
  473. r = c.get(
  474. '/v3/role_assignments?user.id=%s&role.id=%s' % qs,
  475. headers=self.headers
  476. )
  477. self.assertEqual(len(expected), len(r.json['role_assignments']))
  478. actual = self._extract_role_assignments_from_response_body(r)
  479. for assignment in actual:
  480. self.assertIn(assignment, expected)
  481. def test_user_can_filter_role_assignments_by_group_and_role(self):
  482. assignments = self._setup_test_role_assignments()
  483. expected = [
  484. {
  485. 'group_id': assignments['group_id'],
  486. 'project_id': assignments['project_id'],
  487. 'role_id': assignments['role_id']
  488. },
  489. {
  490. 'group_id': assignments['group_id'],
  491. 'domain_id': assignments['domain_id'],
  492. 'role_id': assignments['role_id']
  493. },
  494. {
  495. 'group_id': assignments['group_id'],
  496. 'system': 'all',
  497. 'role_id': assignments['role_id']
  498. }
  499. ]
  500. with self.test_client() as c:
  501. qs = (assignments['group_id'], assignments['role_id'])
  502. r = c.get(
  503. '/v3/role_assignments?group.id=%s&role.id=%s' % qs,
  504. headers=self.headers
  505. )
  506. self.assertEqual(len(expected), len(r.json['role_assignments']))
  507. actual = self._extract_role_assignments_from_response_body(r)
  508. for assignment in actual:
  509. self.assertIn(assignment, expected)
  510. def test_user_can_filter_role_assignments_by_project_and_user(self):
  511. assignments = self._setup_test_role_assignments()
  512. expected = [
  513. {
  514. 'user_id': assignments['user_id'],
  515. 'project_id': assignments['project_id'],
  516. 'role_id': assignments['role_id']
  517. }
  518. ]
  519. qs = (assignments['project_id'], assignments['user_id'])
  520. with self.test_client() as c:
  521. r = c.get(
  522. '/v3/role_assignments?scope.project.id=%s&user.id=%s' % qs,
  523. headers=self.headers
  524. )
  525. self.assertEqual(len(expected), len(r.json['role_assignments']))
  526. actual = self._extract_role_assignments_from_response_body(r)
  527. for assignment in actual:
  528. self.assertIn(assignment, expected)
  529. def test_user_can_filter_role_assignments_by_project_and_group(self):
  530. assignments = self._setup_test_role_assignments()
  531. expected = [
  532. {
  533. 'group_id': assignments['group_id'],
  534. 'project_id': assignments['project_id'],
  535. 'role_id': assignments['role_id']
  536. }
  537. ]
  538. qs = (assignments['project_id'], assignments['group_id'])
  539. with self.test_client() as c:
  540. r = c.get(
  541. '/v3/role_assignments?scope.project.id=%s&group.id=%s' % qs,
  542. headers=self.headers
  543. )
  544. self.assertEqual(len(expected), len(r.json['role_assignments']))
  545. actual = self._extract_role_assignments_from_response_body(r)
  546. for assignment in actual:
  547. self.assertIn(assignment, expected)
  548. def test_user_can_filter_role_assignments_by_domain_and_user(self):
  549. assignments = self._setup_test_role_assignments()
  550. expected = [
  551. {
  552. 'user_id': assignments['user_id'],
  553. 'domain_id': assignments['domain_id'],
  554. 'role_id': assignments['role_id']
  555. }
  556. ]
  557. qs = (assignments['domain_id'], assignments['user_id'])
  558. with self.test_client() as c:
  559. r = c.get(
  560. '/v3/role_assignments?scope.domain.id=%s&user.id=%s' % qs,
  561. headers=self.headers
  562. )
  563. self.assertEqual(len(expected), len(r.json['role_assignments']))
  564. actual = self._extract_role_assignments_from_response_body(r)
  565. for assignment in actual:
  566. self.assertIn(assignment, expected)
  567. def test_user_can_filter_role_assignments_by_domain_and_group(self):
  568. assignments = self._setup_test_role_assignments()
  569. expected = [
  570. {
  571. 'group_id': assignments['group_id'],
  572. 'domain_id': assignments['domain_id'],
  573. 'role_id': assignments['role_id']
  574. }
  575. ]
  576. qs = (assignments['domain_id'], assignments['group_id'])
  577. with self.test_client() as c:
  578. r = c.get(
  579. '/v3/role_assignments?scope.domain.id=%s&group.id=%s' % qs,
  580. headers=self.headers
  581. )
  582. self.assertEqual(len(expected), len(r.json['role_assignments']))
  583. actual = self._extract_role_assignments_from_response_body(r)
  584. for assignment in actual:
  585. self.assertIn(assignment, expected)
  586. class _DomainUserTests(object):
  587. """Common functionality for domain users."""
  588. def _setup_test_role_assignments_for_domain(self):
  589. # Populate role assignment within `self.domain_id` so that we can
  590. # assert users can view assignments within the domain they have
  591. # authorization on
  592. role_id = self.bootstrapper.reader_role_id
  593. user = PROVIDERS.identity_api.create_user(
  594. unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
  595. )
  596. group = PROVIDERS.identity_api.create_group(
  597. unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
  598. )
  599. project = PROVIDERS.resource_api.create_project(
  600. uuid.uuid4().hex,
  601. unit.new_project_ref(domain_id=self.domain_id)
  602. )
  603. # create a user+project role assignment.
  604. PROVIDERS.assignment_api.create_grant(
  605. role_id, user_id=user['id'], project_id=project['id']
  606. )
  607. # create a user+domain role assignment.
  608. PROVIDERS.assignment_api.create_grant(
  609. role_id, user_id=user['id'], domain_id=self.domain_id
  610. )
  611. # create a group+project role assignment.
  612. PROVIDERS.assignment_api.create_grant(
  613. role_id, group_id=group['id'], project_id=project['id']
  614. )
  615. # create a group+domain role assignment.
  616. PROVIDERS.assignment_api.create_grant(
  617. role_id, group_id=group['id'], domain_id=self.domain_id
  618. )
  619. return {
  620. 'user_id': user['id'],
  621. 'group_id': group['id'],
  622. 'project_id': project['id'],
  623. 'role_id': role_id,
  624. }
  625. def test_user_can_list_all_assignments_in_their_domain(self):
  626. self._setup_test_role_assignments()
  627. domain_assignments = self._setup_test_role_assignments_for_domain()
  628. self.expected.append({
  629. 'user_id': domain_assignments['user_id'],
  630. 'domain_id': self.domain_id,
  631. 'role_id': domain_assignments['role_id']
  632. })
  633. self.expected.append({
  634. 'user_id': domain_assignments['user_id'],
  635. 'project_id': domain_assignments['project_id'],
  636. 'role_id': domain_assignments['role_id']
  637. })
  638. self.expected.append({
  639. 'group_id': domain_assignments['group_id'],
  640. 'domain_id': self.domain_id,
  641. 'role_id': domain_assignments['role_id']
  642. })
  643. self.expected.append({
  644. 'group_id': domain_assignments['group_id'],
  645. 'project_id': domain_assignments['project_id'],
  646. 'role_id': domain_assignments['role_id']
  647. })
  648. with self.test_client() as c:
  649. r = c.get('/v3/role_assignments', headers=self.headers)
  650. self.assertEqual(
  651. len(self.expected), len(r.json['role_assignments'])
  652. )
  653. actual = self._extract_role_assignments_from_response_body(r)
  654. for assignment in actual:
  655. self.assertIn(assignment, self.expected)
  656. def test_user_can_filter_role_assignments_by_project_in_domain(self):
  657. self._setup_test_role_assignments()
  658. domain_assignments = self._setup_test_role_assignments_for_domain()
  659. expected = [
  660. {
  661. 'user_id': domain_assignments['user_id'],
  662. 'project_id': domain_assignments['project_id'],
  663. 'role_id': domain_assignments['role_id']
  664. },
  665. {
  666. 'group_id': domain_assignments['group_id'],
  667. 'project_id': domain_assignments['project_id'],
  668. 'role_id': domain_assignments['role_id']
  669. }
  670. ]
  671. project_id = domain_assignments['project_id']
  672. with self.test_client() as c:
  673. r = c.get(
  674. '/v3/role_assignments?scope.project.id=%s' % project_id,
  675. headers=self.headers
  676. )
  677. self.assertEqual(len(expected), len(r.json['role_assignments']))
  678. actual = self._extract_role_assignments_from_response_body(r)
  679. for assignment in actual:
  680. self.assertIn(assignment, expected)
  681. def test_user_can_filter_role_assignments_by_domain(self):
  682. # This shouldn't really provide any more value than just calling GET
  683. # /v3/role_assignments with a domain-scoped token, but we test it
  684. # anyway.
  685. self._setup_test_role_assignments()
  686. domain_assignments = self._setup_test_role_assignments_for_domain()
  687. self.expected.append({
  688. 'user_id': domain_assignments['user_id'],
  689. 'domain_id': self.domain_id,
  690. 'role_id': domain_assignments['role_id']
  691. })
  692. self.expected.append({
  693. 'group_id': domain_assignments['group_id'],
  694. 'domain_id': self.domain_id,
  695. 'role_id': domain_assignments['role_id']
  696. })
  697. with self.test_client() as c:
  698. r = c.get(
  699. '/v3/role_assignments?scope.domain.id=%s' % self.domain_id,
  700. headers=self.headers
  701. )
  702. self.assertEqual(
  703. len(self.expected), len(r.json['role_assignments'])
  704. )
  705. actual = self._extract_role_assignments_from_response_body(r)
  706. for assignment in actual:
  707. self.assertIn(assignment, self.expected)
  708. def test_user_can_filter_role_assignments_by_user_of_domain(self):
  709. self._setup_test_role_assignments()
  710. domain_assignments = self._setup_test_role_assignments_for_domain()
  711. expected = [
  712. {
  713. 'user_id': domain_assignments['user_id'],
  714. 'domain_id': self.domain_id,
  715. 'role_id': domain_assignments['role_id']
  716. },
  717. {
  718. 'user_id': domain_assignments['user_id'],
  719. 'project_id': domain_assignments['project_id'],
  720. 'role_id': domain_assignments['role_id']
  721. }
  722. ]
  723. user_id = domain_assignments['user_id']
  724. with self.test_client() as c:
  725. r = c.get(
  726. '/v3/role_assignments?user.id=%s' % user_id,
  727. headers=self.headers
  728. )
  729. self.assertEqual(len(expected), len(r.json['role_assignments']))
  730. actual = self._extract_role_assignments_from_response_body(r)
  731. for assignment in actual:
  732. self.assertIn(assignment, expected)
  733. def test_user_can_filter_role_assignments_by_group_of_domain(self):
  734. self._setup_test_role_assignments()
  735. domain_assignments = self._setup_test_role_assignments_for_domain()
  736. expected = [
  737. {
  738. 'group_id': domain_assignments['group_id'],
  739. 'domain_id': self.domain_id,
  740. 'role_id': domain_assignments['role_id']
  741. },
  742. {
  743. 'group_id': domain_assignments['group_id'],
  744. 'project_id': domain_assignments['project_id'],
  745. 'role_id': domain_assignments['role_id']
  746. }
  747. ]
  748. group_id = domain_assignments['group_id']
  749. with self.test_client() as c:
  750. r = c.get(
  751. '/v3/role_assignments?group.id=%s' % group_id,
  752. headers=self.headers
  753. )
  754. self.assertEqual(len(expected), len(r.json['role_assignments']))
  755. actual = self._extract_role_assignments_from_response_body(r)
  756. for assignment in actual:
  757. self.assertIn(assignment, expected)
  758. def test_user_cannot_filter_role_assignments_by_system(self):
  759. self._setup_test_role_assignments()
  760. self._setup_test_role_assignments_for_domain()
  761. with self.test_client() as c:
  762. r = c.get(
  763. '/v3/role_assignments?scope.system=all',
  764. headers=self.headers
  765. )
  766. self.assertEqual(0, len(r.json['role_assignments']))
  767. def test_user_cannot_filter_role_assignments_by_other_domain(self):
  768. assignments = self._setup_test_role_assignments()
  769. domain = assignments['domain_id']
  770. with self.test_client() as c:
  771. r = c.get(
  772. '/v3/role_assignments?scope.domain.id=%s' % domain,
  773. headers=self.headers
  774. )
  775. self.assertEqual([], r.json['role_assignments'])
  776. def test_user_cannot_filter_role_assignments_by_other_domain_project(self):
  777. assignments = self._setup_test_role_assignments()
  778. self._setup_test_role_assignments_for_domain()
  779. # This project is in an entirely separate domain that this user doesn't
  780. # have authorization to access, so they should only see an empty list
  781. project_id = assignments['project_id']
  782. with self.test_client() as c:
  783. r = c.get(
  784. '/v3/role_assignments?scope.project.id=%s' % project_id,
  785. headers=self.headers
  786. )
  787. self.assertEqual(0, len(r.json['role_assignments']))
  788. def test_user_cannot_filter_role_assignments_by_other_domain_user(self):
  789. assignments = self._setup_test_role_assignments()
  790. self._setup_test_role_assignments_for_domain()
  791. # This user doesn't have any role assignments on self.domain_id, so the
  792. # domain user of self.domain_id should only see an empty list of role
  793. # assignments.
  794. user_id = assignments['user_id']
  795. with self.test_client() as c:
  796. r = c.get(
  797. '/v3/role_assignments?user.id=%s' % user_id,
  798. headers=self.headers
  799. )
  800. self.assertEqual(0, len(r.json['role_assignments']))
  801. def test_user_cannot_filter_role_assignments_by_other_domain_group(self):
  802. assignments = self._setup_test_role_assignments()
  803. self._setup_test_role_assignments_for_domain()
  804. # This group doesn't have any role assignments on self.domain_id, so
  805. # the domain user of self.domain_id should only see an empty list of
  806. # role assignments.
  807. group_id = assignments['group_id']
  808. with self.test_client() as c:
  809. r = c.get(
  810. '/v3/role_assignments?group.id=%s' % group_id,
  811. headers=self.headers,
  812. )
  813. self.assertEqual(0, len(r.json['role_assignments']))
  814. class SystemReaderTests(base_classes.TestCaseWithBootstrap,
  815. common_auth.AuthTestMixin,
  816. _AssignmentTestUtilities,
  817. _SystemUserTests):
  818. def setUp(self):
  819. super(SystemReaderTests, self).setUp()
  820. self.loadapp()
  821. self.useFixture(ksfixtures.Policy(self.config_fixture))
  822. self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  823. system_reader = unit.new_user_ref(
  824. domain_id=CONF.identity.default_domain_id
  825. )
  826. self.user_id = PROVIDERS.identity_api.create_user(
  827. system_reader
  828. )['id']
  829. PROVIDERS.assignment_api.create_system_grant_for_user(
  830. self.user_id, self.bootstrapper.reader_role_id
  831. )
  832. self.expected = [
  833. # assignment of the user running the test case
  834. {
  835. 'user_id': self.user_id,
  836. 'system': 'all',
  837. 'role_id': self.bootstrapper.reader_role_id
  838. }
  839. ]
  840. auth = self.build_authentication_request(
  841. user_id=self.user_id, password=system_reader['password'],
  842. system=True
  843. )
  844. # Grab a token using the persona we're testing and prepare headers
  845. # for requests we'll be making in the tests.
  846. with self.test_client() as c:
  847. r = c.post('/v3/auth/tokens', json=auth)
  848. self.token_id = r.headers['X-Subject-Token']
  849. self.headers = {'X-Auth-Token': self.token_id}
  850. class SystemMemberTests(base_classes.TestCaseWithBootstrap,
  851. common_auth.AuthTestMixin,
  852. _AssignmentTestUtilities,
  853. _SystemUserTests):
  854. def setUp(self):
  855. super(SystemMemberTests, self).setUp()
  856. self.loadapp()
  857. self.useFixture(ksfixtures.Policy(self.config_fixture))
  858. self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  859. system_member = unit.new_user_ref(
  860. domain_id=CONF.identity.default_domain_id
  861. )
  862. self.user_id = PROVIDERS.identity_api.create_user(
  863. system_member
  864. )['id']
  865. PROVIDERS.assignment_api.create_system_grant_for_user(
  866. self.user_id, self.bootstrapper.member_role_id
  867. )
  868. self.expected = [
  869. # assignment of the user running the test case
  870. {
  871. 'user_id': self.user_id,
  872. 'system': 'all',
  873. 'role_id': self.bootstrapper.member_role_id
  874. }
  875. ]
  876. auth = self.build_authentication_request(
  877. user_id=self.user_id, password=system_member['password'],
  878. system=True
  879. )
  880. # Grab a token using the persona we're testing and prepare headers
  881. # for requests we'll be making in the tests.
  882. with self.test_client() as c:
  883. r = c.post('/v3/auth/tokens', json=auth)
  884. self.token_id = r.headers['X-Subject-Token']
  885. self.headers = {'X-Auth-Token': self.token_id}
  886. class SystemAdminTests(base_classes.TestCaseWithBootstrap,
  887. common_auth.AuthTestMixin,
  888. _AssignmentTestUtilities,
  889. _SystemUserTests):
  890. def setUp(self):
  891. super(SystemAdminTests, self).setUp()
  892. self.loadapp()
  893. self.useFixture(ksfixtures.Policy(self.config_fixture))
  894. self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  895. self.user_id = self.bootstrapper.admin_user_id
  896. self.expected = []
  897. auth = self.build_authentication_request(
  898. user_id=self.user_id, password=self.bootstrapper.admin_password,
  899. system=True
  900. )
  901. # Grab a token using the persona we're testing and prepare headers
  902. # for requests we'll be making in the tests.
  903. with self.test_client() as c:
  904. r = c.post('/v3/auth/tokens', json=auth)
  905. self.token_id = r.headers['X-Subject-Token']
  906. self.headers = {'X-Auth-Token': self.token_id}
  907. class DomainReaderTests(base_classes.TestCaseWithBootstrap,
  908. common_auth.AuthTestMixin,
  909. _AssignmentTestUtilities,
  910. _DomainUserTests):
  911. def setUp(self):
  912. super(DomainReaderTests, self).setUp()
  913. self.loadapp()
  914. self.useFixture(ksfixtures.Policy(self.config_fixture))
  915. self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  916. domain = PROVIDERS.resource_api.create_domain(
  917. uuid.uuid4().hex, unit.new_domain_ref()
  918. )
  919. self.domain_id = domain['id']
  920. domain_reader = unit.new_user_ref(domain_id=self.domain_id)
  921. self.user_id = PROVIDERS.identity_api.create_user(domain_reader)['id']
  922. PROVIDERS.assignment_api.create_grant(
  923. self.bootstrapper.reader_role_id, user_id=self.user_id,
  924. domain_id=self.domain_id
  925. )
  926. self.expected = [
  927. # assignment of the user running the test case
  928. {
  929. 'user_id': self.user_id,
  930. 'domain_id': self.domain_id,
  931. 'role_id': self.bootstrapper.reader_role_id
  932. }]
  933. auth = self.build_authentication_request(
  934. user_id=self.user_id, password=domain_reader['password'],
  935. domain_id=self.domain_id,
  936. )
  937. # Grab a token using the persona we're testing and prepare headers
  938. # for requests we'll be making in the tests.
  939. with self.test_client() as c:
  940. r = c.post('/v3/auth/tokens', json=auth)
  941. self.token_id = r.headers['X-Subject-Token']
  942. self.headers = {'X-Auth-Token': self.token_id}
  943. class DomainMemberTests(base_classes.TestCaseWithBootstrap,
  944. common_auth.AuthTestMixin,
  945. _AssignmentTestUtilities,
  946. _DomainUserTests):
  947. def setUp(self):
  948. super(DomainMemberTests, self).setUp()
  949. self.loadapp()
  950. self.useFixture(ksfixtures.Policy(self.config_fixture))
  951. self.config_fixture.config(group='oslo_policy', enforce_scope=True)
  952. domain = PROVIDERS.resource_api.create_domain(
  953. uuid.uuid4().hex, unit.new_domain_ref()
  954. )
  955. self.domain_id = domain['id']
  956. domain_user = unit.new_user_ref(domain_id=self.domain_id)
  957. self.user_id = PROVIDERS.identity_api.create_user(domain_user)['id']
  958. PROVIDERS.assignment_api.create_grant(
  959. self.bootstrapper.member_role_id, user_id=self.user_id,
  960. domain_id=self.domain_id
  961. )
  962. self.expected = [
  963. # assignment of the user running the test case
  964. {
  965. 'user_id': self.user_id,
  966. 'domain_id': self.domain_id,
  967. 'role_id': self.bootstrapper.member_role_id
  968. }]
  969. auth = self.build_authentication_request(
  970. user_id=self.user_id, password=domain_user['password'],
  971. domain_id=self.domain_id
  972. )
  973. # Grab a token using the persona we're testing and prepare headers
  974. # for requests we'll be making in the tests.
  975. with self.test_client() as c:
  976. r = c.post('/v3/auth/tokens', json=auth)
  977. self.token_id = r.headers['X-Subject-Token']
  978. self.headers = {'X-Auth-Token': self.token_id}