From 94e8bf7d9349c59dd373aefd7de2415e49b3e70c Mon Sep 17 00:00:00 2001 From: bturkus Date: Wed, 10 Jul 2024 15:11:53 -0400 Subject: [PATCH] Update mediainfo_extractor.py add -v vendor flag, will pull from JSON --- ami_scripts/mediainfo_extractor.py | 115 +++++++++++++++++++---------- 1 file changed, 77 insertions(+), 38 deletions(-) diff --git a/ami_scripts/mediainfo_extractor.py b/ami_scripts/mediainfo_extractor.py index 091064be..07c82a08 100755 --- a/ami_scripts/mediainfo_extractor.py +++ b/ami_scripts/mediainfo_extractor.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 -#!/usr/bin/env python3 - import argparse import csv import logging +import json import re import subprocess import xml.etree.ElementTree as ET @@ -12,7 +11,8 @@ from pprint import pprint from pymediainfo import MediaInfo -LOGGER = logging.getLogger(__name__) +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') video_extensions = {'.mkv', '.mov', '.mp4', '.dv', '.iso'} audio_extensions = {'.wav', '.flac'} @@ -28,20 +28,49 @@ def make_parser(): parser.add_argument("-o", "--output", help="path to save csv", required=True) + parser.add_argument("-v", "--vendor", action='store_true', + help="process as BagIt with sidecar JSON metadata") return parser -def process_directory(directory): +def is_bag(directory): + required_files = {'bag-info.txt', 'bagit.txt', 'manifest-md5.txt', 'tagmanifest-md5.txt'} + return required_files <= {file.name for file in directory.iterdir() if file.is_file()} + +def process_bags(top_directory): + bags = [] + for directory in top_directory.iterdir(): + if directory.is_dir() and is_bag(directory): + bags.append(directory) + logging.info(f"Identified BagIt bag: {directory}") + return bags + +def process_directory(bag_directory, process_json=False): valid_extensions = video_extensions.union(audio_extensions) media_files = [] - for path in directory.rglob('*'): - if path.is_file() and path.suffix.lower() in valid_extensions: - if path.name.startswith("._"): - print(f"Skipping hidden Mac file: {path}") - else: - media_files.append(path) + data_directory = bag_directory / "data" + if data_directory.exists(): + for path in data_directory.rglob('*'): + if path.is_file() and path.suffix.lower() in valid_extensions: + if path.name.startswith("._"): + logging.info(f"Skipping hidden Mac file: {path}") + else: + media_files.append(path) + logging.info(f"Adding file to processing list: {path}") return media_files +def read_json_sidecar(json_path): + with open(json_path, 'r') as f: + data = json.load(f) + logging.info(f"Reading JSON sidecar file: {json_path}") + bib = data.get('bibliographic', {}) + src = data.get('source', {}).get('object', {}) + return { + 'collectionID': bib.get('cmsCollectionID'), + 'objectType': src.get('type'), + 'objectFormat': src.get('format') + } + def has_mezzanines(file_path): for parent in file_path.parents: mezzanines_dir = parent / "Mezzanines" @@ -56,9 +85,10 @@ def extract_iso_file_format(file_path): xml_output = process.stdout root = ET.fromstring(xml_output) file_system_type = root.find(".//{http://kb.nl/ns/isolyzer/v1/}fileSystem").attrib['TYPE'] + logging.info(f"Extracted ISO file format: {file_system_type}") return file_system_type except subprocess.CalledProcessError as e: - print(f"Isolyzer failed with error: {e}") + logging.error(f"Isolyzer failed with error: {e}") return None def extract_track_info(media_info, path, valid_extensions): @@ -116,43 +146,46 @@ def is_tool(name): return which(name) is not None def main(): - if not is_tool('isolyzer'): - print('Error: Isolyzer is not installed or not found in PATH. Please install (pip3 install isolyzer) before running this script.') - return parser = make_parser() args = parser.parse_args() - files_to_examine = [] + top_directory = Path(args.directory) + if not top_directory.is_dir(): + logging.error('Invalid directory path') + return - if args.directory: - directory = Path(args.directory) - if directory.is_dir(): - files_to_examine.extend(process_directory(directory)) + bags = process_bags(top_directory) + if args.vendor and not bags: + logging.error("No valid BagIt bags found in the directory.") + return - if args.file: - file = Path(args.file) - if file.is_file() and file.suffix.lower() in video_extensions.union(audio_extensions): - files_to_examine.append(file) + files_to_examine = [] + for bag in bags: + files = process_directory(bag, process_json=args.vendor) + files_to_examine.extend(files) + if files: + logging.info(f"Processing {len(files)} files from {bag}") if not files_to_examine: - print('Error: Please enter a directory or single file') + logging.error('No media files found in bags') return all_file_data = [] - for path in files_to_examine: - if "RECYCLE.BIN" in path.parts: - print('RECYCLING BIN WITH MEDIA FILES!!!') - else: - media_info = MediaInfo.parse(str(path)) - file_data = extract_track_info(media_info, path, video_extensions.union(audio_extensions)) - if file_data: - print(file_data) - all_file_data.append(file_data) - - with open(args.output, 'w') as f: + media_info = MediaInfo.parse(str(path)) + file_data = extract_track_info(media_info, path, video_extensions.union(audio_extensions)) + if file_data: + if args.vendor: + json_path = path.with_suffix('.json') + if json_path.exists(): + json_data = read_json_sidecar(json_path) + file_data.extend([json_data['collectionID'], json_data['objectType'], json_data['objectFormat']]) + all_file_data.append(file_data) + logging.info(f"Processed file data for: {path}") + + with open(args.output, 'w', newline='') as f: md_csv = csv.writer(f) - md_csv.writerow([ + header = [ 'filePath', 'asset.referenceFilename', 'technical.filename', @@ -168,8 +201,14 @@ def main(): 'role', 'divisionCode', 'driveID', - 'primaryID']) + 'primaryID', + 'collectionID', # Added from JSON + 'objectType', # Added from JSON + 'objectFormat' # Added from JSON + ] + md_csv.writerow(header) md_csv.writerows(all_file_data) + logging.info(f"CSV file created successfully at {args.output}") if __name__ == "__main__": - main() + main() \ No newline at end of file