OpenStack Telemetry (Ceilometer)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

test_rpc_publisher.py 12KB


  1. # -*- encoding: utf-8 -*-
  2. #
  3. # Copyright © 2012 New Dream Network, LLC (DreamHost)
  4. #
  5. # Author: Doug Hellmann <doug.hellmann@dreamhost.com>
  6. # Julien Danjou <julien@danjou.info>
  7. #
  8. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  9. # not use this file except in compliance with the License. You may obtain
  10. # a copy of the License at
  11. #
  12. # http://www.apache.org/licenses/LICENSE-2.0
  13. #
  14. # Unless required by applicable law or agreed to in writing, software
  15. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  16. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  17. # License for the specific language governing permissions and limitations
  18. # under the License.
  19. """Tests for ceilometer/publisher/rpc.py
  20. """
  21. import datetime
  22. import eventlet
  23. import fixtures
  24. import mock
  25. from ceilometer.openstack.common.fixture import config
  26. from ceilometer.openstack.common import network_utils
  27. from ceilometer.openstack.common import test
  28. from ceilometer.publisher import rpc
  29. from ceilometer import sample
  30. class TestPublish(test.BaseTestCase):
  31. test_data = [
  32. sample.Sample(
  33. name='test',
  34. type=sample.TYPE_CUMULATIVE,
  35. unit='',
  36. volume=1,
  37. user_id='test',
  38. project_id='test',
  39. resource_id='test_run_tasks',
  40. timestamp=datetime.datetime.utcnow().isoformat(),
  41. resource_metadata={'name': 'TestPublish'},
  42. ),
  43. sample.Sample(
  44. name='test',
  45. type=sample.TYPE_CUMULATIVE,
  46. unit='',
  47. volume=1,
  48. user_id='test',
  49. project_id='test',
  50. resource_id='test_run_tasks',
  51. timestamp=datetime.datetime.utcnow().isoformat(),
  52. resource_metadata={'name': 'TestPublish'},
  53. ),
  54. sample.Sample(
  55. name='test2',
  56. type=sample.TYPE_CUMULATIVE,
  57. unit='',
  58. volume=1,
  59. user_id='test',
  60. project_id='test',
  61. resource_id='test_run_tasks',
  62. timestamp=datetime.datetime.utcnow().isoformat(),
  63. resource_metadata={'name': 'TestPublish'},
  64. ),
  65. sample.Sample(
  66. name='test2',
  67. type=sample.TYPE_CUMULATIVE,
  68. unit='',
  69. volume=1,
  70. user_id='test',
  71. project_id='test',
  72. resource_id='test_run_tasks',
  73. timestamp=datetime.datetime.utcnow().isoformat(),
  74. resource_metadata={'name': 'TestPublish'},
  75. ),
  76. sample.Sample(
  77. name='test3',
  78. type=sample.TYPE_CUMULATIVE,
  79. unit='',
  80. volume=1,
  81. user_id='test',
  82. project_id='test',
  83. resource_id='test_run_tasks',
  84. timestamp=datetime.datetime.utcnow().isoformat(),
  85. resource_metadata={'name': 'TestPublish'},
  86. ),
  87. ]
  88. def faux_cast(self, context, topic, msg):
  89. if self.rpc_unreachable:
  90. #note(sileht): Ugly, but when rabbitmq is unreachable
  91. # and rabbitmq_max_retries is not 0
  92. # oslo.rpc do a sys.exit(1), so we do the same
  93. # things here until this is fixed in oslo
  94. raise SystemExit(1)
  95. else:
  96. self.published.append((topic, msg))
  97. def setUp(self):
  98. super(TestPublish, self).setUp()
  99. self.CONF = self.useFixture(config.Config()).conf
  100. self.published = []
  101. self.rpc_unreachable = False
  102. self.useFixture(fixtures.MonkeyPatch(
  103. "ceilometer.openstack.common.rpc.cast",
  104. self.faux_cast))
  105. def test_published(self):
  106. publisher = rpc.RPCPublisher(
  107. network_utils.urlsplit('rpc://'))
  108. publisher.publish_samples(None,
  109. self.test_data)
  110. self.assertEqual(len(self.published), 1)
  111. self.assertEqual(self.published[0][0],
  112. self.CONF.publisher_rpc.metering_topic)
  113. self.assertIsInstance(self.published[0][1]['args']['data'], list)
  114. self.assertEqual(self.published[0][1]['method'],
  115. 'record_metering_data')
  116. def test_publish_target(self):
  117. publisher = rpc.RPCPublisher(
  118. network_utils.urlsplit('rpc://?target=custom_procedure_call'))
  119. publisher.publish_samples(None,
  120. self.test_data)
  121. self.assertEqual(len(self.published), 1)
  122. self.assertEqual(self.published[0][0],
  123. self.CONF.publisher_rpc.metering_topic)
  124. self.assertIsInstance(self.published[0][1]['args']['data'], list)
  125. self.assertEqual(self.published[0][1]['method'],
  126. 'custom_procedure_call')
  127. def test_published_with_per_meter_topic(self):
  128. publisher = rpc.RPCPublisher(
  129. network_utils.urlsplit('rpc://?per_meter_topic=1'))
  130. publisher.publish_samples(None,
  131. self.test_data)
  132. self.assertEqual(len(self.published), 4)
  133. for topic, rpc_call in self.published:
  134. meters = rpc_call['args']['data']
  135. self.assertIsInstance(meters, list)
  136. if topic != self.CONF.publisher_rpc.metering_topic:
  137. self.assertEqual(len(set(meter['counter_name']
  138. for meter in meters)),
  139. 1,
  140. "Meter are published grouped by name")
  141. topics = [topic for topic, meter in self.published]
  142. self.assertIn(self.CONF.publisher_rpc.metering_topic, topics)
  143. self.assertIn(
  144. self.CONF.publisher_rpc.metering_topic + '.' + 'test', topics)
  145. self.assertIn(
  146. self.CONF.publisher_rpc.metering_topic + '.' + 'test2', topics)
  147. self.assertIn(
  148. self.CONF.publisher_rpc.metering_topic + '.' + 'test3', topics)
  149. def test_published_concurrency(self):
  150. """This test the concurrent access to the local queue
  151. of the rpc publisher
  152. """
  153. def faux_cast_go(context, topic, msg):
  154. self.published.append((topic, msg))
  155. def faux_cast_wait(context, topic, msg):
  156. self.useFixture(fixtures.MonkeyPatch(
  157. "ceilometer.openstack.common.rpc.cast",
  158. faux_cast_go))
  159. # Sleep to simulate concurrency and allow other threads to work
  160. eventlet.sleep(0)
  161. self.published.append((topic, msg))
  162. self.useFixture(fixtures.MonkeyPatch(
  163. "ceilometer.openstack.common.rpc.cast",
  164. faux_cast_wait))
  165. publisher = rpc.RPCPublisher(network_utils.urlsplit('rpc://'))
  166. job1 = eventlet.spawn(publisher.publish_samples, None, self.test_data)
  167. job2 = eventlet.spawn(publisher.publish_samples, None, self.test_data)
  168. job1.wait()
  169. job2.wait()
  170. self.assertEqual(publisher.policy, 'default')
  171. self.assertEqual(len(self.published), 2)
  172. self.assertEqual(len(publisher.local_queue), 0)
  173. @mock.patch('ceilometer.publisher.rpc.LOG')
  174. def test_published_with_no_policy(self, mylog):
  175. self.rpc_unreachable = True
  176. publisher = rpc.RPCPublisher(
  177. network_utils.urlsplit('rpc://'))
  178. self.assertTrue(mylog.info.called)
  179. self.assertRaises(
  180. SystemExit,
  181. publisher.publish_samples,
  182. None, self.test_data)
  183. self.assertEqual(publisher.policy, 'default')
  184. self.assertEqual(len(self.published), 0)
  185. self.assertEqual(len(publisher.local_queue), 0)
  186. @mock.patch('ceilometer.publisher.rpc.LOG')
  187. def test_published_with_policy_block(self, mylog):
  188. self.rpc_unreachable = True
  189. publisher = rpc.RPCPublisher(
  190. network_utils.urlsplit('rpc://?policy=default'))
  191. self.assertTrue(mylog.info.called)
  192. self.assertRaises(
  193. SystemExit,
  194. publisher.publish_samples,
  195. None, self.test_data)
  196. self.assertEqual(len(self.published), 0)
  197. self.assertEqual(len(publisher.local_queue), 0)
  198. @mock.patch('ceilometer.publisher.rpc.LOG')
  199. def test_published_with_policy_incorrect(self, mylog):
  200. self.rpc_unreachable = True
  201. publisher = rpc.RPCPublisher(
  202. network_utils.urlsplit('rpc://?policy=notexist'))
  203. self.assertRaises(
  204. SystemExit,
  205. publisher.publish_samples,
  206. None, self.test_data)
  207. self.assertTrue(mylog.warn.called)
  208. self.assertEqual(publisher.policy, 'default')
  209. self.assertEqual(len(self.published), 0)
  210. self.assertEqual(len(publisher.local_queue), 0)
  211. def test_published_with_policy_drop_and_rpc_down(self):
  212. self.rpc_unreachable = True
  213. publisher = rpc.RPCPublisher(
  214. network_utils.urlsplit('rpc://?policy=drop'))
  215. publisher.publish_samples(None,
  216. self.test_data)
  217. self.assertEqual(len(self.published), 0)
  218. self.assertEqual(len(publisher.local_queue), 0)
  219. def test_published_with_policy_queue_and_rpc_down(self):
  220. self.rpc_unreachable = True
  221. publisher = rpc.RPCPublisher(
  222. network_utils.urlsplit('rpc://?policy=queue'))
  223. publisher.publish_samples(None,
  224. self.test_data)
  225. self.assertEqual(len(self.published), 0)
  226. self.assertEqual(len(publisher.local_queue), 1)
  227. def test_published_with_policy_queue_and_rpc_down_up(self):
  228. self.rpc_unreachable = True
  229. publisher = rpc.RPCPublisher(
  230. network_utils.urlsplit('rpc://?policy=queue'))
  231. publisher.publish_samples(None,
  232. self.test_data)
  233. self.assertEqual(len(self.published), 0)
  234. self.assertEqual(len(publisher.local_queue), 1)
  235. self.rpc_unreachable = False
  236. publisher.publish_samples(None,
  237. self.test_data)
  238. self.assertEqual(len(self.published), 2)
  239. self.assertEqual(len(publisher.local_queue), 0)
  240. def test_published_with_policy_sized_queue_and_rpc_down(self):
  241. self.rpc_unreachable = True
  242. publisher = rpc.RPCPublisher(
  243. network_utils.urlsplit('rpc://?policy=queue&max_queue_length=3'))
  244. for i in range(5):
  245. for s in self.test_data:
  246. s.source = 'test-%d' % i
  247. publisher.publish_samples(None,
  248. self.test_data)
  249. self.assertEqual(len(self.published), 0)
  250. self.assertEqual(len(publisher.local_queue), 3)
  251. self.assertEqual(
  252. publisher.local_queue[0][2]['args']['data'][0]['source'],
  253. 'test-2'
  254. )
  255. self.assertEqual(
  256. publisher.local_queue[1][2]['args']['data'][0]['source'],
  257. 'test-3'
  258. )
  259. self.assertEqual(
  260. publisher.local_queue[2][2]['args']['data'][0]['source'],
  261. 'test-4'
  262. )
  263. def test_published_with_policy_default_sized_queue_and_rpc_down(self):
  264. self.rpc_unreachable = True
  265. publisher = rpc.RPCPublisher(
  266. network_utils.urlsplit('rpc://?policy=queue'))
  267. for i in range(2000):
  268. for s in self.test_data:
  269. s.source = 'test-%d' % i
  270. publisher.publish_samples(None,
  271. self.test_data)
  272. self.assertEqual(len(self.published), 0)
  273. self.assertEqual(len(publisher.local_queue), 1024)
  274. self.assertEqual(
  275. publisher.local_queue[0][2]['args']['data'][0]['source'],
  276. 'test-976'
  277. )
  278. self.assertEqual(
  279. publisher.local_queue[1023][2]['args']['data'][0]['source'],
  280. 'test-1999'
  281. )