Add MySQL driver

Change-Id: Ia10be91f8985d4e2159437ad94096a690ee84dc3
This commit is contained in:
Julien Danjou 2014-11-14 12:39:19 +01:00
parent 33f5d3f497
commit 8deae0f9f7
6 changed files with 178 additions and 0 deletions

37
setup-mysql-env.sh Executable file
View File

@ -0,0 +1,37 @@
#!/bin/bash
set -x -e
. functions.sh
clean_exit () {
local error_code="$?"
kill $(jobs -p)
rm -rf ${MYSQL_DATA}
return $error_code
}
wait_for_line () {
while read line
do
echo "$line" | grep -q "$1" && break
done < "$2"
# Read the fifo for ever otherwise process would block
cat "$2" >/dev/null &
}
trap "clean_exit" EXIT
# On systems like Fedora here's where mysqld can be found
export PATH=$PATH:/usr/libexec
# Start MySQL process for tests
MYSQL_DATA=`mktemp -d /tmp/tooz-mysql-XXXXX`
mkfifo ${MYSQL_DATA}/out
mysqld --datadir=${MYSQL_DATA} --pid-file=${MYSQL_DATA}/mysql.pid --socket=${MYSQL_DATA}/mysql.socket --skip-networking --skip-grant-tables &> ${MYSQL_DATA}/out &
# Wait for MySQL to start listening to connections
wait_for_line "mysqld: ready for connections." ${MYSQL_DATA}/out
mysql -S ${MYSQL_DATA}/mysql.socket -e 'CREATE DATABASE test;'
export TOOZ_TEST_MYSQL_URL="mysql://root@localhost/test?unix_socket=${MYSQL_DATA}/mysql.socket"
# Yield execution to venv command
$*

View File

@ -31,6 +31,7 @@ tooz.backends =
ipc = tooz.drivers.ipc:IPCDriver
redis = tooz.drivers.redis:RedisDriver
postgresql = tooz.drivers.pgsql:PostgresDriver
mysql = tooz.drivers.mysql:MySQLDriver
[build_sphinx]
all_files = 1

View File

@ -10,3 +10,4 @@ testscenarios>=0.4
coverage>=3.6
sysv_ipc>=0.6.8
psycopg2
pymysql

126
tooz/drivers/mysql.py Normal file
View File

@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
#
# Copyright © 2014 eNovance
#
# Author: Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pymysql
import tooz
from tooz import coordination
from tooz.drivers import _retry
from tooz import locking
from tooz import utils
class MySQLLock(locking.Lock):
"""A MySQL based lock."""
def __init__(self, name, connection):
super(MySQLLock, self).__init__(name)
self._conn = connection
def acquire(self, blocking=True):
if blocking is False:
try:
cur = self._conn.cursor()
cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
# Can return NULL on error
if cur.fetchone()[0] is 1:
return True
return False
except pymysql.MySQLError as e:
raise coordination.ToozError(utils.exception_message(e))
else:
def _acquire():
try:
cur = self._conn.cursor()
cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
if cur.fetchone()[0] is 1:
return True
except pymysql.MySQLError as e:
raise coordination.ToozError(utils.exception_message(e))
raise _retry.Retry
kwargs = _retry.RETRYING_KWARGS.copy()
if blocking is not True:
kwargs['stop_max_delay'] = blocking
return _retry.Retrying(**kwargs).call(_acquire)
def release(self):
try:
cur = self._conn.cursor()
cur.execute("SELECT RELEASE_LOCK(%s);", self.name)
return cur.fetchone()[0]
except pymysql.MySQLError as e:
raise coordination.ToozError(utils.exception_message(e))
class MySQLDriver(coordination.CoordinationDriver):
def __init__(self, member_id, parsed_url, options):
"""Initialize the MySQL driver."""
super(MySQLDriver, self).__init__()
self._host = parsed_url.netloc
self._port = parsed_url.port
self._dbname = parsed_url.path[1:]
self._username = parsed_url.username
self._password = parsed_url.password
self._unix_socket = options.get("unix_socket", [None])[-1]
def _start(self):
try:
if self._unix_socket:
self._conn = pymysql.Connect(unix_socket=self._unix_socket,
port=self._port,
user=self._username,
passwd=self._password,
database=self._dbname)
else:
self._conn = pymysql.Connect(host=self._host,
port=self._port,
user=self._username,
passwd=self._password,
database=self._dbname)
except pymysql.err.OperationalError as e:
raise coordination.ToozConnectionError(utils.exception_message(e))
def _stop(self):
self._conn.close()
def get_lock(self, name):
return MySQLLock(name, self._conn)
@staticmethod
def watch_join_group(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def unwatch_join_group(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def watch_leave_group(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def unwatch_leave_group(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented

View File

@ -35,6 +35,8 @@ class TestAPI(testscenarios.TestWithScenarios,
('ipc', {'url': 'ipc://'}),
('redis', {'url': 'redis://localhost:6379?timeout=5'}),
('postgresql', {'url': os.getenv("TOOZ_TEST_PGSQL_URL")}),
('mysql', {'url': os.getenv("TOOZ_TEST_MYSQL_URL"),
'bad_url': 'mysql://localhost:1'}),
]
# Only certain drivers have the tested support for timeouts that we test
@ -78,6 +80,14 @@ class TestAPI(testscenarios.TestWithScenarios,
self._coord.stop()
super(TestAPI, self).tearDown()
def test_connection_error(self):
if not hasattr(self, "bad_url"):
raise testcase.TestSkipped("No bad URL provided")
coord = tooz.coordination.get_coordinator(self.bad_url,
self.member_id)
self.assertRaises(tooz.coordination.ToozConnectionError,
coord.start)
def test_stop_first(self):
c = tooz.coordination.get_coordinator(self.url,
self.member_id)

View File

@ -82,6 +82,9 @@ deps = {[testenv:py34]deps}
basepython = python3.4
commands = {toxinidir}/setup-postgresql-env.sh python setup.py testr --slowest --testr-args="{posargs}"
[testenv:py27-mysql]
commands = {toxinidir}/setup-mysql-env.sh python setup.py testr --slowest --testr-args="{posargs}"
[testenv:cover]
commands = python setup.py testr --slowest --coverage --testr-args="{posargs}"