1357 lines
49 KiB
Python
1357 lines
49 KiB
Python
#!/usr/bin/python2.3
|
|
#
|
|
# vim:set et ts=4 fdc=0 fdn=2 fdl=0:
|
|
#
|
|
# There are no blank lines between blocks beacause i use folding from:
|
|
# http://www.vim.org/scripts/script.php?script_id=515
|
|
#
|
|
|
|
"""= QWeb Framework =
|
|
|
|
== What is QWeb ? ==
|
|
|
|
QWeb is a python based [http://www.python.org/doc/peps/pep-0333/ WSGI]
|
|
compatible web framework, it provides an infratructure to quickly build web
|
|
applications consisting of:
|
|
|
|
* A lightweight request handler (QWebRequest)
|
|
* An xml templating engine (QWebXml and QWebHtml)
|
|
* A simple name based controler (qweb_control)
|
|
* A standalone WSGI Server (QWebWSGIServer)
|
|
* A cgi and fastcgi WSGI wrapper (taken from flup)
|
|
* A startup function that starts cgi, factgi or standalone according to the
|
|
evironement (qweb_autorun).
|
|
|
|
QWeb applications are runnable in standalone mode (from commandline), via
|
|
FastCGI, Regular CGI or by any python WSGI compliant server.
|
|
|
|
QWeb doesn't provide any database access but it integrates nicely with ORMs
|
|
such as SQLObject, SQLAlchemy or plain DB-API.
|
|
|
|
Written by Antony Lesuisse (email al AT udev.org)
|
|
|
|
Homepage: http://antony.lesuisse.org/qweb/trac/
|
|
|
|
Forum: [http://antony.lesuisse.org/qweb/forum/viewforum.php?id=1 Forum]
|
|
|
|
== Quick Start (for Linux, MacOS X and cygwin) ==
|
|
|
|
Make sure you have at least python 2.3 installed and run the following commands:
|
|
|
|
{{{
|
|
$ wget http://antony.lesuisse.org/qweb/files/QWeb-0.7.tar.gz
|
|
$ tar zxvf QWeb-0.7.tar.gz
|
|
$ cd QWeb-0.7/examples/blog
|
|
$ ./blog.py
|
|
}}}
|
|
|
|
And point your browser to http://localhost:8080/
|
|
|
|
You may also try AjaxTerm which uses qweb request handler.
|
|
|
|
== Download ==
|
|
|
|
* Version 0.7:
|
|
* Source [/qweb/files/QWeb-0.7.tar.gz QWeb-0.7.tar.gz]
|
|
* Python 2.3 Egg [/qweb/files/QWeb-0.7-py2.3.egg QWeb-0.7-py2.3.egg]
|
|
* Python 2.4 Egg [/qweb/files/QWeb-0.7-py2.4.egg QWeb-0.7-py2.4.egg]
|
|
|
|
* [/qweb/trac/browser Browse the source repository]
|
|
|
|
== Documentation ==
|
|
|
|
* [/qweb/trac/browser/trunk/README.txt?format=raw Read the included documentation]
|
|
* QwebTemplating
|
|
|
|
== Mailin-list ==
|
|
|
|
* Forum: [http://antony.lesuisse.org/qweb/forum/viewforum.php?id=1 Forum]
|
|
* No mailing-list exists yet, discussion should happen on: [http://mail.python.org/mailman/listinfo/web-sig web-sig] [http://mail.python.org/pipermail/web-sig/ archives]
|
|
|
|
QWeb Components:
|
|
----------------
|
|
|
|
QWeb also feature a simple components api, that enables developers to easily
|
|
produces reusable components.
|
|
|
|
Default qweb components:
|
|
|
|
- qweb_static:
|
|
A qweb component to serve static content from the filesystem or from
|
|
zipfiles.
|
|
|
|
- qweb_dbadmin:
|
|
scaffolding for sqlobject
|
|
|
|
License
|
|
-------
|
|
qweb/fcgi.py wich is BSD-like from saddi.com.
|
|
Everything else is put in the public domain.
|
|
|
|
|
|
TODO
|
|
----
|
|
Announce QWeb to python-announce-list@python.org web-sig@python.org
|
|
qweb_core
|
|
rename request methods into
|
|
request_save_files
|
|
response_404
|
|
response_redirect
|
|
response_download
|
|
request callback_generator, callback_function ?
|
|
wsgi callback_server_local
|
|
xml tags explicitly call render_attributes(t_att)?
|
|
priority form-checkbox over t-value (for t-option)
|
|
|
|
"""
|
|
|
|
import BaseHTTPServer,SocketServer,Cookie
|
|
import cgi,datetime,email,email.Message,errno,gzip,os,random,re,socket,sys,tempfile,time,types,urllib,urlparse,xml.dom
|
|
try:
|
|
import cPickle as pickle
|
|
except ImportError:
|
|
import pickle
|
|
try:
|
|
import cStringIO as StringIO
|
|
except ImportError:
|
|
import StringIO
|
|
|
|
#----------------------------------------------------------
|
|
# Qweb Xml t-raw t-esc t-if t-foreach t-set t-call t-trim
|
|
#----------------------------------------------------------
|
|
class QWebEval:
|
|
def __init__(self,data):
|
|
self.data=data
|
|
def __getitem__(self,expr):
|
|
if self.data.has_key(expr):
|
|
return self.data[expr]
|
|
r=None
|
|
try:
|
|
r=eval(expr,self.data)
|
|
except NameError,e:
|
|
pass
|
|
except AttributeError,e:
|
|
pass
|
|
except Exception,e:
|
|
print "qweb: expression error '%s' "%expr,e
|
|
if self.data.has_key("__builtins__"):
|
|
del self.data["__builtins__"]
|
|
return r
|
|
def eval_object(self,expr):
|
|
return self[expr]
|
|
def eval_str(self,expr):
|
|
if expr=="0":
|
|
return self.data[0]
|
|
if isinstance(self[expr],unicode):
|
|
return self[expr].encode("utf8")
|
|
return str(self[expr])
|
|
def eval_format(self,expr):
|
|
try:
|
|
return str(expr%self)
|
|
except:
|
|
return "qweb: format error '%s' "%expr
|
|
# if isinstance(r,unicode):
|
|
# return r.encode("utf8")
|
|
def eval_bool(self,expr):
|
|
if self.eval_object(expr):
|
|
return 1
|
|
else:
|
|
return 0
|
|
class QWebXml:
|
|
"""QWeb Xml templating engine
|
|
|
|
The templating engine use a very simple syntax, "magic" xml attributes, to
|
|
produce any kind of texutal output (even non-xml).
|
|
|
|
QWebXml:
|
|
the template engine core implements the basic magic attributes:
|
|
|
|
t-att t-raw t-esc t-if t-foreach t-set t-call t-trim
|
|
|
|
"""
|
|
def __init__(self,x=None,zipname=None):
|
|
self.node=xml.dom.Node
|
|
self._t={}
|
|
self._render_tag={}
|
|
prefix='render_tag_'
|
|
for i in [j for j in dir(self) if j.startswith(prefix)]:
|
|
name=i[len(prefix):].replace('_','-')
|
|
self._render_tag[name]=getattr(self.__class__,i)
|
|
|
|
self._render_att={}
|
|
prefix='render_att_'
|
|
for i in [j for j in dir(self) if j.startswith(prefix)]:
|
|
name=i[len(prefix):].replace('_','-')
|
|
self._render_att[name]=getattr(self.__class__,i)
|
|
|
|
if x!=None:
|
|
if zipname!=None:
|
|
import zipfile
|
|
zf=zipfile.ZipFile(zipname, 'r')
|
|
self.add_template(zf.read(x))
|
|
else:
|
|
self.add_template(x)
|
|
def register_tag(self,tag,func):
|
|
self._render_tag[tag]=func
|
|
def add_template(self,x):
|
|
if hasattr(x,'documentElement'):
|
|
dom=x
|
|
elif x.startswith("<?xml"):
|
|
import xml.dom.minidom
|
|
dom=xml.dom.minidom.parseString(x)
|
|
else:
|
|
import xml.dom.minidom
|
|
dom=xml.dom.minidom.parse(x)
|
|
for n in dom.documentElement.childNodes:
|
|
if n.nodeName=="t":
|
|
self._t[str(n.getAttribute("t-name"))]=n
|
|
def get_template(self,name):
|
|
return self._t[name]
|
|
|
|
def eval_object(self,expr,v):
|
|
return QWebEval(v).eval_object(expr)
|
|
def eval_str(self,expr,v):
|
|
return QWebEval(v).eval_str(expr)
|
|
def eval_format(self,expr,v):
|
|
return QWebEval(v).eval_format(expr)
|
|
def eval_bool(self,expr,v):
|
|
return QWebEval(v).eval_bool(expr)
|
|
|
|
def render(self,tname,v={},out=None):
|
|
if self._t.has_key(tname):
|
|
return self.render_node(self._t[tname],v)
|
|
else:
|
|
return 'qweb: template "%s" not found'%tname
|
|
def render_node(self,e,v):
|
|
r=""
|
|
if e.nodeType==self.node.TEXT_NODE or e.nodeType==self.node.CDATA_SECTION_NODE:
|
|
r=e.data.encode("utf8")
|
|
elif e.nodeType==self.node.ELEMENT_NODE:
|
|
pre=""
|
|
g_att=""
|
|
t_render=None
|
|
t_att={}
|
|
for (an,av) in e.attributes.items():
|
|
an=str(an)
|
|
if isinstance(av,types.UnicodeType):
|
|
av=av.encode("utf8")
|
|
else:
|
|
av=av.nodeValue.encode("utf8")
|
|
if an.startswith("t-"):
|
|
for i in self._render_att:
|
|
if an[2:].startswith(i):
|
|
g_att+=self._render_att[i](self,e,an,av,v)
|
|
break
|
|
else:
|
|
if self._render_tag.has_key(an[2:]):
|
|
t_render=an[2:]
|
|
t_att[an[2:]]=av
|
|
else:
|
|
g_att+=' %s="%s"'%(an,cgi.escape(av,1));
|
|
if t_render:
|
|
if self._render_tag.has_key(t_render):
|
|
r=self._render_tag[t_render](self,e,t_att,g_att,v)
|
|
else:
|
|
r=self.render_element(e,g_att,v,pre,t_att.get("trim",0))
|
|
return r
|
|
def render_element(self,e,g_att,v,pre="",trim=0):
|
|
g_inner=[]
|
|
for n in e.childNodes:
|
|
g_inner.append(self.render_node(n,v))
|
|
name=str(e.nodeName)
|
|
inner="".join(g_inner)
|
|
if trim==0:
|
|
pass
|
|
elif trim=='left':
|
|
inner=inner.lstrip()
|
|
elif trim=='right':
|
|
inner=inner.rstrip()
|
|
elif trim=='both':
|
|
inner=inner.strip()
|
|
if name=="t":
|
|
return inner
|
|
elif len(inner):
|
|
return "<%s%s>%s%s</%s>"%(name,g_att,pre,inner,name)
|
|
else:
|
|
return "<%s%s/>"%(name,g_att)
|
|
|
|
# Attributes
|
|
def render_att_att(self,e,an,av,v):
|
|
if an.startswith("t-attf-"):
|
|
att,val=an[7:],self.eval_format(av,v)
|
|
elif an.startswith("t-att-"):
|
|
att,val=(an[6:],self.eval_str(av,v))
|
|
else:
|
|
att,val=self.eval_object(av,v)
|
|
return ' %s="%s"'%(att,cgi.escape(val,1))
|
|
|
|
# Tags
|
|
def render_tag_raw(self,e,t_att,g_att,v):
|
|
return self.eval_str(t_att["raw"],v)
|
|
def render_tag_rawf(self,e,t_att,g_att,v):
|
|
return self.eval_format(t_att["rawf"],v)
|
|
def render_tag_esc(self,e,t_att,g_att,v):
|
|
return cgi.escape(self.eval_str(t_att["esc"],v))
|
|
def render_tag_escf(self,e,t_att,g_att,v):
|
|
return cgi.escape(self.eval_format(t_att["escf"],v))
|
|
def render_tag_foreach(self,e,t_att,g_att,v):
|
|
expr=t_att["foreach"]
|
|
enum=self.eval_object(expr,v)
|
|
if enum!=None:
|
|
var=t_att.get('as',expr).replace('.','_')
|
|
d=v.copy()
|
|
size=-1
|
|
if isinstance(enum,types.ListType):
|
|
size=len(enum)
|
|
elif isinstance(enum,types.TupleType):
|
|
size=len(enum)
|
|
elif hasattr(enum,'count'):
|
|
size=enum.count()
|
|
d["%s_size"%var]=size
|
|
d["%s_all"%var]=enum
|
|
index=0
|
|
ru=[]
|
|
for i in enum:
|
|
d["%s_value"%var]=i
|
|
d["%s_index"%var]=index
|
|
d["%s_first"%var]=index==0
|
|
d["%s_even"%var]=index%2
|
|
d["%s_odd"%var]=(index+1)%2
|
|
d["%s_last"%var]=index+1==size
|
|
if index%2:
|
|
d["%s_parity"%var]='odd'
|
|
else:
|
|
d["%s_parity"%var]='even'
|
|
if isinstance(i,types.DictType):
|
|
d.update(i)
|
|
else:
|
|
d[var]=i
|
|
ru.append(self.render_element(e,g_att,d))
|
|
index+=1
|
|
return "".join(ru)
|
|
else:
|
|
return "qweb: t-foreach %s not found."%expr
|
|
def render_tag_if(self,e,t_att,g_att,v):
|
|
if self.eval_bool(t_att["if"],v):
|
|
return self.render_element(e,g_att,v)
|
|
else:
|
|
return ""
|
|
def render_tag_call(self,e,t_att,g_att,v):
|
|
# TODO t-prefix
|
|
if t_att.has_key("import"):
|
|
d=v
|
|
else:
|
|
d=v.copy()
|
|
d[0]=self.render_element(e,g_att,d)
|
|
return self.render(t_att["call"],d)
|
|
def render_tag_set(self,e,t_att,g_att,v):
|
|
if t_att.has_key("eval"):
|
|
v[t_att["set"]]=self.eval_object(t_att["eval"],v)
|
|
else:
|
|
v[t_att["set"]]=self.render_element(e,g_att,v)
|
|
return ""
|
|
|
|
#----------------------------------------------------------
|
|
# QWeb HTML (+deprecated QWebFORM and QWebOLD)
|
|
#----------------------------------------------------------
|
|
class QWebURL:
|
|
""" URL helper
|
|
assert req.PATH_INFO== "/site/admin/page_edit"
|
|
u = QWebURL(root_path="/site/",req_path=req.PATH_INFO)
|
|
s=u.url2_href("user/login",{'a':'1'})
|
|
assert s=="../user/login?a=1"
|
|
|
|
"""
|
|
def __init__(self, root_path="/", req_path="/",defpath="",defparam={}):
|
|
self.defpath=defpath
|
|
self.defparam=defparam
|
|
self.root_path=root_path
|
|
self.req_path=req_path
|
|
self.req_list=req_path.split("/")[:-1]
|
|
self.req_len=len(self.req_list)
|
|
def decode(self,s):
|
|
h={}
|
|
for k,v in cgi.parse_qsl(s,1):
|
|
h[k]=v
|
|
return h
|
|
def encode(self,h):
|
|
return urllib.urlencode(h.items())
|
|
def request(self,req):
|
|
return req.REQUEST
|
|
def copy(self,path=None,param=None):
|
|
npath=self.defpath
|
|
if path:
|
|
npath=path
|
|
nparam=self.defparam.copy()
|
|
if param:
|
|
nparam.update(param)
|
|
return QWebURL(self.root_path,self.req_path,npath,nparam)
|
|
def path(self,path=''):
|
|
if not path:
|
|
path=self.defpath
|
|
pl=(self.root_path+path).split('/')
|
|
i=0
|
|
for i in range(min(len(pl), self.req_len)):
|
|
if pl[i]!=self.req_list[i]:
|
|
break
|
|
else:
|
|
i+=1
|
|
dd=self.req_len-i
|
|
if dd<0:
|
|
dd=0
|
|
return '/'.join(['..']*dd+pl[i:])
|
|
def href(self,path='',arg={}):
|
|
p=self.path(path)
|
|
tmp=self.defparam.copy()
|
|
tmp.update(arg)
|
|
s=self.encode(tmp)
|
|
if len(s):
|
|
return p+"?"+s
|
|
else:
|
|
return p
|
|
def form(self,path='',arg={}):
|
|
p=self.path(path)
|
|
tmp=self.defparam.copy()
|
|
tmp.update(arg)
|
|
r=''.join(['<input type="hidden" name="%s" value="%s"/>'%(k,cgi.escape(str(v),1)) for k,v in tmp.items()])
|
|
return (p,r)
|
|
class QWebField:
|
|
def __init__(self,name=None,default="",check=None):
|
|
self.name=name
|
|
self.default=default
|
|
self.check=check
|
|
# optional attributes
|
|
self.type=None
|
|
self.trim=1
|
|
self.required=1
|
|
self.cssvalid="form_valid"
|
|
self.cssinvalid="form_invalid"
|
|
# set by addfield
|
|
self.form=None
|
|
# set by processing
|
|
self.input=None
|
|
self.css=None
|
|
self.value=None
|
|
self.valid=None
|
|
self.invalid=None
|
|
self.validate(1)
|
|
def validate(self,val=1,update=1):
|
|
if val:
|
|
self.valid=1
|
|
self.invalid=0
|
|
self.css=self.cssvalid
|
|
else:
|
|
self.valid=0
|
|
self.invalid=1
|
|
self.css=self.cssinvalid
|
|
if update and self.form:
|
|
self.form.update()
|
|
def invalidate(self,update=1):
|
|
self.validate(0,update)
|
|
class QWebForm:
|
|
class QWebFormF:
|
|
pass
|
|
def __init__(self,e=None,arg=None,default=None):
|
|
self.fields={}
|
|
# all fields have been submitted
|
|
self.submitted=False
|
|
self.missing=[]
|
|
# at least one field is invalid or missing
|
|
self.invalid=False
|
|
self.error=[]
|
|
# all fields have been submitted and are valid
|
|
self.valid=False
|
|
# fields under self.f for convenience
|
|
self.f=self.QWebFormF()
|
|
if e:
|
|
self.add_template(e)
|
|
# assume that the fields are done with the template
|
|
if default:
|
|
self.set_default(default,e==None)
|
|
if arg!=None:
|
|
self.process_input(arg)
|
|
def __getitem__(self,k):
|
|
return self.fields[k]
|
|
def set_default(self,default,add_missing=1):
|
|
for k,v in default.items():
|
|
if self.fields.has_key(k):
|
|
self.fields[k].default=str(v)
|
|
elif add_missing:
|
|
self.add_field(QWebField(k,v))
|
|
def add_field(self,f):
|
|
self.fields[f.name]=f
|
|
f.form=self
|
|
setattr(self.f,f.name,f)
|
|
def add_template(self,e):
|
|
att={}
|
|
for (an,av) in e.attributes.items():
|
|
an=str(an)
|
|
if an.startswith("t-"):
|
|
att[an[2:]]=av.encode("utf8")
|
|
for i in ["form-text", "form-password", "form-radio", "form-checkbox", "form-select","form-textarea"]:
|
|
if att.has_key(i):
|
|
name=att[i].split(".")[-1]
|
|
default=att.get("default","")
|
|
check=att.get("check",None)
|
|
f=QWebField(name,default,check)
|
|
if i=="form-textarea":
|
|
f.type="textarea"
|
|
f.trim=0
|
|
if i=="form-checkbox":
|
|
f.type="checkbox"
|
|
f.required=0
|
|
self.add_field(f)
|
|
for n in e.childNodes:
|
|
if n.nodeType==n.ELEMENT_NODE:
|
|
self.add_template(n)
|
|
def process_input(self,arg):
|
|
for f in self.fields.values():
|
|
if arg.has_key(f.name):
|
|
f.input=arg[f.name]
|
|
f.value=f.input
|
|
if f.trim:
|
|
f.input=f.input.strip()
|
|
f.validate(1,False)
|
|
if f.check==None:
|
|
continue
|
|
elif callable(f.check):
|
|
pass
|
|
elif isinstance(f.check,str):
|
|
v=f.check
|
|
if f.check=="email":
|
|
v=r"/^[^@#!& ]+@[A-Za-z0-9-][.A-Za-z0-9-]{0,64}\.[A-Za-z]{2,5}$/"
|
|
if f.check=="date":
|
|
v=r"/^(19|20)\d\d-(0[1-9]|1[012])-(0[1-9]|[12][0-9]|3[01])$/"
|
|
if not re.match(v[1:-1],f.input):
|
|
f.validate(0,False)
|
|
else:
|
|
f.value=f.default
|
|
self.update()
|
|
def validate_all(self,val=1):
|
|
for f in self.fields.values():
|
|
f.validate(val,0)
|
|
self.update()
|
|
def invalidate_all(self):
|
|
self.validate_all(0)
|
|
def update(self):
|
|
self.submitted=True
|
|
self.valid=True
|
|
self.errors=[]
|
|
for f in self.fields.values():
|
|
if f.required and f.input==None:
|
|
self.submitted=False
|
|
self.valid=False
|
|
self.missing.append(f.name)
|
|
if f.invalid:
|
|
self.valid=False
|
|
self.error.append(f.name)
|
|
# invalid have been submitted and
|
|
self.invalid=self.submitted and self.valid==False
|
|
def collect(self):
|
|
d={}
|
|
for f in self.fields.values():
|
|
d[f.name]=f.value
|
|
return d
|
|
class QWebURLEval(QWebEval):
|
|
def __init__(self,data):
|
|
QWebEval.__init__(self,data)
|
|
def __getitem__(self,expr):
|
|
r=QWebEval.__getitem__(self,expr)
|
|
if isinstance(r,str):
|
|
return urllib.quote_plus(r)
|
|
else:
|
|
return r
|
|
class QWebHtml(QWebXml):
|
|
"""QWebHtml
|
|
QWebURL:
|
|
QWebField:
|
|
QWebForm:
|
|
QWebHtml:
|
|
an extended template engine, with a few utility class to easily produce
|
|
HTML, handle URLs and process forms, it adds the following magic attributes:
|
|
|
|
t-href t-action t-form-text t-form-password t-form-textarea t-form-radio
|
|
t-form-checkbox t-form-select t-option t-selected t-checked t-pager
|
|
|
|
# explication URL:
|
|
# v['tableurl']=QWebUrl({p=afdmin,saar=,orderby=,des=,mlink;meta_active=})
|
|
# t-href="tableurl?desc=1"
|
|
#
|
|
# explication FORM: t-if="form.valid()"
|
|
# Foreach i
|
|
# email: <input type="text" t-esc-name="i" t-esc-value="form[i].value" t-esc-class="form[i].css"/>
|
|
# <input type="radio" name="spamtype" t-esc-value="i" t-selected="i==form.f.spamtype.value"/>
|
|
# <option t-esc-value="cc" t-selected="cc==form.f.country.value"><t t-esc="cname"></option>
|
|
# Simple forms:
|
|
# <input t-form-text="form.email" t-check="email"/>
|
|
# <input t-form-password="form.email" t-check="email"/>
|
|
# <input t-form-radio="form.email" />
|
|
# <input t-form-checkbox="form.email" />
|
|
# <textarea t-form-textarea="form.email" t-check="email"/>
|
|
# <select t-form-select="form.email"/>
|
|
# <option t-value="1">
|
|
# <input t-form-radio="form.spamtype" t-value="1"/> Cars
|
|
# <input t-form-radio="form.spamtype" t-value="2"/> Sprt
|
|
"""
|
|
# QWebForm from a template
|
|
def form(self,tname,arg=None,default=None):
|
|
form=QWebForm(self._t[tname],arg,default)
|
|
return form
|
|
|
|
# HTML Att
|
|
def eval_url(self,av,v):
|
|
s=QWebURLEval(v).eval_format(av)
|
|
a=s.split('?',1)
|
|
arg={}
|
|
if len(a)>1:
|
|
for k,v in cgi.parse_qsl(a[1],1):
|
|
arg[k]=v
|
|
b=a[0].split('/',1)
|
|
path=''
|
|
if len(b)>1:
|
|
path=b[1]
|
|
u=b[0]
|
|
return u,path,arg
|
|
def render_att_url_(self,e,an,av,v):
|
|
u,path,arg=self.eval_url(av,v)
|
|
if not isinstance(v.get(u,0),QWebURL):
|
|
out='qweb: missing url %r %r %r'%(u,path,arg)
|
|
else:
|
|
out=v[u].href(path,arg)
|
|
return ' %s="%s"'%(an[6:],cgi.escape(out,1))
|
|
def render_att_href(self,e,an,av,v):
|
|
return self.render_att_url_(e,"t-url-href",av,v)
|
|
def render_att_checked(self,e,an,av,v):
|
|
if self.eval_bool(av,v):
|
|
return ' %s="%s"'%(an[2:],an[2:])
|
|
else:
|
|
return ''
|
|
def render_att_selected(self,e,an,av,v):
|
|
return self.render_att_checked(e,an,av,v)
|
|
|
|
# HTML Tags forms
|
|
def render_tag_rawurl(self,e,t_att,g_att,v):
|
|
u,path,arg=self.eval_url(t_att["rawurl"],v)
|
|
return v[u].href(path,arg)
|
|
def render_tag_escurl(self,e,t_att,g_att,v):
|
|
u,path,arg=self.eval_url(t_att["escurl"],v)
|
|
return cgi.escape(v[u].href(path,arg))
|
|
def render_tag_action(self,e,t_att,g_att,v):
|
|
u,path,arg=self.eval_url(t_att["action"],v)
|
|
if not isinstance(v.get(u,0),QWebURL):
|
|
action,input=('qweb: missing url %r %r %r'%(u,path,arg),'')
|
|
else:
|
|
action,input=v[u].form(path,arg)
|
|
g_att+=' action="%s"'%action
|
|
return self.render_element(e,g_att,v,input)
|
|
def render_tag_form_text(self,e,t_att,g_att,v):
|
|
f=self.eval_object(t_att["form-text"],v)
|
|
g_att+=' type="text" name="%s" value="%s" class="%s"'%(f.name,cgi.escape(f.value,1),f.css)
|
|
return self.render_element(e,g_att,v)
|
|
def render_tag_form_password(self,e,t_att,g_att,v):
|
|
f=self.eval_object(t_att["form-password"],v)
|
|
g_att+=' type="password" name="%s" value="%s" class="%s"'%(f.name,cgi.escape(f.value,1),f.css)
|
|
return self.render_element(e,g_att,v)
|
|
def render_tag_form_textarea(self,e,t_att,g_att,v):
|
|
type="textarea"
|
|
f=self.eval_object(t_att["form-textarea"],v)
|
|
g_att+=' name="%s" class="%s"'%(f.name,f.css)
|
|
r="<%s%s>%s</%s>"%(type,g_att,cgi.escape(f.value,1),type)
|
|
return r
|
|
def render_tag_form_radio(self,e,t_att,g_att,v):
|
|
f=self.eval_object(t_att["form-radio"],v)
|
|
val=t_att["value"]
|
|
g_att+=' type="radio" name="%s" value="%s"'%(f.name,val)
|
|
if f.value==val:
|
|
g_att+=' checked="checked"'
|
|
return self.render_element(e,g_att,v)
|
|
def render_tag_form_checkbox(self,e,t_att,g_att,v):
|
|
f=self.eval_object(t_att["form-checkbox"],v)
|
|
val=t_att["value"]
|
|
g_att+=' type="checkbox" name="%s" value="%s"'%(f.name,val)
|
|
if f.value==val:
|
|
g_att+=' checked="checked"'
|
|
return self.render_element(e,g_att,v)
|
|
def render_tag_form_select(self,e,t_att,g_att,v):
|
|
f=self.eval_object(t_att["form-select"],v)
|
|
g_att+=' name="%s" class="%s"'%(f.name,f.css)
|
|
return self.render_element(e,g_att,v)
|
|
def render_tag_option(self,e,t_att,g_att,v):
|
|
f=self.eval_object(e.parentNode.getAttribute("t-form-select"),v)
|
|
val=t_att["option"]
|
|
g_att+=' value="%s"'%(val)
|
|
if f.value==val:
|
|
g_att+=' selected="selected"'
|
|
return self.render_element(e,g_att,v)
|
|
|
|
# HTML Tags others
|
|
def render_tag_pager(self,e,t_att,g_att,v):
|
|
pre=t_att["pager"]
|
|
total=int(self.eval_str(t_att["total"],v))
|
|
start=int(self.eval_str(t_att["start"],v))
|
|
step=int(self.eval_str(t_att.get("step","100"),v))
|
|
scope=int(self.eval_str(t_att.get("scope","5"),v))
|
|
# Compute Pager
|
|
p=pre+"_"
|
|
d={}
|
|
d[p+"tot_size"]=total
|
|
d[p+"tot_page"]=tot_page=total/step
|
|
d[p+"win_start0"]=total and start
|
|
d[p+"win_start1"]=total and start+1
|
|
d[p+"win_end0"]=max(0,min(start+step-1,total-1))
|
|
d[p+"win_end1"]=min(start+step,total)
|
|
d[p+"win_page0"]=win_page=start/step
|
|
d[p+"win_page1"]=win_page+1
|
|
d[p+"prev"]=(win_page!=0)
|
|
d[p+"prev_start"]=(win_page-1)*step
|
|
d[p+"next"]=(tot_page>=win_page+1)
|
|
d[p+"next_start"]=(win_page+1)*step
|
|
l=[]
|
|
begin=win_page-scope
|
|
end=win_page+scope
|
|
if begin<0:
|
|
end-=begin
|
|
if end>tot_page:
|
|
begin-=(end-tot_page)
|
|
i=max(0,begin)
|
|
while i<=min(end,tot_page) and total!=step:
|
|
l.append( { p+"page0":i, p+"page1":i+1, p+"start":i*step, p+"sel":(win_page==i) })
|
|
i+=1
|
|
d[p+"active"]=len(l)>1
|
|
d[p+"list"]=l
|
|
# Update v
|
|
v.update(d)
|
|
return ""
|
|
|
|
#----------------------------------------------------------
|
|
# QWeb Simple Controller
|
|
#----------------------------------------------------------
|
|
def qweb_control(self,jump='main',p=[]):
|
|
""" qweb_control(self,jump='main',p=[]):
|
|
A simple function to handle the controler part of your application. It
|
|
dispatch the control to the jump argument, while ensuring that prefix
|
|
function have been called.
|
|
|
|
qweb_control replace '/' to '_' and strip '_' from the jump argument.
|
|
|
|
name1
|
|
name1_name2
|
|
name1_name2_name3
|
|
|
|
"""
|
|
jump=jump.replace('/','_').strip('_')
|
|
if not hasattr(self,jump):
|
|
return 0
|
|
done={}
|
|
todo=[]
|
|
while 1:
|
|
if jump!=None:
|
|
tmp=""
|
|
todo=[]
|
|
for i in jump.split("_"):
|
|
tmp+=i+"_";
|
|
if not done.has_key(tmp[:-1]):
|
|
todo.append(tmp[:-1])
|
|
jump=None
|
|
elif len(todo):
|
|
i=todo.pop(0)
|
|
done[i]=1
|
|
if hasattr(self,i):
|
|
f=getattr(self,i)
|
|
r=f(*p)
|
|
if isinstance(r,types.StringType):
|
|
jump=r
|
|
else:
|
|
break
|
|
return 1
|
|
|
|
#----------------------------------------------------------
|
|
# QWeb WSGI Request handler
|
|
#----------------------------------------------------------
|
|
class QWebSession(dict):
|
|
def __init__(self,environ,**kw):
|
|
dict.__init__(self)
|
|
default={
|
|
"path" : tempfile.gettempdir(),
|
|
"cookie_name" : "QWEBSID",
|
|
"cookie_lifetime" : 0,
|
|
"cookie_path" : '/',
|
|
"cookie_domain" : '',
|
|
"limit_cache" : 1,
|
|
"probability" : 0.01,
|
|
"maxlifetime" : 3600,
|
|
"disable" : 0,
|
|
}
|
|
for k,v in default.items():
|
|
setattr(self,'session_%s'%k,kw.get(k,v))
|
|
# Try to find session
|
|
self.session_found_cookie=0
|
|
self.session_found_url=0
|
|
self.session_found=0
|
|
self.session_orig=""
|
|
# Try cookie
|
|
c=Cookie.SimpleCookie()
|
|
c.load(environ.get('HTTP_COOKIE', ''))
|
|
if c.has_key(self.session_cookie_name):
|
|
sid=c[self.session_cookie_name].value[:64]
|
|
if re.match('[a-f0-9]+$',sid) and self.session_load(sid):
|
|
self.session_id=sid
|
|
self.session_found_cookie=1
|
|
self.session_found=1
|
|
# Try URL
|
|
if not self.session_found_cookie:
|
|
mo=re.search('&%s=([a-f0-9]+)'%self.session_cookie_name,environ.get('QUERY_STRING',''))
|
|
if mo and self.session_load(mo.group(1)):
|
|
self.session_id=mo.group(1)
|
|
self.session_found_url=1
|
|
self.session_found=1
|
|
# New session
|
|
if not self.session_found:
|
|
self.session_id='%032x'%random.randint(1,2**128)
|
|
self.session_trans_sid="&%s=%s"%(self.session_cookie_name,self.session_id)
|
|
# Clean old session
|
|
if random.random() < self.session_probability:
|
|
self.session_clean()
|
|
def session_get_headers(self):
|
|
h=[]
|
|
if (not self.session_disable) and (len(self) or len(self.session_orig)):
|
|
self.session_save()
|
|
if not self.session_found_cookie:
|
|
c=Cookie.SimpleCookie()
|
|
c[self.session_cookie_name] = self.session_id
|
|
c[self.session_cookie_name]['path'] = self.session_cookie_path
|
|
if self.session_cookie_domain:
|
|
c[self.session_cookie_name]['domain'] = self.session_cookie_domain
|
|
# if self.session_cookie_lifetime:
|
|
# c[self.session_cookie_name]['expires'] = TODO date localtime or not, datetime.datetime(1970, 1, 1)
|
|
h.append(("Set-Cookie", c[self.session_cookie_name].OutputString()))
|
|
if self.session_limit_cache:
|
|
h.append(('Cache-Control','no-store, no-cache, must-revalidate, post-check=0, pre-check=0'))
|
|
h.append(('Expires','Thu, 19 Nov 1981 08:52:00 GMT'))
|
|
h.append(('Pragma','no-cache'))
|
|
return h
|
|
def session_load(self,sid):
|
|
fname=os.path.join(self.session_path,'qweb_sess_%s'%sid)
|
|
try:
|
|
orig=file(fname).read()
|
|
d=pickle.loads(orig)
|
|
except:
|
|
return
|
|
self.session_orig=orig
|
|
self.update(d)
|
|
return 1
|
|
def session_save(self):
|
|
if not os.path.isdir(self.session_path):
|
|
os.makedirs(self.session_path)
|
|
fname=os.path.join(self.session_path,'qweb_sess_%s'%self.session_id)
|
|
try:
|
|
oldtime=os.path.getmtime(fname)
|
|
except OSError,IOError:
|
|
oldtime=0
|
|
dump=pickle.dumps(self.copy())
|
|
if (dump != self.session_orig) or (time.time() > oldtime+self.session_maxlifetime/4):
|
|
tmpname=os.path.join(self.session_path,'qweb_sess_%s_%x'%(self.session_id,random.randint(1,2**32)))
|
|
f=file(tmpname,'wb')
|
|
f.write(dump)
|
|
f.close()
|
|
if sys.platform=='win32' and os.path.isfile(fname):
|
|
os.remove(fname)
|
|
os.rename(tmpname,fname)
|
|
def session_clean(self):
|
|
t=time.time()
|
|
try:
|
|
for i in [os.path.join(self.session_path,i) for i in os.listdir(self.session_path) if i.startswith('qweb_sess_')]:
|
|
if (t > os.path.getmtime(i)+self.session_maxlifetime):
|
|
os.unlink(i)
|
|
except OSError,IOError:
|
|
pass
|
|
class QWebSessionMem(QWebSession):
|
|
def session_load(self,sid):
|
|
global _qweb_sessions
|
|
if not "_qweb_sessions" in globals():
|
|
_qweb_sessions={}
|
|
if _qweb_sessions.has_key(sid):
|
|
self.session_orig=_qweb_sessions[sid]
|
|
self.update(self.session_orig)
|
|
return 1
|
|
def session_save(self):
|
|
global _qweb_sessions
|
|
if not "_qweb_sessions" in globals():
|
|
_qweb_sessions={}
|
|
_qweb_sessions[self.session_id]=self.copy()
|
|
class QWebSessionService:
|
|
def __init__(self, wsgiapp, url_rewrite=0):
|
|
self.wsgiapp=wsgiapp
|
|
self.url_rewrite_tags="a=href,area=href,frame=src,form=,fieldset="
|
|
def __call__(self, environ, start_response):
|
|
# TODO
|
|
# use QWebSession to provide environ["qweb.session"]
|
|
return self.wsgiapp(environ,start_response)
|
|
class QWebDict(dict):
|
|
def __init__(self,*p):
|
|
dict.__init__(self,*p)
|
|
def __getitem__(self,key):
|
|
return self.get(key,"")
|
|
def int(self,key):
|
|
try:
|
|
return int(self.get(key,"0"))
|
|
except ValueError:
|
|
return 0
|
|
class QWebListDict(dict):
|
|
def __init__(self,*p):
|
|
dict.__init__(self,*p)
|
|
def __getitem__(self,key):
|
|
return self.get(key,[])
|
|
def appendlist(self,key,val):
|
|
if self.has_key(key):
|
|
self[key].append(val)
|
|
else:
|
|
self[key]=[val]
|
|
def get_qwebdict(self):
|
|
d=QWebDict()
|
|
for k,v in self.items():
|
|
d[k]=v[-1]
|
|
return d
|
|
class QWebRequest:
|
|
"""QWebRequest a WSGI request handler.
|
|
|
|
QWebRequest is a WSGI request handler that feature GET, POST and POST
|
|
multipart methods, handles cookies and headers and provide a dict-like
|
|
SESSION Object (either on the filesystem or in memory).
|
|
|
|
It is constructed with the environ and start_response WSGI arguments:
|
|
|
|
req=qweb.QWebRequest(environ, start_response)
|
|
|
|
req has the folowing attributes :
|
|
|
|
req.environ standard WSGI dict (CGI and wsgi ones)
|
|
|
|
Some CGI vars as attributes from environ for convenience:
|
|
|
|
req.SCRIPT_NAME
|
|
req.PATH_INFO
|
|
req.REQUEST_URI
|
|
|
|
Some computed value (also for convenience)
|
|
|
|
req.FULL_URL full URL recontructed (http://host/query)
|
|
req.FULL_PATH (URL path before ?querystring)
|
|
|
|
Dict constructed from querystring and POST datas, PHP-like.
|
|
|
|
req.GET contains GET vars
|
|
req.POST contains POST vars
|
|
req.REQUEST contains merge of GET and POST
|
|
req.FILES contains uploaded files
|
|
req.GET_LIST req.POST_LIST req.REQUEST_LIST req.FILES_LIST multiple arguments versions
|
|
req.debug() returns an HTML dump of those vars
|
|
|
|
A dict-like session object.
|
|
|
|
req.SESSION the session start when the dict is not empty.
|
|
|
|
Attribute for handling the response
|
|
|
|
req.response_headers dict-like to set headers
|
|
req.response_cookies a SimpleCookie to set cookies
|
|
req.response_status a string to set the status like '200 OK'
|
|
|
|
req.write() to write to the buffer
|
|
|
|
req itselfs is an iterable object with the buffer, it will also also call
|
|
start_response automatically before returning anything via the iterator.
|
|
|
|
To make it short, it means that you may use
|
|
|
|
return req
|
|
|
|
at the end of your request handling to return the reponse to any WSGI
|
|
application server.
|
|
"""
|
|
#
|
|
# This class contains part ripped from colubrid (with the permission of
|
|
# mitsuhiko) see http://wsgiarea.pocoo.org/colubrid/
|
|
#
|
|
# - the class HttpHeaders
|
|
# - the method load_post_data (tuned version)
|
|
#
|
|
class HttpHeaders(object):
|
|
def __init__(self):
|
|
self.data = [('Content-Type', 'text/html')]
|
|
def __setitem__(self, key, value):
|
|
self.set(key, value)
|
|
def __delitem__(self, key):
|
|
self.remove(key)
|
|
def __contains__(self, key):
|
|
key = key.lower()
|
|
for k, v in self.data:
|
|
if k.lower() == key:
|
|
return True
|
|
return False
|
|
def add(self, key, value):
|
|
self.data.append((key, value))
|
|
def remove(self, key, count=-1):
|
|
removed = 0
|
|
data = []
|
|
for _key, _value in self.data:
|
|
if _key.lower() != key.lower():
|
|
if count > -1:
|
|
if removed >= count:
|
|
break
|
|
else:
|
|
removed += 1
|
|
data.append((_key, _value))
|
|
self.data = data
|
|
def clear(self):
|
|
self.data = []
|
|
def set(self, key, value):
|
|
self.remove(key)
|
|
self.add(key, value)
|
|
def get(self, key=False, httpformat=False):
|
|
if not key:
|
|
result = self.data
|
|
else:
|
|
result = []
|
|
for _key, _value in self.data:
|
|
if _key.lower() == key.lower():
|
|
result.append((_key, _value))
|
|
if httpformat:
|
|
return '\n'.join(['%s: %s' % item for item in result])
|
|
return result
|
|
def load_post_data(self,environ,POST,FILES):
|
|
length = int(environ['CONTENT_LENGTH'])
|
|
DATA = environ['wsgi.input'].read(length)
|
|
if environ.get('CONTENT_TYPE', '').startswith('multipart'):
|
|
lines = ['Content-Type: %s' % environ.get('CONTENT_TYPE', '')]
|
|
for key, value in environ.items():
|
|
if key.startswith('HTTP_'):
|
|
lines.append('%s: %s' % (key, value))
|
|
raw = '\r\n'.join(lines) + '\r\n\r\n' + DATA
|
|
msg = email.message_from_string(raw)
|
|
for sub in msg.get_payload():
|
|
if not isinstance(sub, email.Message.Message):
|
|
continue
|
|
name_dict = cgi.parse_header(sub['Content-Disposition'])[1]
|
|
if 'filename' in name_dict:
|
|
# Nested MIME Messages are not supported'
|
|
if type([]) == type(sub.get_payload()):
|
|
continue
|
|
if not name_dict['filename'].strip():
|
|
continue
|
|
filename = name_dict['filename']
|
|
# why not keep all the filename? because IE always send 'C:\documents and settings\blub\blub.png'
|
|
filename = filename[filename.rfind('\\') + 1:]
|
|
if 'Content-Type' in sub:
|
|
content_type = sub['Content-Type']
|
|
else:
|
|
content_type = None
|
|
s = { "name":filename, "type":content_type, "data":sub.get_payload() }
|
|
FILES.appendlist(name_dict['name'], s)
|
|
else:
|
|
POST.appendlist(name_dict['name'], sub.get_payload())
|
|
else:
|
|
POST.update(cgi.parse_qs(DATA,keep_blank_values=1))
|
|
return DATA
|
|
|
|
def __init__(self,environ,start_response,session=QWebSession):
|
|
self.environ=environ
|
|
self.start_response=start_response
|
|
self.buffer=[]
|
|
|
|
self.SCRIPT_NAME = environ.get('SCRIPT_NAME', '')
|
|
self.PATH_INFO = environ.get('PATH_INFO', '')
|
|
# extensions:
|
|
self.FULL_URL = environ['FULL_URL'] = self.get_full_url(environ)
|
|
# REQUEST_URI is optional, fake it if absent
|
|
if not environ.has_key("REQUEST_URI"):
|
|
environ["REQUEST_URI"]=urllib.quote(self.SCRIPT_NAME+self.PATH_INFO)
|
|
if environ.get('QUERY_STRING'):
|
|
environ["REQUEST_URI"]+='?'+environ['QUERY_STRING']
|
|
self.REQUEST_URI = environ["REQUEST_URI"]
|
|
# full quote url path before the ?
|
|
self.FULL_PATH = environ['FULL_PATH'] = self.REQUEST_URI.split('?')[0]
|
|
|
|
self.request_cookies=Cookie.SimpleCookie()
|
|
self.request_cookies.load(environ.get('HTTP_COOKIE', ''))
|
|
|
|
self.response_started=False
|
|
self.response_gzencode=False
|
|
self.response_cookies=Cookie.SimpleCookie()
|
|
# to delete a cookie use: c[key]['expires'] = datetime.datetime(1970, 1, 1)
|
|
self.response_headers=self.HttpHeaders()
|
|
self.response_status="200 OK"
|
|
|
|
self.php=None
|
|
if self.environ.has_key("php"):
|
|
self.php=environ["php"]
|
|
self.SESSION=self.php._SESSION
|
|
self.GET=self.php._GET
|
|
self.POST=self.php._POST
|
|
self.REQUEST=self.php._ARG
|
|
self.FILES=self.php._FILES
|
|
else:
|
|
if isinstance(session,QWebSession):
|
|
self.SESSION=session
|
|
elif session:
|
|
self.SESSION=session(environ)
|
|
else:
|
|
self.SESSION=None
|
|
self.GET_LIST=QWebListDict(cgi.parse_qs(environ.get('QUERY_STRING', ''),keep_blank_values=1))
|
|
self.POST_LIST=QWebListDict()
|
|
self.FILES_LIST=QWebListDict()
|
|
self.REQUEST_LIST=QWebListDict(self.GET_LIST)
|
|
if environ['REQUEST_METHOD'] == 'POST':
|
|
self.DATA=self.load_post_data(environ,self.POST_LIST,self.FILES_LIST)
|
|
self.REQUEST_LIST.update(self.POST_LIST)
|
|
self.GET=self.GET_LIST.get_qwebdict()
|
|
self.POST=self.POST_LIST.get_qwebdict()
|
|
self.FILES=self.FILES_LIST.get_qwebdict()
|
|
self.REQUEST=self.REQUEST_LIST.get_qwebdict()
|
|
def get_full_url(environ):
|
|
# taken from PEP 333
|
|
if 'FULL_URL' in environ:
|
|
return environ['FULL_URL']
|
|
url = environ['wsgi.url_scheme']+'://'
|
|
if environ.get('HTTP_HOST'):
|
|
url += environ['HTTP_HOST']
|
|
else:
|
|
url += environ['SERVER_NAME']
|
|
if environ['wsgi.url_scheme'] == 'https':
|
|
if environ['SERVER_PORT'] != '443':
|
|
url += ':' + environ['SERVER_PORT']
|
|
else:
|
|
if environ['SERVER_PORT'] != '80':
|
|
url += ':' + environ['SERVER_PORT']
|
|
if environ.has_key('REQUEST_URI'):
|
|
url += environ['REQUEST_URI']
|
|
else:
|
|
url += urllib.quote(environ.get('SCRIPT_NAME', ''))
|
|
url += urllib.quote(environ.get('PATH_INFO', ''))
|
|
if environ.get('QUERY_STRING'):
|
|
url += '?' + environ['QUERY_STRING']
|
|
return url
|
|
get_full_url=staticmethod(get_full_url)
|
|
def save_files(self):
|
|
for k,v in self.FILES.items():
|
|
if not v.has_key("tmp_file"):
|
|
f=tempfile.NamedTemporaryFile()
|
|
f.write(v["data"])
|
|
f.flush()
|
|
v["tmp_file"]=f
|
|
v["tmp_name"]=f.name
|
|
def debug(self):
|
|
body=''
|
|
for name,d in [
|
|
("GET",self.GET), ("POST",self.POST), ("REQUEST",self.REQUEST), ("FILES",self.FILES),
|
|
("GET_LIST",self.GET_LIST), ("POST_LIST",self.POST_LIST), ("REQUEST_LIST",self.REQUEST_LIST), ("FILES_LIST",self.FILES_LIST),
|
|
("SESSION",self.SESSION), ("environ",self.environ),
|
|
]:
|
|
body+='<table border="1" width="100%" align="center">\n'
|
|
body+='<tr><th colspan="2" align="center">%s</th></tr>\n'%name
|
|
keys=d.keys()
|
|
keys.sort()
|
|
body+=''.join(['<tr><td>%s</td><td>%s</td></tr>\n'%(k,cgi.escape(repr(d[k]))) for k in keys])
|
|
body+='</table><br><br>\n\n'
|
|
return body
|
|
def write(self,s):
|
|
self.buffer.append(s)
|
|
def echo(self,*s):
|
|
self.buffer.extend([str(i) for i in s])
|
|
def response(self):
|
|
if not self.response_started:
|
|
if not self.php:
|
|
for k,v in self.FILES.items():
|
|
if v.has_key("tmp_file"):
|
|
try:
|
|
v["tmp_file"].close()
|
|
except OSError:
|
|
pass
|
|
if self.response_gzencode and self.environ.get('HTTP_ACCEPT_ENCODING','').find('gzip')!=-1:
|
|
zbuf=StringIO.StringIO()
|
|
zfile=gzip.GzipFile(mode='wb', fileobj=zbuf)
|
|
zfile.write(''.join(self.buffer))
|
|
zfile.close()
|
|
zbuf=zbuf.getvalue()
|
|
self.buffer=[zbuf]
|
|
self.response_headers['Content-Encoding']="gzip"
|
|
self.response_headers['Content-Length']=str(len(zbuf))
|
|
headers = self.response_headers.get()
|
|
if isinstance(self.SESSION, QWebSession):
|
|
headers.extend(self.SESSION.session_get_headers())
|
|
headers.extend([('Set-Cookie', self.response_cookies[i].OutputString()) for i in self.response_cookies])
|
|
self.start_response(self.response_status, headers)
|
|
self.response_started=True
|
|
return self.buffer
|
|
def __iter__(self):
|
|
return self.response().__iter__()
|
|
def http_redirect(self,url,permanent=1):
|
|
if permanent:
|
|
self.response_status="301 Moved Permanently"
|
|
else:
|
|
self.response_status="302 Found"
|
|
self.response_headers["Location"]=url
|
|
def http_404(self,msg="<h1>404 Not Found</h1>"):
|
|
self.response_status="404 Not Found"
|
|
if msg:
|
|
self.write(msg)
|
|
def http_download(self,fname,fstr,partial=0):
|
|
# allow fstr to be a file-like object
|
|
# if parital:
|
|
# say accept ranages
|
|
# parse range headers...
|
|
# if range:
|
|
# header("HTTP/1.1 206 Partial Content");
|
|
# header("Content-Range: bytes $offset-".($fsize-1)."/".$fsize);
|
|
# header("Content-Length: ".($fsize-$offset));
|
|
# fseek($fd,$offset);
|
|
# else:
|
|
self.response_headers["Content-Type"]="application/octet-stream"
|
|
self.response_headers["Content-Disposition"]="attachment; filename=\"%s\""%fname
|
|
self.response_headers["Content-Transfer-Encoding"]="binary"
|
|
self.response_headers["Content-Length"]="%d"%len(fstr)
|
|
self.write(fstr)
|
|
|
|
#----------------------------------------------------------
|
|
# QWeb WSGI HTTP Server to run any WSGI app
|
|
# autorun, run an app as FCGI or CGI otherwise launch the server
|
|
#----------------------------------------------------------
|
|
class QWebWSGIHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
|
def log_message(self,*p):
|
|
if self.server.log:
|
|
return BaseHTTPServer.BaseHTTPRequestHandler.log_message(self,*p)
|
|
def address_string(self):
|
|
return self.client_address[0]
|
|
def start_response(self,status,headers):
|
|
l=status.split(' ',1)
|
|
self.send_response(int(l[0]),l[1])
|
|
ctype_sent=0
|
|
for i in headers:
|
|
if i[0].lower()=="content-type":
|
|
ctype_sent=1
|
|
self.send_header(*i)
|
|
if not ctype_sent:
|
|
self.send_header("Content-type", "text/html")
|
|
self.end_headers()
|
|
return self.write
|
|
def write(self,data):
|
|
try:
|
|
self.wfile.write(data)
|
|
except (socket.error, socket.timeout),e:
|
|
print e
|
|
def bufferon(self):
|
|
if not getattr(self,'wfile_buf',0):
|
|
self.wfile_buf=1
|
|
self.wfile_bak=self.wfile
|
|
self.wfile=StringIO.StringIO()
|
|
def bufferoff(self):
|
|
if self.wfile_buf:
|
|
buf=self.wfile
|
|
self.wfile=self.wfile_bak
|
|
self.write(buf.getvalue())
|
|
self.wfile_buf=0
|
|
def serve(self,type):
|
|
path_info, parameters, query = urlparse.urlparse(self.path)[2:5]
|
|
environ = {
|
|
'wsgi.version': (1,0),
|
|
'wsgi.url_scheme': 'http',
|
|
'wsgi.input': self.rfile,
|
|
'wsgi.errors': sys.stderr,
|
|
'wsgi.multithread': 0,
|
|
'wsgi.multiprocess': 0,
|
|
'wsgi.run_once': 0,
|
|
'REQUEST_METHOD': self.command,
|
|
'SCRIPT_NAME': '',
|
|
'QUERY_STRING': query,
|
|
'CONTENT_TYPE': self.headers.get('Content-Type', ''),
|
|
'CONTENT_LENGTH': self.headers.get('Content-Length', ''),
|
|
'REMOTE_ADDR': self.client_address[0],
|
|
'REMOTE_PORT': str(self.client_address[1]),
|
|
'SERVER_NAME': self.server.server_address[0],
|
|
'SERVER_PORT': str(self.server.server_address[1]),
|
|
'SERVER_PROTOCOL': self.request_version,
|
|
# extention
|
|
'FULL_PATH': self.path,
|
|
'qweb.mode': 'standalone',
|
|
}
|
|
if path_info:
|
|
environ['PATH_INFO'] = urllib.unquote(path_info)
|
|
for key, value in self.headers.items():
|
|
environ['HTTP_' + key.upper().replace('-', '_')] = value
|
|
# Hack to avoid may TCP packets
|
|
self.bufferon()
|
|
appiter=self.server.wsgiapp(environ, self.start_response)
|
|
for data in appiter:
|
|
self.write(data)
|
|
self.bufferoff()
|
|
self.bufferoff()
|
|
def do_GET(self):
|
|
self.serve('GET')
|
|
def do_POST(self):
|
|
self.serve('GET')
|
|
class QWebWSGIServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
|
|
""" QWebWSGIServer
|
|
qweb_wsgi_autorun(wsgiapp,ip='127.0.0.1',port=8080,threaded=1)
|
|
A WSGI HTTP server threaded or not and a function to automatically run your
|
|
app according to the environement (either standalone, CGI or FastCGI).
|
|
|
|
This feature is called QWeb autorun. If you want to To use it on your
|
|
application use the following lines at the end of the main application
|
|
python file:
|
|
|
|
if __name__ == '__main__':
|
|
qweb.qweb_wsgi_autorun(your_wsgi_app)
|
|
|
|
this function will select the approriate running mode according to the
|
|
calling environement (http-server, FastCGI or CGI).
|
|
"""
|
|
def __init__(self, wsgiapp, ip, port, threaded=1, log=1):
|
|
BaseHTTPServer.HTTPServer.__init__(self, (ip, port), QWebWSGIHandler)
|
|
self.wsgiapp = wsgiapp
|
|
self.threaded = threaded
|
|
self.log = log
|
|
def process_request(self,*p):
|
|
if self.threaded:
|
|
return SocketServer.ThreadingMixIn.process_request(self,*p)
|
|
else:
|
|
return BaseHTTPServer.HTTPServer.process_request(self,*p)
|
|
def qweb_wsgi_autorun(wsgiapp,ip='127.0.0.1',port=8080,threaded=1,log=1,callback_ready=None):
|
|
if sys.platform=='win32':
|
|
fcgi=0
|
|
else:
|
|
fcgi=1
|
|
sock = socket.fromfd(0, socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
sock.getpeername()
|
|
except socket.error, e:
|
|
if e[0] == errno.ENOTSOCK:
|
|
fcgi=0
|
|
if fcgi or os.environ.has_key('REQUEST_METHOD'):
|
|
import fcgi
|
|
fcgi.WSGIServer(wsgiapp,multithreaded=False).run()
|
|
else:
|
|
if log:
|
|
print 'Serving on %s:%d'%(ip,port)
|
|
s=QWebWSGIServer(wsgiapp,ip=ip,port=port,threaded=threaded,log=log)
|
|
if callback_ready:
|
|
callback_ready()
|
|
try:
|
|
s.serve_forever()
|
|
except KeyboardInterrupt,e:
|
|
sys.excepthook(*sys.exc_info())
|
|
|
|
#----------------------------------------------------------
|
|
# Qweb Documentation
|
|
#----------------------------------------------------------
|
|
def qweb_doc():
|
|
body=__doc__
|
|
for i in [QWebXml ,QWebHtml ,QWebForm ,QWebURL ,qweb_control ,QWebRequest ,QWebSession ,QWebWSGIServer ,qweb_wsgi_autorun]:
|
|
n=i.__name__
|
|
d=i.__doc__
|
|
body+='\n\n%s\n%s\n\n%s'%(n,'-'*len(n),d)
|
|
return body
|
|
|
|
print qweb_doc()
|
|
|
|
#
|