formatted closer to PEP8, also added some optimizations.
This commit is contained in:
parent
bc1ae13bce
commit
ca1e7ee524
@ -2,272 +2,329 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import re
|
||||
import time
|
||||
from time import time, mktime
|
||||
from datetime import datetime, date
|
||||
from dateutil.relativedelta import *
|
||||
from dateutil.relativedelta import relativedelta
|
||||
|
||||
class croniter:
|
||||
RANGES = (
|
||||
(0, 59),
|
||||
(0, 23),
|
||||
(1, 31),
|
||||
(1, 12),
|
||||
(0, 6),
|
||||
(0, 59)
|
||||
)
|
||||
DAYS = (
|
||||
31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31
|
||||
)
|
||||
search_re = re.compile(r'^([^-]+)-([^-/]+)(/(.*))?$')
|
||||
only_int_re = re.compile(r'^\d+$')
|
||||
any_int_re = re.compile(r'^\d+')
|
||||
star_or_int_re = re.compile(r'^(\d+|\*)$')
|
||||
|
||||
ALPHACONV = (
|
||||
{ },
|
||||
{ },
|
||||
{ },
|
||||
{ 'jan':1, 'feb':2, 'mar':3, 'apr':4, 'may':5, 'jun':6,
|
||||
'jul':7, 'aug':8, 'sep':9, 'oct':10, 'nov':11, 'dec':12 },
|
||||
{ 'sun':0, 'mon':1, 'tue':2, 'wed':3, 'thu':4, 'fri':5, 'sat':6 },
|
||||
{ }
|
||||
)
|
||||
__all__ = ('croniter',)
|
||||
|
||||
LOWMAP = (
|
||||
{},
|
||||
{},
|
||||
{0: 1},
|
||||
{0: 1},
|
||||
{7: 0},
|
||||
{},
|
||||
)
|
||||
|
||||
def __init__(self, expr_format, start_time=time.time()):
|
||||
if isinstance(start_time, datetime):
|
||||
start_time = time.mktime(start_time.timetuple())
|
||||
self.cur = start_time
|
||||
self.exprs = expr_format.split()
|
||||
if len(self.exprs) != 5 and len(self.exprs) != 6:
|
||||
raise ValueError("Exactly 5 or 6 columns has to be specified for iterator expression")
|
||||
expanded = []
|
||||
for i, expr in enumerate(self.exprs):
|
||||
e_list = expr.split(',')
|
||||
res = []
|
||||
while len(e_list) > 0:
|
||||
e = e_list.pop()
|
||||
t = re.sub(r'^\*(/.+)$', r'%d-%d\1' % (self.RANGES[i][0], self.RANGES[i][1]), str(e))
|
||||
m = re.search(r'^([^-]+)-([^-/]+)(/(.*))?$', t)
|
||||
if m:
|
||||
(low, high, step) = m.group(1), m.group(2), m.group(4) or 1
|
||||
if not re.search(r'^\d+', low):
|
||||
low = self.ALPHACONV[i][low.lower()]
|
||||
if not re.search(r'^\d+', high):
|
||||
high = self.ALPHACONV[i][high.lower()]
|
||||
if not low or not high or int(low) > int(high) or not re.search(r'^\d+$', str(step)):
|
||||
raise ValueError("[%s] is not acceptable" % expr_format)
|
||||
for j in range(int(low), int(high)+1):
|
||||
if j % int(step) == 0:
|
||||
e_list.append(j)
|
||||
|
||||
class croniter(object):
|
||||
RANGES = (
|
||||
(0, 59),
|
||||
(0, 23),
|
||||
(1, 31),
|
||||
(1, 12),
|
||||
(0, 6),
|
||||
(0, 59)
|
||||
)
|
||||
DAYS = (
|
||||
31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31
|
||||
)
|
||||
|
||||
ALPHACONV = (
|
||||
{ },
|
||||
{ },
|
||||
{ },
|
||||
{ 'jan':1, 'feb':2, 'mar':3, 'apr':4, 'may':5, 'jun':6,
|
||||
'jul':7, 'aug':8, 'sep':9, 'oct':10, 'nov':11, 'dec':12 },
|
||||
{ 'sun':0, 'mon':1, 'tue':2, 'wed':3, 'thu':4, 'fri':5, 'sat':6 },
|
||||
{ }
|
||||
)
|
||||
|
||||
LOWMAP = (
|
||||
{},
|
||||
{},
|
||||
{0: 1},
|
||||
{0: 1},
|
||||
{7: 0},
|
||||
{},
|
||||
)
|
||||
|
||||
bad_length = 'Exactly 5 or 6 columns has to be specified for iterator' \
|
||||
'expression.'
|
||||
|
||||
def __init__(self, expr_format, start_time=time()):
|
||||
if isinstance(start_time, datetime):
|
||||
start_time = mktime(start_time.timetuple())
|
||||
|
||||
self.cur = start_time
|
||||
self.exprs = expr_format.split()
|
||||
|
||||
if len(self.exprs) != 5 and len(self.exprs) != 6:
|
||||
raise ValueError(self.bad_length)
|
||||
|
||||
expanded = []
|
||||
|
||||
for i, expr in enumerate(self.exprs):
|
||||
e_list = expr.split(',')
|
||||
res = []
|
||||
|
||||
while len(e_list) > 0:
|
||||
e = e_list.pop()
|
||||
t = re.sub(r'^\*(/.+)$', r'%d-%d\1' % (self.RANGES[i][0],
|
||||
self.RANGES[i][1]),
|
||||
str(e))
|
||||
m = search_re.search(t)
|
||||
|
||||
if m:
|
||||
(low, high, step) = m.group(1), m.group(2), m.group(4) or 1
|
||||
|
||||
if not any_int_re.search(low):
|
||||
low = self.ALPHACONV[i][low.lower()]
|
||||
|
||||
if not any_int_re.search(high):
|
||||
high = self.ALPHACONV[i][high.lower()]
|
||||
|
||||
if (not low or not high or int(low) > int(high)
|
||||
or not only_int_re.search(str(step))):
|
||||
raise ValueError("[%s] is not acceptable" %expr_format)
|
||||
|
||||
for j in xrange(int(low), int(high)+1):
|
||||
if j % int(step) == 0:
|
||||
e_list.append(j)
|
||||
else:
|
||||
if not star_or_int_re.search(t):
|
||||
t = self.ALPHACONV[i][t.lower()]
|
||||
|
||||
try:
|
||||
t = int(t)
|
||||
except:
|
||||
pass
|
||||
|
||||
if t in self.LOWMAP[i]:
|
||||
t = self.LOWMAP[i][t]
|
||||
|
||||
if t != '*' and (int(t) < self.RANGES[i][0] or
|
||||
int(t) > self.RANGES[i][1]):
|
||||
raise ValueError("[%s] is not acceptable, out of range" % expr_format)
|
||||
|
||||
res.append(t)
|
||||
|
||||
res.sort()
|
||||
expanded.append(['*'] if (len(res) == 1 and res[0] == '*') else res)
|
||||
self.expanded = expanded
|
||||
|
||||
def get_next(self, ret_type=float):
|
||||
return self._get_next(ret_type, is_prev=False)
|
||||
|
||||
def get_prev(self, ret_type=float):
|
||||
return self._get_next(ret_type, is_prev=True)
|
||||
|
||||
def _get_next(self, ret_type=float, is_prev=False):
|
||||
expanded = self.expanded[:]
|
||||
|
||||
if ret_type not in (float, datetime):
|
||||
raise TypeError("Invalid ret_type, only 'float' or 'datetime' " \
|
||||
"is acceptable.")
|
||||
|
||||
if expanded[2][0] != '*' and expanded[4][0] != '*':
|
||||
bak = expanded[4]
|
||||
expanded[4] = ['*']
|
||||
t1 = self._calc(self.cur, expanded, is_prev)
|
||||
expanded[4] = bak
|
||||
expanded[2] = ['*']
|
||||
|
||||
t2 = self._calc(self.cur, expanded, is_prev)
|
||||
if not is_prev:
|
||||
result = t1 if t1 < t2 else t2
|
||||
else:
|
||||
result = t1 if t1 > t2 else t2
|
||||
else:
|
||||
if not re.search(r'^(\d+|\*)$', t):
|
||||
t = self.ALPHACONV[i][t.lower()]
|
||||
try: t = int(t)
|
||||
except: pass
|
||||
if t in self.LOWMAP[i]:
|
||||
t = self.LOWMAP[i][t]
|
||||
if t != '*' and (int(t) < self.RANGES[i][0] or int(t) > self.RANGES[i][1]):
|
||||
raise ValueError("[%s] is not acceptable, out of range" % expr_format)
|
||||
try:
|
||||
res.append(int(t))
|
||||
except:
|
||||
res.append(t)
|
||||
res.sort()
|
||||
expanded.append(['*'] if (len(res) == 1 and res[0] == '*') else res)
|
||||
self.expanded = expanded
|
||||
result = self._calc(self.cur, expanded, is_prev)
|
||||
self.cur = result
|
||||
|
||||
def get_next(self, ret_type=float):
|
||||
return self._get_next(ret_type, is_prev=False)
|
||||
if ret_type == datetime:
|
||||
result = datetime.fromtimestamp(result)
|
||||
return result
|
||||
|
||||
def _calc(self, now, expanded, is_prev):
|
||||
nearest_method = self._get_prev_nearest if is_prev \
|
||||
else self._get_next_nearest
|
||||
|
||||
def get_prev(self, ret_type=float):
|
||||
return self._get_next(ret_type, is_prev=True)
|
||||
nearest_diff_method = self._get_prev_nearest_diff if is_prev \
|
||||
else self._get_next_nearest_diff
|
||||
|
||||
def _get_next(self, ret_type=float, is_prev=False):
|
||||
expanded = self.expanded[:]
|
||||
if ret_type not in [float, datetime]:
|
||||
raise TypeError("invalid ret_type, only 'float' or 'datetime' is acceptable")
|
||||
sign = -1 if is_prev else 1
|
||||
offset = len(expanded) == 6 and 1 or 60
|
||||
dst = now = datetime.fromtimestamp(now + sign * offset)
|
||||
|
||||
if expanded[2][0] != '*' and expanded[4][0] != '*':
|
||||
bak = expanded[4]
|
||||
expanded[4] = ['*']
|
||||
t1 = self._calc(self.cur, expanded, is_prev)
|
||||
expanded[4] = bak
|
||||
expanded[2] = ['*']
|
||||
t2 = self._calc(self.cur, expanded, is_prev)
|
||||
if not is_prev:
|
||||
result = t1 if t1 < t2 else t2
|
||||
else:
|
||||
result = t1 if t1 > t2 else t2
|
||||
else:
|
||||
result = self._calc(self.cur, expanded, is_prev)
|
||||
self.cur = result
|
||||
if ret_type == datetime:
|
||||
result = datetime.fromtimestamp(result)
|
||||
return result
|
||||
|
||||
def _calc(self, now, expanded, is_prev):
|
||||
nearest_method = self._get_prev_nearest if is_prev else self._get_next_nearest
|
||||
nearest_diff_method = self._get_prev_nearest_diff if is_prev else self._get_next_nearest_diff
|
||||
day, month, year = dst.day, dst.month, dst.year
|
||||
current_year = now.year
|
||||
|
||||
sign = -1 if is_prev else 1
|
||||
offset = len(expanded) == 6 and 1 or 60
|
||||
dst = now = datetime.fromtimestamp(now + sign * offset)
|
||||
while abs(dst.year - now.year) <= 1:
|
||||
# check month
|
||||
if expanded[3][0] != '*':
|
||||
diff_month = nearest_diff_method(dst.month, expanded[3], 12)
|
||||
days = self.DAYS[dst.month - 1]
|
||||
if dst.month == 2 and self.is_leap(dst.year) == True:
|
||||
days += 1
|
||||
reset_day = days if is_prev else 1
|
||||
if diff_month != None and diff_month != 0:
|
||||
dst += relativedelta(months=diff_month, day=reset_day, hour=0, minute=0, second=0)
|
||||
continue
|
||||
# check day of month
|
||||
if expanded[2][0] != '*':
|
||||
days = self.DAYS[dst.month - 1]
|
||||
if dst.month == 2 and self.is_leap(dst.year) == True:
|
||||
days += 1
|
||||
diff_day = nearest_diff_method(dst.day, expanded[2], days)
|
||||
if diff_day != None and diff_day != 0:
|
||||
dst += relativedelta(days=diff_day, hour=0, minute=0, second=0)
|
||||
continue
|
||||
# check day of week
|
||||
if expanded[4][0] != '*':
|
||||
diff_day_of_week = nearest_diff_method(dst.isoweekday() % 7, expanded[4], 7)
|
||||
if diff_day_of_week != None and diff_day_of_week != 0:
|
||||
dst += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
|
||||
continue
|
||||
# check hour
|
||||
if expanded[1][0] != '*':
|
||||
diff_hour = nearest_diff_method(dst.hour, expanded[1], 24)
|
||||
if diff_hour != None and diff_hour != 0:
|
||||
dst += relativedelta(hours = diff_hour, minute=0, second=0)
|
||||
continue
|
||||
# check minute
|
||||
if expanded[0][0] != '*':
|
||||
diff_min = nearest_diff_method(dst.minute, expanded[0], 60)
|
||||
if diff_min != None and diff_min != 0:
|
||||
dst += relativedelta(minutes = diff_min, second=0)
|
||||
continue
|
||||
# check second
|
||||
if len(expanded) == 6:
|
||||
if expanded[5][0] != '*':
|
||||
diff_sec = nearest_diff_method(dst.second, expanded[5], 60)
|
||||
if diff_sec != None and diff_sec != 0:
|
||||
dst += relativedelta(seconds = diff_sec)
|
||||
continue
|
||||
else:
|
||||
dst += relativedelta(second = 0)
|
||||
return time.mktime(dst.timetuple())
|
||||
raise "failed to find prev date"
|
||||
DAYS = self.DAYS
|
||||
|
||||
def _get_next_nearest(self, x, to_check):
|
||||
small = [item for item in to_check if item < x]
|
||||
large = [item for item in to_check if item >= x]
|
||||
large.extend(small)
|
||||
return large[0]
|
||||
while abs(year - current_year) <= 1:
|
||||
# check month
|
||||
if expanded[3][0] != '*':
|
||||
diff_month = nearest_diff_method(month, expanded[3], 12)
|
||||
days = DAYS[month - 1]
|
||||
|
||||
def _get_prev_nearest(self, x, to_check):
|
||||
small = [item for item in to_check if item <= x]
|
||||
large = [item for item in to_check if item > x]
|
||||
small.reverse()
|
||||
large.reverse()
|
||||
small.extend(large)
|
||||
return small[0]
|
||||
if month == 2 and self.is_leap(year) == True:
|
||||
days += 1
|
||||
|
||||
def _get_next_nearest_diff(self, x, to_check, range_val):
|
||||
for i, d in enumerate(to_check):
|
||||
if d >= x:
|
||||
return d - x
|
||||
return to_check[0] - x + range_val
|
||||
reset_day = days if is_prev else 1
|
||||
|
||||
def _get_prev_nearest_diff(self, x, to_check, range_val):
|
||||
candidates = to_check[:]
|
||||
candidates.reverse()
|
||||
for d in candidates:
|
||||
if d <= x:
|
||||
return d - x
|
||||
return (candidates[0]) - x - range_val
|
||||
if diff_month != None and diff_month != 0:
|
||||
dst += relativedelta(months=diff_month, day=reset_day,
|
||||
hour=0, minute=0, second=0)
|
||||
continue
|
||||
|
||||
def is_leap(self, year):
|
||||
if year % 400 == 0 or (year % 4 == 0 and year % 100 != 0):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
# check day of month
|
||||
if expanded[2][0] != '*':
|
||||
days = DAYS[month - 1]
|
||||
|
||||
if month == 2 and self.is_leap(year) == True:
|
||||
days += 1
|
||||
|
||||
diff_day = nearest_diff_method(dst.day, expanded[2], days)
|
||||
|
||||
if diff_day != None and diff_day != 0:
|
||||
dst += relativedelta(days=diff_day, hour=0, minute=0, second=0)
|
||||
continue
|
||||
|
||||
# check day of week
|
||||
if expanded[4][0] != '*':
|
||||
diff_day_of_week = nearest_diff_method(dst.isoweekday() % 7, expanded[4], 7)
|
||||
|
||||
if diff_day_of_week != None and diff_day_of_week != 0:
|
||||
dst += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
|
||||
continue
|
||||
|
||||
# check hour
|
||||
if expanded[1][0] != '*':
|
||||
diff_hour = nearest_diff_method(dst.hour, expanded[1], 24)
|
||||
if diff_hour != None and diff_hour != 0:
|
||||
dst += relativedelta(hours = diff_hour, minute=0, second=0)
|
||||
continue
|
||||
|
||||
# check minute
|
||||
if expanded[0][0] != '*':
|
||||
diff_min = nearest_diff_method(dst.minute, expanded[0], 60)
|
||||
if diff_min != None and diff_min != 0:
|
||||
dst += relativedelta(minutes = diff_min, second=0)
|
||||
continue
|
||||
|
||||
# check second
|
||||
if len(expanded) == 6:
|
||||
if expanded[5][0] != '*':
|
||||
diff_sec = nearest_diff_method(dst.second, expanded[5], 60)
|
||||
if diff_sec != None and diff_sec != 0:
|
||||
dst += relativedelta(seconds = diff_sec)
|
||||
continue
|
||||
else:
|
||||
dst += relativedelta(second = 0)
|
||||
|
||||
return mktime(dst.timetuple())
|
||||
|
||||
raise "failed to find prev date"
|
||||
|
||||
def _get_next_nearest(self, x, to_check):
|
||||
small = [item for item in to_check if item < x]
|
||||
large = [item for item in to_check if item >= x]
|
||||
large.extend(small)
|
||||
return large[0]
|
||||
|
||||
def _get_prev_nearest(self, x, to_check):
|
||||
small = [item for item in to_check if item <= x]
|
||||
large = [item for item in to_check if item > x]
|
||||
small.reverse()
|
||||
large.reverse()
|
||||
small.extend(large)
|
||||
return small[0]
|
||||
|
||||
def _get_next_nearest_diff(self, x, to_check, range_val):
|
||||
for i, d in enumerate(to_check):
|
||||
if d >= x:
|
||||
return d - x
|
||||
return to_check[0] - x + range_val
|
||||
|
||||
def _get_prev_nearest_diff(self, x, to_check, range_val):
|
||||
candidates = to_check[:]
|
||||
candidates.reverse()
|
||||
for d in candidates:
|
||||
if d <= x:
|
||||
return d - x
|
||||
return (candidates[0]) - x - range_val
|
||||
|
||||
def is_leap(self, year):
|
||||
if year % 400 == 0 or (year % 4 == 0 and year % 100 != 0):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
if __name__ == '__main__':
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 * * sun,mon', base)
|
||||
print itr.get_next(datetime)
|
||||
print itr.get_next(datetime)
|
||||
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 1 * 3', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n2 = itr.get_next(datetime)
|
||||
print n1
|
||||
print n2
|
||||
print "#" * 10
|
||||
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 1 * 3', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n2 = itr.get_next(datetime)
|
||||
|
||||
base = datetime(2010, 2, 24, 12, 9)
|
||||
itr = croniter('0 0 */3 * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n2 = itr.get_next(datetime)
|
||||
print n1
|
||||
print n2
|
||||
base = datetime(1997, 2, 27)
|
||||
itr = croniter('0 0 * * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n2 = itr.get_next(datetime)
|
||||
print n1
|
||||
print n2
|
||||
base2 = datetime(2000, 2, 27)
|
||||
itr2 = croniter('0 0 * * *', base2)
|
||||
n3 = itr2.get_next(datetime)
|
||||
print n3
|
||||
n4 = itr2.get_next(datetime)
|
||||
print n4
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 * * sun,mon', base)
|
||||
print itr.get_next(datetime)
|
||||
print itr.get_next(datetime)
|
||||
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 1 * 3', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n2 = itr.get_next(datetime)
|
||||
print n1
|
||||
print n2
|
||||
print "#" * 10
|
||||
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 1 * 3', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n2 = itr.get_next(datetime)
|
||||
|
||||
base = datetime(2010, 2, 24, 12, 9)
|
||||
itr = croniter('0 0 */3 * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n2 = itr.get_next(datetime)
|
||||
print n1
|
||||
print n2
|
||||
base = datetime(1997, 2, 27)
|
||||
itr = croniter('0 0 * * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n2 = itr.get_next(datetime)
|
||||
print n1
|
||||
print n2
|
||||
base2 = datetime(2000, 2, 27)
|
||||
itr2 = croniter('0 0 * * *', base2)
|
||||
n3 = itr2.get_next(datetime)
|
||||
print n3
|
||||
n4 = itr2.get_next(datetime)
|
||||
print n4
|
||||
|
||||
base3 = datetime(2010, 8, 8, 14, 2)
|
||||
itr3 = croniter('5-15 * * * *', base3)
|
||||
for i in range(20):
|
||||
print itr3.get_next(datetime)
|
||||
base3 = datetime(2010, 8, 8, 14, 2)
|
||||
itr3 = croniter('5-15 * * * *', base3)
|
||||
for i in range(20):
|
||||
print itr3.get_next(datetime)
|
||||
|
||||
print "#" * 10
|
||||
base = datetime(2010, 8, 1, 0, 0)
|
||||
itr4 = croniter('0 9 * * mon,tue,wed,thu,fri', base)
|
||||
for i in range(10):
|
||||
print itr4.get_next(datetime)
|
||||
print "#" * 10
|
||||
base = datetime(2010, 8, 1, 0, 0)
|
||||
itr4 = croniter('0 9 * * mon,tue,wed,thu,fri', base)
|
||||
for i in range(10):
|
||||
print itr4.get_next(datetime)
|
||||
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 1 * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
print n1
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 1 * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
print n1
|
||||
|
||||
print '#' * 30
|
||||
base = datetime(2010, 8, 25)
|
||||
itr = croniter('0 0 * * *', base)
|
||||
print itr.get_prev(datetime)
|
||||
for i in range(10):
|
||||
print '#' * 30
|
||||
base = datetime(2010, 8, 25)
|
||||
itr = croniter('0 0 * * *', base)
|
||||
print itr.get_prev(datetime)
|
||||
for i in range(10):
|
||||
print itr.get_prev(datetime)
|
||||
print '#' * 30
|
||||
base = datetime(2010, 8, 25)
|
||||
itr = croniter('0 0 1 * *', base)
|
||||
print itr.get_prev(datetime)
|
||||
print itr.get_prev(datetime)
|
||||
print itr.get_prev(datetime)
|
||||
print '#' * 30
|
||||
base = datetime(2010, 8, 25)
|
||||
itr = croniter('0 0 1 * *', base)
|
||||
print itr.get_prev(datetime)
|
||||
print itr.get_prev(datetime)
|
||||
print itr.get_prev(datetime)
|
||||
|
||||
base = datetime(2010, 8, 25, 15, 56)
|
||||
itr = croniter('0 0 * * sat,sun', base)
|
||||
print itr.get_prev(datetime)
|
||||
base = datetime(2010, 8, 25, 15, 56)
|
||||
itr = croniter('0 0 * * sat,sun', base)
|
||||
print itr.get_prev(datetime)
|
||||
|
213
croniter/speed_test.py
Normal file
213
croniter/speed_test.py
Normal file
@ -0,0 +1,213 @@
|
||||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import time
|
||||
from datetime import datetime, date
|
||||
from croniter import croniter
|
||||
|
||||
class timerTest(object):
|
||||
def __init__(self):
|
||||
self.tests = tuple(getattr(self, m) for m in dir(self) if m.lower().startswith('test'))
|
||||
|
||||
def run(self):
|
||||
for test in self.tests:
|
||||
test()
|
||||
|
||||
|
||||
class CroniterTest(timerTest):
|
||||
def testMinute(self):
|
||||
# minute asterisk
|
||||
base = datetime(2010, 1, 23, 12, 18)
|
||||
itr = croniter('*/1 * * * *', base)
|
||||
n1 = itr.get_next(datetime) # 19
|
||||
base.year == n1.year
|
||||
base.month == n1.month
|
||||
base.day == n1.day
|
||||
base.hour == n1.hour
|
||||
base.minute == n1.minute - 1
|
||||
for i in range(39): # ~ 58
|
||||
itr.get_next()
|
||||
|
||||
n2 = itr.get_next(datetime)
|
||||
n2.minute == 59
|
||||
|
||||
n3 = itr.get_next(datetime)
|
||||
|
||||
n3.minute == 0
|
||||
n3.hour == 13
|
||||
|
||||
itr = croniter('*/5 * * * *', base)
|
||||
n4 = itr.get_next(datetime)
|
||||
n4.minute == 20
|
||||
for i in range(6):
|
||||
itr.get_next()
|
||||
n5 = itr.get_next(datetime)
|
||||
n5.minute == 55
|
||||
|
||||
n6 = itr.get_next(datetime)
|
||||
n6.minute == 0
|
||||
n6.hour == 13
|
||||
|
||||
def testHour(self):
|
||||
base = datetime(2010, 1, 24, 12, 2)
|
||||
itr = croniter('0 */3 * * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
|
||||
n1.hour == 15
|
||||
n1.minute == 0
|
||||
|
||||
for i in range(2):
|
||||
itr.get_next()
|
||||
|
||||
n2 = itr.get_next(datetime)
|
||||
n2.hour == 0
|
||||
n2.day == 25
|
||||
|
||||
def testDay(self):
|
||||
base = datetime(2010, 2, 24, 12, 9)
|
||||
itr = croniter('0 0 */3 * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n1.day == 27
|
||||
n2 = itr.get_next(datetime)
|
||||
n2.day == 3
|
||||
|
||||
# test leap year
|
||||
base = datetime(1996, 2, 27)
|
||||
itr = croniter('0 0 * * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n1.day == 28
|
||||
n1.month == 2
|
||||
n2 = itr.get_next(datetime)
|
||||
n2.day == 29
|
||||
n2.month == 2
|
||||
|
||||
base2 = datetime(2000, 2, 27)
|
||||
itr2 = croniter('0 0 * * *', base2)
|
||||
n3 = itr2.get_next(datetime)
|
||||
n3.day == 28
|
||||
n3.month == 2
|
||||
n4 = itr2.get_next(datetime)
|
||||
n4.day == 29
|
||||
n4.month == 2
|
||||
|
||||
def testWeekDay(self):
|
||||
base = datetime(2010, 2, 25)
|
||||
itr = croniter('0 0 * * sat', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n1.isoweekday() == 6
|
||||
n1.day == 27
|
||||
|
||||
n2 = itr.get_next(datetime)
|
||||
n2.isoweekday() == 6
|
||||
n2.day == 6
|
||||
n2.month == 3
|
||||
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 1 * wed', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n1.month == 1
|
||||
n1.day == 27
|
||||
n1.year == 2010
|
||||
|
||||
n2 = itr.get_next(datetime)
|
||||
n2.month == 2
|
||||
n2.day == 1
|
||||
n2.year == 2010
|
||||
|
||||
n3 = itr.get_next(datetime)
|
||||
n3.month == 2
|
||||
n3.day == 3
|
||||
n3.year == 2010
|
||||
|
||||
def testMonth(self):
|
||||
base = datetime(2010, 1, 25)
|
||||
itr = croniter('0 0 1 * *', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
|
||||
n1.month == 2
|
||||
n1.day == 1
|
||||
|
||||
n2 = itr.get_next(datetime)
|
||||
n2.month == 3
|
||||
n2.day == 1
|
||||
|
||||
for i in range(8):
|
||||
itr.get_next()
|
||||
|
||||
n3 = itr.get_next(datetime)
|
||||
n3.month == 12
|
||||
n3.year == 2010
|
||||
|
||||
n4 = itr.get_next(datetime)
|
||||
n4.month == 1
|
||||
n4.year == 2011
|
||||
|
||||
|
||||
def testPrevMinute(self):
|
||||
base = datetime(2010, 8, 25, 15, 56)
|
||||
itr = croniter('*/1 * * * *', base)
|
||||
prev = itr.get_prev(datetime)
|
||||
base.year == prev.year
|
||||
base.month == prev.month
|
||||
base.day == prev.day
|
||||
base.hour == prev.hour
|
||||
base.minute, prev.minute+1
|
||||
|
||||
base = datetime(2010, 8, 25, 15, 0)
|
||||
itr = croniter('*/1 * * * *', base)
|
||||
prev = itr.get_prev(datetime)
|
||||
base.year == prev.year
|
||||
base.month == prev.month
|
||||
base.day == prev.day
|
||||
base.hour == prev.hour+1
|
||||
59 == prev.minute
|
||||
|
||||
base = datetime(2010, 8, 25, 0, 0)
|
||||
itr = croniter('*/1 * * * *', base)
|
||||
prev = itr.get_prev(datetime)
|
||||
base.year == prev.year
|
||||
base.month == prev.month
|
||||
base.day == prev.day+1
|
||||
23 == prev.hour
|
||||
59 == prev.minute
|
||||
|
||||
def testPrevWeekDay(self):
|
||||
base = datetime(2010, 8, 25, 15, 56)
|
||||
itr = croniter('0 0 * * sat,sun', base)
|
||||
prev1 = itr.get_prev(datetime)
|
||||
prev1.year == base.year
|
||||
prev1.month == base.month
|
||||
prev1.day == 22
|
||||
prev1.hour == 0
|
||||
prev1.minute == 0
|
||||
|
||||
prev2 = itr.get_prev(datetime)
|
||||
prev2.year == base.year
|
||||
prev2.month == base.month
|
||||
prev2.day == 21
|
||||
prev2.hour == 0
|
||||
prev2.minute == 0
|
||||
|
||||
prev3 = itr.get_prev(datetime)
|
||||
prev3.year == base.year
|
||||
prev3.month == base.month
|
||||
prev3.day == 15
|
||||
prev3.hour == 0
|
||||
prev3.minute == 0
|
||||
|
||||
def testISOWeekday(self):
|
||||
base = datetime(2010, 2, 25)
|
||||
itr = croniter('0 0 * * 7', base)
|
||||
n1 = itr.get_next(datetime)
|
||||
n1.isoweekday() == 7
|
||||
n1.day == 28
|
||||
|
||||
n2 = itr.get_next(datetime)
|
||||
n2.isoweekday() == 7
|
||||
n2.day == 7
|
||||
n2.month == 3
|
||||
|
||||
if __name__ == '__main__':
|
||||
from timeit import Timer
|
||||
t = Timer('c=CroniterTest();c.run()', 'from __main__ import CroniterTest')
|
||||
print t.timeit(200)
|
Loading…
Reference in New Issue
Block a user