Merge "Add MySQL driver"
This commit is contained in:
commit
2d0be7c9ff
37
setup-mysql-env.sh
Executable file
37
setup-mysql-env.sh
Executable file
@ -0,0 +1,37 @@
|
||||
#!/bin/bash
|
||||
set -x -e
|
||||
|
||||
. functions.sh
|
||||
|
||||
clean_exit () {
|
||||
local error_code="$?"
|
||||
kill $(jobs -p)
|
||||
rm -rf ${MYSQL_DATA}
|
||||
return $error_code
|
||||
}
|
||||
|
||||
wait_for_line () {
|
||||
while read line
|
||||
do
|
||||
echo "$line" | grep -q "$1" && break
|
||||
done < "$2"
|
||||
# Read the fifo for ever otherwise process would block
|
||||
cat "$2" >/dev/null &
|
||||
}
|
||||
|
||||
trap "clean_exit" EXIT
|
||||
|
||||
# On systems like Fedora here's where mysqld can be found
|
||||
export PATH=$PATH:/usr/libexec
|
||||
|
||||
# Start MySQL process for tests
|
||||
MYSQL_DATA=`mktemp -d /tmp/tooz-mysql-XXXXX`
|
||||
mkfifo ${MYSQL_DATA}/out
|
||||
mysqld --datadir=${MYSQL_DATA} --pid-file=${MYSQL_DATA}/mysql.pid --socket=${MYSQL_DATA}/mysql.socket --skip-networking --skip-grant-tables &> ${MYSQL_DATA}/out &
|
||||
# Wait for MySQL to start listening to connections
|
||||
wait_for_line "mysqld: ready for connections." ${MYSQL_DATA}/out
|
||||
mysql -S ${MYSQL_DATA}/mysql.socket -e 'CREATE DATABASE test;'
|
||||
export TOOZ_TEST_MYSQL_URL="mysql://root@localhost/test?unix_socket=${MYSQL_DATA}/mysql.socket"
|
||||
|
||||
# Yield execution to venv command
|
||||
$*
|
@ -31,6 +31,7 @@ tooz.backends =
|
||||
ipc = tooz.drivers.ipc:IPCDriver
|
||||
redis = tooz.drivers.redis:RedisDriver
|
||||
postgresql = tooz.drivers.pgsql:PostgresDriver
|
||||
mysql = tooz.drivers.mysql:MySQLDriver
|
||||
|
||||
[build_sphinx]
|
||||
all_files = 1
|
||||
|
@ -10,3 +10,4 @@ testscenarios>=0.4
|
||||
coverage>=3.6
|
||||
sysv_ipc>=0.6.8
|
||||
psycopg2
|
||||
pymysql
|
||||
|
126
tooz/drivers/mysql.py
Normal file
126
tooz/drivers/mysql.py
Normal file
@ -0,0 +1,126 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2014 eNovance
|
||||
#
|
||||
# Author: Julien Danjou <julien@danjou.info>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import pymysql
|
||||
|
||||
import tooz
|
||||
from tooz import coordination
|
||||
from tooz.drivers import _retry
|
||||
from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
|
||||
class MySQLLock(locking.Lock):
|
||||
"""A MySQL based lock."""
|
||||
|
||||
def __init__(self, name, connection):
|
||||
super(MySQLLock, self).__init__(name)
|
||||
self._conn = connection
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
if blocking is False:
|
||||
try:
|
||||
cur = self._conn.cursor()
|
||||
cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
|
||||
# Can return NULL on error
|
||||
if cur.fetchone()[0] is 1:
|
||||
return True
|
||||
return False
|
||||
except pymysql.MySQLError as e:
|
||||
raise coordination.ToozError(utils.exception_message(e))
|
||||
else:
|
||||
def _acquire():
|
||||
try:
|
||||
cur = self._conn.cursor()
|
||||
cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
|
||||
if cur.fetchone()[0] is 1:
|
||||
return True
|
||||
except pymysql.MySQLError as e:
|
||||
raise coordination.ToozError(utils.exception_message(e))
|
||||
raise _retry.Retry
|
||||
kwargs = _retry.RETRYING_KWARGS.copy()
|
||||
if blocking is not True:
|
||||
kwargs['stop_max_delay'] = blocking
|
||||
return _retry.Retrying(**kwargs).call(_acquire)
|
||||
|
||||
def release(self):
|
||||
try:
|
||||
cur = self._conn.cursor()
|
||||
cur.execute("SELECT RELEASE_LOCK(%s);", self.name)
|
||||
return cur.fetchone()[0]
|
||||
except pymysql.MySQLError as e:
|
||||
raise coordination.ToozError(utils.exception_message(e))
|
||||
|
||||
|
||||
class MySQLDriver(coordination.CoordinationDriver):
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
"""Initialize the MySQL driver."""
|
||||
super(MySQLDriver, self).__init__()
|
||||
self._host = parsed_url.netloc
|
||||
self._port = parsed_url.port
|
||||
self._dbname = parsed_url.path[1:]
|
||||
self._username = parsed_url.username
|
||||
self._password = parsed_url.password
|
||||
self._unix_socket = options.get("unix_socket", [None])[-1]
|
||||
|
||||
def _start(self):
|
||||
try:
|
||||
if self._unix_socket:
|
||||
self._conn = pymysql.Connect(unix_socket=self._unix_socket,
|
||||
port=self._port,
|
||||
user=self._username,
|
||||
passwd=self._password,
|
||||
database=self._dbname)
|
||||
else:
|
||||
self._conn = pymysql.Connect(host=self._host,
|
||||
port=self._port,
|
||||
user=self._username,
|
||||
passwd=self._password,
|
||||
database=self._dbname)
|
||||
except pymysql.err.OperationalError as e:
|
||||
raise coordination.ToozConnectionError(utils.exception_message(e))
|
||||
|
||||
def _stop(self):
|
||||
self._conn.close()
|
||||
|
||||
def get_lock(self, name):
|
||||
return MySQLLock(name, self._conn)
|
||||
|
||||
@staticmethod
|
||||
def watch_join_group(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
@staticmethod
|
||||
def unwatch_join_group(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
@staticmethod
|
||||
def watch_leave_group(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
@staticmethod
|
||||
def unwatch_leave_group(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
@staticmethod
|
||||
def watch_elected_as_leader(group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
@staticmethod
|
||||
def unwatch_elected_as_leader(group_id, callback):
|
||||
raise tooz.NotImplemented
|
@ -35,6 +35,8 @@ class TestAPI(testscenarios.TestWithScenarios,
|
||||
('ipc', {'url': 'ipc://'}),
|
||||
('redis', {'url': 'redis://localhost:6379?timeout=5'}),
|
||||
('postgresql', {'url': os.getenv("TOOZ_TEST_PGSQL_URL")}),
|
||||
('mysql', {'url': os.getenv("TOOZ_TEST_MYSQL_URL"),
|
||||
'bad_url': 'mysql://localhost:1'}),
|
||||
]
|
||||
|
||||
# Only certain drivers have the tested support for timeouts that we test
|
||||
@ -78,6 +80,14 @@ class TestAPI(testscenarios.TestWithScenarios,
|
||||
self._coord.stop()
|
||||
super(TestAPI, self).tearDown()
|
||||
|
||||
def test_connection_error(self):
|
||||
if not hasattr(self, "bad_url"):
|
||||
raise testcase.TestSkipped("No bad URL provided")
|
||||
coord = tooz.coordination.get_coordinator(self.bad_url,
|
||||
self.member_id)
|
||||
self.assertRaises(tooz.coordination.ToozConnectionError,
|
||||
coord.start)
|
||||
|
||||
def test_stop_first(self):
|
||||
c = tooz.coordination.get_coordinator(self.url,
|
||||
self.member_id)
|
||||
|
3
tox.ini
3
tox.ini
@ -82,6 +82,9 @@ deps = {[testenv:py34]deps}
|
||||
basepython = python3.4
|
||||
commands = {toxinidir}/setup-postgresql-env.sh python setup.py testr --slowest --testr-args="{posargs}"
|
||||
|
||||
[testenv:py27-mysql]
|
||||
commands = {toxinidir}/setup-mysql-env.sh python setup.py testr --slowest --testr-args="{posargs}"
|
||||
|
||||
[testenv:cover]
|
||||
commands = python setup.py testr --slowest --coverage --testr-args="{posargs}"
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user