"""Database integration for persisting API client information."""
from typing import List, Tuple, Optional
from . import util, models
from ... import domain
[docs]class NoSuchClient(RuntimeError):
"""A client was requested that does not exist."""
[docs]class NoSuchAuthorization(RuntimeError):
"""A non-existant :class:`domain.ClientAuthorization` was requested."""
[docs]class NoSuchGrantType(RuntimeError):
"""A non-existant :class:`domain.ClientGrantType` was requested."""
[docs]class NoSuchAuthCode(RuntimeError):
"""A non-existant :class:`domain.AuthorizationCode` was requested."""
init_app = util.init_app
create_all = util.create_all
drop_all = util.drop_all
[docs]def save_client(
client: domain.Client,
cred: Optional[domain.ClientCredential] = None,
auths: Optional[List[domain.ClientAuthorization]] = None,
grant_types: Optional[List[domain.ClientGrantType]] = None) -> str:
"""
Persist a :class:`domain.Client` and (optionally) related data.
Parameters
----------
client : :class:`domain.Client`
cred : :class:`domain.ClientCredential` or None
auths : list or None
Items are :class:`domain.ClientAuthorization` instances.
grant_types : list or None
Items are :class:`domain.ClientGrantType` instances.
"""
with util.transaction() as dbsession:
if client.client_id:
db_client = _load_dbclient(client.client_id, dbsession)
db_client.owner_id = client.owner_id
db_client.name = client.name
db_client.url = client.url
db_client.description = client.description
db_client.redirect_uri = client.redirect_uri
else:
db_client = models.DBClient(
owner_id=client.owner_id,
name=client.name,
url=client.url,
description=client.description,
redirect_uri=client.redirect_uri
)
dbsession.add(db_client)
if cred:
set_credential(cred, db_client=db_client, commit=False)
if auths:
update_authorizations(auths, db_client=db_client, commit=False)
if grant_types:
update_grant_types(grant_types, db_client=db_client, commit=False)
return str(db_client.client_id)
[docs]def set_credential(cred: domain.ClientCredential,
client_id: Optional[str] = None,
db_client: Optional[models.DBClient] = None,
commit: bool = True) -> None:
with util.transaction(commit) as dbsession:
if db_client is None:
db_client = _load_dbclient(client_id, dbsession)
if db_client.credential:
db_client.credential.client_secret = cred.client_secret
dbsession.add(db_client)
else:
dbsession.add(models.DBClientCredential(
client=db_client,
client_secret=cred.client_secret
))
[docs]def update_authorizations(auths: List[domain.ClientAuthorization],
client_id: Optional[str] = None,
db_client: Optional[models.DBClient] = None,
commit: bool = True) -> None:
with util.transaction(commit) as dbsession:
if db_client is None:
db_client = _load_dbclient(client_id, dbsession)
auths_to_keep = set([auth.authorization_id for auth in auths
if auth.authorization_id])
extant_auths = {str(auth.authorization_id): auth
for auth in db_client.authorizations}
# Remove auths from the datastore that are not included.
for auth_id in set(extant_auths.keys()) - auths_to_keep:
dbsession.delete(extant_auths[auth_id])
# Add or update auths in the datastore.
for auth in auths:
# Does not yet exist in the datastore.
if auth.authorization_id is None:
dbsession.add(
models.DBClientAuthorization(
client=db_client,
scope=auth.scope,
requested=auth.requested,
authorized=auth.authorized
)
)
# Update an existing auth.
elif auth.authorization_id in extant_auths:
db_auth = extant_auths[auth.authorization_id]
db_auth.scope = auth.scope
db_auth.requested = auth.requested
db_auth.authorized = auth.authorized
dbsession.add(db_auth)
else:
raise NoSuchAuthorization(
f'No auth {auth.authorization_id} for client'
)
[docs]def update_grant_types(grant_types: List[domain.ClientGrantType],
client_id: Optional[str] = None,
db_client: Optional[models.DBClient] = None,
commit: bool = True) -> None:
with util.transaction(commit) as dbsession:
if db_client is None:
db_client = _load_dbclient(client_id, dbsession)
gtypes_to_keep = set([g.grant_type_id for g in grant_types
if g.grant_type_id])
extant_gtypes = {str(g.grant_type_id): g
for g in db_client.grant_types}
# Remove grant_types from the datastore that are not included.
for gtype_id in set(extant_gtypes.keys()) - gtypes_to_keep:
dbsession.delete(extant_gtypes[gtype_id])
# Add or update grant_types in the datastore.
for gtype in grant_types:
# Does not yet exist in the datastore.
if gtype.grant_type_id is None:
dbsession.add(
models.DBClientGrantType(
client=db_client,
grant_type=gtype.grant_type,
requested=gtype.requested,
authorized=gtype.authorized
)
)
# Update an existing auth.
elif gtype.grant_type_id in extant_gtypes:
db_grant_type = extant_gtypes[gtype.grant_type_id]
db_grant_type.grant_type = gtype.grant_type
db_grant_type.requested = gtype.requested
db_grant_type.authorized = gtype.authorized
dbsession.add(db_grant_type)
else:
raise NoSuchGrantType(
f'No grant type {gtype.grant_type_id} for client'
)
[docs]def load_client(client_id: str) -> Tuple[domain.Client,
Optional[domain.ClientCredential],
List[domain.ClientAuthorization],
List[domain.ClientGrantType]]:
"""Load a :class:`.Client` from the datastore."""
with util.transaction() as dbsession:
db_client = _load_dbclient(client_id, dbsession)
params = {
"client_id": db_client.client_id,
"owner_id": db_client.owner_id,
"name": db_client.name,
"url": db_client.url,
"description": db_client.description,
"redirect_uri": db_client.redirect_uri
}
client = domain.Client(**{k: str(v) if v is not None else None
for k, v in params.items()})
if db_client.credential:
cred = domain.ClientCredential(
client_id=str(db_client.client_id),
client_secret=str(db_client.credential.client_secret)
)
else:
cred = None
auths = [domain.ClientAuthorization(
authorization_id=str(auth.authorization_id),
client_id=str(db_client.client_id),
scope=str(auth.scope),
requested=auth.requested,
authorized=auth.authorized
) for auth in db_client.authorizations]
grant_types = [domain.ClientGrantType(
grant_type_id=str(grant_type.grant_type_id),
client_id=str(db_client.client_id),
grant_type=str(grant_type.grant_type),
requested=grant_type.requested,
authorized=grant_type.authorized
) for grant_type in db_client.grant_types]
return client, cred, auths, grant_types
[docs]def save_auth_code(code: domain.AuthorizationCode) -> None:
"""Save a new authorization code."""
with util.transaction() as dbsession:
db_code = models.DBAuthorizationCode(
code=code.code,
user_id=code.user_id,
user_email=code.user_email,
username=code.username,
client_id=code.client_id,
redirect_uri=code.redirect_uri,
scope=code.scope,
created=code.created,
expires=code.expires
)
dbsession.add(db_code)
[docs]def delete_auth_code(code: str, client_id: int) -> None:
"""Delete an auth code from the database."""
with util.transaction() as dbsession:
db_code = _load_dbauthcode(code, client_id, dbsession)
dbsession.delete(db_code)
[docs]def load_auth_code(code: str, client_id: int) -> domain.AuthorizationCode:
"""Load an authorization code for an API client."""
with util.transaction() as dbsession:
db_code = _load_dbauthcode(code, client_id, dbsession)
return domain.AuthorizationCode(
code=code,
user_id=db_code.user_id,
user_email=db_code.user_email,
username=db_code.username,
client_id=db_code.client_id,
redirect_uri=db_code.redirect_uri,
scope=db_code.scope,
created=db_code.created,
expires=db_code.expires
)
[docs]def load_auth_code_by_user(code: str, user_id: str) \
-> domain.AuthorizationCode:
"""Load an authorization code for an API client."""
with util.transaction() as dbsession:
db_code = dbsession.query(models.DBAuthorizationCode)\
.filter(models.DBAuthorizationCode.code == code) \
.filter(models.DBAuthorizationCode.user_id == user_id) \
.first()
if db_code is None:
raise NoSuchAuthCode(f'Auth code {code} does not exist'
f' for user {user_id}')
return domain.AuthorizationCode(
code=code,
user_id=db_code.user_id,
user_email=db_code.user_email,
username=db_code.username,
client_id=db_code.client_id,
redirect_uri=db_code.redirect_uri,
scope=db_code.scope,
created=db_code.created,
expires=db_code.expires
)
def _load_dbauthcode(code: str, client_id: int, dbsession: util.Session) \
-> None:
db_code = dbsession.query(models.DBAuthorizationCode)\
.filter(models.DBAuthorizationCode.code == code) \
.filter(models.DBAuthorizationCode.client_id == client_id) \
.first()
if db_code is None:
raise NoSuchAuthCode(f'Auth code {code} does not exist'
f' for client {client_id}')
return db_code
def _load_dbclient(client_id: str, dbsession: util.Session) -> models.DBClient:
db_client: models.DBClient = dbsession.query(models.DBClient) \
.filter(models.DBClient.client_id == client_id) \
.first()
if db_client is None:
raise NoSuchClient(f'Client {client_id} does not exist')
return db_client