__all__ = ['MobileClient']
import re
from collections import defaultdict
from operator import itemgetter
from uuid import getnode as get_mac
from uuid import uuid4
import google_music_proto.mobileclient.calls as mc_calls
import more_itertools
from google_music_proto.mobileclient.types import (
ListenNowItemType,
QueryResultType,
StationSeedType,
)
from google_music_proto.oauth import IOS_CLIENT_ID, IOS_CLIENT_SECRET, MOBILE_SCOPE
from tbm_utils import cast_to_list
from .base import GoogleMusicClient
from ..token_handlers import FileTokenHandler
from ..utils import create_mac_string, get_ple_prev_next
# TODO: 'max_results', 'start_token', 'updated_min', 'quality', etc.
# TODO: Podcast edits.
# TODO: Station create/edit.
# TODO: Difference between shuffles and instant mixes?
# TODO: Situations are now returned through a protobuf call?
[docs]class MobileClient(GoogleMusicClient):
"""API wrapper class to access Google Music mobile client functionality.
>>> from google_music import MobileClient
>>> mc = MobileClient('username')
Note:
Streaming requires a ``device_id`` from a valid, linked mobile device.
The :class:`MobileClient` instance's ``device_id`` can be changed after
instantiation, or a different ``device_id`` provided to :meth:`stream`
or :meth:`stream_url`.
Parameters:
username (str, Optional):
Your Google Music username.
Used to store OAuth tokens for multiple accounts separately.
device_id (str, Optional):
A mobile device ID.
Default: MAC address is used.
locale (str, Optional):
`ICU <http://www.localeplanet.com/icu/>`__
locale used to localize some responses.
This must be a locale supported by Android.
Default: ``'en_US'``.
session (:class:`~google_music.GoogleMusicSession`, Optional):
A session compatible with :class:`GoogleMusicSession`.
token (dict, Optional):
An OAuth token compatible with ``oauthlib``.
token_handler (:class:`~google_music.TokenHandler`, Optional):
A token handler class compatible with :class:`TokenHandler`
for dumping and loading the OAuth token.
token_handler_kwargs (dict, Optional):
Keyword arguments to pass to the ``token_handler``
class. These become attributes on the class instance.
"""
client = 'mobileclient'
client_id = IOS_CLIENT_ID
client_secret = IOS_CLIENT_SECRET
oauth_scope = MOBILE_SCOPE
def __init__(
self,
username=None,
device_id=None,
*,
locale='en_US',
session=None,
token=None,
token_handler=FileTokenHandler,
token_handler_kwargs=None
):
super().__init__(
username,
session=session,
token=token,
token_handler=token_handler,
token_handler_kwargs=None
)
if self.login():
self.locale = locale
self.tier = 'fr'
if device_id is None:
mac_int = get_mac()
if (mac_int >> 40) % 2:
raise OSError("A valid MAC address could not be obtained.")
self.device_id = create_mac_string(mac_int, delimiter='')
else:
self.device_id = device_id
self.is_subscribed
def __repr__(self):
return f"MobileClient(username={self.username!r}, device_id={self.device_id}, token={self.token}, locale={self.locale})"
@property
def device_id(self):
"""The mobile device ID of the :class:`MobileClient` instance."""
return self._session.headers.get('X-Device-ID')
@device_id.setter
def device_id(self, device_id):
self._session.headers.update({'X-Device-ID': device_id})
@property
def is_subscribed(self):
"""The subscription status of the account linked to the :class:`MobileClient` instance."""
subscribed = next(
(
config_item['value'] == 'true'
for config_item in self.config()
if config_item['key'] == 'isNautilusUser'
),
None
)
if subscribed:
self.tier = 'aa'
else:
self.tier = 'fr'
return subscribed
@property
def locale(self):
"""The locale of the :class:`MobileClient` instance.
Can be changed after instantiation.
`ICU <http://www.localeplanet.com/icu/>`__
locale used to localize some responses.
This must be a locale supported by Android.
"""
return self._session.params.get('hl')
@locale.setter
def locale(self, locale):
self._session.params.update({'hl': locale})
@property
def tier(self):
"""The subscription tier of the :class:`MobileClient` instance.
Can be changed after instantiation.
``aa`` if subscribed, ``fr`` if not.
"""
return self._session.params.get('tier')
@tier.setter
def tier(self, tier):
self._session.params.update(
{'tier': tier}
)
[docs] def album(self, album_id, *, include_description=True, include_songs=True):
"""Get information about an album.
Parameters:
album_id (str):
An album ID.
Album IDs start with a 'B'.
include_description (bool, Optional):
Include description of the album in the returned dict.
include_songs (bool, Optional):
Include songs from the album in the returned dict.
Default: ``True``.
Returns:
dict: Album information.
"""
response = self._call(
mc_calls.FetchAlbum,
album_id,
include_description=include_description,
include_tracks=include_songs,
)
album_info = response.body
return album_info
[docs] def artist(
self, artist_id, *, include_albums=True, num_related_artists=5, num_top_tracks=5
):
"""Get information about an artist.
Parameters:
artist_id (str):
An artist ID.
Artist IDs start with an 'A'.
include_albums (bool, Optional):
Include albums by the artist in returned dict.
Default: ``True``.
num_related_artists (int, Optional):
Include up to given number of related artists in returned dict.
Default: ``5``.
num_top_tracks (int, Optional):
Include up to given number of top tracks in returned dict.
Default: ``5``.
Returns:
dict: Artist information.
"""
response = self._call(
mc_calls.FetchArtist,
artist_id,
include_albums=include_albums,
num_related_artists=num_related_artists,
num_top_tracks=num_top_tracks,
)
artist_info = response.body
return artist_info
[docs] def browse_podcasts(self, podcast_genre_id='JZCpodcasttopchartall'):
"""Get the podcasts for a genre from the Podcasts browse tab.
Parameters:
podcast_genre_id (str, Optional):
A podcast genre ID as found
in :meth:`browse_podcasts_genres`.
Default: ``'JZCpodcasttopchartall'``.
Returns:
list: Podcast dicts.
"""
response = self._call(
mc_calls.PodcastBrowse,
podcast_genre_id=podcast_genre_id
)
podcast_series_list = response.body.get('series', [])
return podcast_series_list
[docs] def browse_podcasts_genres(self):
"""Get the genres from the Podcasts browse tab dropdown.
Returns:
list: Genre groups that contain sub groups.
"""
response = self._call(mc_calls.PodcastBrowseHierarchy)
genres = response.body.get('groups', [])
return genres
[docs] def browse_stations(self, station_category_id):
"""Get the stations for a category from Browse Stations.
Parameters:
station_category_id (str):
A station category ID as
found with :meth:`browse_stations_categories`.
Returns:
list: Station dicts.
"""
response = self._call(
mc_calls.BrowseStations,
station_category_id
)
stations = response.body.get('stations', [])
return stations
[docs] def browse_stations_categories(self):
"""Get the categories from Browse Stations.
Returns:
list: Station categories that can contain subcategories.
"""
response = self._call(mc_calls.BrowseStationCategories)
station_categories = response.body.get('root', {}).get('subcategories', [])
return station_categories
[docs] def config(self):
"""Get a listing of mobile client configuration settings."""
response = self._call(mc_calls.Config)
config_list = response.body.get('data', {}).get('entries', [])
return config_list
# TODO: Check success/failure?
[docs] def device_deauthorize(self, device):
"""Deauthorize a registered device.
Parameters:
device (dict): A device dict as returned by :meth:`devices`.
"""
self._call(
mc_calls.DeviceManagementInfoDelete,
device['id']
)
# TODO: Set device dict as property of MobileClient?
[docs] def device_set(self, device):
"""Set device used by :class:`MobileClient` instance.
Parameters:
device (dict): A device dict as returned by :meth:`devices`.
"""
if device['id'].startswith('0x'):
self.device_id = device['id'][2:]
elif device['id'].startswith('ios:'):
self.device_id = device['id'].replace(':', '')
else:
self.device_id = device['id']
[docs] def devices(self):
"""Get a listing of devices registered to the Google Music account."""
response = self._call(mc_calls.DeviceManagementInfo)
registered_devices = response.body.get('data', {}).get('items', [])
return registered_devices
[docs] def explore_genres(self, parent_genre_id=None):
"""Get a listing of song genres.
Parameters:
parent_genre_id (str, Optional):
A genre ID.
If given, a listing of this genre's sub-genres is returned.
Returns:
list: Genre dicts.
"""
response = self._call(
mc_calls.ExploreGenres,
parent_genre_id
)
genre_list = response.body.get('genres', [])
return genre_list
[docs] def explore_tabs(self, *, num_items=100, genre_id=None):
"""Get a listing of explore tabs.
Parameters:
num_items (int, Optional):
Number of items per tab to return.
Default: ``100``
genre_id (genre_id, Optional):
Genre ID from :meth:`explore_genres` to explore.
Default: ``None``.
Returns:
dict: Explore tabs content.
"""
response = self._call(
mc_calls.ExploreTabs,
num_items=num_items,
genre_id=genre_id
)
tab_list = response.body.get('tabs', [])
explore_tabs = {}
for tab in tab_list:
explore_tabs[tab['tab_type'].lower()] = tab
return explore_tabs
[docs] def listen_now_dismissed_items(self):
"""Get a listing of items dismissed from Listen Now tab."""
response = self._call(mc_calls.ListenNowGetDismissedItems)
dismissed_items = response.body.get('items', [])
return dismissed_items
[docs] def listen_now_items(self):
"""Get a listing of Listen Now items.
Note:
This does not include situations;
use the :meth:`situations` method instead.
Returns:
dict: With ``albums`` and ``stations`` keys of listen now items.
"""
response = self._call(mc_calls.ListenNowGetListenNowItems)
listen_now_item_list = response.body.get('listennow_items', [])
listen_now_items = defaultdict(list)
for item in listen_now_item_list:
type_ = f"{ListenNowItemType(item['type']).name}s"
listen_now_items[type_].append(item)
return dict(listen_now_items)
def new_releases(self, genre_id=None):
new_releases_tab = self.explore_tabs(genre_id=genre_id)['new_releases']
new_releases = []
if 'groups' in new_releases_tab:
for group in new_releases_tab['groups']:
for entity in group['entities']:
del entity['kind']
new_releases.append(entity.popitem()[1])
return new_releases
[docs] def playlist_song(self, playlist_song_id):
"""Get information about a playlist song.
Note:
This returns the playlist entry information only.
For full song metadata, use :meth:`song` with
the ``'trackId'`` field.
Parameters:
playlist_song_id (str): A playlist song ID.
Returns:
dict: Playlist song information.
"""
playlist_song_info = next(
(
playlist_song
for playlist in self.playlists(include_songs=True)
for playlist_song in playlist['tracks']
if playlist_song['id'] == playlist_song_id
),
None
)
return playlist_song_info
[docs] @cast_to_list
def playlist_songs_add(
self,
songs,
playlist,
*,
after=None,
before=None,
index=None,
position=None
):
"""Add songs to a playlist.
Note:
* Provide no optional arguments to add to end.
* Provide playlist song dicts for ``after`` and/or ``before``.
* Provide a zero-based ``index``.
* Provide a one-based ``position``.
Songs are inserted *at* given index or position.
It's also possible to add to the end by using
``len(songs)`` for index or ``len(songs) + 1`` for position.
Parameters:
songs (dict or list): A song dict or a list of song dicts.
playlist (dict): A playlist dict.
after (dict, Optional): A playlist song dict ``songs`` will follow.
before (dict, Optional): A playlist song dict ``songs`` will precede.
index (int, Optional): The zero-based index position to insert ``songs``.
position (int, Optional): The one-based position to insert ``songs``.
Returns:
dict: Playlist dict including songs.
"""
playlist_songs = self.playlist_songs(playlist)
prev, next_ = get_ple_prev_next(
playlist_songs,
after=after,
before=before,
index=index,
position=position
)
prev_id = prev.get('id')
next_id = next_.get('id')
mutations = []
for song in songs:
if 'storeId' in song:
song_id = song['storeId']
elif 'trackId' in song:
song_id = song['trackId']
else:
song_id = song['id']
ple_id = str(uuid4())
mutation = mc_calls.PlaylistEntriesBatch.create(
song_id,
playlist['id'],
playlist_entry_id=ple_id,
preceding_entry_id=prev_id,
following_entry_id=next_id,
)
mutations.append(mutation)
prev_id = ple_id
self._call(
mc_calls.PlaylistEntriesBatch,
mutations
)
return self.playlist(
playlist['id'],
include_songs=True
)
[docs] @cast_to_list
def playlist_songs_delete(self, playlist_songs):
"""Delete songs from playlist.
Parameters:
playlist_songs (dict or list): A playlist song dict
or a list of playlist song dicts.
Returns:
dict: Playlist dict including songs.
"""
if not more_itertools.all_equal(
playlist_song['playlistId']
for playlist_song in playlist_songs
):
raise ValueError("All 'playlist_songs' must be from the same playlist.")
mutations = [
mc_calls.PlaylistEntriesBatch.delete(playlist_song['id'])
for playlist_song in playlist_songs
]
self._call(
mc_calls.PlaylistEntriesBatch,
mutations)
return self.playlist(
playlist_songs[0]['playlistId'],
include_songs=True
)
[docs] @cast_to_list
def playlist_songs_move(
self,
playlist_songs,
*,
after=None,
before=None,
index=None,
position=None
):
"""Move songs in a playlist.
Note:
* Provide no optional arguments to move to end.
* Provide playlist song dicts for ``after`` and/or ``before``.
* Provide a zero-based ``index``.
* Provide a one-based ``position``.
Songs are inserted *at* given index or position.
It's also possible to move to the end by using
``len(songs)`` for index or ``len(songs) + 1`` for position.
Parameters:
playlist_songs (list): A list of playlist song dicts.
after (dict, Optional): A playlist song dict ``songs`` will follow.
before (dict, Optional): A playlist song dict ``songs`` will precede.
index (int, Optional): The zero-based index position to insert ``songs``.
position (int, Optional): The one-based position to insert ``songs``.
Returns:
dict: Playlist dict including songs.
"""
if not more_itertools.all_equal(
playlist_song['playlistId']
for playlist_song in playlist_songs
):
raise ValueError("All 'playlist_songs' must be from the same playlist.")
playlist = self.playlist(playlist_songs[0]['playlistId'], include_songs=True)
prev, next_ = get_ple_prev_next(
playlist['tracks'],
after=after,
before=before,
index=index,
position=position,
)
prev_id = prev.get('id')
next_id = next_.get('id')
mutations = []
for playlist_song in playlist_songs:
mutation = mc_calls.PlaylistEntriesBatch.update(
playlist_song,
preceding_entry_id=prev_id,
following_entry_id=next_id
)
mutations.append(mutation)
prev_id = playlist_song['id']
self._call(
mc_calls.PlaylistEntriesBatch,
mutations
)
return self.playlist(
playlist['id'],
include_songs=True
)
[docs] def playlist_songs(self, playlist):
"""Get a listing of songs from a playlist.
Paramters:
playlist (dict): A playlist dict.
Returns:
list: Playlist song dicts.
"""
playlist_type = playlist.get('type')
playlist_song_list = []
if playlist_type in ('USER_GENERATED', None):
start_token = None
playlist_song_list = []
while True:
response = self._call(
mc_calls.PlaylistEntryFeed,
max_results=49995,
start_token=start_token,
)
items = response.body.get('data', {}).get('items', [])
if items:
playlist_song_list.extend(items)
start_token = response.body.get('nextPageToken')
if start_token is None:
break
elif playlist_type == 'SHARED':
playlist_share_token = playlist['shareToken']
start_token = None
playlist_song_list = []
while True:
response = self._call(
mc_calls.PlaylistEntriesShared,
playlist_share_token,
max_results=49995,
start_token=start_token,
)
entry = response.body['entries'][0]
items = entry.get('playlistEntry', [])
if items:
playlist_song_list.extend(items)
start_token = entry.get('nextPageToken')
if start_token is None:
break
playlist_song_list.sort(key=itemgetter('absolutePosition'))
return playlist_song_list
[docs] def playlist(self, playlist_id, *, include_songs=False):
"""Get information about a playlist.
Parameters:
playlist_id (str): A playlist ID.
include_songs (bool, Optional):
Include songs from the playlist in the returned dict.
Default: ``False``
Returns:
dict: Playlist information.
"""
playlist_info = next(
(
playlist
for playlist in self.playlists(include_songs=include_songs)
if playlist['id'] == playlist_id
),
None
)
return playlist_info
[docs] def playlist_create(
self,
name,
description='',
*,
public=False,
songs=None
):
"""Create a playlist.
Parameters:
name (str): Name to give the playlist.
description (str): Description to give the playlist.
public (bool, Optional):
If ``True`` and account has a subscription,
make playlist public.
Default: ``False``
songs (list, Optional): A list of song dicts to add to the playlist.
Returns:
dict: Playlist information.
"""
share_state = 'PUBLIC' if public else 'PRIVATE'
playlist = self._call(
mc_calls.PlaylistsCreate,
name,
description,
share_state
).body
if songs:
playlist = self.playlist_songs_add(
songs,
playlist
)
return playlist
# TODO: Check success/failure?
[docs] def playlist_delete(self, playlist):
"""Delete a playlist.
Parameters:
playlist (dict): A playlist dict.
"""
self._call(
mc_calls.PlaylistsDelete,
playlist['id']
)
[docs] def playlist_edit(
self,
playlist,
*,
name=None,
description=None,
public=None
):
"""Edit playlist(s).
Parameters:
playlist (dict): A playlist dict.
name (str): Name to give the playlist.
description (str, Optional):
Description to give the playlist.
public (bool, Optional):
If ``True`` and account has a subscription,
make playlist public.
Default: ``False``
Returns:
dict: Playlist information.
"""
if all(
value is None
for value in (name, description, public)
):
raise ValueError(
'At least one of name, description, or public must be provided'
)
playlist_id = playlist['id']
playlist = self.playlist(playlist_id)
name = name if name is not None else playlist['name']
description = (
description if description is not None else playlist['description']
)
share_state = 'PUBLIC' if public else playlist['accessControlled']
playlist = self._call(
mc_calls.PlaylistsUpdate,
playlist_id,
name,
description,
share_state
).body
return playlist
[docs] def playlist_subscribe(self, playlist):
"""Subscribe to a public playlist.
Parameters:
playlist (dict): A public playlist dict.
Returns:
dict: Playlist information.
"""
mutation = mc_calls.PlaylistBatch.create(
playlist['name'],
playlist['description'],
'SHARED',
owner_name=playlist.get('ownerName', ''),
share_token=playlist['shareToken'],
)
response_body = self._call(
mc_calls.PlaylistBatch,
mutation
).body
playlist_id = response_body['mutate_response'][0]['id']
return self.playlist(playlist_id)
# TODO: Check success/failure?
[docs] def playlist_unsubscribe(self, playlist):
"""Unsubscribe from a public playlist.
Parameters:
playlist (dict): A public playlist dict.
"""
self.playlist_delete(playlist)
[docs] def playlists(self, *, include_songs=False):
"""Get a listing of library playlists.
Parameters:
include_songs (bool, Optional):
Include songs in the returned playlist dicts.
Default: ``False``.
Returns:
list: A list of playlist dicts.
"""
playlist_list = []
for chunk in self.playlists_iter(page_size=49995):
for playlist in chunk:
if include_songs:
playlist['tracks'] = self.playlist_songs(playlist)
playlist_list.append(playlist)
return playlist_list
[docs] def playlists_iter(self, *, start_token=None, page_size=250):
"""Get a paged iterator of library playlists.
Parameters:
start_token (str): The token of the page to return.
Default: Not sent to get first page.
page_size (int, Optional): The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Playlist dicts.
"""
start_token = None
while True:
response = self._call(
mc_calls.PlaylistFeed,
max_results=page_size,
start_token=start_token
)
items = response.body.get('data', {}).get('items', [])
if items:
yield items
start_token = response.body.get('nextPageToken')
if start_token is None:
break
[docs] def podcast(self, podcast_series_id, *, max_episodes=50):
"""Get information about a podcast series.
Parameters:
podcast_series_id (str): A podcast series ID.
max_episodes (int, Optional):
Include up to given number of episodes in returned dict.
Default: ``50``
Returns:
dict: Podcast series information.
"""
podcast_info = self._call(
mc_calls.PodcastFetchSeries,
podcast_series_id,
max_episodes=max_episodes
).body
return podcast_info
[docs] def podcasts(self, *, device_id=None):
"""Get a listing of subsribed podcast series.
Paramaters:
device_id (str, Optional):
A mobile device ID.
Default: Use :attr:`device_id`.
Returns:
list: Podcast series dict.
"""
if device_id is None:
device_id = self.device_id
podcast_list = []
for chunk in self.podcasts_iter(device_id=device_id, page_size=49995):
podcast_list.extend(chunk)
return podcast_list
[docs] def podcasts_iter(self, *, device_id=None, page_size=250):
"""Get a paged iterator of subscribed podcast series.
Parameters:
device_id (str, Optional):
A mobile device ID.
Default: Use :attr:`device_id`.
page_size (int, Optional):
The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Podcast series dicts.
"""
if device_id is None:
device_id = self.device_id
start_token = None
prev_items = None
while True:
response = self._call(
mc_calls.PodcastSeries,
device_id,
max_results=page_size,
start_token=start_token,
)
items = response.body.get('data', {}).get('items', [])
# Google does some weird shit.
if items != prev_items:
subscribed_podcasts = [
item
for item in items
if item.get('userPreferences', {}).get('subscribed')
]
yield subscribed_podcasts
prev_items = items
else:
break
start_token = response.body.get('nextPageToken')
if start_token is None:
break
[docs] def podcast_episode(self, podcast_episode_id):
"""Get information about a podcast_episode.
Parameters:
podcast_episode_id (str): A podcast episode ID.
Returns:
dict: Podcast episode information.
"""
response = self._call(
mc_calls.PodcastFetchEpisode,
podcast_episode_id
)
podcast_episode_info = [
podcast_episode
for podcast_episode in response.body
if not podcast_episode['deleted']
]
return podcast_episode_info
[docs] def podcast_episodes(self, *, device_id=None):
"""Get a listing of podcast episodes for all subscribed podcasts.
Paramaters:
device_id (str, Optional): A mobile device ID.
Default: Use ``device_id`` of the :class:`MobileClient` instance.
Returns:
list: Podcast episode dicts.
"""
if device_id is None:
device_id = self.device_id
podcast_episode_list = []
for chunk in self.podcast_episodes_iter(device_id=device_id, page_size=49995):
podcast_episode_list.extend(chunk)
return podcast_episode_list
[docs] def podcast_episodes_iter(self, *, device_id=None, page_size=250):
"""Get a paged iterator of podcast episode for all subscribed podcasts.
Parameters:
device_id (str, Optional):
A mobile device ID.
Default: Use :attr:`device_id`.
page_size (int, Optional):
The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Podcast episode dicts.
"""
if device_id is None:
device_id = self.device_id
start_token = None
prev_items = None
while True:
response = self._call(
mc_calls.PodcastEpisode,
device_id,
max_results=page_size,
start_token=start_token,
)
items = response.body.get('data', {}).get('items', [])
# Google does some weird shit.
if items != prev_items:
yield items
prev_items = items
else:
break
start_token = response.body.get('nextPageToken')
if start_token is None:
break
[docs] def search(self, query, *, max_results=100, **kwargs):
"""Search Google Music and library for content.
Parameters:
query (str): Search text.
max_results (int, Optional):
Maximum number of results per type per location to retrieve.
I.e up to 100 Google and 100 library
for a total of 200 for the default value.
Google only accepts values up to 100.
Default: ``100``
kwargs (bool, Optional):
Any of:
- ``'albums'``
- ``'artists'``
- ``'genres'``
- ``'playlists'``
- ``'podcasts'``
- ``'situations'``
- ``'songs'``
- ``'stations'``
- ``'videos'``
set to ``True`` will include that result type in the returned dict.
Setting none of them will include all result types in the returned dict.
Returns:
dict: A dict of results separated into keys:
- ``'albums'``
- ``'artists'``
- ``'genres'``
- ``'playlists'``
- ```'podcasts'``
- ``'situations'``,
- ``'songs'``
- ``'stations'``
- ``'videos'``
Note:
Free account search is restricted
so may not contain hits for all result types.
"""
results = defaultdict(list)
for type_, results_ in self.search_library(
query,
max_results=max_results,
**kwargs
).items():
results[type_].extend(results_)
for type_, results_ in self.search_google(
query,
max_results=max_results,
**kwargs
).items():
results[type_].extend(results_)
return dict(results)
[docs] def search_google(self, query, *, max_results=100, **kwargs):
"""Search Google Music for content.
Parameters:
query (str): Search text.
max_results (int, Optional):
Maximum number of results per type to retrieve.
Google only accepts values up to 100.
Default: ``100``
kwargs (bool, Optional):
Any of:
- ``'albums'``
- ``'artists'``
- ``'genres'``
- ``'playlists'``
- ``'podcasts'``
- ``'situations'``
- ``'songs'``
- ``'stations'``
- ``'videos'``
set to ``True`` will include that result type in the returned dict.
Setting none of them will include all result types in the returned dict.
Returns:
dict: A dict of results separated into keys:
- ``albums``
- ``artists``
- ``genres``
- ``playlists``
- ``podcasts``
- ``situations``
- ``songs``
- ``stations``
- ``videos``
Note:
Free account search is restricted
so may not contain hits for all result types.
"""
response = self._call(
mc_calls.Query,
query,
max_results=max_results,
**kwargs
)
clusters = response.body.get('clusterDetail', [])
results = defaultdict(list)
for cluster in clusters:
result_type = QueryResultType(cluster['cluster']['type']).name
entries = cluster.get('entries', [])
if len(entries) > 0:
for entry in entries:
item_key = next(
key
for key in entry
if key not in ['cluster', 'score', 'type']
)
results[f"{result_type}s"].append(entry[item_key])
return dict(results)
[docs] def search_library(self, query, *, max_results=100, **kwargs):
"""Search Google Music for content.
Parameters:
query (str): Search text.
max_results (int, Optional):
Maximum number of results per type to retrieve.
Default: ``100``
kwargs (bool, Optional):
Any of:
- ``'playlists'``
- ``'podcasts'``
- ``'songs'``
- ``'stations'``
set to ``True`` will include that result type in the returned dict.
Setting none of them will include all result types in the returned dict.
Returns:
dict: A dict of results separated into keys:
- ``'playlists'``
- ``'podcasts'``
- ``'songs'``
- ``'stations'``
"""
def match_fields(item, fields):
return any(
query.casefold() in item.get(field, '').casefold() for field in fields
)
types = [
(
'playlists',
['description', 'name'],
self.playlists,
),
(
'podcasts',
['author', 'description', 'title'],
self.podcasts,
),
(
'songs',
['album', 'albumArtist', 'artist', 'composer', 'genre', 'title'],
self.songs,
),
(
'stations',
['byline', 'description', 'name'],
self.stations,
),
]
results = {}
for type_, fields, func in types:
if (not kwargs) or (type_ in kwargs):
results[type_] = [
item
for item in func()
if match_fields(item, fields)
][:max_results]
return results
[docs] def search_suggestion(self, query):
"""Get search query suggestions for query.
Parameters:
query (str): Search text.
Returns:
list: Suggested query strings.
"""
response = self._call(mc_calls.QuerySuggestion, query)
suggested_queries = response.body.get('suggested_queries', [])
return [
suggested_query['suggestion_string']
for suggested_query in suggested_queries
]
[docs] def shuffle_album(
self,
album,
*,
num_songs=100,
only_library=False,
recently_played=None
):
"""Get a listing of album shuffle/mix songs.
Parameters:
album (dict): An album dict.
num_songs (int, Optional):
The maximum number of songs to return from the station.
Default: ``100``
only_library (bool, Optional):
Only return content from library.
Default: False
recently_played (list, Optional):
A list of dicts in the form of {'id': '', 'type'} where
``id`` is a song ID and
``type`` is 0 for a library song and 1 for a store song.
Returns:
list: List of album shuffle/mix songs.
"""
station_info = {
'seed': {
'albumId': album['albumId'],
'seedType': StationSeedType.album.value,
},
'num_entries': num_songs,
'library_content_only': only_library,
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(
mc_calls.RadioStationFeed,
station_infos=[station_info]
)
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station.get('tracks', [])
[docs] def shuffle_artist(
self,
artist,
*,
num_songs=100,
only_library=False,
recently_played=None,
only_artist=False
):
"""Get a listing of artist shuffle/mix songs.
Parameters:
artist (dict): An artist dict.
num_songs (int, Optional):
The maximum number of songs to return from the station.
Default: ``100``
only_library (bool, Optional):
Only return content from library.
Default: False
recently_played (list, Optional):
A list of dicts in the form of {'id': '', 'type'} where
``id`` is a song ID and
``type`` is 0 for a library song and 1 for a store song.
only_artist (bool, Optional):
If ``True``, only return songs from the artist,
else return songs from artist and related artists.
Default: ``False``
Returns:
list: List of artist shuffle/mix songs.
"""
station_info = {
'num_entries': num_songs,
'library_content_only': only_library,
}
if only_artist:
station_info['seed'] = {
'artistId': artist['artistId'],
'seedType': StationSeedType.artist_only.value,
}
else:
station_info['seed'] = {
'artistId': artist['artistId'],
'seedType': StationSeedType.artist_related.value,
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(
mc_calls.RadioStationFeed,
station_infos=[station_info]
)
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station.get('tracks', [])
[docs] def shuffle_genre(
self,
genre,
*,
num_songs=100,
only_library=False,
recently_played=None
):
"""Get a listing of genre shuffle/mix songs.
Parameters:
genre (dict): A genre dict.
num_songs (int, Optional): The maximum number of songs to return from the station. Default: ``100``
only_library (bool, Optional): Only return content from library.
Default: False
recently_played (list, Optional): A list of dicts in the form of {'id': '', 'type'} where ``id`` is a song ID and
``type`` is 0 for a library song and 1 for a store song.
Returns:
list: List of genre shuffle/mix songs.
"""
station_info = {
'seed': {
'genreId': genre['id'],
'seedType': StationSeedType.genre.value,
},
'num_entries': num_songs,
'library_content_only': only_library,
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(
mc_calls.RadioStationFeed,
station_infos=[station_info]
)
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station.get('tracks', [])
[docs] def shuffle_song(
self,
song,
*,
num_songs=100,
only_library=False,
recently_played=None
):
"""Get a listing of song shuffle/mix songs.
Parameters:
song (dict): A song dict.
num_songs (int, Optional):
The maximum number of songs to return from the station.
Default: ``100``
only_library (bool, Optional):
Only return content from library.
Default: False
recently_played (list, Optional):
A list of dicts in the form of {'id': '', 'type'} where
``id`` is a song ID and
``type`` is 0 for a library song and 1 for a store song.
Returns:
list: List of artist shuffle/mix songs.
"""
station_info = {
'num_entries': num_songs,
'library_content_only': only_library,
}
if 'storeId' in song:
station_info['seed'] = {
'trackId': song['storeId'],
'seedType': StationSeedType.store_track.value,
}
else:
station_info['seed'] = {
'trackLockerId': song['id'],
'seedType': StationSeedType.library_track.value,
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(mc_calls.RadioStationFeed, station_infos=[station_info])
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station.get('tracks', [])
[docs] def situations(self, *, tz_offset=None):
"""Get a listing of situations.
Parameters:
tz_offset (int, Optional): A timezone offset from UTC in seconds.
"""
response = self._call(
mc_calls.ListenNowSituations,
tz_offset
)
situation_list = response.body.get('situations', [])
return situation_list
[docs] def song(self, song_id):
"""Get information about a song.
Parameters:
song_id (str): A song ID.
Returns:
dict: Song information.
"""
if song_id.startswith('T'):
song_info = self._call(
mc_calls.FetchTrack,
song_id
).body
else:
song_info = next(
(
song
for song in self.songs()
if song['id'] == song_id
),
None
)
return song_info
[docs] @cast_to_list
def songs_add(self, songs):
"""Add store songs to your library.
Parameters:
songs (list):
A store song dict or a list of store song dicts.
Returns:
list: Songs' library IDs.
"""
mutations = [
mc_calls.TrackBatch.add(song)
for song in songs
]
response = self._call(
mc_calls.TrackBatch,
mutations
)
success_ids = [
res['id']
for res in response.body['mutate_response']
if res['response_code'] == 'OK'
]
return [
self.song(success_id)
for success_id in success_ids
]
[docs] @cast_to_list
def songs_delete(self, songs):
"""Delete songs from library.
Parameters:
songs (list):
A library song dict or a list of library song dicts.
Returns:
list: Successfully deleted song IDs.
"""
mutations = [
mc_calls.TrackBatch.delete(song['id'])
for song in songs
]
response = self._call(
mc_calls.TrackBatch,
mutations
)
success_ids = [
res['id']
for res in response.body['mutate_response']
if res['response_code'] == 'OK'
]
# TODO: Report failures.
# failure_ids = [
# res['id']
# for res in response.body['mutate_response']
# if res['response_code'] != 'OK'
# ]
return success_ids
[docs] @cast_to_list
def songs_play(self, songs):
"""Add play to song play count.
Parameters:
songs (dict or list):
A song dict or a list of song dicts.
Returns:
bool: ``True`` if successful, ``False`` if not.
"""
events = []
for song in songs:
if 'id' in song:
song_id = song['id']
elif 'trackId' in song:
song_id = song['trackId']
else:
song_id = song['storeId']
song_duration = song['durationMillis']
events.append(
mc_calls.ActivityRecordRealtime.play(
song_id,
song_duration
)
)
self._call(
mc_calls.ActivityRecordRealtime,
events)
return [
self.song(song_id)
for song in songs
]
[docs] @cast_to_list
def songs_rate(self, songs, rating):
"""Rate song.
Parameters:
songs (dict or list):
A song dict or a list of song dicts.
rating (int):
0 (not rated), 1 (thumbs down), or 5 (thumbs up).
Returns:
bool: ``True`` if successful, ``False`` if not.
"""
events = []
for song in songs:
if 'id' in song:
song_id = song['id']
elif 'trackId' in song:
song_id = song['trackId']
else:
song_id = song['storeId']
events.append(
mc_calls.ActivityRecordRealtime.rate(
song_id,
rating
)
)
self._call(
mc_calls.ActivityRecordRealtime,
events
)
return [
self.song(song_id)
for song in songs
]
[docs] def songs(self):
"""Get a listing of library songs.
Returns:
list: Song dicts.
"""
song_list = []
for chunk in self.songs_iter(page_size=49995):
song_list.extend(chunk)
return song_list
[docs] def songs_iter(self, *, page_size=250):
"""Get a paged iterator of library songs.
Parameters:
page_size (int, Optional):
The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Song dicts.
"""
start_token = None
while True:
response = self._call(
mc_calls.TrackFeed,
max_results=page_size,
start_token=start_token
)
items = response.body.get('data', {}).get('items', [])
if items:
yield items
start_token = response.body.get('nextPageToken')
if start_token is None:
break
# TODO: Investigate library_content_only.
# TODO: Figure out 'radio/stationfeed'.
[docs] def station(self, station_id, *, num_songs=25, recently_played=None):
"""Get information about a station.
Parameters:
station_id (str):
A station ID.
Use 'IFL' for I'm Feeling Lucky.
num_songs (int, Optional):
The maximum number of songs to return from the station.
Default: ``25``
recently_played (list, Optional):
A list of dicts in the form of {'id': '', 'type'} where
``id`` is a song ID and
``type`` is 0 for a library song and 1 for a store song.
Returns:
dict: Station information.
"""
station_info = {
'station_id': station_id,
'num_entries': num_songs,
'library_content_only': False,
}
if recently_played is not None:
station_info['recently_played'] = recently_played
response = self._call(
mc_calls.RadioStationFeed,
station_infos=[station_info]
)
station_feed = response.body.get('data', {}).get('stations', [])
try:
station = station_feed[0]
except IndexError:
station = {}
return station
# TODO: Figure out 'radio/stationfeed'.
[docs] def station_feed(self, *, num_songs=25, num_stations=4):
"""Generate stations.
Note:
A Google Music subscription is required.
Parameters:
num_songs (int, Optional):
The total number of songs to return. Default: ``25``
num_stations (int, Optional):
The number of stations to return when no station_infos is provided.
Default: ``5``
Returns:
list: Station information dicts.
"""
response = self._call(
mc_calls.RadioStationFeed,
num_entries=num_songs,
num_stations=num_stations
)
station_feed = response.body.get('data', {}).get('stations', [])
return station_feed
[docs] def station_songs(self, station, *, num_songs=25, recently_played=None):
"""Get a listing of songs from a station.
Parameters:
station (str): A station dict.
num_songs (int, Optional):
The maximum number of songs to return from the station.
Default: ``25``
recently_played (list, Optional):
A list of dicts in the form of {'id': '', 'type'} where
``id`` is a song ID and
``type`` is 0 for a library song and 1 for a store song.
Returns:
list: Station song dicts.
"""
station_id = station['id']
station = self.station(
station_id,
num_songs=num_songs,
recently_played=recently_played
)
return station.get('tracks', [])
[docs] def stations(self, *, generated=True, library=True):
"""Get a listing of library stations.
The listing can contain stations added to the library and generated from the library.
Parameters:
generated (bool, Optional): Include generated stations.
Default: True
library (bool, Optional): Include library stations.
Default: True
Returns:
list: Station information dicts.
"""
station_list = []
for chunk in self.stations_iter(page_size=49995):
for station in chunk:
if (
(generated and not station.get('inLibrary'))
or (library and station.get('inLibrary'))
):
station_list.append(station)
return station_list
[docs] def stations_iter(self, *, page_size=250):
"""Get a paged iterator of library stations.
Parameters:
page_size (int, Optional):
The maximum number of results per returned page.
Max allowed is ``49995``.
Default: ``250``
Yields:
list: Station dicts.
"""
start_token = None
while True:
response = self._call(
mc_calls.RadioStation,
max_results=page_size,
start_token=start_token
)
yield response.body.get('data', {}).get('items', [])
start_token = response.body.get('nextPageToken')
if start_token is None:
break
[docs] def stream(
self,
item,
*,
device_id=None,
quality='hi',
session_token=None
):
"""Get MP3 stream of a podcast episode, library song, station_song, or store song.
Note:
Streaming requires a ``device_id`` from a valid, linked mobile device.
Parameters:
item (str):
A podcast episode, library song, station_song, or store song.
A Google Music subscription is required to stream store songs.
device_id (str, Optional):
A mobile device ID.
Default: Use :attr:`device_id`.
quality (str, Optional):
Stream quality is one of:
- ``'hi'`` (320Kbps)
- ``'med'`` (160Kbps)
- ``'low'`` (128Kbps)
Default: ``'hi'``.
session_token (str, Optional):
Session token from a station dict required for
unsubscribed users to stream a station song.
station['sessionToken'] as returend by :meth:`station`
only exists for free accounts.
Returns:
bytes: An MP3 file.
"""
if device_id is None:
device_id = self.device_id
stream_url = self.stream_url(
item,
device_id=device_id,
quality=quality,
session_token=session_token
)
response = self._session.request('GET', stream_url, withhold_token=True)
audio = response.content
return audio
[docs] def stream_url(
self,
item,
*,
device_id=None,
quality='hi',
session_token=None
):
"""Get a URL to stream a podcast episode, library song, station_song, or store song.
Note:
Streaming requires a ``device_id`` from a valid, linked mobile device.
Parameters:
item (str):
A podcast episode, library song, station_song, or store song.
A Google Music subscription is required to stream store songs.
device_id (str, Optional):
A mobile device ID.
Default: Use :attr:`device_id`.
quality (str, Optional):
Stream quality is one of:
- ``'hi'`` (320Kbps)
- ``'med'`` (160Kbps)
- ``'low'`` (128Kbps)
Default: ``'hi'``.
session_token (str):
Session token from a station dict required for
unsubscribed users to stream a station song.
station['sessionToken'] as returend by :meth:`station`
only exists for free accounts.
Returns:
str: A URL to an MP3 file.
"""
if device_id is None:
device_id = self.device_id
if 'episodeId' in item: # Podcast episode.
response = self._call(
mc_calls.PodcastEpisodeStreamURL,
item['episodeId'],
quality=quality,
device_id=device_id,
)
elif 'wentryid' in item: # Free account station song.
response = self._call(
mc_calls.RadioStationTrackStreamURL,
item['storeId'],
item['wentryid'],
session_token,
quality=quality,
device_id=device_id,
)
elif 'trackId' in item: # Playlist song.
response = self._call(
mc_calls.TrackStreamURL,
item['trackId'],
quality=quality,
device_id=device_id,
)
elif (
self.is_subscribed
and 'storeId' in item
and (
'clientId' not in item
or re.match(
r'^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$',
item['clientId']
)
)
): # Store song.
response = self._call(
mc_calls.TrackStreamURL,
item['storeId'],
quality=quality,
device_id=device_id,
)
elif 'id' in item: # Library song.
response = self._call(
mc_calls.TrackStreamURL,
item['id'],
quality=quality,
device_id=device_id,
)
else:
# TODO: Create an exception for not being subscribed or use a better builtin exception for this case.
if 'storeId' in item and not self.is_subscribed:
msg = "Can't stream a store song without a subscription."
else:
msg = "Item does not contain an ID field."
raise ValueError(msg)
try:
stream_url = response.headers['Location']
except KeyError:
stream_url = response.body['url']
return stream_url
[docs] def thumbs_up_songs(self, *, library=True, store=True):
"""Get a listing of 'Thumbs Up' songs.
Parameters:
library (bool, Optional):
Include 'Thumbs Up' songs from library.
Default: True
generated (bool, Optional):
Include 'Thumbs Up' songs from store.
Default: True
Returns:
list: Dicts of 'Thumbs Up' songs.
"""
thumbs_up_songs = []
if library is True:
thumbs_up_songs.extend(
song
for song in self.songs()
if song.get('rating', '0') == '5'
)
if store is True:
response = self._call(mc_calls.EphemeralTop)
thumbs_up_songs.extend(response.body.get('data', {}).get('items', []))
return thumbs_up_songs
[docs] def top_charts(self):
"""Get a listing of the default top charts."""
response = self._call(mc_calls.BrowseTopChart)
top_charts = response.body
return top_charts
[docs] def top_charts_for_genre(self, genre_id):
"""Get a listing of top charts for a top chart genre.
Parameters:
genre_id (str):
A top chart genre ID as found with :meth:`top_charts_genres`.
"""
response = self._call(
mc_calls.BrowseTopChartForGenre,
genre_id
)
top_chart_for_genre = response.body
return top_chart_for_genre
[docs] def top_charts_genres(self):
"""Get a listing of genres from the browse top charts tab."""
response = self._call(mc_calls.BrowseTopChartGenres)
top_chart_genres = response.body.get('genres', [])
return top_chart_genres