# -*- coding: utf-8 -*-
"""
`DBApi 2.0 <http://legacy.python.org/dev/peps/pep-0249/>`_ (PEP249) compatible
implementation of data stores.
"""
from oauth2.datatype import AccessToken, AuthorizationCode, Client
from oauth2.error import AccessTokenNotFound, ClientNotFoundError, \
AuthCodeNotFound
from oauth2.store import AccessTokenStore, AuthCodeStore, ClientStore
[docs]class DatabaseStore(object):
"""
Base class providing functionality used by a variety of store classes.
"""
def __init__(self, connection):
"""
Initialize a new store class.
:param connection: An instance of a `connection class <http://legacy.python.org/dev/peps/pep-0249/#connection-objects>`_.
"""
self.connection = connection
def execute(self, query, *params):
"""
Executes a query and returns the identifier of the modified row.
:param query: The query to be executed as a `str`.
:param params: A `tuple` of parameters that will be replaced for
placeholders in the query.
:return: A `long` identifying the last altered row.
"""
cursor = self.connection.cursor()
try:
cursor.execute(query, params)
self.connection.commit()
return cursor.lastrowid
finally:
cursor.close()
def fetchone(self, query, *args):
"""
Returns the first result of the given query.
:param query: The query to be executed as a `str`.
:param params: A `tuple` of parameters that will be replaced for
placeholders in the query.
:return: The retrieved row with each field being one element in a
`tuple`.
"""
cursor = self.connection.cursor()
try:
cursor.execute(query, args)
return cursor.fetchone()
finally:
cursor.close()
def fetchall(self, query, *args):
"""
Returns all results of the given query.
:param query: The query to be executed as a `str`.
:param params: A `tuple` of parameters that will be replaced for
placeholders in the query.
:return: A `list` of `tuple`s with each field being one element in the
`tuple`.
"""
cursor = self.connection.cursor()
try:
cursor.execute(query, args)
return cursor.fetchall()
finally:
cursor.close()
[docs]class DbApiAccessTokenStore(DatabaseStore, AccessTokenStore):
"""
Base class of a DBApi 2.0 compatible
:class:`oauth2.store.AccessTokenStore`.
A concrete implementation extends this class and defines all or a subset
of the \*_query class attributes.
"""
#: Insert an access token.
create_access_token_query = None
#: Insert one entry of additional data associated with an access token.
create_data_query = None
#: Insert one scope associated with an access token.
create_scope_query = None
#: Delete an access token by its refresh token.
delete_refresh_token_query = None
#: Retrieve an access token by its refresh token.
fetch_by_refresh_token_query = None
#: Retrieve all scopes associated with an access token.
fetch_scopes_by_access_token_query = None
#: Retrieve all data associated with an access token.
fetch_data_by_access_token_query = None
#: Retrieve an access token issued to a client and user for a specific
#: grant.
fetch_existing_token_of_user_query = None
[docs] def delete_refresh_token(self, refresh_token):
"""
Deletes an access token by its refresh token.
:param refresh_token: The refresh token of an access token as a `str`.
"""
self.execute(self.delete_refresh_token_query, refresh_token)
[docs] def fetch_by_refresh_token(self, refresh_token):
"""
Retrieves an access token by its refresh token.
:param refresh_token: The refresh token of an access token as a `str`.
:return: An instance of :class:`oauth2.datatype.AccessToken`.
:raises: :class:`oauth2.error.AccessTokenNotFound` if not access token
could be retrieved.
"""
row = self.fetchone(self.fetch_by_refresh_token_query, refresh_token)
if row is None:
raise AccessTokenNotFound
scopes = self._fetch_scopes(access_token_id=row[0])
data = self._fetch_data(access_token_id=row[0])
return self._row_to_token(data=data, scopes=scopes, row=row)
[docs] def fetch_existing_token_of_user(self, client_id, grant_type, user_id):
"""
Retrieve an access token issued to a client and user for a specific
grant.
:param client_id: The identifier of a client as a `str`.
:param grant_type: The type of grant.
:param user_id: The identifier of the user the access token has been
issued to.
:return: An instance of :class:`oauth2.datatype.AccessToken`.
:raises: :class:`oauth2.error.AccessTokenNotFound` if not access token
could be retrieved.
"""
token_data = self.fetchone(self.fetch_existing_token_of_user_query,
client_id, grant_type, user_id)
if token_data is None:
raise AccessTokenNotFound
scopes = self._fetch_scopes(access_token_id=token_data[0])
data = self._fetch_data(access_token_id=token_data[0])
return self._row_to_token(data=data, scopes=scopes, row=token_data)
[docs] def save_token(self, access_token):
"""
Creates a new entry for an access token in the database.
:param access_token: An instance of
:class:`oauth2.datatype.AccessToken`.
:return: `True`.
"""
access_token_id = self.execute(self.create_access_token_query,
access_token.client_id,
access_token.grant_type,
access_token.token,
access_token.expires_at,
access_token.refresh_token,
access_token.refresh_expires_at,
access_token.user_id)
for key, value in list(access_token.data.items()):
self.execute(self.create_data_query, key, value,
access_token_id)
for scope in access_token.scopes:
self.execute(self.create_scope_query, scope, access_token_id)
return True
def _fetch_data(self, access_token_id):
result = self.fetchall(self.fetch_data_by_access_token_query,
access_token_id)
data = {}
if result is not None:
for dataset in result:
data[dataset[0]] = dataset[1]
return data
def _fetch_scopes(self, access_token_id):
result = self.fetchall(self.fetch_scopes_by_access_token_query,
access_token_id)
scopes = []
if result is not None:
for scope_data in result:
scopes.append(scope_data[0])
return scopes
def _row_to_token(self, data, scopes, row):
return AccessToken(client_id=row[1], grant_type=row[2], token=row[3],
data=data, expires_at=row[4], refresh_token=row[5],
refresh_expires_at=row[6], scopes=scopes,
user_id=row[7])
[docs]class DbApiAuthCodeStore(DatabaseStore, AuthCodeStore):
"""
Base class of a DBApi 2.0 compatible :class:`oauth2.store.AuthCodeStore`.
A concrete implementation extends this class and defines all or a subset
of the \*_query class attributes.
"""
#: Insert an auth code.
create_auth_code_query = None
#: Insert one entry of additional data associated with an auth code.
create_data_query = None
#: Insert one scope associated with an auth code.
create_scope_query = None
#: Delete an auth code.
delete_code_query = None
#: Retrieve an auth code by its code.
fetch_code_query = None
#: Retrieve all data associated with an auth code.
fetch_data_query = None
#: Retrieve all scopes associated with an auth code.
fetch_scopes_query = None
[docs] def delete_code(self, code):
"""
Delete an auth code identified by its code.
:param code: The code of an auth code.
"""
self.execute(self.delete_code_query, code)
[docs] def fetch_by_code(self, code):
"""
Retrieves an auth code by its code.
:param code: The code of an auth code.
:return: An instance of :class:`oauth2.datatype.AuthorizationCode`.
:raises: :class:`oauth2.error.AuthCodeNotFound` if no auth code could
be retrieved.
"""
auth_code_data = self.fetchone(self.fetch_code_query, code)
if auth_code_data is None:
raise AuthCodeNotFound
data = dict()
data_result = self.fetchall(self.fetch_data_query, auth_code_data[0])
if data_result is not None:
for dataset in data_result:
data[dataset[0]] = dataset[1]
scopes = []
scope_result = self.fetchall(self.fetch_scopes_query,
auth_code_data[0])
if scope_result is not None:
for scope_set in scope_result:
scopes.append(scope_set[0])
return AuthorizationCode(client_id=auth_code_data[1],
code=auth_code_data[2],
expires_at=auth_code_data[3],
redirect_uri=auth_code_data[4],
scopes=scopes, data=data,
user_id=auth_code_data[5])
[docs] def save_code(self, authorization_code):
"""
Creates a new entry of an auth code in the database.
:param authorization_code: An instance of
:class:`oauth2.datatype.AuthorizationCode`.
:return: `True` if everything went fine.
"""
auth_code_id = self.execute(self.create_auth_code_query,
authorization_code.client_id,
authorization_code.code,
authorization_code.expires_at,
authorization_code.redirect_uri,
authorization_code.user_id)
for key, value in list(authorization_code.data.items()):
self.execute(self.create_data_query, key, value, auth_code_id)
for scope in authorization_code.scopes:
self.execute(self.create_scope_query, scope, auth_code_id)
return True
[docs]class DbApiClientStore(DatabaseStore, ClientStore):
"""
Base class of a DBApi 2.0 compatible :class:`oauth2.store.ClientStore`.
A concrete implementation extends this class and defines all or a subset
of the \*_query class attributes.
"""
#: Retrieve a client by its identifier.
fetch_client_query = None
#: Retrieve all grants that a client is allowed to use.
fetch_grants_query = None
#: Retrieve all redirect URIs of a client.
fetch_redirect_uris_query = None
#: Retrieve all response types that a client supports.
fetch_response_types_query = None
[docs] def fetch_by_client_id(self, client_id):
"""
Retrieves a client by its identifier.
:param client_id: The identifier of a client.
:return: An instance of :class:`oauth2.datatype.Client`.
:raises: :class:`oauth2.error.ClientError` if no client could be
retrieved.
"""
grants = None
redirect_uris = None
response_types = None
client_data = self.fetchone(self.fetch_client_query, client_id)
if client_data is None:
raise ClientNotFoundError
grant_data = self.fetchall(self.fetch_grants_query, client_data[0])
if grant_data:
grants = []
for grant in grant_data:
grants.append(grant[0])
redirect_uris_data = self.fetchall(self.fetch_redirect_uris_query,
client_data[0])
if redirect_uris_data:
redirect_uris = []
for redirect_uri in redirect_uris_data:
redirect_uris.append(redirect_uri[0])
response_types_data = self.fetchall(self.fetch_response_types_query,
client_data[0])
if response_types_data:
response_types = []
for response_type in response_types_data:
response_types.append(response_type[0])
return Client(identifier=client_data[1], secret=client_data[2],
authorized_grants=grants,
authorized_response_types=response_types,
redirect_uris=redirect_uris)