Source code for azure_databricks_sdk_python.api

import json
import requests
import urllib.parse
from cattr import structure, unstructure

from azure_databricks_sdk_python.types import AuthMethods
from azure_databricks_sdk_python.exceptions import *


[docs]class API: """Base class for API wrappers. It composes with APIWithAuth API classes. """ def __init__(self, **kwargs): """takes keyword arguements and constructs a dispatcher as an instance of APIWithAuth that will act as this class via composition. Raises: Exception: In case the auth method is not recognised. """ auth_method = kwargs.pop('auth_method') base_url = kwargs.pop('base_url') if (auth_method == AuthMethods.PERSONAL_ACCESS_TOKEN): token = kwargs.pop('personal_access_token') self._dispatcher = APIWithPersonalAccessToken( base_url, token) elif (auth_method == AuthMethods.AZURE_AD_USER): access_token = kwargs.pop('access_token') resource_id = kwargs.pop('resource_id', None) self._dispatcher = APIWithAzureADUser( base_url, access_token, resource_id) elif (auth_method == AuthMethods.AZURE_AD_SERVICE_PRINCIPAL): access_token = kwargs.pop('access_token') resource_id = kwargs.pop('resource_id', None) management_token = kwargs.pop('management_token', None) self._dispatcher = APIWithAzureADServicePrincipal( base_url, access_token, management_token, resource_id) else: raise Exception('Authentification method not defined.') def __getattr__(self, attr): """Implementing a composition with the self._dispatcher object hence API().foo with now return self._dispatcher().foo """ return getattr(self._dispatcher, attr)
[docs]class APIWithAuth: """Base class API composers the API composers implement auth specific logic as they inherit from this class that implements common functionality such as http get and post and also error handeling. """ def _get(self, endpoint: str, data=None): """Performs a http get to BASE_URL/endpoint with data passed as json body Args: endpoint (str): API endpoint data (dict, optional): parameters to send to the API endpoint. Defaults to None. Returns: Response: the response object from http call. """ url = urllib.parse.urljoin(self._base_url, endpoint.lstrip('/')) return requests.get(url=url, headers=self._headers, json=data) def _post(self, endpoint: str, data=None): """Performs a http post to BASE_URL/endpoint with data passed as body Args: endpoint (str): API endpoint data (dict, optional): parameters to send to the API endpoint. Defaults to None. Returns: Response: the response object from http call. """ data_json = json.dumps(data, ensure_ascii=False) url = self._base_url + endpoint url = urllib.parse.urljoin(self._base_url, endpoint.lstrip('/')) return requests.post(url=url, headers=self._headers, data=data_json) def _handle_error(self, res): """Helper method to handle http errors Args: res (Response): the response object from http call. Raises: AuthorizationError: In case of authorization error. ERROR_CODES[Exception]: For all speicfic cases, check exceptions.py . APIError: for all other cases """ if res.status_code == 403: raise AuthorizationError(res.json().get('message')) elif res.json().get("error_code") in ERROR_CODES: raise ERROR_CODES[res.json().get('error_code')](res.json().get('message')) else: raise APIError("API returned an error {0}: {1} {2}".format(res.status_code, res.json().get('error_code'), res.json().get('message'))) def _safe_handle(self, res, value, type=None): """Helper method to safely handle http response Args: res (Response): http response. value (any): value to return. Returns: any: the returned object. Raise exception if code is not 200. """ if res.status_code == 200: if type: return structure(value, type) else: return value else: self._handle_error(res) def _validate(self, req, type, validate=True): """Validates users input to be passed to api Args: req (object): user input. type (object): the type to be validated against. validate (bool): to validate or not the input against the type. Raises: ValueError: if validates=True, Raises in case the input is not type serializable. ValueError: if validates=True,Raises in case the input is not a dict. Returns: dict: the input data in dict format. """ data = req if validate: if not isinstance(req, type): try: data = structure(req, type) except Exception as err: raise ValueError( 'Request is a valid {0}: {1}'.format(type.__name__, err)) return unstructure(data) else: if not isinstance(req, dict): raise ValueError( 'Request is not a dict. {0} passed instead.'.format(req)) return data
[docs]class APIWithPersonalAccessToken(APIWithAuth): """API composers for PersonalAccessToken auth""" def __init__(self, base_url: str, personal_access_token: str): """Sets up request parameters for the personal access token auth method. Args: base_url (str): Databricks API url. personal_access_token (str): Databricks personal access token. """ self._base_url = base_url self._token = personal_access_token self._headers = {'Authorization': 'Bearer {0}'.format(self._token)}
[docs]class APIWithAzureADUser(APIWithAuth): """API composers for AzureADUser auth""" def __init__(self, base_url: str, access_token: str, resource_id: str): """Sets up request parameters for the personal Azure AD user auth method. Args: base_url (str): Databricks API url. access_token (str): Azure AD access token. resource_id (str): Databricks workspace resource ID. """ self._base_url = base_url self._token = access_token self._resource_id = resource_id self._headers = {'Authorization': 'Bearer {0}'.format( self._token)} if (resource_id): self._headers = { 'X-Databricks-Azure-Workspace-Resource-Id': self._resource_id, **self._headers}
[docs]class APIWithAzureADServicePrincipal(APIWithAuth): """API composers for AzureADServicePrincipal auth""" def __init__(self, base_url: str, access_token: str, management_token: str, resource_id: str): """Sets up request parameters for the personal Azure AD service principal auth method. Args: base_url (str): Databricks API url. access_token (str): Azure AD access token. management_token (str): AD management token. resource_id (str): Databricks workspace resource ID. """ self._base_url = base_url self._token = access_token self._resource_id = resource_id self._management_token = management_token self._headers = {'Authorization': 'Bearer {0}'.format( self._token)} if (resource_id): self._headers = { 'X-Databricks-Azure-Workspace-Resource-Id': self._resource_id, 'X-Databricks-Azure-SP-Management-Token': self._management_token, **self._headers}