''' add_replaygain.py This is the helper script that downloads songs from an Amazon S3 instance (or other implementations, like DigialOcean Spaces) and analyzes the replaygain data to put into the radio database. ''' import argparse import json import logging import os import subprocess import sys import threading import traceback from unicodedata import normalize from decouple import config import boto3 # If these four are not defined, then boto3 will look for defaults in the # ~/.aws configurations S3_REGION = config('S3_REGION', default=None) S3_ENDPOINT = config('S3_ENDPOINT', default=None) S3_ACCESS_KEY = config('S3_ACCESS_KEY', default=None) S3_SECRET_KEY = config('S3_SECRET_KEY', default=None) logging.basicConfig( handlers=[logging.FileHandler('./s3_replaygain.log', encoding='utf8')], level=logging.INFO, format=('[%(asctime)s] [%(levelname)s]' ' [%(name)s.%(funcName)s] === %(message)s'), datefmt='%Y-%m-%dT%H:%M:%S' ) LOGGER = logging.getLogger('add_replaygain') class Progress(object): ''' A callback class for the Amazon S3 transfer to detect how far along in an upload we are. ''' def __init__(self, filepath): self._filepath = filepath self._filename = os.path.basename(filepath) self._size = float(os.path.getsize(filepath)) self._seen_so_far = 0 self._lock = threading.Lock() def __call__(self, bytes_amount): with self._lock: self._seen_so_far += bytes_amount percentage = (self._seen_so_far / self._size) * 100 sys.stdout.write( "\r%s %s / %s (%.2f%%)" % ( self._filename, self._seen_so_far, self._size, percentage ) ) sys.stdout.flush() def asciify(text): ''' Converts a unicode string to pure ascii. ''' normal = normalize('NFKC', text) return normal.encode('ascii', 'backslashreplace').decode('ascii') def get_fullname(artist): ''' String representing the artist's full name including an alias, if available. ''' if artist['alias']: if artist['first_name'] or artist['last_name']: return '{} "{}" {}'.format(artist['first_name'], artist['alias'], artist['last_name']) return artist['alias'] return '{} {}'.format(artist['first_name'], artist['last_name']) def beautify_artists(artists): ''' Turns a list of one or more artists into a proper English listing. ''' fullnames = [get_fullname(artist) for artist in artists] output = ', ' if len(fullnames) == 2: output = ' & ' return output.join(fullnames) def import_playlist(playlist_file): ''' Imports a playlist from a JSON file, uploads the files to an S3[-like] instance, and exports a new JSON file with the updated paths. ''' if not os.path.isfile(playlist_file): raise FileNotFoundError with open(playlist_file, 'r', encoding='utf8') as pfile: playlist = json.load(pfile) session = boto3.session.Session() client = session.client( 's3', region_name=S3_REGION, endpoint_url=S3_ENDPOINT, aws_access_key_id=S3_ACCESS_KEY, aws_secret_access_key=S3_SECRET_KEY ) totals = {'success': 0, 'fail': 0} for song in playlist['songs']: path_parts = song['store']['path'][5:].split('/') bucket = path_parts[0] key = '{}/{}'.format(path_parts[1], path_parts[2]) temp_path = '/tmp/{}'.format(path_parts[2]) LOGGER.info('Begin download of: %s', song['store']['path']) try: client.download_file( bucket, key, temp_path ) except Exception: LOGGER.error( 'Download failed for: %s -- %s', temp_path, traceback.print_exc() ) totals['fail'] += 1 else: LOGGER.info( 'Successful download of: %s to %s', song['store']['path'], temp_path ) totals['success'] += 1 # get the old metadata handy head_obj = client.head_object( Bucket=bucket, Key=key ) s3_metadata = head_obj["Metadata"] done = subprocess.run( ['ffmpeg', '-i', temp_path, '-vn', '-filter', 'aresample=44100,replaygain', '-f', 'null', '2>&1'], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) results = done.stdout.split('\n')[-3:-1] # Remove extra empty string at the end track_gain = results[0].split('=')[1].strip() track_peak = results[1].split('=')[1].strip() rg_results = '{} -- gain: {}, peak: {}'.format( temp_path, track_gain, track_peak ) print(rg_results) LOGGER.info(rg_results) song['store']['track_gain'] = track_gain song['store']['track_peak'] = track_peak s3_metadata["track-gain"] = asciify(track_gain) s3_metadata["track-peak"] = asciify(track_peak) client.copy_object( Bucket=bucket, Key=key, ContentType=song['store']['mime'], CopySource='{}/{}'.format(bucket,key), Metadata=s3_metadata, MetadataDirective='REPLACE' ) os.remove(temp_path) result_message = 'Replaygain Analysis complete -- {} successful, {} failures'.format( totals['success'], totals['fail'] ) print(result_message) LOGGER.info(result_message) return playlist def main(): '''Main loop of the program''' description = 'Uploads song files to an Amazon S3 (or similar) instance.' parser = argparse.ArgumentParser(description=description) subparsers = parser.add_subparsers(dest='command') parser_playlist = subparsers.add_parser( 'playlist', help='Import playlist song data.' ) parser_playlist.add_argument( 'filepath', help='Path to the playlist file.', nargs=1 ) if len(sys.argv) == 1: sys.stderr.write('Error: please specify a command\n\n') parser.print_help(sys.stderr) sys.exit(1) results = None args = parser.parse_args() if args.command == 'playlist': results = import_playlist(args.filepath[0]) if results: LOGGER.info('Exporting new playlist file to \'playlist_s3_rg.json\'') with open('playlist_s3_rg.json', 'w', encoding='utf8') as file: json.dump( results, file, ensure_ascii=False, sort_keys=True, indent=4 ) LOGGER.info('Program finished. Exiting.') if __name__ == '__main__': main()