Source code for dcos_test_utils.iam
""" This module provides a specialized client for interacting with
the Identity Access and Management (IAM) service endpoints
"""
from dcos_test_utils import helpers
[docs]class Iam(helpers.ApiClientSession):
"""
Helpers for interacting with service user accounts.
Note that some terminology is confused.
The methods here interact with service user accounts and not services.
:param default_url: URL for the IAM service endpoint
:type default_url: helpers.Url
:param session: Session to bootstrap this session client with
:type session: requests.Session
"""
def __init__(self, default_url: helpers.Url, session=None):
super().__init__(default_url)
if session:
self.session = session
[docs] def create_service(self, uid: str, pubkey: str, description: str):
""" creates a service user
:param uid: ID for the new service
:type uid: str
:param pubkey: Public key to be used by this new service
:type pubkey: str
:param description: simple description metadata to include with account creation
:type description: str
:returns: None
"""
data = {
'description': description,
'public_key': pubkey
}
r = self.put('/users/{}'.format(uid), json=data)
assert r.status_code == 201
[docs] def delete_service(self, uid: str) -> None:
"""Delete a service account and verify that this worked.
Args:
uid: The user ID of the service account user to delete.
Raises:
AssertionError: The delete operation does not succeed.
"""
resp = self.delete('/users/{}'.format(uid))
assert resp.status_code == 204
# Verify that service does not appear in collection anymore.
resp = self.get('/users', query='type=service')
resp.raise_for_status()
uids = [account['uid'] for account in resp.json()['array']]
assert uid not in uids
[docs] def grant_user_permission(self, uid: str, action: str, rid: str) -> None:
""" Will grant a user with an action for a given RID
:param uid: ID of the user that this permission will be granted to
:type uid: str
:param action: action that user will be granted for the RID
:type action: str
:param rid: resource ID that the user will be granted the action to
:type rid: str
"""
rid = rid.replace('/', '%252F')
r = self.put('/acls/{}/users/{}/{}'.format(rid, uid, action))
assert r.status_code == 204, ('Permission was not granted. Code: {}. '
'Content {}'.format(r.status_code, r.content.decode()))
[docs] def delete_user_permission(self, uid: str, action: str, rid: str) -> None:
""" Will delete permission for a user for an action for a given RID
:param uid: ID of the user that this permission will be deleted from
:type uid: str
:param action: action that will be deleted for the RID for the user
:type action: str
:param rid: resource ID that the user will be removed from for the given action
:type rid: str
"""
rid = rid.replace('/', '%252F')
rid = rid.replace('/', '%252F')
r = self.delete('/acls/{}/users/{}/{}'.format(rid, uid, action))
assert r.status_code == 204
[docs] def create_acl(self, rid: str, description: str) -> None:
""" creates an ACL
:param rid: RID for the ACL to be created
:type rid: str
:param description: text description for the new RID
:type description: str
"""
rid = rid.replace('/', '%252F')
# Create ACL if it does not yet exist.
r = self.put('/acls/{}'.format(rid), json={'description': description})
assert r.status_code == 201 or r.status_code == 409
[docs] def delete_acl(self, rid: str) -> None:
""" Deletes an ACL
:param rid: RID for the ACL to be deleted
:type rid: str
"""
rid = rid.replace('/', '%252F')
r = self.delete('/acls/{}'.format(rid))
assert r.status_code == 204
[docs] def make_service_account_credentials(self, uid, privkey) -> dict:
""" Generates the JSON object to post to create a service account
:param uid: ID for the service account to be created
:type uid: str
:param privkey: private key to be used for the new service account
:type privkey: str
:returns: JSON-like dict to POST to IAM service
"""
return {
'scheme': 'RS256',
'uid': uid,
'login_endpoint': str(self.default_url) + '/auth/login',
'private_key': privkey
}