228 lines
6.7 KiB
Python
228 lines
6.7 KiB
Python
'''
|
|
export_playlist.py
|
|
|
|
This is the helper script that exports old playlist databases to be reimported
|
|
by the new database later.
|
|
'''
|
|
|
|
import argparse
|
|
from decimal import Decimal, getcontext
|
|
import hashlib
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import unicodedata
|
|
|
|
import magic
|
|
|
|
|
|
def scrub(text):
|
|
'''
|
|
Forcing a Unicode NFC normalization to remove combining marks that mess
|
|
with certain Python functions.
|
|
'''
|
|
if text:
|
|
return unicodedata.normalize('NFC', text)
|
|
return None
|
|
|
|
|
|
def detect_mime(path):
|
|
'''
|
|
Guess a file's mimetype from it's magic number. If inconclusive, then
|
|
guess based on it's file extension.
|
|
'''
|
|
mimetype = magic.from_file(path, mime=True)
|
|
if mimetype == 'application/octet-stream':
|
|
return mimetypes.guess_type(path, strict=True)[0]
|
|
return mimetype
|
|
|
|
|
|
def hash_file(path):
|
|
'''
|
|
Run a music file through a hashing algorithm (SHA3_256) and return the
|
|
hexidecimal digest.
|
|
'''
|
|
try:
|
|
with open(path, 'rb') as file:
|
|
filehash = hashlib.sha3_256(file.read()).hexdigest()
|
|
except OSError:
|
|
filehash = None
|
|
return filehash
|
|
|
|
|
|
def adapt_decimal(number):
|
|
'''Sqlite3 adapter for Decimal types'''
|
|
return str(number)
|
|
|
|
|
|
def convert_decimal(text):
|
|
'''Sqlite3 converter for Decimal types'''
|
|
return float(text.decode('utf8'))
|
|
|
|
|
|
def import_sqlite3(db_file):
|
|
'''
|
|
Imports a playlist from an SQLite3 database file and exports to a
|
|
JSON file.
|
|
'''
|
|
totals = {
|
|
'albums': 0,
|
|
'artists': 0,
|
|
'games': 0,
|
|
'songs': 0,
|
|
'jingles': 0
|
|
}
|
|
|
|
if not os.path.isfile(db_file):
|
|
raise FileNotFoundError
|
|
|
|
detect_types = sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES
|
|
con = sqlite3.connect(db_file, detect_types=detect_types)
|
|
cur = con.cursor()
|
|
|
|
# Fetching albums first
|
|
albums = list()
|
|
for album in con.execute('SELECT title, enabled FROM albums'):
|
|
albums.append({
|
|
'title': scrub(album[0]),
|
|
'disabled': not bool(album[1])
|
|
})
|
|
totals['albums'] += 1
|
|
print('Exported {} albums'.format(str(totals['albums'])))
|
|
|
|
# Next up, artists
|
|
artists = list()
|
|
artist_query = 'SELECT alias, firstname, lastname, enabled FROM artists'
|
|
for artist in con.execute(artist_query):
|
|
artists.append({
|
|
'alias': scrub(artist[0]) or '',
|
|
'first_name': scrub(artist[1]) or '',
|
|
'last_name': scrub(artist[2]) or '',
|
|
'disabled': not bool(artist[3])
|
|
})
|
|
totals['artists'] += 1
|
|
print('Exported {} artists'.format(str(totals['artists'])))
|
|
|
|
# On to games
|
|
games = list()
|
|
for game in con.execute('SELECT title, enabled FROM games'):
|
|
games.append({
|
|
'title': scrub(game[0]),
|
|
'disabled': not bool(game[1])
|
|
})
|
|
totals['games'] += 1
|
|
print('Exported {} games'.format(str(totals['games'])))
|
|
|
|
# Now the songs
|
|
songs = list()
|
|
songs_query = '''SELECT
|
|
songs.songs_id AS id,
|
|
games.title AS game,
|
|
albums.title AS album,
|
|
songs.enabled AS enabled,
|
|
songs.type AS type,
|
|
songs.title AS title,
|
|
songs.length AS length,
|
|
songs.path AS path
|
|
FROM songs
|
|
LEFT JOIN games
|
|
ON (songs.game = games.games_id)
|
|
LEFT JOIN albums
|
|
ON (songs.album = albums.albums_id)'''
|
|
cur.execute(songs_query)
|
|
old_songs = cur.fetchall()
|
|
for song in old_songs:
|
|
# Deal with the list of artists
|
|
song_artists = list()
|
|
song_artist_query = '''SELECT
|
|
ifnull(alias, "") AS alias,
|
|
ifnull(firstname, "") AS firstname,
|
|
ifnull(lastname, "") AS lastname
|
|
FROM artists
|
|
WHERE artists_id
|
|
IN (SELECT artists_artists_id
|
|
FROM songs_have_artists
|
|
WHERE songs_songs_id = ?)'''
|
|
cur.execute(song_artist_query, [song[0]])
|
|
song_artist_results = cur.fetchall()
|
|
for artist in song_artist_results:
|
|
song_artists.append({
|
|
'alias': scrub(artist[0]),
|
|
'first_name': scrub(artist[1]),
|
|
'last_name': scrub(artist[2])
|
|
})
|
|
store = {'path': scrub(song[7]),
|
|
'mime': detect_mime(scrub(song[7])),
|
|
'filesize': os.stat(scrub(song[7])).st_size,
|
|
'filehash': hash_file(scrub(song[7])),
|
|
'length': song[6]}
|
|
songs.append({'album': scrub(song[2]),
|
|
'artists': song_artists,
|
|
'game': scrub(song[1]),
|
|
'disabled': not bool(song[3]),
|
|
'type': song[4],
|
|
'title': scrub(song[5]),
|
|
'store': store})
|
|
if song[4] == 'S':
|
|
totals['songs'] += 1
|
|
else:
|
|
totals['jingles'] += 1
|
|
print('Exported {} requestables ({} songs, {} jingles)'.format(
|
|
str(totals['songs'] + totals['jingles']),
|
|
str(totals['songs']),
|
|
str(totals['jingles'])
|
|
))
|
|
|
|
return {'albums': albums,
|
|
'artists': artists,
|
|
'games': games,
|
|
'songs': songs}
|
|
|
|
|
|
def main():
|
|
'''Main loop of the program'''
|
|
getcontext().prec = 8
|
|
|
|
sqlite3.register_adapter(Decimal, adapt_decimal)
|
|
sqlite3.register_converter("decimal", convert_decimal)
|
|
|
|
description = 'Exports old playlist to a file for reimporting later.'
|
|
|
|
parser = argparse.ArgumentParser(description=description)
|
|
subparsers = parser.add_subparsers(dest='command')
|
|
|
|
parser_sqlite3 = subparsers.add_parser(
|
|
'sqlite3',
|
|
help='Imports old Sqlite3 database.'
|
|
)
|
|
parser_sqlite3.add_argument(
|
|
'db_file',
|
|
help='Path to the sqlite3 database file.',
|
|
nargs=1
|
|
)
|
|
|
|
if len(sys.argv) == 1:
|
|
sys.stderr.write('Error: please specify a command\n\n')
|
|
parser.print_help(sys.stderr)
|
|
sys.exit(1)
|
|
|
|
results = None
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.command == 'sqlite3':
|
|
results = import_sqlite3(args.db_file[0])
|
|
|
|
if results:
|
|
with open('playlist.json', 'w', encoding='utf8') as file:
|
|
json.dump(results,
|
|
file,
|
|
ensure_ascii=False,
|
|
sort_keys=True,
|
|
indent=4)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|