dcos_test_utils package

dcos_test_utils.dcos_api module

Utilities for interacting with a DC/OS instance via REST API

Most DC/OS deployments will have auth enabled, so this module includes DcosUser and DcosAuth to be attached to a DcosApiSession. Additionally, it is sometimes necessary to query specific nodes within a DC/OS cluster, so there is ARNodeApiClientMixin to allow querying nodes without boilerplate to set the correct port and scheme.

class dcos_test_utils.dcos_api.DcosApiSession(dcos_url: str, masters: Optional[List[str]], slaves: Optional[List[str]], public_slaves: Optional[List[str]], auth_user: Optional[dcos_test_utils.dcos_api.DcosUser], exhibitor_admin_password: Optional[str] = None)[source]

Bases: dcos_test_utils.helpers.ARNodeApiClientMixin, dcos_test_utils.helpers.RetryCommonHttpErrorsMixin, dcos_test_utils.helpers.ApiClientSession

Proxy class for DC/OS clusters. If any of the host lists (masters, slaves, public_slaves) are provided, the wait_for_dcos function of this class will wait until provisioning is complete. If these lists are not provided, then there is no ground truth and the cluster will be assumed the be in a completed state.

Parameters:
  • dcos_url (helpers.Url) – address for the DC/OS web UI.
  • masters (list) – list of Mesos master advertised IP addresses.
  • slaves (list) – list of Mesos slave/agent advertised IP addresses.
  • public_slaves (list) – list of public Mesos slave/agent advertised IP addresses.
  • auth_user (DcosUser) – use this user’s auth for all requests. Note: user must be authenticated explicitly or call self.wait_for_dcos()
all_slaves

Property which returns a sorted list of all slave IP strings for this cluster

copy()[source]

Create a new client session from this one without cookies, with the authentication intact.

cosmos

Property which returns a dcos_test_utils.package.Cosmos derived from this session

classmethod create()[source]

Uses environment variables defined in DcosApiSession.get_args_from_env() to create a new DcosApiSession instance

exhibitor

Property which creates a new Exhibitor

static get_args_from_env() → dict[source]

This method will use environment variables to generate the arguments necessary to initialize a DcosApiSession

Environment Variables:

  • DCOS_DNS_ADDRESS: the URL for the DC/OS cluster to be used. If not set, leader.mesos will be used
  • DCOS_ACS_TOKEN: authentication token that can be taken from dcos-cli after login in order to authenticate If not given, a hard-coded dummy login token will be used to create the authentication token.
  • MASTER_HOSTS: a complete list of the expected master IPs (optional)
  • SLAVE_HOSTS: a complete list of the expected private slaves IPs (optional)
  • PUBLIC_SLAVE_HOSTS: a complete list of the public slave IPs (optional)
Returns:arguments to initialize a DcosApiSesssion
Return type:dict
get_user_session(user: dcos_test_utils.dcos_api.DcosUser)[source]

Returns a copy of this client session with a new user

Parameters:user (DcosUser) – The user with which the new DcosApiSession will authenticate (can be None)
get_version() → str[source]

Queries the DC/OS version endpoint to get DC/OS version

Returns:version for DC/OS
health

Property which returns a dcos_test_utils.diagnostics.Diagnostics derived from this session

jobs

Property which returns a dcos_test_utils.jobs.Jobs derived from this session

login_default_user()[source]

retry default user login because in some deployments, the login endpoint might not be routable immediately after Admin Router is up. We wait 5 seconds between retries to avoid DoS-ing the IAM.

Raises:
requests.HTTPException: In case the login fails due to wrong
username or password of the default user.
logs

Property which returns a copy of this session where all requests are prefaced with /system/v1/logs

marathon

Property which returns a dcos_test_utils.marathon.Marathon derived from this session

masters

Property which returns a sorted list of master IP strings for this cluster

mesos_pod_sandbox_directory(slave_id: str, framework_id: str, executor_id: str, task_id: str) → str[source]

Gets the mesos sandbox directory for a specific task in a pod which is currently running

Parameters:
  • slave_id (str) – slave ID to pull sandbox from
  • framework_id – framework_id to pull sandbox from
  • executor_id (str) – executor ID to pull directory sandbox from
  • task_id (str) – task ID to pull directory sandbox from
Returns:

the directory of the sandbox

Return type:

str

mesos_pod_sandbox_file(slave_id: str, framework_id: str, executor_id: str, task_id: str, filename: str) → str[source]

Gets a specific file from a currently-running pod’s task sandbox and returns the text content

Parameters:
  • slave_id (str) – ID of the slave running the task
  • framework_id (str) – ID of the framework of the task
  • executor_id (str) – ID of the executor
  • task_id (str) – ID of the task
  • filename (str) – filename in the sandbox
Returns:

sandbox text contents

mesos_sandbox_directory(slave_id: str, framework_id: str, task_id: str) → str[source]

Gets the mesos sandbox directory for a specific task

Parameters:
  • slave_id (str) – slave ID to pull sandbox from
  • framework_id – framework_id to pull sandbox from
  • task_id (str) – task ID to pull directory sandbox from
Returns:

the directory of the sandbox

Return type:

str

mesos_sandbox_file(slave_id: str, framework_id: str, task_id: str, filename: str) → str[source]

Gets a specific file from a task sandbox and returns the text content

Parameters:
  • slave_id (str) – ID of the slave running the task
  • framework_id (str) – ID of the framework of the task
  • task_id (str) – ID of the task
  • filename (str) – filename in the sandbox
Returns:

sandbox text contents

metrics

Property which returns a copy of this session where all requests are prefaced with /system/v1/metrics/v0

metronome

Property which returns a copy of this session where all requests are prefaced with /service/metronome

metronome_one_off(job_definition: dict, timeout: int = 300, ignore_failures: bool = False) → None[source]

Run a job on metronome and block until it returns success

Parameters:
  • job_definition (dict) – metronome job JSON to be triggered once
  • timeout (int) – how long to wait (in seconds) for the job to complete
  • ignore_failures (bool) – if True, failures will not block or raise an exception
public_slaves

Property which retruns a sorted list of public slave IP strings for this cluster

set_node_lists_if_unset()[source]

Sets the expected cluster topology to be the observed cluster topology from exhibitor and mesos. I.E. if masters, slave, or public_slaves were not provided, accept whatever is currently available

slaves

Property which returns a sorted list of private slave IP strings for this cluster

wait_for_dcos()[source]

This method will wait for: * cluster endpoints to come up immediately after deployment has completed * authentication with DC/OS to be successful * all DC/OS services becoming healthy * all explicitly declared nodes register to register

class dcos_test_utils.dcos_api.DcosAuth(auth_token: str)[source]

Bases: requests.auth.AuthBase

Child of AuthBase for specifying how to handle DC/OS auth per request

Parameters:auth_token (str) – token generated by authenticating with access control
class dcos_test_utils.dcos_api.DcosUser(credentials: dict)[source]

Bases: object

Representation of a DC/OS user used for authentication

Parameters:credentials (dict) – representation of the JSON used to log in
auth_header

Property for the auth header provided at authentication time

Returns:representation of HTTP headers to use
Return type:dict
class dcos_test_utils.dcos_api.Exhibitor(default_url: dcos_test_utils.helpers.Url, session: Optional[requests.sessions.Session] = None, exhibitor_admin_password: Optional[str] = None)[source]

Bases: dcos_test_utils.helpers.RetryCommonHttpErrorsMixin, dcos_test_utils.helpers.ApiClientSession

Exhibitor can have a password set, in which case a different auth model is needed

Parameters:
  • default_url (helpers.Url) – Url object for the exhibitor instance
  • session (requests.Session) – optional session for bootstrapping this session (a new one is created otherwise)
  • exhibitor_admin_password (str) – password for exhibitor (not always set)

dcos_test_utils.dcos_cli module

This module is intended to provide a thin wrapper for the dcos-cli binary

The dcos-cli in some versions is highly dependent upon state and is not the most direct interface for interacting with a DC/OS cluster. Under the hood, the CLI is just a helper for making calls to the HTTP REST APIs so often users will get better results by using the DcosApiSession object for direct API access

class dcos_test_utils.dcos_cli.DcosCli(cli_path: str, core_plugin_url: str, ee_plugin_url: str)[source]

Bases: object

This wrapper assists in setting up the CLI and running CLI commands in subprocesses

Parameters:cli_path (str) – path to a binary with executable permissions already set
static clear_cli_dir()[source]

Remove the CLI state directory.

Cluster and installed plugins are stored in the CLI state directory. Remove this directory to reset the CLI to its initial state.

exec_command(cmd: List[str], stdin=None) → tuple[source]

Execute CLI command and processes result.

This method expects that process won’t block.

Parameters:
  • cmd – Program and arguments
  • stdin (File) – File to use for stdin
Returns:

A tuple with stdout and stderr

Return type:

(str, str)

login_enterprise(username=None, password=None, provider=None)[source]

Authenticates the CLI with the setup Mesosphere Enterprise DC/OS cluster

Parameters:
  • username (str) – username to login with
  • password (str) – password to use with username
  • provider – authentication type to use
classmethod new_cli(download_url: str = 'https://downloads.dcos.io/cli/releases/binaries/dcos/linux/x86-64/1.1.3/dcos', core_plugin_url: str = 'https://downloads.dcos.io/cli/releases/plugins/dcos-core-cli/linux/x86-64/dcos-core-cli-2.1-patch.1.zip', ee_plugin_url: str = 'https://downloads.mesosphere.io/cli/releases/plugins/dcos-enterprise-cli/linux/x86-64/dcos-enterprise-cli-1.13-patch.0.zip', tmpdir: Optional[str] = None)[source]

Download and set execute permission for a new dcos-cli binary

Parameters:
  • download_url – URL of the dcos-cli binary to be used. If not set, a stable cli will be used.
  • core_plugin_url – URL of the core plugin for the DC/OS CLI
  • ee_enterprise_url – URL of the ee plugin for the DC/OS CLI
  • tmpdir – path to a temporary directory to contain the executable. If not set, a temporary directory will be created.
setup_enterprise(url: str, username: Optional[str] = None, password: Optional[str] = None)[source]

This method does the CLI setup for a Mesosphere Enterprise DC/OS cluster

Note:
This is not an idempotent operation and can only be ran once per CLI state-session
Parameters:
  • url (str) – URL of EE DC/OS cluster to setup the CLI with
  • username (Optional[str]) – username to login with
  • password (Optional[str]) – password to use with username
class dcos_test_utils.dcos_cli.DcosCliConfiguration(cli: dcos_test_utils.dcos_cli.DcosCli)[source]

Bases: object

Represents helper for simple access to the CLI configuration

Parameters:cli (DcosCli) – DcosCli object to grab config data from
NOT_FOUND_MSG = "Property '{}' doesn't exist"
get(key: str, default: str = None)[source]

Retrieves configuration value from CLI

Parameters:
  • key (str) – key to grab from CLI config
  • default (str) – value to return if key not present
set(name: str, value: str)[source]

Sets configuration option

Parameters:
  • name (str) – key to set in CLI config
  • default (str) – value to set

dcos_test_utils.diagnostics module

Utilities for running and downloading diagnostics reports

Based on the utilities in dcos/dcos-integration-tests test_dcos_diagnostics.py

This is tested via the test_dcos_diagnostics.py module in the dcos-integration-test module in dcos/dcos

class dcos_test_utils.diagnostics.Diagnostics(default_url: dcos_test_utils.helpers.Url, masters: list, all_slaves: list, session=None, use_legacy_api=False)[source]

Bases: dcos_test_utils.helpers.ARNodeApiClientMixin, dcos_test_utils.helpers.RetryCommonHttpErrorsMixin, dcos_test_utils.helpers.ApiClientSession

Specialized session client for diagnostics service that is aware of the cluster agents

Parameters:
  • default_url (helpers.Url) – URL for the diagnostics service API
  • masters (list) – list of master IP strings
  • all_slaves (list) – list of slave IP strings
  • session (requets.Session) – Session object to bootstrap this session with
delete_bundle(diagnostics_bundle: str)[source]

Given diagnostics bundle name, this method will delete it

Args:
diagnostics_bundles (str): bundle name to delete. Item of result of self.get_diagnostics_reports
download_diagnostics_reports(diagnostics_bundles, download_directory=None, master=None)[source]

Given diagnostics bundle names, this method will download them

Args:
diagnostics_bundles (List[str]): list of bundle names to download. Result of self.get_diagnostics_reports download_directory (str): path, defaults to home directory
get_diagnostics_reports() → list[source]

Gets the complete list of diagnostics reports

Returns:list of report filenames
Return type:list
start_diagnostics_job(nodes: dict = None)[source]

POSTs to the endpoint that triggers diagnostics report creation

Parameters:nodes (dict) – JSON-like definition of nodes
Returns:Response from diagnostics service
Return type:requests.Response
wait_for_diagnostics_job(last_datapoint: dict)[source]

initial value of last_datapoint should be last_datapoint = {

‘time’: None, ‘value’: 0

}

wait_for_diagnostics_reports()[source]

Sometimes it may take extra few seconds to list bundles after the job is finished. This method will retry until the reports are non empty or 50 seconds has elapsed

dcos_test_utils.enterprise module

This module defines an DcosApiSession child class for using Mesosphere Enterprise DC/OS

class dcos_test_utils.enterprise.EnterpriseApiSession(*args, ssl_enabled=True, **kwargs)[source]

Bases: dcos_test_utils.enterprise.MesosNodeClientMixin, dcos_test_utils.dcos_api.DcosApiSession

DcosApiSession specialized for use with an Enterprise cluster

Note:
This class is required for Enterprise API interaction
Parameters:ssl_enabled (bool) – if the security parameter is configured to permissive or strict, this should be True
ca

Property which generates a new client where all paths are prepended with /ca/api/v2

classmethod create()[source]

Uses the method EnterpriseApiSession.get_args_from_env() to create an EnterpriseApiSession

static get_args_from_env()[source]

Uses all parameters defined in get_args_from_env() and adds some additional environment variables:

  • DCOS_LOGIN_UNAME username to user for DC/OS login
  • DCOS_LOGIN_PW password to user for DC/OS login
  • DCOS_SSL_ENABLED can be ‘true’ or ‘false’. Set to false only if security is configured as disabled
iam

Property which generates a new client for Iam

secrets

Property which generates a new client where all paths are prepended with /secrets/v1

set_ca_cert()[source]

If security is permissive or strict, and the API session is not configured with verify=False, then the custom CA cert for the desired cluster must be attached to the session, which this method will do

set_initial_resource_ids()[source]

helper method for setting the initial_resource_ids property of this ApiSession object

This is useful for resetting the RIDs that were added over the couse of interaction with the cluster

wait_for_dcos()[source]

This method will wait for basic DC/OS services to be running. Once basic endpoints are up, this method will set the custom CA cert and authenticate with the cluster

class dcos_test_utils.enterprise.EnterpriseUser(uid: str, password: str)[source]

Bases: dcos_test_utils.dcos_api.DcosUser

Enterprise user abstraction for authenticating the EnterpriseApiSession client

Parameters:
  • uid (str) – username to log in with
  • password (str) – password to be used with uid
auth_json

Property for the headers needed to log into an Enterprise Edition cluster

class dcos_test_utils.enterprise.MesosNodeClientMixin[source]

Bases: object

This Mixin allows any request to be made against a master or agent mesos HTTP port by providing the keyword ‘mesos_node’. Thus, the user does not have to specify the master/agent port or which arbitrary host in the cluster meeting that role

api_request(method, path_extension, *, scheme=None, host=None, query=None, fragment=None, port=None, mesos_node=None, **kwargs)[source]

This mixin method provides an additional keyword for easily directing to Mesos endpoints on either masters or slaves

Parameters:mesos_node (str) – IP string of either a master or slave in the cluster
Returns:API response
Return type:requests.Response

dcos_test_utils.helpers module

Various helpers for test runners and integration testing directly

class dcos_test_utils.helpers.ARNodeApiClientMixin[source]

Bases: object

api_request(method, path_extension, *, scheme=None, host=None, query=None, fragment=None, port=None, node=None, **kwargs)[source]

Communicating with a DC/OS cluster is done by default through Admin Router. Use this Mixin with an ApiClientSession that requires distinguishing between nodes. Admin Router has both a master and agent process and so this wrapper accepts a node argument. node must be a host in self.master or self.all_slaves. If given, the request will be made to the Admin Router endpoint for that node type

Parameters:node (str) – IP of a master or agent
class dcos_test_utils.helpers.ApiClientSession(default_url: dcos_test_utils.helpers.Url)[source]

Bases: object

This class functions like the requests.session interface but adds a default Url and a request wrapper. This class only differs from requests.Session in that the cookies are cleared after each request (but not purged from the response) so that the request state may be more well-defined betweens tests sharing this object

Parameters:default_url (Url) – The base URL to which all requests will be appended to
api_request(method, path_extension, *, scheme=None, host=None, query=None, fragment=None, port=None, **kwargs) → requests.models.Response[source]

Direct wrapper for requests.session.request. This method is kept deliberatly simple so that child classes can alter this behavior without much copying

Parameters:
  • method (str) – the HTTP verb
  • path_extension (str) – the extension to the path that is set as the default Url
  • scheme (str) – scheme to be used instead of that included with self.default_url
  • host (str) – host to be used instead of that included with self.default_url
  • query (str) – query to be used instead of that included with self.default_url
  • fragment (str) – fragment to be used instead of that included with self.default_url
  • port (int, str) – port to be used instead of that included with self.default_url
  • **kwargs

    anything that can be passed to requests.request

Returns:

requests.Response – response object from the request

delete(*args, **kwargs)[source]

DELETE method for api_request() method

get(*args, **kwargs)[source]

GET method for api_request() method

head(*args, **kwargs)[source]

HEAD method for api_request() method

options(*args, **kwargs)[source]

OPTIONS method for api_request() method

patch(*args, **kwargs)[source]

PATCH method for api_request() method

post(*args, **kwargs)[source]

POST method for api_request() method

put(*args, **kwargs)[source]

PUT method for api_request() method

class dcos_test_utils.helpers.Host(private_ip, public_ip)

Bases: tuple

private_ip

Alias for field number 0

public_ip

Alias for field number 1

class dcos_test_utils.helpers.RetryCommonHttpErrorsMixin[source]

Bases: object

Mixin for ApiClientSession so that random disconnects from network instability do not derail entire scripts. This functionality is configured through the retry_timeout keyword

api_request(*args, retry_timeout: int = 60, **kwargs) → requests.models.Response[source]

Adds ‘retry_timeout’ keyword to API requests. Args:

*args: args to be passed to super()’s api_request method **kwargs: keyword args to be passed to super()’s api_request method retry_timeout: total number of seconds to keep retrying after

the initial exception was raised
class dcos_test_utils.helpers.SshInfo(user, home_dir)

Bases: tuple

home_dir

Alias for field number 1

user

Alias for field number 0

class dcos_test_utils.helpers.Url(scheme: str, host: str, path: str, query: str, fragment: str, port: Union[str, int])[source]

Bases: object

URL abstraction to allow convenient substitution of URL anatomy without having to copy and dissect the entire original URL

copy(scheme=None, host=None, path=None, query=None, fragment=None, port=None)[source]

return new Url with any component replaced

classmethod from_string(url_str: str)[source]

Creates a Url object from a string like ‘http://foo.bar/baz?=qux

netloc

Property which returns the a string of the form IP:port

dcos_test_utils.helpers.assert_response_ok(r: requests.models.Response)[source]

Simple helper to print out the status code and response content if a response is not OK

dcos_test_utils.helpers.check_json(response: requests.models.Response)[source]

Simple method which will raise an error if response has non-existent or empty JSON

dcos_test_utils.helpers.is_retryable_exception(exception: Exception) → bool[source]

Helper method to catch HTTP errors that are likely safe to retry. Args:

exception: exception raised from ApiClientSession.api_request instance
dcos_test_utils.helpers.marathon_app_id_to_mesos_dns_subdomain(app_id: str)[source]

Return app_id’s subdomain as it would appear in a Mesos DNS A record.

>>> marathon_app_id_to_mesos_dns_subdomain('/app-1')
'app-1'
>>> marathon_app_id_to_mesos_dns_subdomain('app-1')
'app-1'
>>> marathon_app_id_to_mesos_dns_subdomain('/group-1/app-1')
'app-1-group-1'
dcos_test_utils.helpers.path_join(p1: str, p2: str)[source]

Helper to ensure there is only one ‘/’ between two strings

dcos_test_utils.helpers.session_tempfile(data)[source]

Writes bytes to a named temp file and returns its path the temp file will be removed when the interpreter exits

dcos_test_utils.iam module

This module provides a specialized client for interacting with the Identity Access and Management (IAM) service endpoints

class dcos_test_utils.iam.Iam(default_url: dcos_test_utils.helpers.Url, session=None)[source]

Bases: dcos_test_utils.helpers.ApiClientSession

Helpers for interacting with service user accounts.

Note that some terminology is confused. The methods here interact with service user accounts and not services.

Parameters:
  • default_url (helpers.Url) – URL for the IAM service endpoint
  • session (requests.Session) – Session to bootstrap this session client with
create_acl(rid: str, description: str) → None[source]

creates an ACL

Parameters:
  • rid (str) – RID for the ACL to be created
  • description (str) – text description for the new RID
create_service(uid: str, pubkey: str, description: str)[source]

creates a service user

Parameters:
  • uid (str) – ID for the new service
  • pubkey (str) – Public key to be used by this new service
  • description (str) – simple description metadata to include with account creation
Returns:

None

delete_acl(rid: str) → None[source]

Deletes an ACL

Parameters:rid (str) – RID for the ACL to be deleted
delete_service(uid: str) → None[source]

Delete a service account and verify that this worked.

Args:
uid: The user ID of the service account user to delete.
Raises:
AssertionError: The delete operation does not succeed.
delete_user_permission(uid: str, action: str, rid: str) → None[source]

Will delete permission for a user for an action for a given RID

Parameters:
  • uid (str) – ID of the user that this permission will be deleted from
  • action (str) – action that will be deleted for the RID for the user
  • rid (str) – resource ID that the user will be removed from for the given action
grant_user_permission(uid: str, action: str, rid: str) → None[source]

Will grant a user with an action for a given RID

Parameters:
  • uid (str) – ID of the user that this permission will be granted to
  • action (str) – action that user will be granted for the RID
  • rid (str) – resource ID that the user will be granted the action to
make_service_account_credentials(uid, privkey) → dict[source]

Generates the JSON object to post to create a service account

Parameters:
  • uid (str) – ID for the service account to be created
  • privkey (str) – private key to be used for the new service account
Returns:

JSON-like dict to POST to IAM service

dcos_test_utils.jobs module

Utilities for integration testing metronome in a deployed DC/OS cluster

class dcos_test_utils.jobs.Jobs(default_url: dcos_test_utils.helpers.Url, session: requests.sessions.Session = None)[source]

Bases: dcos_test_utils.helpers.RetryCommonHttpErrorsMixin, dcos_test_utils.helpers.ApiClientSession

Specialized client for interacting with DC/OS jobs functionality

Parameters:
  • default_url (helpers.Url) – URL of the jobs service to bind to
  • session (requests.Session) – option session to bootstrap this session with
create(job_definition: dict) → dict[source]

Create a new job with given definition.

Parameters:job_definition (dict) – Job definition
Returns:Response from Jobs service as JSON
Return type:dict
destroy(job_id: str)[source]

Delete an existing job and all data.

Parameters:job_id (str) – Job ID
details(job_id: str, history=False) → dict[source]

Get the details of a specific Job.

Parameters:
  • job_id (str) – Job ID
  • history (bool) – Include embedded history in details
Returns:

Job details as JSON

Return type:

dict

run(job_id: str, timeout=600) -> (<class 'bool'>, <class 'dict'>, <class 'dict'>)[source]

Create a run, wait for it to finish, and return whether it was successful and the run itself.

This will run the job immediately and block until the run is complete.

Parameters:
  • job_id (str) – Job ID
  • timeout (int) – Timeout in seconds
Returns:

tuple of success, Run details, Job details

Return type:

bool, dict, dict

run_details(job_id: str, run_id: str) → dict[source]

Return details about the given Run ID.

Parameters:
  • job_id (str) – Job ID
  • run_id (str) – Run ID
Returns:

Run details

Return type:

dict

run_stop(job_id: str, run_id: str) → dict[source]

Stop the run run_id if it is in-progress.

Parameters:
  • job_id (str) – Job ID
  • run_id (str) – Run ID
Returns:

JSON response

Return type:

dict

start(job_id: str) → dict[source]

Create a run and return the Run.

Parameters:job_id (str) – Job ID
Returns:Run creation response from Jobs service
Return type:dict
wait_for_run(job_id: str, run_id: str, timeout=600)[source]

Wait for a given run to complete or timeout seconds to elapse.

Parameters:
  • job_id (str) – Job ID
  • run_id (str) – Run ID
  • timeout (int) – Time in seconds to wait before giving up
Returns:

None

dcos_test_utils.logger module

Logging configuration for the DC/OS testing utilities

Certain libraries used in this repository have excessively verbose output on the debug level (as python logging does not natively have a trace level). Invoking this module will effectively lower the level of the messages from the targed modules by one level.

dcos_test_utils.logger.setup(log_level_str: str, noisy_modules: list = None)[source]

Handles the builtin python log levels and adds level below debug (trace) which dampened modules will log debug at

Args:
log_level_str: user-provided string to set the log-level noisy_modules: any additional modules that should have their level increased

dcos_test_utils.marathon module

Utilities for integration testing marathon in a deployed DC/OS cluster

class dcos_test_utils.marathon.Container[source]

Bases: enum.Enum

Enumerator to capture all Marathon app container options

DOCKER = 'DOCKER'
MESOS = 'MESOS'
NONE = None
class dcos_test_utils.marathon.Endpoint(host, port, ip)

Bases: tuple

host

Alias for field number 0

ip

Alias for field number 2

port

Alias for field number 1

class dcos_test_utils.marathon.Healthcheck[source]

Bases: enum.Enum

Enumerator to capture all Marathon app Healthcheck options

HOST = 'HOST'
HTTP = 'HTTP'
MESOS_HTTP = 'MESOS_HTTP'
class dcos_test_utils.marathon.Marathon(default_url, session=None)[source]

Bases: dcos_test_utils.helpers.RetryCommonHttpErrorsMixin, dcos_test_utils.helpers.ApiClientSession

Specialized client for interacting with Marathon (DC/OS Services) functionality

Parameters:
  • default_url (helpers.Url) – URL of the jobs service to bind to
  • session (requests.Session) – option session to bootstrap this session with
check_app_instances(app_id: str, app_instances: int, check_health: bool, ignore_failed_tasks: bool) → bool[source]

Check a marathon app ID and return True if healthy

Args:
app_id: marathon app ID ro be checked app_instances: number of expected app instances check_health: if True, health check status must pass to return True ignore_failed_tasks: if False, any failed tasks will result in an exception
deploy_and_cleanup(app_definition, timeout=180, check_health=True, ignore_failed_tasks=True)[source]

This context manager works just like Marathon.deploy_app() but will always destroy the app once the context is left

deploy_app(app_definition, check_health=True, ignore_failed_tasks=False, timeout=180)[source]

Deploy an app to marathon

This function deploys an an application and then waits for marathon to acknowledge it’s successful creation or fails the test.

The wait for application is immediately aborted if Marathon returns nonempty ‘lastTaskFailure’ field (if ignore_failed_tasks is set to False). Otherwise it waits until all the instances reach tasksRunning and then tasksHealthy state.

Args:
app_definition: a dict with application definition as specified in
Marathon API (https://mesosphere.github.io/marathon/docs/rest-api.html#post-v2-apps)
check_health: wait until Marathon reports tasks as healthy before
returning
Returns:

A list of named tuples which represent service points of deployed applications. I.E:

[Endpoint(host=‘172.17.10.202’, port=10464), Endpoint(host=‘172.17.10.201’, port=1630)]
deploy_pod(pod_definition, timeout=180)[source]

Deploy a pod to marathon

This function deploys an a pod and then waits for marathon to acknowledge it’s successful creation or fails the test.

It waits until all the instances reach tasksRunning and then tasksHealthy state.

Args:
pod_definition: a dict with pod definition as specified in
Marathon API

timeout: seconds to wait for deployment to finish

Returns:
Pod data JSON
deploy_pod_and_cleanup(pod_definition, timeout=180)[source]

This context manager works just like Marathon.deploy_pod() but will always destroy the app once the context is left

destroy_app(app_name, timeout=300)[source]

Remove a marathon app

Abort the test if the removal was unsuccessful.

Args:
app_name: name of the application to remove timeout: seconds to wait for destruction before failing test
destroy_pod(pod_id, timeout=300)[source]

Remove a marathon pod

Abort the test if the removal was unsuccessful.

Args:
pod_id: id of the pod to remove timeout: seconds to wait for destruction before failing test
get_app_service_endpoints(app_id: str) → List[dcos_test_utils.marathon.Endpoint][source]

returns endpoint tuples for the given application ID

purge()[source]

Force deletes all applications, all pods, and then waits indefinitely for any deployments to finish

wait_for_app_deployment(app_id: str, app_instances: int, check_health: bool, ignore_failed_tasks: bool, timeout: int)[source]

Retries the check_app_instance function for a limited time Args:

app_id: ID of the marathon app to check app_instances: expected number of instances check_health: if True, health checks must pass before unblocking ignore_failed_tasks: if False, then failed tasks will raise an exception timeout: time (in seconds) to wait before raising an exception
wait_for_deployments_complete()[source]

This simple helper will block until there are no more deployments in progress

class dcos_test_utils.marathon.Network[source]

Bases: enum.Enum

Enumerator to capture all Marathon app networking options

BRIDGE = 'BRIDGE'
HOST = 'HOST'
USER = 'USER'

dcos_test_utils.onprem module

Utilities to assist with orchestrating and testing an onprem deployment

class dcos_test_utils.onprem.OnpremCluster(masters: List[dcos_test_utils.helpers.Host], private_agents: List[dcos_test_utils.helpers.Host], public_agents: List[dcos_test_utils.helpers.Host], bootstrap_host: dcos_test_utils.helpers.Host)[source]

Bases: object

cluster_hosts

Property that returns a list of all Hosts except the bootstrap host

classmethod from_hosts(bootstrap_host, cluster_hosts, num_masters, num_private_agents, num_public_agents)[source]

Creates a cluster object from a hosts list and the desired quantity of each host type

get_master_ips() → List[dcos_test_utils.helpers.Host][source]
Returns:a new copy of self.masters
get_private_agent_ips() → List[dcos_test_utils.helpers.Host][source]
Returns:a new copy of self.private_agents
get_public_agent_ips() → List[dcos_test_utils.helpers.Host][source]
Returns:a new copy of self.public_agents
hosts

Property that returns a list of all Hosts including the bootstrap host

static partition_cluster(cluster_hosts: List[dcos_test_utils.helpers.Host], num_masters: int, num_agents: int, num_public_agents: int)[source]

Return (masters, agents, public_agents) from hosts list.

dcos_test_utils.onprem.log_and_raise_if_not_ok(response: requests.models.Response)[source]

A helper for dumping the response content to log if its not OK

dcos_test_utils.package module

Utilities for integration testing package management (https://github.com/dcos/cosmos)

class dcos_test_utils.package.Cosmos(default_url: dcos_test_utils.helpers.Url, session=None)[source]

Bases: dcos_test_utils.helpers.RetryCommonHttpErrorsMixin, dcos_test_utils.helpers.ApiClientSession

Specialized client for interacting with Cosmos (universe gateway) functionality

Parameters:
  • default_url (helpers.Url) – URL of the jobs service to bind to
  • session (requests.Session) – option session to bootstrap this session with
install_package(package_name, package_version=None, options=None, app_id=None)[source]

Install a package using the cosmos packaging API

Args:
package_name: str package_version: str options: JSON dict appId: str
Returns:
requests.response object
Notes:
Use Marathon.poll_marathon_for_app_deployment to check if the installed app deployed successfully (Need the appId from the response)
list_packages()[source]

List all packages using the cosmos packaging API

Returns:
requests.response object
uninstall_package(package_name, app_id=None)[source]

Uninstall a package using the cosmos packaging API

Args:
package_name: str app_id: str, should have leading slash
Returns:
requests.response object

dcos_test_utils.recordio module

Provides facilities for “Record-IO” encoding of data. “Record-IO” encoding allows one to encode a sequence of variable-length records by prefixing each record with its size in bytes:

5

hello 6

world!

Note that this currently only supports record lengths encoded as base 10 integer values with newlines as a delimiter. This is to provide better language portability: parsing a base 10 integer is simple. Most other “Record-IO” implementations use a fixed-size header of 4 bytes to directly encode an unsigned 32 bit length.

class dcos_test_utils.recordio.Decoder(deserialize)[source]

Bases: object

Decode a ‘RecordIO’ message back to an arbitrary message type.

This class encapsulates the process of decoding a message previously encoded with ‘RecordIO’ back to an arbitrary message type. Its constructor takes a deserialization function of the form ‘deserialize(data)’. This deserialization function is responsible for knowing how to take a fully constructed ‘RecordIO’ message containing a ‘UTF-8’ encoded byte array and deserialize it back into the original message type.

The ‘decode(data)’ message takes a ‘UTF-8’ encoded byte array as input and buffers it across subsequent calls to construct a set of fully constructed ‘RecordIO’ messages that are decoded and returned in a list.

Parameters:deserialize (function) – a function to deserialize from ‘RecordIO’ messages built up by subsequent calls to ‘decode(data)’
FAILED = 2
HEADER = 0
RECORD = 1
decode(data)[source]

Decode a ‘RecordIO’ formatted message to its original type.

Parameters:data (bytes) – an array of ‘UTF-8’ encoded bytes that make up a partial ‘RecordIO’ message. Subsequent calls to this function maintain state to build up a full ‘RecordIO’ message and decode it
Returns:a list of deserialized messages
Return type:list
class dcos_test_utils.recordio.Encoder(serialize)[source]

Bases: object

Encode an arbitray message type into a ‘RecordIO’ message.

This class encapsulates the process of encoding an arbitrary message into a ‘RecordIO’ message. Its constructor takes a serialization function of the form ‘serialize(message)’. This serialization function is responsible for knowing how to take whatever message type is passed to ‘encode()’ and serializing it to a ‘UTF-8’ encoded byte array.

Once ‘encode(message)’ is called, it will use the serialization function to convert ‘message’ into a ‘UTF-8’ encoded byte array, wrap it in a ‘RecordIO’ frame, and return it.

Parameters:serialize (function) – a function to serialize any message passed to ‘encode()’ into a ‘UTF-8’ encoded byte array
encode(message)[source]

Encode a message into ‘RecordIO’ format.

Parameters:message (object) – a message to serialize and then wrap in a ‘RecordIO’ frame.
Returns:a serialized message wrapped in a ‘RecordIO’ frame
Return type:bytes

dcos_test_utils.ssh_client module

Simple, robust SSH client(s) for basic I/O with remote hosts

class dcos_test_utils.ssh_client.AsyncSshClient(user: str, key: str, targets: list, process_timeout=120, parallelism=10)[source]

Bases: dcos_test_utils.ssh_client.SshClient

SshClient for running against a set of hosts in parallel

Args:

user: ssh user name key: ssh private key contents targets: list of host strings for SSH use (hostname:optional_port) process_timeout (optional): how many seconds any given process can run for parallelism (optional): how many processes to run at the same time. Rarely is

a SSH command CPU bound, so this number can be greater than CPU concurrency
copy(sem: asyncio.locks.Semaphore, host: str, local_path: str, remote_path: str, recursive: bool) → dict[source]

uses SCP to copy files to remote host

Args:
sem: semaphore for concurrency control host: host string to run copy to local_path: path that will be copied remote_path: where the data will be copied to recursive: if True, recursive SCP the local_path to remote_path
Returns:
command result dict (see _run_cmd_return_dict_async)
run(sem: asyncio.locks.Semaphore, host: str, cmd: list) → dict[source]

Uses SSH tunnel to run a command against a host

Args:
sem: semaphore for concurrency control host: host string to run copy to cmd: argument list to be executed on the remote host
Returns:
command result dict (see _run_cmd_return_dict_async)
run_command(coroutine_name: str, *args) → list[source]

Runs a _run_command_on_hosts in an async loop

Args:
coroutine_name: either ‘copy’ or ‘run’ *args: args to pass to copy or run
Returns:
list of result dicts
run_command_on_hosts(coroutine_name: str, *args, sem: asyncio.locks.Semaphore = None) → list[source]

Starts and waits upon tasks running across all hosts

Args:

coroutine_name: either ‘copy’ or ‘run’ *args: arg list to be passed to copy or run sem (optional): semaphore for controlling concurrency. If not supplied, a semaphore

of the default parallelism will be created
Returns:
list of result dicts from _run_cmd_return_dict_async
start_command_on_hosts(sem: asyncio.locks.Semaphore, coroutine_name: str, *args) → list[source]

Starts coroutines against all hosts and returns futures

Args:
sem: semaphore for blocking job creation to control concurrency coroutine_name: either ‘copy’ or ‘run’ *args: args to be passed to copy or run
Returns:
list of futures of the commands that were started
class dcos_test_utils.ssh_client.SshClient(user: str, key: str)[source]

Bases: object

class for binding SSH user and key to tunnel

Parameters:
  • user (str) – SSH user to connect with
  • key (str) – SSH private key for user to connect with
add_ssh_user_to_docker_users(host: str, port: int = 22)[source]

Runs user mod on remote host to add this user to docker users

Parameters:
  • host (str) – host to add usergroup memership too
  • port (int) – SSH port of the host (defaults to 22)
command(host: str, cmd: list, port: int = 22, **kwargs) → bytes[source]

Opens a tunnel and runs a single command

Parameters:
  • host (str) – host IP to open the tunnel to
  • cmd (list) – list of shell args to run on the host
  • port (int) – SSH port of the host (defaults to 22)
  • kwargs – see args used in Tunnelled.command()
get_home_dir(host: str, port: int = 22) → str[source]

Returns the SSH home dir

Parameters:
  • host (str) – host IP to get the home directory from
  • port (int) – SSH port of the host (defaults to 22)
tunnel(host: str, port: int = 22) → Generator[dcos_test_utils.ssh_client.Tunnelled, None, None][source]

wrapper for the open_tunnel() context manager

Parameters:
  • host (str) – host IP to open the tunnel to
  • port (int) – SSH port of the host (defaults to 22)
wait_for_ssh_connection(host: str, port: int = 22) → None[source]

Blocks until SSH connection can be established

Parameters:
  • host (str) – host IP to wait for connection to
  • port (int) – SSH port of the host (defaults to 22)
class dcos_test_utils.ssh_client.Tunnelled(opt_list: list, target: str, port: int)[source]

Bases: object

Abstraction of an already instantiated SSH-tunnel

Args:
opt_list: list of SSH options strings. E.G. ‘-oControlPath=foo’ target: string in the form user@host port: port number to be used for SSH or SCP
command(cmd: list, **kwargs) → bytes[source]

Run a command at the tunnel target

Args:

cmd: list of strings that will be sent as a command to the target **kwargs: any keywork args that can be passed into

subprocess.check_output. For more information, see: https://docs.python.org/3/library/subprocess.html#subprocess.check_output
copy_file(src: str, dst: str, to_remote=True) → None[source]

Copy a path from localhost to target. If path is a local directory, then recursive copy will be used.

Args:
src: local or remote representing source data dst: local or remote destination path to_remote: Whether copying from remote->local or local->remote
dcos_test_utils.ssh_client.open_tunnel(user: str, host: str, port: int, control_path: str, key_path: str) → dcos_test_utils.ssh_client.Tunnelled[source]

Provides clean setup/tear down for an SSH tunnel

Args:
user: SSH user key_path: path to a private SSH key host: string containing target host port: target’s SSH port
dcos_test_utils.ssh_client.parse_ip(ip: str) -> (<class 'str'>, <class 'int'>)[source]

takes an IP string and either a hostname and either the given port or the default ssh port of 22

dcos_test_utils.ssh_client.temp_ssh_key(key: str) → str[source]

Dumps an SSH key string to a temp file that will be deleted at session close and returns the path