Source code for google_music.clients.musicmanager

__all__ = ['MusicManager']

import socket
import subprocess
import time
from pathlib import Path
from urllib.parse import unquote
from uuid import getnode as get_mac

import audio_metadata
import google_music_proto.musicmanager.calls as mm_calls
import httpx
from google_music_proto.musicmanager.pb import locker_pb2, upload_pb2
from google_music_proto.musicmanager.utils import transcode_to_mp3
from google_music_proto.oauth import (
	MUSICMANAGER_CLIENT_ID,
	MUSICMANAGER_CLIENT_SECRET,
	MUSICMANAGER_SCOPE,
)
from tenacity import stop_after_attempt

from .base import GoogleMusicClient
from ..token_handlers import FileTokenHandler
from ..utils import create_mac_string


[docs]class MusicManager(GoogleMusicClient): """API wrapper class to access Google Music Music Manager functionality. >>> from google_music import MusicManager >>> mm = MusicManager('username') Parameters: username (str, Optional): Your Google Music username. Used to store OAuth tokens for multiple accounts separately. uploader_id (str, Optional): A unique uploader ID. Default: MAC address and username used. session (:class:`~google_music.GoogleMusicSession`, Optional): A session compatible with :class:`GoogleMusicSession`. token (dict, Optional): An OAuth token compatible with ``oauthlib``. token_handler (:class:`~google_music.TokenHandler`, Optional): A token handler class compatible with :class:`TokenHandler` for dumping and loading the OAuth token. token_handler_kwargs (dict, Optional): Keyword arguments to pass to the ``token_handler`` class. These become attributes on the class instance. """ client = 'musicmanager' client_id = MUSICMANAGER_CLIENT_ID client_secret = MUSICMANAGER_CLIENT_SECRET oauth_scope = MUSICMANAGER_SCOPE def __init__( self, username=None, uploader_id=None, *, session=None, token=None, token_handler=FileTokenHandler, token_handler_kwargs=None ): super().__init__( username, session=session, token=token, token_handler=token_handler, token_handler_kwargs=None ) if self.login(): if uploader_id is None: mac_int = get_mac() if (mac_int >> 40) % 2: raise OSError("A valid MAC address could not be obtained.") mac_string = create_mac_string(mac_int) if username: uploader_id = f"{mac_string}-{username}" else: uploader_id = mac_string uploader_name = ( f"{socket.gethostname()} ({self._session.headers['User-Agent']})" ) self._upauth(uploader_id, uploader_name) def __repr__(self): return f"MusicManager(username={self.username!r}, uploader_id={self.uploader_id}, token={self.token})" def _upauth(self, uploader_id, uploader_name): self._call(mm_calls.UpAuth, uploader_id, uploader_name) self._uploader_id = uploader_id self._uploader_name = uploader_name @property def uploader_id(self): """The uploader ID of the :class:`MusicManager` instance.""" return self._uploader_id @property def uploader_name(self): """The uploader name of the :class:`MusicManager` instance.""" return self._uploader_name
[docs] def download(self, song): """Download a song from a Google Music library. Parameters: song (dict): A song dict. Returns: tuple: Song content as bytestring, suggested filename. """ song_id = song['id'] response = self._call( mm_calls.Export, self.uploader_id, song_id ) audio = response.body suggested_filename = unquote( response.headers['Content-Disposition'].split("filename*=UTF-8''")[-1] ) return (audio, suggested_filename)
[docs] def quota(self): """Get the uploaded track count and allowance. Returns: tuple: Number of uploaded tracks, number of tracks allowed. """ response = self._call( mm_calls.ClientState, self.uploader_id ) client_state = response.body.clientstate_response return (client_state.total_track_count, client_state.locker_track_limit)
[docs] def songs(self, *, uploaded=True, purchased=True): """Get a listing of Music Library songs. Returns: list: Song dicts. """ if not uploaded and not purchased: raise ValueError("'uploaded' and 'purchased' cannot both be False.") if purchased and uploaded: song_list = [] for chunk in self.songs_iter(export_type=1): song_list.extend(chunk) elif purchased: song_list = [] for chunk in self.songs_iter(export_type=2): song_list.extend(chunk) elif uploaded: purchased_songs = [] for chunk in self.songs_iter(export_type=2): purchased_songs.extend(chunk) song_list = [ song for chunk in self.songs_iter(export_type=1) for song in chunk if song not in purchased_songs ] return song_list
[docs] def songs_iter(self, *, continuation_token=None, export_type=1): """Get a paged iterator of Music Library songs. Parameters: continuation_token (str, Optional): The token of the page to return. Default: Not sent to get first page. export_type (int, Optional): The type of tracks to return. 1 for all tracks, 2 for promotional and purchased. Default: ``1`` Yields: list: Song dicts. """ def track_info_to_dict(track_info): return { field.name: value for field, value in track_info.ListFields() } while True: response = self._call( mm_calls.ExportIDs, self.uploader_id, continuation_token=continuation_token, export_type=export_type, ) items = [ track_info_to_dict(track_info) for track_info in response.body.download_track_info ] if items: yield items continuation_token = response.body.continuation_token if not continuation_token: break
# TODO: Is there a better return value? # TODO: Can more of this code be moved into calls and still leave viable control flow?
[docs] def upload(self, song, *, album_art_path=None, no_sample=False): """Upload a song to a Google Music library. Parameters: song (os.PathLike or str or audio_metadata.Format): The path to an audio file or an instance of :class:`audio_metadata.Format`. album_art_path (os.PathLike or str, Optional): The relative filename or absolute filepath to external album art. no_sample(bool, Optional): Don't generate an audio sample from song; send empty audio sample. Default: Create an audio sample using ffmpeg/avconv. Returns: dict: A result dict with keys: ``'filepath'``, ``'success'``, ``'reason'``, and ``'song_id'`` (if successful). """ if not isinstance(song, audio_metadata.Format): try: song = audio_metadata.load(song) except audio_metadata.UnsupportedFormat: raise ValueError("'song' is not of a supported format.") if album_art_path: album_art_path = Path(album_art_path).resolve() if album_art_path.is_file(): with album_art_path.open('rb') as image_file: external_art = image_file.read() else: external_art = None else: external_art = None result = {'filepath': Path(song.filepath)} track_info = mm_calls.Metadata.get_track_info(song) response = self._call( mm_calls.Metadata, self.uploader_id, [track_info] ) metadata_response = response.body.metadata_response if metadata_response.signed_challenge_info: # Sample requested. sample_request = metadata_response.signed_challenge_info[0] try: track_sample = mm_calls.Sample.generate_sample( song, track_info, sample_request, external_art=external_art, no_sample=no_sample, ) response = self._call( mm_calls.Sample, self.uploader_id, [track_sample] ) track_sample_response = response.body.sample_response.track_sample_response[ 0 ] except (OSError, ValueError, subprocess.CalledProcessError): raise # TODO else: track_sample_response = metadata_response.track_sample_response[0] response_code = track_sample_response.response_code if response_code == upload_pb2.TrackSampleResponse.MATCHED: result.update( { 'success': True, 'reason': 'Matched', 'song_id': track_sample_response.server_track_id, } ) elif response_code == upload_pb2.TrackSampleResponse.UPLOAD_REQUESTED: server_track_id = track_sample_response.server_track_id self._call( mm_calls.UploadState, self.uploader_id, 'START' ) attempts = 0 should_retry = True while should_retry and attempts <= 10: try: # Call with tenacity.retry_with to disable automatic retries. response = self._call.retry_with(stop=stop_after_attempt(1))( self, mm_calls.ScottyAgentPost, self.uploader_id, server_track_id, track_info, song, external_art=external_art, total_song_count=1, total_uploaded_count=0, ) except httpx.HTTPError as e: should_retry = True reason = e.response else: session_response = response.body if 'sessionStatus' in session_response: break try: # WHY, GOOGLE?! WHY??????????? status_code = session_response['errorMessage']['additionalInfo'][ 'uploader_service.GoogleRupioAdditionalInfo' ]['completionInfo']['customerSpecificInfo']['ResponseCode'] except KeyError: status_code = None if status_code == 503: # Upload server still syncing. should_retry = True reason = "Server syncing" elif status_code == 200: # Song is already uploaded. should_retry = False reason = "Already uploaded" elif status_code == 404: # Rejected. should_retry = False reason = "Rejected" else: should_retry = True reason = "Unkown error" finally: attempts += 1 time.sleep(2) # Give the server time to sync. else: result.update( { 'success': False, 'reason': f'Could not get upload session: {reason}', } ) if 'success' not in result: transfer = session_response['sessionStatus']['externalFieldTransfers'][0] upload_url = transfer['putInfo']['url'] content_type = transfer.get('content_type', 'audio/mpeg') original_content_type = track_info.original_content_type transcode = ( isinstance(song, audio_metadata.WAVE) or original_content_type != locker_pb2.Track.MP3 ) if ( transcode or original_content_type == locker_pb2.Track.MP3 ): if transcode: audio_file = transcode_to_mp3(song, quality='320k') else: with open(song.filepath, 'rb') as f: audio_file = f.read() # Google Music allows a maximum file size of 300 MiB. if len(audio_file) >= 300 * 1024 * 1024: result.update( { 'success': False, 'reason': 'Maximum allowed file size is 300 MiB.', } ) else: try: upload_response = self._call( mm_calls.ScottyAgentPut, upload_url, audio_file, content_type=content_type, ).body except Exception as e: # noqa result.update( { 'success': False, 'reason': str(e), } ) if upload_response.get('sessionStatus', {}).get('state'): result.update( { 'success': True, 'reason': 'Uploaded', 'song_id': track_sample_response.server_track_id, } ) else: result.update( { 'success': False, 'reason': upload_response, # TODO: Better error details. } ) else: # Do not upload files if transcode option set to False. result.update( { 'success': False, 'reason': 'Transcoding disabled for file type.', } ) self._call(mm_calls.UploadState, self.uploader_id, 'STOPPED') else: response_codes = upload_pb2._TRACKSAMPLERESPONSE.enum_types[0] response_type = response_codes.values_by_number[ track_sample_response.response_code ].name reason = response_type result.update( { 'success': False, 'reason': f'{reason}' } ) if response_type == 'ALREADY_EXISTS': result['song_id'] = track_sample_response.server_track_id return result