Source code for oauth2.store.memory

"""
Read or write data from or to local memory.

Though not very valuable in a production setup, these store adapters are great
for testing purposes.
"""

from oauth2.store import AccessTokenStore, AuthCodeStore, ClientStore
from oauth2.error import AccessTokenNotFound, AuthCodeNotFound, \
                         ClientNotFoundError
from oauth2.datatype import Client


[docs]class ClientStore(ClientStore): """ Stores clients in memory. """ def __init__(self): self.clients = {}
[docs] def add_client(self, client_id, client_secret, redirect_uris, authorized_grants=None, authorized_response_types=None): """ Add a client app. :param client_id: Identifier of the client app. :param client_secret: Secret the client app uses for authentication against the OAuth 2.0 provider. :param redirect_uris: A ``list`` of URIs to redirect to. """ self.clients[client_id] = Client( identifier=client_id, secret=client_secret, redirect_uris=redirect_uris, authorized_grants=authorized_grants, authorized_response_types=authorized_response_types) return True
[docs] def fetch_by_client_id(self, client_id): """ Retrieve a client by its identifier. :param client_id: Identifier of a client app. :return: An instance of :class:`oauth2.Client`. :raises: ClientNotFoundError """ if client_id not in self.clients: raise ClientNotFoundError return self.clients[client_id]
[docs]class TokenStore(AccessTokenStore, AuthCodeStore): """ Stores tokens in memory. Useful for testing purposes or APIs with a very limited set of clients. Use memcache or redis as storage to be able to scale. """ def __init__(self): self.access_tokens = {} self.auth_codes = {} self.refresh_tokens = {} self.unique_token_identifier = {}
[docs] def fetch_by_code(self, code): """ Returns an AuthorizationCode. :param code: The authorization code. :return: An instance of :class:`oauth2.datatype.AuthorizationCode`. :raises: :class:`AuthCodeNotFound` if no data could be retrieved for given code. """ if code not in self.auth_codes: raise AuthCodeNotFound return self.auth_codes[code]
[docs] def save_code(self, authorization_code): """ Stores the data belonging to an authorization code token. :param authorization_code: An instance of :class:`oauth2.datatype.AuthorizationCode`. """ self.auth_codes[authorization_code.code] = authorization_code return True
[docs] def save_token(self, access_token): """ Stores an access token and additional data in memory. :param access_token: An instance of :class:`oauth2.datatype.AccessToken`. """ self.access_tokens[access_token.token] = access_token unique_token_key = self._unique_token_key(access_token.client_id, access_token.grant_type, access_token.user_id) self.unique_token_identifier[unique_token_key] = access_token.token if access_token.refresh_token is not None: self.refresh_tokens[access_token.refresh_token] = access_token return True
[docs] def delete_code(self, code): """ Deletes an authorization code after use :param code: The authorization code. """ if code in self.auth_codes: del self.auth_codes[code]
[docs] def delete_refresh_token(self, refresh_token): """ Deletes a refresh token after use :param refresh_token: The refresh_token. """ if refresh_token in self.refresh_tokens: del self.refresh_tokens[refresh_token]
[docs] def fetch_by_refresh_token(self, refresh_token): """ Find an access token by its refresh token. :param refresh_token: The refresh token that was assigned to an ``AccessToken``. :return: The :class:`oauth2.datatype.AccessToken`. :raises: :class:`oauth2.error.AccessTokenNotFound` """ if refresh_token not in self.refresh_tokens: raise AccessTokenNotFound return self.refresh_tokens[refresh_token]
[docs] def fetch_by_token(self, token): """ Returns data associated with an access token or ``None`` if no data was found. Useful for cases like validation where the access token needs to be read again. :param token: A access token code. :return: An instance of :class:`oauth2.datatype.AccessToken`. """ if token not in self.access_tokens: raise AccessTokenNotFound return self.access_tokens[token]
[docs] def fetch_existing_token_of_user(self, client_id, grant_type, user_id): try: key = self._unique_token_key(client_id, grant_type, user_id) token = self.unique_token_identifier[key] except KeyError: raise AccessTokenNotFound return self.fetch_by_token(token)
def _unique_token_key(self, client_id, grant_type, user_id): return "{0}_{1}_{2}".format(client_id, grant_type, user_id)