import asyncio
import base64
import codecs
import datetime as dt
import hashlib
import json
import logging
import os
import re
import urllib.parse as urlparse
import uuid
from base64 import urlsafe_b64encode
from functools import partial
import tornado
from bokeh.server.auth_provider import AuthProvider
from tornado.auth import OAuth2Mixin
from tornado.httpclient import HTTPError as HTTPClientError, HTTPRequest
from tornado.web import HTTPError, RequestHandler, decode_signed_value
from tornado.websocket import WebSocketHandler
from .config import config
from .entry_points import entry_points_for
from .io.resources import (
BASIC_LOGIN_TEMPLATE, CDN_DIST, ERROR_TEMPLATE, LOGOUT_TEMPLATE, _env,
)
from .io.state import state
from .util import base64url_encode, decode_token
log = logging.getLogger(__name__)
STATE_COOKIE_NAME = 'panel-oauth-state'
CODE_COOKIE_NAME = 'panel-oauth-code'
[docs]def decode_response_body(response):
"""
Decodes the JSON-format response body
Arguments
---------
response: tornado.httpclient.HTTPResponse
Returns
-------
Decoded response content
"""
# Fix GitHub response.
try:
body = codecs.decode(response.body, 'ascii')
except Exception:
body = codecs.decode(response.body, 'utf-8')
body = re.sub('"', '\"', body)
body = re.sub("'", '"', body)
body = json.loads(body)
return body
def _serialize_state(state):
"""Serialize OAuth state to a base64 string after passing through JSON"""
json_state = json.dumps(state)
return base64.urlsafe_b64encode(json_state.encode('utf8')).decode('ascii')
def _deserialize_state(b64_state):
"""Deserialize OAuth state as serialized in _serialize_state"""
if isinstance(b64_state, str):
b64_state = b64_state.encode('ascii')
try:
json_state = base64.urlsafe_b64decode(b64_state).decode('utf8')
except ValueError:
log.error("Failed to b64-decode state: %r", b64_state)
return {}
try:
return json.loads(json_state)
except ValueError:
log.error("Failed to json-decode state: %r", json_state)
return {}
[docs]class OAuthLoginHandler(tornado.web.RequestHandler, OAuth2Mixin):
_API_BASE_HEADERS = {
'Accept': 'application/json',
'User-Agent': 'Tornado OAuth'
}
_DEFAULT_SCOPES = ['openid', 'email', 'profile', 'offline_access']
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code'
}
_access_token_header = None
_state_cookie = None
_error_template = ERROR_TEMPLATE
_login_endpoint = '/login'
@property
def _SCOPE(self):
if 'scope' in config.oauth_extra_params:
return config.oauth_extra_params['scope']
elif 'PANEL_OAUTH_SCOPE' not in os.environ:
return self._DEFAULT_SCOPES
return [scope for scope in os.environ['PANEL_OAUTH_SCOPE'].split(',')]
[docs] async def get_authenticated_user(self, redirect_uri, client_id, state,
client_secret=None, code=None):
"""
Fetches the authenticated user
Arguments
---------
redirect_uri: (str)
The OAuth redirect URI
client_id: (str)
The OAuth client ID
state: (str)
The unguessable random string to protect against
cross-site request forgery attacks
client_secret: (str, optional)
The client secret
code: (str, optional)
The response code from the server
"""
if code:
user, _, _, _ = await self._fetch_access_token(
client_id,
redirect_uri,
client_secret=client_secret,
code=code
)
return user
params = {
'redirect_uri': redirect_uri,
'client_id': client_id,
'client_secret': client_secret,
'response_type': 'code',
'extra_params': {
'state': state,
}
}
if 'audience' in config.oauth_extra_params:
params['extra_params']['audience'] = config.oauth_extra_params['audience']
if self._SCOPE is not None:
params['scope'] = self._SCOPE
if 'scope' in config.oauth_extra_params:
params['scope'] = config.oauth_extra_params['scope']
log.debug("%s making authorize request", type(self).__name__)
self.authorize_redirect(**params)
async def _fetch_access_token(
self, client_id, redirect_uri=None, client_secret=None, code=None,
refresh_token=None, username=None, password=None
):
"""
Fetches the access token.
Arguments
---------
client_id:
The client ID
redirect_uri:
The redirect URI
code:
The response code from the server
client_secret:
The client secret
refresh_token:
A token used for refreshing the access_token
username:
A username
password:
A password
"""
log.debug("%s making access token request.", type(self).__name__)
params = {
'client_id': client_id,
**self._EXTRA_TOKEN_PARAMS
}
if redirect_uri:
params['redirect_uri'] = redirect_uri
if self._SCOPE:
params['scope'] = ' '.join(self._SCOPE)
if code:
params['code'] = code
if refresh_token:
params['refresh_token'] = refresh_token
params['grant_type'] = 'refresh_token'
if client_secret:
params['client_secret'] = client_secret
elif username:
params.update(username=username, password=password)
else:
params['code_verifier'] = self.get_code_cookie()
http = self.get_auth_http_client()
# Request the access token.
req = HTTPRequest(
self._OAUTH_ACCESS_TOKEN_URL,
method='POST',
body=urlparse.urlencode(params),
headers=self._API_BASE_HEADERS
)
try:
response = await http.fetch(req)
except HTTPClientError as e:
log.debug("%s access token request failed.", type(self).__name__)
self._raise_error(e.response, status=401)
if not response.body or not (body:= decode_response_body(response)):
log.debug("%s token endpoint did not return a valid access token.", type(self).__name__)
self._raise_error(response)
if 'access_token' not in body:
if refresh_token:
log.debug("%s token endpoint did not reissue an access token.", type(self).__name__)
return None, None, None
self._raise_error(response, body, status=401)
access_token, refresh_token = body['access_token'], body.get('refresh_token')
expires_in = body.get('expires_in')
if expires_in:
expires_in = int(expires_in)
if id_token:= body.get('id_token'):
try:
user = self._on_auth(id_token, access_token, refresh_token, expires_in)
except HTTPError:
pass
else:
log.debug("%s successfully obtained access_token and id_token.", type(self).__name__)
return user, access_token, refresh_token, expires_in
user_headers = dict(self._API_BASE_HEADERS)
if self._access_token_header:
user_url = self._OAUTH_USER_URL
user_headers['Authorization'] = self._access_token_header.format(
body['access_token']
)
else:
user_url = '{}{}'.format(self._OAUTH_USER_URL, body['access_token'])
log.debug("%s requesting OpenID userinfo.", type(self).__name__)
try:
user_response = await http.fetch(user_url, headers=user_headers)
id_token = decode_response_body(user_response)
except HTTPClientError:
id_token = None
if not id_token:
log.debug("%s could not obtain userinfo or id_token, falling back to decoding access_token.", type(self).__name__)
try:
id_token = decode_token(body['access_token'])
except Exception:
log.debug("%s could not decode access_token.", type(self).__name__)
self._raise_error(response, body, status=401)
log.debug("%s successfully obtained access_token and userinfo.", type(self).__name__)
user = self._on_auth(id_token, access_token, refresh_token, expires_in)
return user, access_token, refresh_token, expires_in
[docs] def get_state_cookie(self):
"""Get OAuth state from cookies
To be compared with the value in redirect URL
"""
if self._state_cookie is None:
self._state_cookie = (
self.get_secure_cookie(STATE_COOKIE_NAME, max_age_days=config.oauth_expiry) or b''
).decode('utf8', 'replace')
self.clear_cookie(STATE_COOKIE_NAME)
return self._state_cookie
def set_state_cookie(self, state):
self.set_secure_cookie(
STATE_COOKIE_NAME, state, expires_days=config.oauth_expiry, httponly=True
)
def get_state(self):
root_url = self.request.uri.replace(self._login_endpoint, '')
next_url = original_next_url = self.get_argument('next', root_url)
if next_url:
# avoid browsers treating \ as /
next_url = next_url.replace('\\', urlparse.quote('\\'))
# disallow hostname-having urls,
# force absolute path redirect
urlinfo = urlparse.urlparse(next_url)
next_url = urlinfo._replace(
scheme='', netloc='', path='/' + urlinfo.path.lstrip('/')
).geturl()
if next_url != original_next_url:
log.warning(
"Ignoring next_url %r, using %r", original_next_url, next_url
)
return _serialize_state(
{'state_id': uuid.uuid4().hex, 'next_url': next_url or '/'}
)
def get_code(self):
code_verifier = uuid.uuid4().hex + uuid.uuid4().hex + uuid.uuid4().hex
hashed_code_verifier = hashlib.sha256(code_verifier.encode("utf-8")).digest()
code_challenge = urlsafe_b64encode(hashed_code_verifier).decode("utf-8").replace("=", "")
return code_verifier, code_challenge
def get_code_cookie(self):
code = (self.get_secure_cookie(CODE_COOKIE_NAME, max_age_days=config.oauth_expiry) or b'').decode('utf8', 'replace')
self.clear_cookie(CODE_COOKIE_NAME)
return code
def set_code_cookie(self, code):
self.set_secure_cookie(
CODE_COOKIE_NAME, code, expires_days=config.oauth_expiry, httponly=True
)
async def get(self):
log.debug("%s received login request", type(self).__name__)
if config.oauth_redirect_uri:
redirect_uri = config.oauth_redirect_uri
else:
redirect_uri = "{0}://{1}".format(
self.request.protocol,
self.request.host
)
params = {
'redirect_uri': redirect_uri,
'client_id': config.oauth_key,
}
# Some OAuth2 backends do not correctly return code
next_arg = self.get_argument('next', {})
if next_arg:
next_arg = urlparse.parse_qs(next_arg)
next_arg = {arg.split('?')[-1]: value for arg, value in next_arg.items()}
code = self.get_argument('code', extract_urlparam(next_arg, 'code'))
url_state = self.get_argument('state', extract_urlparam(next_arg, 'state'))
# Handle authentication error
error = self.get_argument('error', extract_urlparam(next_arg, 'error'))
if error is not None:
error_msg = self.get_argument(
'error_description', extract_urlparam(next_arg, 'error_description'))
if not error_msg:
error_msg = error
log.error(
"%s failed to authenticate with following error: %s",
type(self).__name__, error
)
raise HTTPError(401, error_msg, reason=error)
# Seek the authorization
cookie_state = self.get_state_cookie()
if code:
if cookie_state != url_state:
log.warning("OAuth state mismatch: %s != %s", cookie_state, url_state)
raise HTTPError(401, "OAuth state mismatch. Please restart the authentication flow.", reason='state mismatch')
state = _deserialize_state(url_state)
# For security reason, the state value (cross-site token) will be
# retrieved from the query string.
params.update({
'client_secret': config.oauth_secret,
'code': code,
'state': url_state
})
user = await self.get_authenticated_user(**params)
if user is None:
raise HTTPError(403, "Permissions unknown.")
log.debug("%s authorized user, redirecting to app.", type(self).__name__)
self.redirect(state.get('next_url', '/'))
else:
# Redirect for user authentication
params['state'] = state = self.get_state()
self.set_state_cookie(state)
await self.get_authenticated_user(**params)
def _on_auth(self, id_token, access_token, refresh_token=None, expires_in=None):
if isinstance(id_token, str):
decoded = decode_token(id_token)
else:
decoded = id_token
id_token = base64url_encode(json.dumps(id_token))
user_key = config.oauth_jwt_user or self._USER_KEY
if user_key in decoded:
user = decoded[user_key]
else:
log.error("%s token payload did not contain expected %r.",
type(self).__name__, user_key)
raise HTTPError(401, "OAuth token payload missing user information")
self.clear_cookie('is_guest')
self.set_secure_cookie('user', user, expires_days=config.oauth_expiry)
if state.encryption:
access_token = state.encryption.encrypt(access_token.encode('utf-8'))
id_token = state.encryption.encrypt(id_token.encode('utf-8'))
if refresh_token:
refresh_token = state.encryption.encrypt(refresh_token.encode('utf-8'))
self.set_secure_cookie('access_token', access_token, expires_days=config.oauth_expiry)
self.set_secure_cookie('id_token', id_token, expires_days=config.oauth_expiry)
if expires_in:
now_ts = dt.datetime.now(dt.timezone.utc).timestamp()
self.set_secure_cookie('oauth_expiry', str(int(now_ts + expires_in)), expires_days=config.oauth_expiry)
if refresh_token:
self.set_secure_cookie('refresh_token', refresh_token, expires_days=config.oauth_expiry)
if user in state._oauth_user_overrides:
state._oauth_user_overrides.pop(user, None)
return user
def _raise_error(self, response, body=None, status=400):
try:
body = body or decode_response_body(response)
except json.decoder.JSONDecodeError:
body = body
provider = self.__class__.__name__.replace('LoginHandler', '')
if response.error:
log.error(f"{provider} OAuth provider returned a {response.error} "
f"error. The full response was: {body}")
else:
log.warning(f"{provider} OAuth provider failed to fully "
f"authenticate returning the following response:"
f"{body}.")
raise HTTPError(
status,
body.get('error_description', str(body)),
reason=body.get('error', 'Unknown error')
)
[docs] def write_error(self, status_code, **kwargs):
_, e, _ = kwargs['exc_info']
self.clear_all_cookies()
self.set_header("Content-Type", 'text/html')
if isinstance(e, HTTPError):
error, error_msg = e.reason, e.log_message
else:
provider = self.__class__.__name__.replace('LoginHandler', '')
log.error(
f'{provider} OAuth provider encountered unexpected '
f'error: {e}'
)
error, error_msg = (
'500: Internal Server Error',
'Server encountered unexpected problem.'
)
self.write(self._error_template.render(
npm_cdn=config.npm_cdn,
title='Panel: Authentication Error',
error_type='Authentication Error',
error=error,
error_msg=error_msg
))
[docs]class GenericLoginHandler(OAuthLoginHandler):
_access_token_header = 'Bearer {}'
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code'
}
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
return config.oauth_extra_params.get('TOKEN_URL', os.environ.get('PANEL_OAUTH_TOKEN_URL'))
@property
def _OAUTH_AUTHORIZE_URL(self):
return config.oauth_extra_params.get('AUTHORIZE_URL', os.environ.get('PANEL_OAUTH_AUTHORIZE_URL'))
@property
def _OAUTH_USER_URL(self):
return config.oauth_extra_params.get('USER_URL', os.environ.get('PANEL_OAUTH_USER_URL'))
@property
def _USER_KEY(self):
return config.oauth_extra_params.get('USER_KEY', os.environ.get('PANEL_USER_KEY', 'email'))
[docs]class PasswordLoginHandler(GenericLoginHandler):
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'password'
}
def get(self):
try:
errormessage = self.get_argument("error")
except Exception:
errormessage = ""
next_url = self.get_argument('next', None)
if next_url:
self.set_cookie("next_url", next_url)
html = self._login_template.render(
errormessage=errormessage,
PANEL_CDN=CDN_DIST
)
self.write(html)
async def post(self):
username = self.get_argument("username", "")
password = self.get_argument("password", "")
if config.oauth_redirect_uri:
redirect_uri = config.oauth_redirect_uri
else:
redirect_uri = f"{self.request.protocol}://{self.request.host}{self._login_endpoint}"
user, _, _, _ = await self._fetch_access_token(
client_id=config.oauth_key,
redirect_uri=redirect_uri,
username=username,
password=password
)
if not user:
return
self.redirect('/')
[docs]class CodeChallengeLoginHandler(GenericLoginHandler):
async def get(self):
code = self.get_argument("code", "")
url_state = self.get_argument("state", "")
if config.oauth_redirect_uri:
redirect_uri = config.oauth_redirect_uri
else:
redirect_uri = f"{self.request.protocol}://{self.request.host}{self._login_endpoint}"
if not code or not url_state:
self._authorize_redirect(redirect_uri)
return
cookie_state = self.get_state_cookie()
if cookie_state != url_state:
log.warning("OAuth state mismatch: %s != %s", cookie_state, url_state)
raise HTTPError(400, "OAuth state mismatch")
state = _deserialize_state(url_state)
user = await self.get_authenticated_user(redirect_uri, config.oauth_key, url_state, code=code)
if user is None:
raise HTTPError(403)
log.debug("%s authorized user, redirecting to app.", type(self).__name__)
self.redirect(state.get('next_url', '/'))
def _authorize_redirect(self, redirect_uri):
state = self.get_state()
self.set_state_cookie(state)
code_verifier, code_challenge = self.get_code()
self.set_code_cookie(code_verifier)
params = {
"client_id": config.oauth_key,
"response_type": "code",
"scope": ' '.join(self._SCOPE),
"state": state,
"response_mode": "query",
"code_challenge": code_challenge,
"code_challenge_method": "S256",
"redirect_uri": redirect_uri
}
query_params = urlparse.urlencode(params)
self.redirect(f"{self._OAUTH_AUTHORIZE_URL}?{query_params}")
[docs]class GithubLoginHandler(OAuthLoginHandler):
"""GitHub OAuth2 Authentication
To authenticate with GitHub, first register your application at
https://github.com/settings/applications/new to get the client ID and
secret.
"""
_OAUTH_ACCESS_TOKEN_URL = 'https://github.com/login/oauth/access_token'
_OAUTH_AUTHORIZE_URL = 'https://github.com/login/oauth/authorize'
_OAUTH_USER_URL = 'https://api.github.com/user'
_access_token_header = 'token {}'
_USER_KEY = 'login'
[docs]class BitbucketLoginHandler(OAuthLoginHandler):
_API_BASE_HEADERS = {
"Accept": "application/json",
}
_OAUTH_ACCESS_TOKEN_URL = "https://bitbucket.org/site/oauth2/access_token"
_OAUTH_AUTHORIZE_URL = "https://bitbucket.org/site/oauth2/authorize"
_OAUTH_USER_URL = "https://api.bitbucket.org/2.0/user?access_token="
_USER_KEY = 'username'
[docs]class Auth0Handler(OAuthLoginHandler):
_access_token_header = 'Bearer {}'
_OAUTH_ACCESS_TOKEN_URL_ = 'https://{0}.auth0.com/oauth/token'
_OAUTH_AUTHORIZE_URL_ = 'https://{0}.auth0.com/authorize'
_OAUTH_USER_URL_ = 'https://{0}.auth0.com/userinfo'
_USER_KEY = 'email'
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
url = config.oauth_extra_params.get('subdomain', 'example')
return self._OAUTH_ACCESS_TOKEN_URL_.format(url)
@property
def _OAUTH_AUTHORIZE_URL(self):
url = config.oauth_extra_params.get('subdomain', 'example')
return self._OAUTH_AUTHORIZE_URL_.format(url)
@property
def _OAUTH_USER_URL(self):
url = config.oauth_extra_params.get('subdomain', 'example')
return self._OAUTH_USER_URL_.format(url)
[docs]class GitLabLoginHandler(OAuthLoginHandler):
_API_BASE_HEADERS = {
'Accept': 'application/json',
}
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://{0}/oauth/token'
_OAUTH_AUTHORIZE_URL_ = 'https://{0}/oauth/authorize'
_OAUTH_USER_URL_ = 'https://{0}/api/v4/user'
_access_token_header = 'Bearer {}'
_USER_KEY = 'username'
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
url = config.oauth_extra_params.get('url', 'gitlab.com')
return self._OAUTH_ACCESS_TOKEN_URL_.format(url)
@property
def _OAUTH_AUTHORIZE_URL(self):
url = config.oauth_extra_params.get('url', 'gitlab.com')
return self._OAUTH_AUTHORIZE_URL_.format(url)
@property
def _OAUTH_USER_URL(self):
url = config.oauth_extra_params.get('url', 'gitlab.com')
return self._OAUTH_USER_URL_.format(url)
[docs]class AzureAdLoginHandler(OAuthLoginHandler):
_API_BASE_HEADERS = {
'Accept': 'application/json',
'User-Agent': 'Tornado OAuth'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://login.microsoftonline.com/{tenant}/oauth2/token'
_OAUTH_AUTHORIZE_URL_ = 'https://login.microsoftonline.com/{tenant}/oauth2/authorize'
_OAUTH_USER_URL_ = ''
_USER_KEY = 'unique_name'
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
tenant = os.environ.get('AAD_TENANT_ID', config.oauth_extra_params.get('tenant', 'common'))
return self._OAUTH_ACCESS_TOKEN_URL_.format(tenant=tenant)
@property
def _OAUTH_AUTHORIZE_URL(self):
tenant = os.environ.get('AAD_TENANT_ID', config.oauth_extra_params.get('tenant', 'common'))
return self._OAUTH_AUTHORIZE_URL_.format(tenant=tenant)
@property
def _OAUTH_USER_URL(self):
return self._OAUTH_USER_URL_.format(**config.oauth_extra_params)
[docs]class AzureAdV2LoginHandler(OAuthLoginHandler):
_API_BASE_HEADERS = {
'Accept': 'application/json',
'User-Agent': 'Tornado OAuth'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/token'
_OAUTH_AUTHORIZE_URL_ = 'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/authorize'
_OAUTH_USER_URL_ = ''
_USER_KEY = 'email'
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
tenant = os.environ.get('AAD_TENANT_ID', config.oauth_extra_params.get('tenant', 'common'))
return self._OAUTH_ACCESS_TOKEN_URL_.format(tenant=tenant)
@property
def _OAUTH_AUTHORIZE_URL(self):
tenant = os.environ.get('AAD_TENANT_ID', config.oauth_extra_params.get('tenant', 'common'))
return self._OAUTH_AUTHORIZE_URL_.format(tenant=tenant)
@property
def _OAUTH_USER_URL(self):
return self._OAUTH_USER_URL_.format(**config.oauth_extra_params)
[docs]class OktaLoginHandler(OAuthLoginHandler):
"""Okta OAuth2 Authentication
To authenticate with Okta you first need to set up and configure
in the Okta developer console.
"""
_EXTRA_TOKEN_PARAMS = {
'grant_type': 'authorization_code',
'response_type': 'code,token,id_token'
}
_OAUTH_ACCESS_TOKEN_URL_ = 'https://{0}/oauth2/{1}/v1/token'
_OAUTH_ACCESS_TOKEN_URL__ = 'https://{0}/oauth2/v1/token'
_OAUTH_AUTHORIZE_URL_ = 'https://{0}/oauth2/{1}/v1/authorize'
_OAUTH_AUTHORIZE_URL__ = 'https://{0}/oauth2/v1/authorize'
_OAUTH_USER_URL_ = 'https://{0}/oauth2/{1}/v1/userinfo?access_token='
_OAUTH_USER_URL__ = 'https://{0}/oauth2/v1/userinfo?access_token='
_USER_KEY = 'email'
@property
def _OAUTH_ACCESS_TOKEN_URL(self):
url = config.oauth_extra_params.get('url', 'okta.com')
server = config.oauth_extra_params.get('server', 'default')
if server:
return self._OAUTH_ACCESS_TOKEN_URL_.format(url, server)
else:
return self._OAUTH_ACCESS_TOKEN_URL__.format(url)
@property
def _OAUTH_AUTHORIZE_URL(self):
url = config.oauth_extra_params.get('url', 'okta.com')
server = config.oauth_extra_params.get('server', 'default')
if server:
return self._OAUTH_AUTHORIZE_URL_.format(url, server)
else:
return self._OAUTH_AUTHORIZE_URL__.format(url)
@property
def _OAUTH_USER_URL(self):
url = config.oauth_extra_params.get('url', 'okta.com')
server = config.oauth_extra_params.get('server', 'default')
if server:
return self._OAUTH_USER_URL_.format(url, server)
else:
return self._OAUTH_USER_URL__.format(url, server)
[docs]class GoogleLoginHandler(OAuthLoginHandler):
_API_BASE_HEADERS = {
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
}
_DEFAULT_SCOPES = ['openid', 'email', 'profile']
_OAUTH_AUTHORIZE_URL = "https://accounts.google.com/o/oauth2/v2/auth"
_OAUTH_ACCESS_TOKEN_URL = "https://accounts.google.com/o/oauth2/token"
_USER_KEY = 'email'
[docs]class BasicLoginHandler(RequestHandler):
_login_template = BASIC_LOGIN_TEMPLATE
def get(self):
try:
errormessage = self.get_argument("error")
except Exception:
errormessage = ""
next_url = self.get_argument('next', None)
if next_url:
self.set_cookie("next_url", next_url)
html = self._login_template.render(
errormessage=errormessage,
PANEL_CDN=CDN_DIST
)
self.write(html)
def _validate(self, username, password):
if 'basic_auth' in state._server_config.get(self.application, {}):
auth_info = state._server_config[self.application]['basic_auth']
else:
auth_info = config.basic_auth
if isinstance(auth_info, str) and os.path.isfile(auth_info):
with open(auth_info, encoding='utf-8') as auth_file:
auth_info = json.loads(auth_file.read())
if isinstance(auth_info, dict):
if username not in auth_info:
return False
return password == auth_info[username]
elif password == auth_info:
return True
return False
def post(self):
username = self.get_argument("username", "")
password = self.get_argument("password", "")
auth = self._validate(username, password)
if auth:
self.set_current_user(username)
next_url = self.get_cookie("next_url", "/")
self.redirect(next_url)
else:
error_msg = "?error=" + tornado.escape.url_escape("Invalid username or password!")
self.redirect(self.request.uri + error_msg)
def set_current_user(self, user):
if not user:
self.clear_cookie("is_guest")
self.clear_cookie("user")
return
self.clear_cookie("is_guest")
self.set_secure_cookie("user", user, expires_days=config.oauth_expiry)
id_token = base64url_encode(json.dumps({'user': user}))
if state.encryption:
id_token = state.encryption.encrypt(id_token.encode('utf-8'))
self.set_secure_cookie('id_token', id_token, expires_days=config.oauth_expiry)
[docs]class LogoutHandler(tornado.web.RequestHandler):
_login_endpoint = '/login'
_logout_template = LOGOUT_TEMPLATE
def get(self):
self.clear_cookie("user")
self.clear_cookie("id_token")
self.clear_cookie("access_token")
self.clear_cookie("refresh_token")
self.clear_cookie("oauth_expiry")
self.clear_cookie(STATE_COOKIE_NAME)
html = self._logout_template.render(
PANEL_CDN=CDN_DIST,
LOGIN_ENDPOINT=self._login_endpoint
)
self.write(html)
[docs]class BasicAuthProvider(AuthProvider):
"""
An AuthProvider which serves a simple login and logout page.
"""
def __init__(
self, login_endpoint=None, logout_endpoint=None,
login_template=None, logout_template=None, error_template=None,
guest_endpoints=None
):
if error_template is None:
self._error_template = ERROR_TEMPLATE
else:
with open(error_template) as f:
self._error_template = _env.from_string(f.read())
if logout_template is None:
self._logout_template = LOGOUT_TEMPLATE
else:
with open(logout_template) as f:
self._logout_template = _env.from_string(f.read())
if login_template is None:
self._login_template = BASIC_LOGIN_TEMPLATE
else:
with open(login_template) as f:
self._login_template = _env.from_string(f.read())
self._login_endpoint = login_endpoint or '/login'
self._logout_endpoint = logout_endpoint or '/logout'
self._guest_endpoints = guest_endpoints or []
state.on_session_destroyed(self._remove_user)
super().__init__()
def _remove_user(self, session_context):
guest_cookie = session_context.request.cookies.get('is_guest')
user_cookie = session_context.request.cookies.get('user')
if guest_cookie:
user = 'guest'
elif user_cookie:
user = decode_signed_value(
config.cookie_secret, 'user', user_cookie
)
if user:
user = user.decode('utf-8')
else:
user = None
if not user:
return
state._active_users[user] -= 1
if not state._active_users[user]:
del state._active_users[user]
def _allow_guest(self, uri):
if config.oauth_optional and not (uri == self._login_endpoint or '?code=' in uri):
return True
return True if uri.replace('/ws', '') in self._guest_endpoints else False
@property
def get_user(self):
def get_user(request_handler):
user = request_handler.get_secure_cookie("user", max_age_days=config.oauth_expiry)
if user:
user = user.decode('utf-8')
elif self._allow_guest(request_handler.request.uri):
user = "guest"
request_handler.request.cookies["is_guest"] = "1"
if not isinstance(request_handler, WebSocketHandler):
request_handler.set_cookie("is_guest", "1", expires_days=config.oauth_expiry)
if user and isinstance(request_handler, WebSocketHandler):
state._active_users[user] += 1
return user
return get_user
@property
def login_url(self):
return self._login_endpoint
@property
def login_handler(self):
BasicLoginHandler._login_endpoint = self._login_endpoint
BasicLoginHandler._login_template = self._login_template
return BasicLoginHandler
@property
def logout_url(self):
return self._logout_endpoint
@property
def logout_handler(self):
if self._logout_template:
LogoutHandler._logout_template = self._logout_template
LogoutHandler._login_endpoint = self._login_endpoint
return LogoutHandler
[docs]class OAuthProvider(BasicAuthProvider):
"""
An AuthProvider using specific OAuth implementation selected via
the global config.oauth_provider configuration.
"""
@property
def get_user(self):
return None
@property
def get_user_async(self):
async def get_user(handler):
user = super(OAuthProvider, self).get_user(handler)
if not config.oauth_refresh_tokens or user is None:
return user
now_ts = dt.datetime.now(dt.timezone.utc).timestamp()
expiry = None
if user in state._oauth_user_overrides:
while not state._oauth_user_overrides[user]:
await asyncio.sleep(0.1)
user_state = state._oauth_user_overrides[user]
access_token = user_state['access_token']
if user_state['expiry']:
expiry = user_state['expiry']
else:
access_cookie = handler.get_secure_cookie('access_token', max_age_days=config.oauth_expiry)
if not access_cookie:
log.debug("No access token available, forcing user to reauthenticate.")
return
access_token = state._decrypt_cookie(access_cookie)
if expiry is None:
try:
access_json = decode_token(access_token)
expiry = access_json['exp']
except Exception:
expiry = handler.get_secure_cookie('oauth_expiry', max_age_days=config.oauth_expiry)
if expiry is None:
# Token does not have content and therefore does not expire
log.debug("access_token is not a valid JWT token. Expiry cannot be determined.")
return user
if user in state._oauth_user_overrides:
refresh_token = state._oauth_user_overrides[user]['refresh_token']
else:
refresh_cookie = handler.get_secure_cookie('refresh_token', max_age_days=config.oauth_expiry)
if refresh_cookie:
refresh_token = state._decrypt_cookie(refresh_cookie)
self._schedule_refresh(access_json['exp'], user, refresh_token, handler.application, handler.request)
else:
refresh_token = None
if expiry > now_ts:
log.debug("Fully authenticated and access_token still valid.")
return user
if refresh_token:
try:
refresh_json = decode_token(refresh_token)
if refresh_json['exp'] < now_ts:
refresh_token = None
except Exception:
# If refresh token is not a valid JWT token then it does not expire
pass
if refresh_token is None:
log.debug("%s access_token is expired and refresh_token not available, forcing user to reauthenticate.", type(self).__name__)
return
log.debug("%s refreshing token", type(self).__name__)
await self._refresh_access_token(user, refresh_token, handler.application, handler.request)
return user
return get_user
@property
def login_handler(self):
handler = AUTH_PROVIDERS[config.oauth_provider]
if self._error_template:
handler._error_template = self._error_template
handler._login_template = self._login_template
handler._login_endpoint = self._login_endpoint
return handler
def _remove_user(self, session_context):
guest_cookie = session_context.request.cookies.get('is_guest')
user_cookie = session_context.request.cookies.get('user')
if guest_cookie:
user = 'guest'
elif user_cookie:
user = decode_signed_value(
config.cookie_secret, 'user', user_cookie
)
if user:
user = user.decode('utf-8')
else:
user = None
if not user:
return
state._active_users[user] -= 1
if not state._active_users[user]:
del state._active_users[user]
if user in state._oauth_user_overrides:
del state._oauth_user_overrides[user]
def _schedule_refresh(self, expiry_ts, user, refresh_token, application, request):
if not state._active_users.get(user):
return
now_ts = dt.datetime.now(dt.timezone.utc).timestamp()
expiry_seconds = expiry_ts - now_ts - 10
log.debug("%s scheduling token refresh in %d seconds", type(self).__name__, expiry_seconds)
expiry_date = dt.datetime.now() + dt.timedelta(seconds=expiry_seconds) # schedule_task is in local TZ
refresh_cb = partial(self._scheduled_refresh, user, refresh_token, application, request)
if expiry_seconds <= 0:
refresh_cb()
return
task = f'{user}-refresh-access-tokens'
try:
state.cancel_task(task)
except KeyError:
pass
finally:
state.schedule_task(task, refresh_cb, at=expiry_date)
async def _scheduled_refresh(self, user, refresh_token, application, request):
await self._refresh_access_token(user, refresh_token, application, request)
user_state = state._oauth_user_overrides[user]
access_token, refresh_token = user_state['access_token'], user_state['refresh_token']
if user_state['expiry']:
expiry = user_state['expiry']
else:
expiry = decode_token(access_token)['exp']
self._schedule_refresh(expiry, user, refresh_token, application, request)
async def _refresh_access_token(self, user, refresh_token, application, request):
log.debug("%s refreshing token", type(self).__name__)
if user in state._oauth_user_overrides:
refresh_token = state._oauth_user_overrides[user]['refresh_token']
state._oauth_user_overrides[user] = {}
auth_handler = self.login_handler(application=application, request=request)
_, access_token, refresh_token, expires_in = await auth_handler._fetch_access_token(
client_id=config.oauth_key,
client_secret=config.oauth_secret,
refresh_token=refresh_token
)
if access_token:
now_ts = dt.datetime.now(dt.timezone.utc).timestamp()
state._oauth_user_overrides[user] = {
'access_token': access_token,
'refresh_token': refresh_token,
'expiry': now_ts+expires_in if expires_in else None
}
else:
del state._oauth_user_overrides[user]
AUTH_PROVIDERS = {
'auth0': Auth0Handler,
'azure': AzureAdLoginHandler,
'azurev2': AzureAdV2LoginHandler,
'bitbucket': BitbucketLoginHandler,
'generic': GenericLoginHandler,
'google': GoogleLoginHandler,
'github': GithubLoginHandler,
'gitlab': GitLabLoginHandler,
'okta': OktaLoginHandler,
'password': PasswordLoginHandler,
'auth_code': CodeChallengeLoginHandler
}
# Populate AUTH Providers from external extensions
for entry_point in entry_points_for('panel.auth'):
AUTH_PROVIDERS[entry_point.name] = entry_point.load()
config.param.objects(False)['_oauth_provider'].objects = list(AUTH_PROVIDERS.keys())