Source code for google_music.sessions

__all__ = [
	'GoogleMusicSession',
]

import httpx
from google_music_proto.oauth import AUTHORIZATION_BASE_URL, REDIRECT_URI, TOKEN_URL
from oauthlib.common import generate_token, urldecode
from oauthlib.oauth2 import TokenExpiredError, WebApplicationClient

from .__about__ import __title__, __version__


# Adapted from requests-oauthlib for use with httpx.
[docs]class GoogleMusicSession(httpx.Client): authorization_base_url = AUTHORIZATION_BASE_URL redirect_uri = REDIRECT_URI token_url = TOKEN_URL def __init__( self, client_id, client_secret, scope, *, token=None, **kwargs ): # httpx sets a default timeout on the Client class. # requests did not. # Disable timeout by default as too low a value # can cause issues with upload calls. timeout = kwargs.pop('timeout', None) super().__init__(timeout=timeout, **kwargs) self.params = {} self.headers.update( {'User-Agent': f'{__title__}/{__version__}'} ) self.client_id = client_id self.client_secret = client_secret self.scope = scope self.token = token or {} self.oauth_client = WebApplicationClient(self.client_id, token=self.token) @property def access_token(self): return self.token.get('access_token') @property def authorized(self): return bool(self.access_token) def authorization_url(self): state = generate_token() return ( self.oauth_client.prepare_request_uri( self.authorization_base_url, redirect_uri=self.redirect_uri, scope=self.scope, state=state, access_type='offline', prompt='select_account' ) ) def fetch_token(self, code): body = self.oauth_client.prepare_request_body( code=code, body='', redirect_uri=self.redirect_uri, include_client_id=None ) response = self.request( 'POST', self.token_url, headers={ 'Accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8', }, data=dict(urldecode(body)), auth=httpx.BasicAuth(self.client_id, self.client_secret) ) self.token = self.oauth_client.parse_request_body_response(response.text, scope=self.scope) return self.token def refresh_token(self): refresh_token = self.token.get('refresh_token') body = self.oauth_client.prepare_refresh_body( body='', refresh_token=refresh_token, scope=self.scope, client_id=self.client_id, client_secret=self.client_secret ) response = self.request( 'POST', self.token_url, headers={ 'Accept': 'application/json', 'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8', }, data=dict(urldecode(body)), auth=httpx.BasicAuth(self.client_id, self.client_secret), withhold_token=True ) self.token = self.oauth_client.parse_request_body_response(response.text, scope=self.scope) if 'refresh_token' not in self.token: self.token['refresh_token'] = refresh_token return self.token def request( self, method, url, data=None, headers=None, withhold_token=False, **kwargs ): if self.token and not withhold_token: try: url, headers, data = self.oauth_client.add_token( url, http_method=method, body=data, headers=headers ) except TokenExpiredError: self.refresh_token() url, headers, data = self.oauth_client.add_token( url, http_method=method, body=data, headers=headers ) return super().request( method, url, headers=headers, data=data, **kwargs )