refactored

This commit is contained in:
Roland Hedberg
2009-11-06 19:40:57 +01:00
parent 64be8427a4
commit c571ece8a9

View File

@@ -19,8 +19,8 @@
or attribute authority (AA) may use to conclude its tasks.
"""
from saml2 import saml, samlp
from saml2.utils import sid, decode_base64_and_inflate
from saml2.time_util import instant
from saml2.utils import sid, decode_base64_and_inflate, make_instance
from saml2.time_util import instant, in_a_while
from saml2.metadata import MetaData
class VersionMismatch(Exception):
@@ -44,9 +44,21 @@ EXCEPTION2STATUS = {
def properties(klass):
props = [val[0] for key,val in klass.c_children.items()]
props.extend(klass.c_attributes)
props.extend(klass.c_attributes.values())
return props
def klassdict(klass, text=None, **kwargs):
spec = {}
if text:
spec["text"] = text
props = properties(klass)
#print props
for key, val in kwargs.items():
#print "?",key
if key in props:
spec[key] = val
return spec
class Server(object):
def __init__(self, config, log=None):
if config:
@@ -84,131 +96,81 @@ class Server(object):
print "Configuration: %s" % (self.conf,)
def issuer(self):
return {
"format": saml.NAMEID_FORMAT_ENTITY,
"text": self.conf["entityid"]
}
return klassdict( saml.Issuer, self.conf["entityid"],
format=saml.NAMEID_FORMAT_ENTITY)
def status_from_exception(self, exception):
return {
"status_code": {
"value": samlp.STATUS_RESPONDER,
"status_code": {
"value": EXCEPTION2STATUS[exception.__class__],
},
},
"status_message": exception.args[0],
}
return klassdict(samlp.Status,
status_code=klassdict(samlp.StatusCode,
value=samlp.STATUS_RESPONDER,
status_code=klassdict(samlp.StatusCode,
value=EXCEPTION2STATUS[exception.__class__])
),
status_message=exception.args[0],
)
def status(self, status, message=None, status_code=None):
res = {
"status_code": {
"value": status,
}
}
if message:
res["status_message"] = message
if status_code:
res["status_code"].update(status_code)
return res
def audience_restriction(self, audience):
return { "audience": audience }
def status_message(self, text="", **kwargs):
return klassdict(samlp.StatusMessage, text, **kwargs)
def conditions(self, not_before=None, not_on_or_after=None,
audience_restriction=None, condition=None,
one_time_use=None, proxy_restriction=None):
res = {}
if not_before:
res["not_before"] = not_before
if not_on_or_after:
res["not_on_or_after"] = not_on_or_after
if audience_restriction:
res["audience_restriction"] = audience_restriction
# The ones below are hardly used
if condition:
res["condition"] = condition
if one_time_use:
res["one_time_use"] = one_time_use
if proxy_restriction:
res["proxy_restriction"] = proxy_restriction
def status_code(self, text="", **kwargs):
return klassdict(samlp.StatusCode, text, **kwargs)
def status(self, text="", **kwargs):
return klassdict(samlp.Status, text, **kwargs)
return res
def success_status(self):
return self.status(status_code=self.status_code(
value=samlp.STATUS_SUCCESS))
def audience(self, text="", **kwargs):
return klassdict(saml.Audience, text, **kwargs)
def audience_restriction(self, text="", **kwargs):
return klassdict(saml.AudienceRestriction, text, **kwargs)
def conditions(self, text="", **kwargs):
return klassdict(saml.Conditions, text, **kwargs)
def attribute(self, value="", name="", name_format="", friendly_name=""):
dic = {}
if value:
dic["attribute_value"] = value
if name:
dic["name"] = name
if name_format:
dic["name_format"] = name_format
if friendly_name:
dic["friendly_name"] = friendly_name
return dic
def attribute_statement(self, attribute):
return { "attribute": attribute }
def attribute(self, text="", **kwargs):
return klassdict(saml.Attribute, text, **kwargs)
def attribute_value(self, text="", **kwargs):
return klassdict(saml.AttributeValue, text, **kwargs)
def attribute_statement(self, text="", **kwargs):
return klassdict(saml.AttributeStatement, text, **kwargs)
def subject(self, name, name_id, subject_confirmation=None):
spec = {
"text": name,
"name_id": name_id,
}
if subject_confirmation:
spec["subject_confirmation"] = subject_confirmation
return spec
def subject_confirmation_data(self, text="", **kwargs):
return klassdict(saml.SubjectConfirmationData, text, **kwargs)
def assertion(self, subject, signature=False,
conditions=None, advice=None, statement=None,
authn_statement=None, authz_desc_statement=None,
attribute_statement=None):
spec = {
"version": "2.0",
"id" : sid(),
"issue_instant" : instant(),
"issuer": self.issuer(),
"subject": subject,
}
def subject_confirmation(self, text="", **kwargs):
return klassdict(saml.SubjectConfirmation, text, **kwargs)
if signature:
spec["signature"] = sigver.pre_signature_part(spec["id"])
if conditions:
spec["conditions"] = conditions
if advice:
spec["advice"] = advice
if statement:
spec["statement"] = statement
if authn_statement:
spec["authn_statement"] = authn_statement
if authz_desc_statement:
spec["authz_desc_statement"] = authz_desc_statement
if attribute_statement:
spec["attribute_statement"] = attribute_statement
return spec
def response(self, in_response_to, destination, status,
consent=None, signature=False, assertion=None,
encrypt=False):
def subject(self, text="", **kwargs):
return klassdict(saml.Subject, text, **kwargs)
spec = {
def authn_statement(self, text="", **kwargs):
return klassdict(saml.Subject, text, **kwargs)
def assertion(self, text="", **kwargs):
kwargs.update({
"version": "2.0",
"id" : sid(),
"issue_instant" : instant(),
})
return klassdict(saml.Assertion, text, **kwargs)
def response(self, signature=False, encrypt=False, **kwargs):
kwargs.update({
"id" : sid(),
"in_response_to": in_response_to,
"version": "2.0",
"issue_instant" : instant(),
"issuer": self.issuer(),
"destination": destination,
"status": status,
}
})
if signature:
spec["signature"] = sigver.pre_signature_part(spec["id"])
if assertion:
spec["assertion"] = assertion
kwargs["signature"] = sigver.pre_signature_part(kwargs["id"])
return spec
return kwargs
def parse_request(self, enc_request):
request_xml = decode_base64_and_inflate(enc_request)
@@ -225,12 +187,17 @@ class Server(object):
try:
consumer_url = self.metadata.consumer_url(spentityid)
except KeyError:
self.log and self.log.info(
"entities: %s" % self.metadata.entity.keys())
raise UnknownPricipal(spentityid)
if not consumer_url: # what to do ?
raise UnsupportedBinding(spentityid)
if consumer_url != return_destination:
# serious error on someones behalf
self.log and self.log.info("%s != %s" % (consumer_url,
return_destination))
print "%s != %s" % (consumer_url, return_destination)
raise OtherError("ConsumerURL and return destination mismatch")
policy = request.name_id_policy
@@ -238,5 +205,68 @@ class Server(object):
policy.format == saml.NAMEID_FORMAT_TRANSIENT:
name_id_policies = policy.format
return (consumer_url, id, name_id_policies)
return (consumer_url, id, name_id_policies, spentityid)
def do_attribute_statement(self, identity):
"""
:param identity: A dictionary with fiendly names as keys
:return:
"""
attrs = []
for key, val in identity.items():
dic = {}
if isinstance(val,basestring):
attrval = self.attribute_value(val)
elif isinstance(val,list):
attrval = [self.attribute_value(v) for v in val]
else:
raise OtherError("strange value type on: %s" % val)
dic["attribute_value"] = attrval
if isinstance(key, basestring):
dic["name"] = key
elif isinstance(key, tuple): # 3-tuple
(name,format,friendly) = key
if name:
dic["name"] = name
if format:
dic["name_format"] = format
if friendly:
dic["friendly_name"] = friendly
attrs.append(self.attribute(**dic))
return self.attribute_statement(attribute=attrs)
def do_sso_response(self, consumer_url, in_response_to,
sp_entity_id, identity, name_id_policies=None,
subject_id=None ):
attribute_statement = self.do_attribute_statement(identity)
# start using now and for a hour
conditions = self.conditions(
not_before=instant(),
# an hour from now
not_on_or_after=in_a_while(0,0,0,0,0,1),
audience_restriction=self.audience_restriction(
audience=self.audience(sp_entity_id)))
# temporary identifier or ??
subject_id = sid()
tmp = self.response(
in_response_to=in_response_to,
destination=consumer_url,
status=self.success_status(),
assertion=self.assertion(
subject = self.subject(subject_id,
name_id=saml.NAMEID_FORMAT_TRANSIENT,
method=saml.SUBJECT_CONFIRMATION_METHOD_BEARER,
subject_confirmation=self.subject_confirmation(
subject_confirmation_data=self.subject_confirmation_data(
in_response_to=in_response_to))),
attribute_statement = attribute_statement,
authn_statement= self.authn_statement(
authn_instant=instant(),
session_index=sid()),
conditions=conditions,
),
)
return make_instance(samlp.Response, tmp)