spradio-server-django/contrib/add_replaygain/add_replaygain.py

253 lines
7 KiB
Python
Raw Normal View History

'''
add_replaygain.py
This is the helper script that downloads songs from an Amazon S3 instance
(or other implementations, like DigialOcean Spaces) and analyzes the
replaygain data to put into the radio database.
'''
import argparse
import json
import logging
import os
import subprocess
import sys
import threading
import traceback
from unicodedata import normalize
from decouple import config
import boto3
# If these four are not defined, then boto3 will look for defaults in the
# ~/.aws configurations
S3_REGION = config('S3_REGION', default=None)
S3_ENDPOINT = config('S3_ENDPOINT', default=None)
S3_ACCESS_KEY = config('S3_ACCESS_KEY', default=None)
S3_SECRET_KEY = config('S3_SECRET_KEY', default=None)
logging.basicConfig(
handlers=[logging.FileHandler('./s3_replaygain.log', encoding='utf8')],
level=logging.INFO,
format=('[%(asctime)s] [%(levelname)s]'
' [%(name)s.%(funcName)s] === %(message)s'),
datefmt='%Y-%m-%dT%H:%M:%S'
)
LOGGER = logging.getLogger('add_replaygain')
class Progress(object):
'''
A callback class for the Amazon S3 transfer to detect how far along in an
upload we are.
'''
def __init__(self, filepath):
self._filepath = filepath
self._filename = os.path.basename(filepath)
self._size = float(os.path.getsize(filepath))
self._seen_so_far = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount):
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write(
"\r%s %s / %s (%.2f%%)" % (
self._filename, self._seen_so_far, self._size,
percentage
)
)
sys.stdout.flush()
def asciify(text):
'''
Converts a unicode string to pure ascii.
'''
normal = normalize('NFKC', text)
return normal.encode('ascii', 'backslashreplace').decode('ascii')
def get_fullname(artist):
'''
String representing the artist's full name including an alias, if
available.
'''
if artist['alias']:
if artist['first_name'] or artist['last_name']:
return '{} "{}" {}'.format(artist['first_name'],
artist['alias'],
artist['last_name'])
return artist['alias']
return '{} {}'.format(artist['first_name'], artist['last_name'])
def beautify_artists(artists):
'''
Turns a list of one or more artists into a proper English listing.
'''
fullnames = [get_fullname(artist) for artist in artists]
output = ', '
if len(fullnames) == 2:
output = ' & '
return output.join(fullnames)
def import_playlist(playlist_file):
'''
Imports a playlist from a JSON file, uploads the files to an S3[-like]
instance, and exports a new JSON file with the updated paths.
'''
if not os.path.isfile(playlist_file):
raise FileNotFoundError
with open(playlist_file, 'r', encoding='utf8') as pfile:
playlist = json.load(pfile)
session = boto3.session.Session()
client = session.client(
's3',
region_name=S3_REGION,
endpoint_url=S3_ENDPOINT,
aws_access_key_id=S3_ACCESS_KEY,
aws_secret_access_key=S3_SECRET_KEY
)
totals = {'success': 0, 'fail': 0}
for song in playlist['songs']:
path_parts = song['store']['path'][5:].split('/')
bucket = path_parts[0]
key = '{}/{}'.format(path_parts[1], path_parts[2])
temp_path = '/tmp/{}'.format(path_parts[2])
LOGGER.info('Begin download of: %s', song['store']['path'])
try:
client.download_file(
bucket,
key,
temp_path
)
except Exception:
LOGGER.error(
'Download failed for: %s -- %s',
temp_path,
traceback.print_exc()
)
totals['fail'] += 1
else:
LOGGER.info(
'Successful download of: %s to %s',
song['store']['path'],
temp_path
)
totals['success'] += 1
# get the old metadata handy
head_obj = client.head_object(
Bucket=bucket,
Key=key
)
s3_metadata = head_obj["Metadata"]
done = subprocess.run(
['ffmpeg',
'-i',
temp_path,
'-vn',
'-filter',
'aresample=44100,replaygain',
'-f',
'null',
'2>&1'],
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT
)
results = done.stdout.split('\n')[-3:-1] # Remove extra empty string at the end
track_gain = results[0].split('=')[1].strip()
track_peak = results[1].split('=')[1].strip()
rg_results = '{} -- gain: {}, peak: {}'.format(
temp_path,
track_gain,
track_peak
)
print(rg_results)
LOGGER.info(rg_results)
song['store']['track_gain'] = track_gain
song['store']['track_peak'] = track_peak
s3_metadata["track-gain"] = asciify(track_gain)
s3_metadata["track-peak"] = asciify(track_peak)
client.copy_object(
Bucket=bucket,
Key=key,
ContentType=song['store']['mime'],
CopySource='{}/{}'.format(bucket,key),
Metadata=s3_metadata,
MetadataDirective='REPLACE'
)
os.remove(temp_path)
result_message = 'Replaygain Analysis complete -- {} successful, {} failures'.format(
totals['success'],
totals['fail']
)
print(result_message)
LOGGER.info(result_message)
return playlist
def main():
'''Main loop of the program'''
description = 'Uploads song files to an Amazon S3 (or similar) instance.'
parser = argparse.ArgumentParser(description=description)
subparsers = parser.add_subparsers(dest='command')
parser_playlist = subparsers.add_parser(
'playlist',
help='Import playlist song data.'
)
parser_playlist.add_argument(
'filepath',
help='Path to the playlist file.',
nargs=1
)
if len(sys.argv) == 1:
sys.stderr.write('Error: please specify a command\n\n')
parser.print_help(sys.stderr)
sys.exit(1)
results = None
args = parser.parse_args()
if args.command == 'playlist':
results = import_playlist(args.filepath[0])
if results:
LOGGER.info('Exporting new playlist file to \'playlist_s3_rg.json\'')
with open('playlist_s3_rg.json', 'w', encoding='utf8') as file:
json.dump(
results,
file,
ensure_ascii=False,
sort_keys=True,
indent=4
)
LOGGER.info('Program finished. Exiting.')
if __name__ == '__main__':
main()