mirror of
https://github.com/optim-enterprises-bv/Mailu-OIDC.git
synced 2025-11-02 11:07:52 +00:00
Initial commit
This commit is contained in:
@@ -35,6 +35,17 @@ from itsdangerous.encoding import want_bytes
|
||||
from werkzeug.datastructures import CallbackDict
|
||||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||||
|
||||
# [OIDC] Import the OIDC related modules
|
||||
from oic.oic import Client
|
||||
from oic.extension.client import Client as ExtensionClient
|
||||
from oic.utils.authn.client import CLIENT_AUTHN_METHOD
|
||||
from oic.utils.settings import OicClientSettings
|
||||
from oic import rndstr
|
||||
from oic.exception import MessageException, NotForMe
|
||||
from oic.oauth2.message import ROPCAccessTokenRequest, AccessTokenResponse
|
||||
from oic.oic.message import AuthorizationResponse, RegistrationResponse, EndSessionRequest, BackChannelLogoutRequest
|
||||
from oic.oauth2.grant import Token
|
||||
|
||||
# Login configuration
|
||||
login = flask_login.LoginManager()
|
||||
login.login_view = "sso.login"
|
||||
@@ -126,6 +137,157 @@ class PrefixMiddleware(object):
|
||||
|
||||
proxy = PrefixMiddleware()
|
||||
|
||||
# [OIDC] Client class
|
||||
class OicClient:
|
||||
"Redirects user to OpenID Provider if configured"
|
||||
|
||||
def __init__(self):
|
||||
self.app = None
|
||||
self.client = None
|
||||
self.extension_client = None
|
||||
self.registration_response = None
|
||||
self.change_password_redirect_enabled = True,
|
||||
self.change_password_url = None
|
||||
|
||||
def init_app(self, app):
|
||||
self.app = app
|
||||
|
||||
settings = OicClientSettings()
|
||||
|
||||
settings.verify_ssl = app.config['OIDC_VERIFY_SSL']
|
||||
|
||||
self.change_password_redirect_enabled = app.config['OIDC_CHANGE_PASSWORD_REDIRECT_ENABLED']
|
||||
|
||||
self.client = Client(client_authn_method=CLIENT_AUTHN_METHOD,settings=settings)
|
||||
self.client.provider_config(app.config['OIDC_PROVIDER_INFO_URL'])
|
||||
|
||||
self.extension_client = ExtensionClient(client_authn_method=CLIENT_AUTHN_METHOD,settings=settings)
|
||||
self.extension_client.provider_config(app.config['OIDC_PROVIDER_INFO_URL'])
|
||||
self.change_password_url = app.config['OIDC_CHANGE_PASSWORD_REDIRECT_URL'] or (self.client.issuer + '/.well-known/change-password')
|
||||
self.redirect_url = app.config['OIDC_REDIRECT_URL'] or ("https://" + self.app.config['HOSTNAME'])
|
||||
info = {"client_id": app.config['OIDC_CLIENT_ID'], "client_secret": app.config['OIDC_CLIENT_SECRET'], "redirect_uris": [ self.redirect_url + "/sso/login" ]}
|
||||
client_reg = RegistrationResponse(**info)
|
||||
self.client.store_registration_info(client_reg)
|
||||
self.extension_client.store_registration_info(client_reg)
|
||||
|
||||
def get_redirect_url(self):
|
||||
if not self.is_enabled():
|
||||
return None
|
||||
flask.session["state"] = rndstr()
|
||||
flask.session["nonce"] = rndstr()
|
||||
args = {
|
||||
"client_id": self.client.client_id,
|
||||
"response_type": ["code"],
|
||||
"scope": ["openid", "email"],
|
||||
"nonce": flask.session["nonce"],
|
||||
"redirect_uri": self.redirect_url + "/sso/login",
|
||||
"state": flask.session["state"]
|
||||
}
|
||||
|
||||
auth_req = self.client.construct_AuthorizationRequest(request_args=args)
|
||||
login_url = auth_req.request(self.client.authorization_endpoint)
|
||||
return login_url
|
||||
|
||||
def exchange_code(self, query):
|
||||
aresp = self.client.parse_response(AuthorizationResponse, info=query, sformat="urlencoded")
|
||||
if not ("state" in flask.session and aresp["state"] == flask.session["state"]):
|
||||
return None, None, None, None
|
||||
args = {
|
||||
"code": aresp["code"]
|
||||
}
|
||||
response = self.client.do_access_token_request(state=aresp["state"],
|
||||
request_args=args,
|
||||
authn_method="client_secret_basic")
|
||||
|
||||
if "id_token" not in response or response["id_token"]["nonce"] != flask.session["nonce"]:
|
||||
return None, None, None, None
|
||||
if 'access_token' not in response or not isinstance(response, AccessTokenResponse):
|
||||
return None, None, None, None
|
||||
user_response = self.client.do_user_info_request(
|
||||
access_token=response['access_token'])
|
||||
return user_response['email'], user_response['sub'], response["id_token"], response
|
||||
|
||||
|
||||
def get_token(self, username, password):
|
||||
args = {
|
||||
"username": username,
|
||||
"password": password,
|
||||
"client_id": self.extension_client.client_id,
|
||||
"client_secret": self.extension_client.client_secret,
|
||||
"grant_type": "password"
|
||||
}
|
||||
url, body, ht_args, csi = self.extension_client.request_info(ROPCAccessTokenRequest,
|
||||
request_args=args, method="POST")
|
||||
response = self.extension_client.request_and_return(url, AccessTokenResponse, "POST", body, "json", "", ht_args)
|
||||
if isinstance(response, AccessTokenResponse):
|
||||
return response
|
||||
return None
|
||||
|
||||
|
||||
def get_user_info(self, token):
|
||||
return self.client.do_user_info_request(
|
||||
access_token=token['access_token'])
|
||||
|
||||
def check_validity(self, token):
|
||||
if 'exp' in token['id_token'] and token['id_token']['exp'] > time.time():
|
||||
return token
|
||||
else:
|
||||
return self.refresh_token(token)
|
||||
|
||||
def refresh_token(self, token):
|
||||
try:
|
||||
args = {
|
||||
"refresh_token": token['refresh_token']
|
||||
}
|
||||
response = self.client.do_access_token_refresh(request_args=args, token=Token(token))
|
||||
if isinstance(response, AccessTokenResponse):
|
||||
return response
|
||||
else:
|
||||
return None
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
def logout(self, id_token):
|
||||
state = rndstr()
|
||||
flask.session['state'] = state
|
||||
|
||||
args = {
|
||||
"state": state,
|
||||
"id_token_hint": id_token,
|
||||
"post_logout_redirect_uri": self.redirect_url + "/sso/logout",
|
||||
"client_id": self.client.client_id
|
||||
}
|
||||
|
||||
request = self.client.construct_EndSessionRequest(request_args=args)
|
||||
uri, body, h_args, cis = self.client.uri_and_body(EndSessionRequest, method="GET", request_args=args, cis=request)
|
||||
return uri
|
||||
|
||||
def backchannel_logout(self, body):
|
||||
req = BackChannelLogoutRequest().from_dict(body)
|
||||
|
||||
kwargs = {"aud": self.client.client_id, "iss": self.client.issuer, "keyjar": self.client.keyjar}
|
||||
|
||||
try:
|
||||
req.verify(**kwargs)
|
||||
except (MessageException, ValueError, NotForMe) as err:
|
||||
self.app.logger.error(err)
|
||||
return False
|
||||
|
||||
sub = req["logout_token"]["sub"]
|
||||
|
||||
if sub is not None and sub != '':
|
||||
MailuSessionExtension.prune_sessions(None, None, self.app, sub)
|
||||
|
||||
return True
|
||||
|
||||
def is_enabled(self):
|
||||
return self.app is not None and self.app.config['OIDC_ENABLED']
|
||||
|
||||
def change_password(self):
|
||||
return self.change_password_url if self.change_password_redirect_enabled else None
|
||||
|
||||
oic_client = OicClient()
|
||||
|
||||
# Data migrate
|
||||
migrate = flask_migrate.Migrate()
|
||||
@@ -449,8 +611,9 @@ class MailuSessionExtension:
|
||||
|
||||
return count
|
||||
|
||||
# [OIDC] Prune sessions by user id
|
||||
@staticmethod
|
||||
def prune_sessions(uid=None, keep=None, app=None):
|
||||
def prune_sessions(uid=None, keep=None, app=None, sub=None):
|
||||
""" Remove sessions
|
||||
uid: remove all sessions (NONE) or sessions belonging to a specific user
|
||||
keep: keep listed sessions
|
||||
@@ -464,8 +627,11 @@ class MailuSessionExtension:
|
||||
count = 0
|
||||
for key in app.session_store.list(prefix):
|
||||
if key not in keep and not key.startswith(b'token-'):
|
||||
app.session_store.delete(key)
|
||||
count += 1
|
||||
# [OIDC] Prune sessions by sub
|
||||
session = MailuSession(key, app) if sub is not None else None
|
||||
if sub is None or ('openid_sub' in session and session['openid_sub'] == sub):
|
||||
app.session_store.delete(key)
|
||||
count += 1
|
||||
|
||||
return count
|
||||
|
||||
|
||||
Reference in New Issue
Block a user