Coordinate distributed systems.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mysql.py 6.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. # -*- coding: utf-8 -*-
  2. #
  3. # Copyright © 2014 eNovance
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  6. # not use this file except in compliance with the License. You may obtain
  7. # a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  13. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14. # License for the specific language governing permissions and limitations
  15. # under the License.
  16. import logging
  17. from oslo_utils import encodeutils
  18. import pymysql
  19. import tooz
  20. from tooz import _retry
  21. from tooz import coordination
  22. from tooz import locking
  23. from tooz import utils
  24. LOG = logging.getLogger(__name__)
  25. class MySQLLock(locking.Lock):
  26. """A MySQL based lock."""
  27. MYSQL_DEFAULT_PORT = 3306
  28. def __init__(self, name, parsed_url, options):
  29. super(MySQLLock, self).__init__(name)
  30. self.acquired = False
  31. self._conn = MySQLDriver.get_connection(parsed_url, options, True)
  32. def acquire(self, blocking=True, shared=False):
  33. if shared:
  34. raise tooz.NotImplemented
  35. @_retry.retry(stop_max_delay=blocking)
  36. def _lock():
  37. # NOTE(sileht): mysql-server (<5.7.5) allows only one lock per
  38. # connection at a time:
  39. # select GET_LOCK("a", 0);
  40. # select GET_LOCK("b", 0); <-- this release lock "a" ...
  41. # Or
  42. # select GET_LOCK("a", 0);
  43. # select GET_LOCK("a", 0); release and lock again "a"
  44. #
  45. # So, we track locally the lock status with self.acquired
  46. if self.acquired is True:
  47. if blocking:
  48. raise _retry.TryAgain
  49. return False
  50. try:
  51. if not self._conn.open:
  52. self._conn.connect()
  53. with self._conn as cur:
  54. cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
  55. # Can return NULL on error
  56. if cur.fetchone()[0] is 1:
  57. self.acquired = True
  58. return True
  59. except pymysql.MySQLError as e:
  60. utils.raise_with_cause(
  61. tooz.ToozError,
  62. encodeutils.exception_to_unicode(e),
  63. cause=e)
  64. if blocking:
  65. raise _retry.TryAgain
  66. self._conn.close()
  67. return False
  68. try:
  69. return _lock()
  70. except Exception:
  71. # Close the connection if we tried too much and finally failed, or
  72. # anything else bad happened.
  73. self._conn.close()
  74. raise
  75. def release(self):
  76. if not self.acquired:
  77. return False
  78. try:
  79. with self._conn as cur:
  80. cur.execute("SELECT RELEASE_LOCK(%s);", self.name)
  81. cur.fetchone()
  82. self.acquired = False
  83. self._conn.close()
  84. return True
  85. except pymysql.MySQLError as e:
  86. utils.raise_with_cause(tooz.ToozError,
  87. encodeutils.exception_to_unicode(e),
  88. cause=e)
  89. def __del__(self):
  90. if self.acquired:
  91. LOG.warning("unreleased lock %s garbage collected", self.name)
  92. class MySQLDriver(coordination.CoordinationDriver):
  93. """A `MySQL`_ based driver.
  94. This driver users `MySQL`_ database tables to
  95. provide the coordination driver semantics and required API(s). It **is**
  96. missing some functionality but in the future these not implemented API(s)
  97. will be filled in.
  98. .. _MySQL: http://dev.mysql.com/
  99. """
  100. CHARACTERISTICS = (
  101. coordination.Characteristics.NON_TIMEOUT_BASED,
  102. coordination.Characteristics.DISTRIBUTED_ACROSS_THREADS,
  103. coordination.Characteristics.DISTRIBUTED_ACROSS_PROCESSES,
  104. coordination.Characteristics.DISTRIBUTED_ACROSS_HOSTS,
  105. )
  106. """
  107. Tuple of :py:class:`~tooz.coordination.Characteristics` introspectable
  108. enum member(s) that can be used to interogate how this driver works.
  109. """
  110. def __init__(self, member_id, parsed_url, options):
  111. """Initialize the MySQL driver."""
  112. super(MySQLDriver, self).__init__(member_id)
  113. self._parsed_url = parsed_url
  114. self._options = utils.collapse(options)
  115. def _start(self):
  116. self._conn = MySQLDriver.get_connection(self._parsed_url,
  117. self._options)
  118. def _stop(self):
  119. self._conn.close()
  120. def get_lock(self, name):
  121. return MySQLLock(name, self._parsed_url, self._options)
  122. @staticmethod
  123. def watch_join_group(group_id, callback):
  124. raise tooz.NotImplemented
  125. @staticmethod
  126. def unwatch_join_group(group_id, callback):
  127. raise tooz.NotImplemented
  128. @staticmethod
  129. def watch_leave_group(group_id, callback):
  130. raise tooz.NotImplemented
  131. @staticmethod
  132. def unwatch_leave_group(group_id, callback):
  133. raise tooz.NotImplemented
  134. @staticmethod
  135. def watch_elected_as_leader(group_id, callback):
  136. raise tooz.NotImplemented
  137. @staticmethod
  138. def unwatch_elected_as_leader(group_id, callback):
  139. raise tooz.NotImplemented
  140. @staticmethod
  141. def get_connection(parsed_url, options, defer_connect=False):
  142. host = parsed_url.hostname
  143. port = parsed_url.port or MySQLLock.MYSQL_DEFAULT_PORT
  144. dbname = parsed_url.path[1:]
  145. username = parsed_url.username
  146. password = parsed_url.password
  147. unix_socket = options.get("unix_socket")
  148. try:
  149. if unix_socket:
  150. return pymysql.Connect(unix_socket=unix_socket,
  151. port=port,
  152. user=username,
  153. passwd=password,
  154. database=dbname,
  155. defer_connect=defer_connect)
  156. else:
  157. return pymysql.Connect(host=host,
  158. port=port,
  159. user=username,
  160. passwd=password,
  161. database=dbname,
  162. defer_connect=defer_connect)
  163. except (pymysql.err.OperationalError, pymysql.err.InternalError) as e:
  164. utils.raise_with_cause(coordination.ToozConnectionError,
  165. encodeutils.exception_to_unicode(e),
  166. cause=e)