diff --git a/setup-mysql-env.sh b/setup-mysql-env.sh new file mode 100755 index 00000000..2a8b9bf6 --- /dev/null +++ b/setup-mysql-env.sh @@ -0,0 +1,37 @@ +#!/bin/bash +set -x -e + +. functions.sh + +clean_exit () { + local error_code="$?" + kill $(jobs -p) + rm -rf ${MYSQL_DATA} + return $error_code +} + +wait_for_line () { + while read line + do + echo "$line" | grep -q "$1" && break + done < "$2" + # Read the fifo for ever otherwise process would block + cat "$2" >/dev/null & +} + +trap "clean_exit" EXIT + +# On systems like Fedora here's where mysqld can be found +export PATH=$PATH:/usr/libexec + +# Start MySQL process for tests +MYSQL_DATA=`mktemp -d /tmp/tooz-mysql-XXXXX` +mkfifo ${MYSQL_DATA}/out +mysqld --datadir=${MYSQL_DATA} --pid-file=${MYSQL_DATA}/mysql.pid --socket=${MYSQL_DATA}/mysql.socket --skip-networking --skip-grant-tables &> ${MYSQL_DATA}/out & +# Wait for MySQL to start listening to connections +wait_for_line "mysqld: ready for connections." ${MYSQL_DATA}/out +mysql -S ${MYSQL_DATA}/mysql.socket -e 'CREATE DATABASE test;' +export TOOZ_TEST_MYSQL_URL="mysql://root@localhost/test?unix_socket=${MYSQL_DATA}/mysql.socket" + +# Yield execution to venv command +$* diff --git a/setup.cfg b/setup.cfg index a1661087..5221a3b9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,6 +31,7 @@ tooz.backends = ipc = tooz.drivers.ipc:IPCDriver redis = tooz.drivers.redis:RedisDriver postgresql = tooz.drivers.pgsql:PostgresDriver + mysql = tooz.drivers.mysql:MySQLDriver [build_sphinx] all_files = 1 diff --git a/test-requirements.txt b/test-requirements.txt index 97c53f68..a7b8d480 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -10,3 +10,4 @@ testscenarios>=0.4 coverage>=3.6 sysv_ipc>=0.6.8 psycopg2 +pymysql diff --git a/tooz/drivers/mysql.py b/tooz/drivers/mysql.py new file mode 100644 index 00000000..c517a0aa --- /dev/null +++ b/tooz/drivers/mysql.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# +# Copyright © 2014 eNovance +# +# Author: Julien Danjou +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import pymysql + +import tooz +from tooz import coordination +from tooz.drivers import _retry +from tooz import locking +from tooz import utils + + +class MySQLLock(locking.Lock): + """A MySQL based lock.""" + + def __init__(self, name, connection): + super(MySQLLock, self).__init__(name) + self._conn = connection + + def acquire(self, blocking=True): + if blocking is False: + try: + cur = self._conn.cursor() + cur.execute("SELECT GET_LOCK(%s, 0);", self.name) + # Can return NULL on error + if cur.fetchone()[0] is 1: + return True + return False + except pymysql.MySQLError as e: + raise coordination.ToozError(utils.exception_message(e)) + else: + def _acquire(): + try: + cur = self._conn.cursor() + cur.execute("SELECT GET_LOCK(%s, 0);", self.name) + if cur.fetchone()[0] is 1: + return True + except pymysql.MySQLError as e: + raise coordination.ToozError(utils.exception_message(e)) + raise _retry.Retry + kwargs = _retry.RETRYING_KWARGS.copy() + if blocking is not True: + kwargs['stop_max_delay'] = blocking + return _retry.Retrying(**kwargs).call(_acquire) + + def release(self): + try: + cur = self._conn.cursor() + cur.execute("SELECT RELEASE_LOCK(%s);", self.name) + return cur.fetchone()[0] + except pymysql.MySQLError as e: + raise coordination.ToozError(utils.exception_message(e)) + + +class MySQLDriver(coordination.CoordinationDriver): + + def __init__(self, member_id, parsed_url, options): + """Initialize the MySQL driver.""" + super(MySQLDriver, self).__init__() + self._host = parsed_url.netloc + self._port = parsed_url.port + self._dbname = parsed_url.path[1:] + self._username = parsed_url.username + self._password = parsed_url.password + self._unix_socket = options.get("unix_socket", [None])[-1] + + def _start(self): + try: + if self._unix_socket: + self._conn = pymysql.Connect(unix_socket=self._unix_socket, + port=self._port, + user=self._username, + passwd=self._password, + database=self._dbname) + else: + self._conn = pymysql.Connect(host=self._host, + port=self._port, + user=self._username, + passwd=self._password, + database=self._dbname) + except pymysql.err.OperationalError as e: + raise coordination.ToozConnectionError(utils.exception_message(e)) + + def _stop(self): + self._conn.close() + + def get_lock(self, name): + return MySQLLock(name, self._conn) + + @staticmethod + def watch_join_group(group_id, callback): + raise tooz.NotImplemented + + @staticmethod + def unwatch_join_group(group_id, callback): + raise tooz.NotImplemented + + @staticmethod + def watch_leave_group(group_id, callback): + raise tooz.NotImplemented + + @staticmethod + def unwatch_leave_group(group_id, callback): + raise tooz.NotImplemented + + @staticmethod + def watch_elected_as_leader(group_id, callback): + raise tooz.NotImplemented + + @staticmethod + def unwatch_elected_as_leader(group_id, callback): + raise tooz.NotImplemented diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index ace3093e..2f464e5a 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -35,6 +35,8 @@ class TestAPI(testscenarios.TestWithScenarios, ('ipc', {'url': 'ipc://'}), ('redis', {'url': 'redis://localhost:6379?timeout=5'}), ('postgresql', {'url': os.getenv("TOOZ_TEST_PGSQL_URL")}), + ('mysql', {'url': os.getenv("TOOZ_TEST_MYSQL_URL"), + 'bad_url': 'mysql://localhost:1'}), ] # Only certain drivers have the tested support for timeouts that we test @@ -78,6 +80,14 @@ class TestAPI(testscenarios.TestWithScenarios, self._coord.stop() super(TestAPI, self).tearDown() + def test_connection_error(self): + if not hasattr(self, "bad_url"): + raise testcase.TestSkipped("No bad URL provided") + coord = tooz.coordination.get_coordinator(self.bad_url, + self.member_id) + self.assertRaises(tooz.coordination.ToozConnectionError, + coord.start) + def test_stop_first(self): c = tooz.coordination.get_coordinator(self.url, self.member_id) diff --git a/tox.ini b/tox.ini index 757f3da6..1c503148 100644 --- a/tox.ini +++ b/tox.ini @@ -82,6 +82,9 @@ deps = {[testenv:py34]deps} basepython = python3.4 commands = {toxinidir}/setup-postgresql-env.sh python setup.py testr --slowest --testr-args="{posargs}" +[testenv:py27-mysql] +commands = {toxinidir}/setup-mysql-env.sh python setup.py testr --slowest --testr-args="{posargs}" + [testenv:cover] commands = python setup.py testr --slowest --coverage --testr-args="{posargs}"