Source code for oauth2.store.redisdb

# -*- coding: utf-8 -*-
import redis
import json

from oauth2.datatype import AccessToken, AuthorizationCode, Client
from oauth2.error import AccessTokenNotFound, AuthCodeNotFound, \
    ClientNotFoundError
from oauth2.store import AccessTokenStore, AuthCodeStore, ClientStore


class RedisStore(object):
    """
    Uses redis to store access tokens and auth tokens.

    This Store supports ``redis``. Arguments are passed to the
    underlying client implementation.

    Initialization::

        import redisdb

        token_store = TokenStore(host="127.0.0.1",
            port=6379,
            db=0
        )

    """
    def __init__(self, rs=None, prefix="oauth2", *args, **kwargs):
        self.prefix = prefix

        if rs is not None:
            self.rs = rs
        else:
            self.rs = redis.StrictRedis(*args, **kwargs)

    def delete(self, name):
        cache_key = self._generate_cache_key(name)

        self.rs.delete(cache_key)

    def write(self, name, data):
        cache_key = self._generate_cache_key(name)

        self.rs.set(cache_key, json.dumps(data))

    def read(self, name):
        cache_key = self._generate_cache_key(name)

        data = self.rs.get(cache_key)

        if data is None:
            return None

        return json.loads(data.decode("utf-8"))

    def _generate_cache_key(self, identifier):
        return self.prefix + "_" + identifier


[docs]class TokenStore(AccessTokenStore, AuthCodeStore, RedisStore): def fetch_by_code(self, code): """ Returns data belonging to an authorization code from redis or ``None`` if no data was found. See :class:`oauth2.store.AuthCodeStore`. """ code_data = self.read(code) if code_data is None: raise AuthCodeNotFound return AuthorizationCode(**code_data) def save_code(self, authorization_code): """ Stores the data belonging to an authorization code token in redis. See :class:`oauth2.store.AuthCodeStore`. """ self.write(authorization_code.code, {"client_id": authorization_code.client_id, "code": authorization_code.code, "expires_at": authorization_code.expires_at, "redirect_uri": authorization_code.redirect_uri, "scopes": authorization_code.scopes, "data": authorization_code.data, "user_id": authorization_code.user_id}) def delete_code(self, code): """ Deletes an authorization code after use :param code: The authorization code. """ self.delete(code) def save_token(self, access_token): """ Stores the access token and additional data in redis. See :class:`oauth2.store.AccessTokenStore`. """ self.write(access_token.token, access_token.__dict__) unique_token_key = self._unique_token_key(access_token.client_id, access_token.grant_type, access_token.user_id) self.write(unique_token_key, access_token.__dict__) if access_token.refresh_token is not None: self.write(access_token.refresh_token, access_token.__dict__) def delete_refresh_token(self, refresh_token): """ Deletes a refresh token after use :param refresh_token: The refresh token to delete. """ access_token = self.fetch_by_refresh_token(refresh_token) self.delete(access_token.token) def fetch_by_refresh_token(self, refresh_token): token_data = self.read(refresh_token) if token_data is None: raise AccessTokenNotFound return AccessToken(**token_data) def fetch_existing_token_of_user(self, client_id, grant_type, user_id): unique_token_key = self._unique_token_key(client_id=client_id, grant_type=grant_type, user_id=user_id) token_data = self.read(unique_token_key) if token_data is None: raise AccessTokenNotFound return AccessToken(**token_data) def _unique_token_key(self, client_id, grant_type, user_id): return "{0}_{1}_{2}".format(client_id, grant_type, user_id)
[docs]class ClientStore(ClientStore, RedisStore): def add_client(self, client_id, client_secret, redirect_uris, authorized_grants=None, authorized_response_types=None): """ Add a client app. :param client_id: Identifier of the client app. :param client_secret: Secret the client app uses for authentication against the OAuth 2.0 provider. :param redirect_uris: A ``list`` of URIs to redirect to. """ self.write(client_id, {"identifier": client_id, "secret": client_secret, "redirect_uris": redirect_uris, "authorized_grants": authorized_grants, "authorized_response_types": authorized_response_types}) return True def fetch_by_client_id(self, client_id): client_data = self.read(client_id) if client_data is None: raise ClientNotFoundError return Client(identifier=client_data["identifier"], secret=client_data["secret"], redirect_uris=client_data["redirect_uris"], authorized_grants=client_data["authorized_grants"], authorized_response_types=client_data["authorized_response_types"])