#!/usr/bin/env python
# coding: utf8
"""
OpenID authentication for web2py
Allowed using OpenID login together with web2py built-in login.
By default, to support OpenID login, put this in your db.py
>>> from gluon.contrib.login_methods.openid_auth import OpenIDAuth
>>> auth.settings.login_form = OpenIDAuth(auth)
To show OpenID list in user profile, you can add the following code
before the end of function user() of your_app/controllers/default.py
+ if (request.args and request.args(0) == "profile"):
+ form = DIV(form, openid_login_form.list_user_openids())
return dict(form=form, login_form=login_form, register_form=register_form, self_registration=self_registration)
More detail in the description of the class OpenIDAuth.
Requirements:
python-openid version 2.2.5 or later
Reference:
* w2p openID
http://w2popenid.appspot.com/init/default/wiki/w2popenid
* RPX and web2py auth module
http://www.web2pyslices.com/main/slices/take_slice/28
* built-in file: gluon/contrib/login_methods/rpx_account.py
* built-in file: gluon/tools.py (Auth class)
"""
import time
from datetime import datetime, timedelta
from gluon import *
from gluon.storage import Storage, Messages
try:
import openid.consumer.consumer
from openid.association import Association
from openid.store.interface import OpenIDStore
from openid.extensions.sreg import SRegRequest, SRegResponse
from openid.store import nonce
from openid.consumer.discover import DiscoveryFailure
except ImportError, err:
raise ImportError("OpenIDAuth requires python-openid package")
DEFAULT = lambda: None
class OpenIDAuth(object):
"""
OpenIDAuth
It supports the logout_url, implementing the get_user and login_form
for cas usage of gluon.tools.Auth.
It also uses the ExtendedLoginForm to allow the OpenIDAuth login_methods
combined with the standard logon/register procedure.
It uses OpenID Consumer when render the form and begins the OpenID
authentication.
Example: (put these code after auth.define_tables() in your models.)
auth = Auth(globals(), db) # authentication/authorization
...
auth.define_tables() # creates all needed tables
...
#include in your model after auth has been defined
from gluon.contrib.login_methods.openid_auth import OpenIDAuth
openid_login_form = OpenIDAuth(request, auth, db)
from gluon.contrib.login_methods.extended_login_form import ExtendedLoginForm
extended_login_form = ExtendedLoginForm(request, auth, openid_login_form,
signals=['oid','janrain_nonce'])
auth.settings.login_form = extended_login_form
"""
def __init__(self, auth):
self.auth = auth
self.db = auth.db
request = current.request
self.nextvar = '_next'
self.realm = 'http://%s' % request.env.http_host
self.login_url = URL(r=request, f='user', args=['login'])
self.return_to_url = self.realm + self.login_url
self.table_alt_logins_name = "alt_logins"
if not auth.settings.table_user:
raise
self.table_user = self.auth.settings.table_user
self.openid_expiration = 15 #minutes
self.messages = self._define_messages()
if not self.table_alt_logins_name in self.db.tables:
self._define_alt_login_table()
def _define_messages(self):
messages = Messages(current.T)
messages.label_alt_login_username = 'Sign-in with OpenID: '
messages.label_add_alt_login_username = 'Add a new OpenID: '
messages.submit_button = 'Sign in'
messages.submit_button_add = 'Add'
messages.a_delete = 'Delete'
messages.comment_openid_signin = 'What is OpenID?'
messages.comment_openid_help_title = 'Start using your OpenID'
messages.comment_openid_help_url = 'http://openid.net/get-an-openid/start-using-your-openid/'
messages.openid_fail_discover = 'Failed to discover OpenID service. Check your OpenID or "More about OpenID"?'
messages.flash_openid_expired = 'OpenID expired. Please login or authenticate OpenID again. Sorry for the inconvenient.'
messages.flash_openid_associated = 'OpenID associated'
messages.flash_associate_openid = 'Please login or register an account for this OpenID.'
messages.p_openid_not_registered = "This Open ID haven't be registered. " \
+ "Please login to associate with it or register an account for it."
messages.flash_openid_authenticated = 'OpenID authenticated successfully.'
messages.flash_openid_fail_authentication = 'OpenID authentication failed. (Error message: %s)'
messages.flash_openid_canceled = 'OpenID authentication canceled by user.'
messages.flash_openid_need_setup = 'OpenID authentication needs to be setup by the user with the provider first.'
messages.h_openid_login = 'OpenID Login'
messages.h_openid_list = 'OpenID List'
return messages
def _define_alt_login_table(self):
"""
Define the OpenID login table.
Note: type is what I used for our project. We're going to support 'fackbook' and
'plurk' alternate login methods. Otherwise it's always 'openid' and you
may not need it. This should be easy to changed.
(Just remove the field of "type" and remove the
"and db.alt_logins.type == type_" in _find_matched_openid function)
"""
db = self.db
table = db.define_table(
self.table_alt_logins_name,
Field('username', length=512, default=''),
Field('type', length=128, default='openid', readable=False),
Field('user', self.table_user, readable=False),
)
table.username.requires = IS_NOT_IN_DB(db, table.username)
self.table_alt_logins = table
def logout_url(self, next):
"""
Delete the w2popenid record in session as logout
"""
if current.session.w2popenid:
del(current.session.w2popenid)
return next
def login_form(self):
"""
Start to process the OpenID response if 'janrain_nonce' in request parameters
and not processed yet. Else return the OpenID form for login.
"""
request = current.request
if request.vars.has_key('janrain_nonce') and not self._processed():
self._process_response()
return self.auth()
return self._form()
def get_user(self):
"""
It supports the logout_url, implementing the get_user and login_form
for cas usage of gluon.tools.Auth.
"""
request = current.request
args = request.args
if args[0] == 'logout':
return True # Let logout_url got called
if current.session.w2popenid:
w2popenid = current.session.w2popenid
db = self.db
if (w2popenid.ok is True and w2popenid.oid): # OpenID authenticated
if self._w2popenid_expired(w2popenid):
del(current.session.w2popenid)
flash = self.messages.flash_openid_expired
current.session.warning = flash
redirect(self.auth.settings.login_url)
oid = self._remove_protocol(w2popenid.oid)
alt_login = self._find_matched_openid(db, oid)
nextvar = self.nextvar
# This OpenID not in the database. If user logged in then add it
# into database, else ask user to login or register.
if not alt_login:
if self.auth.is_logged_in():
# TODO: ask first maybe
self._associate_user_openid(self.auth.user, oid)
if current.session.w2popenid:
del(current.session.w2popenid)
current.session.flash = self.messages.flash_openid_associated
if request.vars.has_key(nextvar):
redirect(request.vars[nextvar])
redirect(self.auth.settings.login_next)
if not request.vars.has_key(nextvar):
# no next var, add it and do login again
# so if user login or register can go back here to associate the OpenID
redirect(URL(r=request,
args=['login'],
vars={nextvar:self.login_url}))
self.login_form = self._form_with_notification()
current.session.flash = self.messages.flash_associate_openid
return None # need to login or register to associate this openid
# Get existed OpenID user
user = db(self.table_user.id==alt_login.user).select().first()
if user:
if current.session.w2popenid:
del(current.session.w2popenid)
if 'username' in self.table_user.fields():
username = 'username'
elif 'email' in self.table_user.fields():
username = 'email'
return {username: user[username]} if user else None # login success (almost)
return None # just start to login
def _find_matched_openid(self, db, oid, type_='openid'):
"""
Get the matched OpenID for given
"""
query = ((db.alt_logins.username == oid) & (db.alt_logins.type == type_))
alt_login = db(query).select().first() # Get the OpenID record
return alt_login
def _associate_user_openid(self, user, oid):
"""
Associate the user logged in with given OpenID
"""
# print "[DB] %s authenticated" % oid
self.db.alt_logins.insert(username=oid, user=user.id)
def _form_with_notification(self):
"""
Render the form for normal login with a notice of OpenID authenticated
"""
form = DIV()
# TODO: check when will happen
if self.auth.settings.login_form in (self.auth, self):
self.auth.settings.login_form = self.auth
form = DIV(self.auth())
register_note = DIV(P(self.messages.p_openid_not_registered))
form.components.append(register_note)
return lambda: form
def _remove_protocol(self, oid):
"""
Remove https:// or http:// from oid url
"""
protocol = 'https://'
if oid.startswith(protocol):
oid = oid[len(protocol):]
return oid
protocol = 'http://'
if oid.startswith(protocol):
oid = oid[len(protocol):]
return oid
return oid
def _init_consumerhelper(self):
"""
Initialize the ConsumerHelper
"""
if not hasattr(self, "consumerhelper"):
self.consumerhelper = ConsumerHelper(current.session,
self.db)
return self.consumerhelper
def _form(self, style=None):
form = DIV(H3(self.messages.h_openid_login), self._login_form(style))
return form
def _login_form(self,
openid_field_label=None,
submit_button=None,
_next=None,
style=None):
"""
Render the form for OpenID login
"""
def warning_openid_fail(session):
session.warning = messages.openid_fail_discover
style = style or """
background-attachment: scroll;
background-repeat: no-repeat;
background-image: url("http://wiki.openid.net/f/openid-16x16.gif");
background-position: 0% 50%;
background-color: transparent;
padding-left: 18px;
width: 400px;
"""
style = style.replace("\n","")
request = current.request
session = current.session
messages = self.messages
hidden_next_input = ""
if _next == 'profile':
profile_url = URL(r=request, f='user', args=['profile'])
hidden_next_input = INPUT(_type="hidden", _name="_next", _value=profile_url)
form = FORM(openid_field_label or self.messages.label_alt_login_username,
INPUT(_type="input", _name="oid",
requires=IS_NOT_EMPTY(error_message=messages.openid_fail_discover),
_style=style),
hidden_next_input,
INPUT(_type="submit", _value=submit_button or messages.submit_button),
" ",
A(messages.comment_openid_signin,
_href=messages.comment_openid_help_url,
_title=messages.comment_openid_help_title,
_class='openid-identifier',
_target="_blank"),
_action=self.login_url
)
if form.accepts(request.vars, session):
oid = request.vars.oid
consumerhelper = self._init_consumerhelper()
url = self.login_url
return_to_url = self.return_to_url
if not oid:
warning_openid_fail(session)
redirect(url)
try:
if request.vars.has_key('_next'):
return_to_url = self.return_to_url + '?_next=' + request.vars._next
url = consumerhelper.begin(oid, self.realm, return_to_url)
except DiscoveryFailure:
warning_openid_fail(session)
redirect(url)
return form
def _processed(self):
"""
Check if w2popenid authentication is processed.
Return True if processed else False.
"""
processed = (hasattr(current.session, 'w2popenid') and
current.session.w2popenid.ok is True)
return processed
def _set_w2popenid_expiration(self, w2popenid):
"""
Set expiration for OpenID authentication.
"""
w2popenid.expiration = datetime.now() + timedelta(minutes=self.openid_expiration)
def _w2popenid_expired(self, w2popenid):
"""
Check if w2popenid authentication is expired.
Return True if expired else False.
"""
return (not w2popenid.expiration) or (datetime.now() > w2popenid.expiration)
def _process_response(self):
"""
Process the OpenID by ConsumerHelper.
"""
request = current.request
request_vars = request.vars
consumerhelper = self._init_consumerhelper()
process_status = consumerhelper.process_response(request_vars, self.return_to_url)
if process_status == "success":
w2popenid = current.session.w2popenid
user_data = self.consumerhelper.sreg()
current.session.w2popenid.ok = True
self._set_w2popenid_expiration(w2popenid)
w2popenid.user_data = user_data
current.session.flash = self.messages.flash_openid_authenticated
elif process_status == "failure":
flash = self.messages.flash_openid_fail_authentication % consumerhelper.error_message
current.session.warning = flash
elif process_status == "cancel":
current.session.warning = self.messages.flash_openid_canceled
elif process_status == "setup_needed":
current.session.warning = self.messages.flash_openid_need_setup
def list_user_openids(self):
messages = self.messages
request = current.request
if request.vars.has_key('delete_openid'):
self.remove_openid(request.vars.delete_openid)
query = self.db.alt_logins.user == self.auth.user.id
alt_logins = self.db(query).select()
l = []
for alt_login in alt_logins:
username = alt_login.username
delete_href = URL(r=request, f='user',
args=['profile'],
vars={'delete_openid': username})
delete_link = A(messages.a_delete, _href=delete_href)
l.append(LI(username, " ", delete_link))
profile_url = URL(r=request, f='user', args=['profile'])
#return_to_url = self.return_to_url + '?' + self.nextvar + '=' + profile_url
openid_list = DIV(H3(messages.h_openid_list), UL(l),
self._login_form(
_next='profile',
submit_button=messages.submit_button_add,
openid_field_label=messages.label_add_alt_login_username)
)
return openid_list
def remove_openid(self, openid):
query = self.db.alt_logins.username == openid
self.db(query).delete()
class ConsumerHelper(object):
"""
ConsumerHelper knows the python-openid and
"""
def __init__(self, session, db):
self.session = session
store = self._init_store(db)
self.consumer = openid.consumer.consumer.Consumer(session, store)
def _init_store(self, db):
"""
Initialize Web2pyStore
"""
if not hasattr(self, "store"):
store = Web2pyStore(db)
session = self.session
if not session.has_key('w2popenid'):
session.w2popenid = Storage()
self.store = store
return self.store
def begin(self, oid, realm, return_to_url):
"""
Begin the OpenID authentication
"""
w2popenid = self.session.w2popenid
w2popenid.oid = oid
auth_req = self.consumer.begin(oid)
auth_req.addExtension(SRegRequest(required=['email','nickname']))
url = auth_req.redirectURL(return_to=return_to_url, realm=realm)
return url
def process_response(self, request_vars, return_to_url):
"""
Complete the process and
"""
resp = self.consumer.complete(request_vars, return_to_url)
if resp:
if resp.status == openid.consumer.consumer.SUCCESS:
self.resp = resp
if hasattr(resp, "identity_url"):
self.session.w2popenid.oid = resp.identity_url
return "success"
if resp.status == openid.consumer.consumer.FAILURE:
self.error_message = resp.message
return "failure"
if resp.status == openid.consumer.consumer.CANCEL:
return "cancel"
if resp.status == openid.consumer.consumer.SETUP_NEEDED:
return "setup_needed"
return "no resp"
def sreg(self):
"""
Try to get OpenID Simple Registation
http://openid.net/specs/openid-simple-registration-extension-1_0.html
"""
if self.resp:
resp = self.resp
sreg_resp = SRegResponse.fromSuccessResponse(resp)
return sreg_resp.data if sreg_resp else None
else:
return None
class Web2pyStore(OpenIDStore):
"""
Web2pyStore
This class implements the OpenIDStore interface. OpenID stores take care
of persisting nonces and associations. The Janrain Python OpenID library
comes with implementations for file and memory storage. Web2pyStore uses
the web2py db abstration layer. See the source code docs of OpenIDStore
for a comprehensive description of this interface.
"""
def __init__(self, database):
self.database = database
self.table_oid_associations_name = 'oid_associations'
self.table_oid_nonces_name = 'oid_nonces'
self._initDB()
def _initDB(self):
if self.table_oid_associations_name not in self.database:
self.database.define_table(self.table_oid_associations_name,
Field('server_url', 'string', length=2047, required=True),
Field('handle', 'string', length=255, required=True),
Field('secret', 'blob', required=True),
Field('issued', 'integer', required=True),
Field('lifetime', 'integer', required=True),
Field('assoc_type', 'string', length=64, required=True)
)
if self.table_oid_nonces_name not in self.database:
self.database.define_table(self.table_oid_nonces_name,
Field('server_url', 'string', length=2047, required=True),
Field('timestamp', 'integer', required=True),
Field('salt', 'string', length=40, required=True)
)
def storeAssociation(self, server_url, association):
"""
Store associations. If there already is one with the same
server_url and handle in the table replace it.
"""
db = self.database
query = (db.oid_associations.server_url == server_url) & (db.oid_associations.handle == association.handle)
db(query).delete()
db.oid_associations.insert(server_url = server_url,
handle = association.handle,
secret = association.secret,
issued = association.issued,
lifetime = association.lifetime,
assoc_type = association.assoc_type), 'insert '*10
def getAssociation(self, server_url, handle=None):
"""
Return the association for server_url and handle. If handle is
not None return the latests associations for that server_url.
Return None if no association can be found.
"""
db = self.database
query = (db.oid_associations.server_url == server_url)
if handle:
query &= (db.oid_associations.handle == handle)
rows = db(query).select(orderby=db.oid_associations.issued)
keep_assoc, _ = self._removeExpiredAssocations(rows)
if len(keep_assoc) == 0:
return None
else:
assoc = keep_assoc.pop() # pop the last one as it should be the latest one
return Association(assoc['handle'],
assoc['secret'],
assoc['issued'],
assoc['lifetime'],
assoc['assoc_type'])
def removeAssociation(self, server_url, handle):
db = self.database
query = (db.oid_associations.server_url == server_url) & (db.oid_associations.handle == handle)
return db(query).delete() != None
def useNonce(self, server_url, timestamp, salt):
"""
This method returns Falase if a nonce has been used before or its
timestamp is not current.
"""
db = self.database
if abs(timestamp - time.time()) > nonce.SKEW:
return False
query = (db.oid_nonces.server_url == server_url) & (db.oid_nonces.timestamp == timestamp) & (db.oid_nonces.salt == salt)
if db(query).count() > 0:
return False
else:
db.oid_nonces.insert(server_url = server_url,
timestamp = timestamp,
salt = salt)
return True
def _removeExpiredAssocations(self, rows):
"""
This helper function is not part of the interface. Given a list of
association rows it checks which associations have expired and
deletes them from the db. It returns a tuple of the form
([valid_assoc], no_of_expired_assoc_deleted).
"""
db = self.database
keep_assoc = []
remove_assoc = []
t1970 = time.time()
for r in rows:
if r['issued'] + r['lifetime'] < t1970:
remove_assoc.append(r)
else:
keep_assoc.append(r)
for r in remove_assoc:
del db.oid_associations[r['id']]
return (keep_assoc, len(remove_assoc)) # return tuple (list of valid associations, number of deleted associations)
def cleanupNonces(self):
"""
Remove expired nonce entries from DB and return the number
of entries deleted.
"""
db = self.database
query = (db.oid_nonces.timestamp < time.time() - nonce.SKEW)
return db(query).delete()
def cleanupAssociations(self):
"""
Remove expired associations from db and return the number
of entries deleted.
"""
db = self.database
query = (db.oid_associations.id > 0)
return self._removeExpiredAssocations(db(query).select())[1] #return number of assoc removed
def cleanup(self):
"""
This method should be run periodically to free the db from
expired nonce and association entries.
"""
return self.cleanupNonces(), self.cleanupAssociations()