-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmigrate.py
147 lines (127 loc) · 5.5 KB
/
migrate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/python3
import spotipy
from spotipy import SpotifyOAuth
import glob
import os
import re
import sys
import logging
logging.basicConfig(filename="output.log", level=logging.INFO)
LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)
def has_artist(artist, artists):
artist_names = list(map(lambda art: art['name'], artists))
return artist in list(filter(lambda a: artist.lower() == a.lower(), artist_names))
def save_track(saved_tracks, track):
saved_tracks.append(track)
def pick_track(saved_tracks, tracks, song, artist):
sorted_tracks = list(sorted(tracks, key=lambda track: track['popularity'], reverse=True))
if len(sorted_tracks) < 3:
unique_tracks = set([ t['name'] + t['album']['name'] for t in sorted_tracks ])
if len(unique_tracks) == 1:
LOG.info("Unambiguous track picked: %s", sorted_tracks[0])
save_track(saved_tracks, sorted_tracks[0])
return
print("Please select a track for {} - {}:".format(song, artist))
for i, track in enumerate(sorted_tracks):
artists = list(map(lambda a: a['name'], track['artists']))
print("{}) {} - {} - {}".format(i, track['name'], ', '.join([ a for a in artists ]), track['album']['name']))
while True:
selection = input("Select a track (or -1 to skip): ")
try:
selection = int(selection)
if selection == -1:
LOG.warning("Song skipped: %s - %s", song, artist)
break
elif selection >= 0 and selection < len(sorted_tracks):
save_track(saved_tracks, sorted_tracks[selection])
break
except Exception as e:
pass
print()
def chunk_playlist(user_id, playlist_id, track_ids):
size = 99
chunks = [ track_ids[i:i+size] for i in range(0, len(track_ids), size) ]
first_insert = True
for chunk in chunks:
if first_insert:
first_insert = False
sp.user_playlist_replace_tracks(user_id, playlist_id, chunk)
else:
sp.user_playlist_add_tracks(user_id, playlist_id, chunk)
def add_tracks_to_playlist(saved_tracks, playlist):
LOG.info("Adding %s songs to playlist: %s", len(saved_tracks), playlist)
track_ids = list(map(lambda t: t['id'], saved_tracks))
user_id = sp.current_user()['id']
user_playlists = sp.user_playlists(user_id)['items']
if len(user_playlists) == 0:
created_playlist = sp.user_playlist_create(user_id, playlist)
playlist_id = created_playlist['id']
chunk_playlist(user_id, playlist_id, track_ids)
else:
print(user_playlists)
existing_playlists = list(filter(lambda p: playlist == p['name'], user_playlists))
if len(existing_playlists) == 0:
created_playlist = sp.user_playlist_create(user_id, playlist)
playlist_id = created_playlist['id']
chunk_playlist(user_id, playlist_id, track_ids)
elif len(existing_playlists) == 1:
chunk_playlist(user_id, existing_playlists[0]['id'], track_ids)
else:
LOG.err("Multiple playsts with name exists?? %s", existing_playlists)
def check_for_exact_matches(saved_tracks, song, artist, tracks):
exact_matches = list(filter(lambda t: song in t['name'] and
has_artist(artist, t['artists']), tracks))
if len(exact_matches) == 0:
LOG.debug("No exact matches")
if len(tracks) == 0:
LOG.warning("Track not found: %s - %s", artist, song)
else:
pick_track(saved_tracks, tracks, song, artist)
elif len(exact_matches) == 1:
LOG.debug('One exact matching result')
save_track(saved_tracks, exact_matches[0])
else:
pick_track(saved_tracks, exact_matches, song, artist)
def find_song(line, saved_tracks):
artist_song = line.replace('\n', '').split(' - ')
artist = artist_song[0]
song = artist_song[1]
result = sp.search(song + ' ' + artist)
tracks = result['tracks']['items']
if len(tracks) == 0:
cleaned_song = re.sub(r'\(.*\)', '', song).replace('&', ' ')
LOG.info("Trying search without parens: %s", cleaned_song)
result = sp.search(cleaned_song + ' ' + artist)
check_for_exact_matches(saved_tracks, song, artist, result['tracks']['items'])
elif len(tracks) == 1:
LOG.debug("Ideal case")
save_track(saved_tracks, tracks[0])
else:
check_for_exact_matches(saved_tracks, song, artist, tracks)
from dotenv import load_dotenv
load_dotenv()
scope = "playlist-modify-public"
auth = SpotifyOAuth(scope=scope,
username=os.getenv("SPOTIFY_USERNAME"),
client_id=os.getenv("SPOTIPY_CLIENT_ID"),
client_secret=os.getenv("SPOTIPY_CLIENT_SECRET"),
redirect_uri="http://localhost")
sp = spotipy.Spotify(auth_manager=auth)
for playlist in glob.glob('playlists/*'):
saved_tracks = []
with open(playlist, 'r') as file:
LOG.info("Finding songs for playlist: %s", playlist)
print("Finding songs for playlist: {}".format(playlist))
for line in file:
try:
find_song(line, saved_tracks)
except Exception as e:
print("Error adding track: {}", line)
LOG.error("Error adding track: %s", line, e)
try:
add_tracks_to_playlist(saved_tracks, playlist.split('/')[1])
except Exception as e:
LOG.fatal("Error saving playlist: %s", playlist.split('/')[1], e)
raise e
break