Skip to content

Commit

Permalink
Update mediainfo_extractor.py
Browse files Browse the repository at this point in the history
add -v vendor flag, will pull from JSON
  • Loading branch information
bturkus committed Jul 10, 2024
1 parent ac09f04 commit 94e8bf7
Showing 1 changed file with 77 additions and 38 deletions.
115 changes: 77 additions & 38 deletions ami_scripts/mediainfo_extractor.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
#!/usr/bin/env python3

#!/usr/bin/env python3

import argparse
import csv
import logging
import json
import re
import subprocess
import xml.etree.ElementTree as ET
from pathlib import Path
from pprint import pprint
from pymediainfo import MediaInfo

LOGGER = logging.getLogger(__name__)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

video_extensions = {'.mkv', '.mov', '.mp4', '.dv', '.iso'}
audio_extensions = {'.wav', '.flac'}
Expand All @@ -28,20 +28,49 @@ def make_parser():
parser.add_argument("-o", "--output",
help="path to save csv",
required=True)
parser.add_argument("-v", "--vendor", action='store_true',
help="process as BagIt with sidecar JSON metadata")

return parser

def process_directory(directory):
def is_bag(directory):
required_files = {'bag-info.txt', 'bagit.txt', 'manifest-md5.txt', 'tagmanifest-md5.txt'}
return required_files <= {file.name for file in directory.iterdir() if file.is_file()}

def process_bags(top_directory):
bags = []
for directory in top_directory.iterdir():
if directory.is_dir() and is_bag(directory):
bags.append(directory)
logging.info(f"Identified BagIt bag: {directory}")
return bags

def process_directory(bag_directory, process_json=False):
valid_extensions = video_extensions.union(audio_extensions)
media_files = []
for path in directory.rglob('*'):
if path.is_file() and path.suffix.lower() in valid_extensions:
if path.name.startswith("._"):
print(f"Skipping hidden Mac file: {path}")
else:
media_files.append(path)
data_directory = bag_directory / "data"
if data_directory.exists():
for path in data_directory.rglob('*'):
if path.is_file() and path.suffix.lower() in valid_extensions:
if path.name.startswith("._"):
logging.info(f"Skipping hidden Mac file: {path}")
else:
media_files.append(path)
logging.info(f"Adding file to processing list: {path}")
return media_files

def read_json_sidecar(json_path):
with open(json_path, 'r') as f:
data = json.load(f)
logging.info(f"Reading JSON sidecar file: {json_path}")
bib = data.get('bibliographic', {})
src = data.get('source', {}).get('object', {})
return {
'collectionID': bib.get('cmsCollectionID'),
'objectType': src.get('type'),
'objectFormat': src.get('format')
}

def has_mezzanines(file_path):
for parent in file_path.parents:
mezzanines_dir = parent / "Mezzanines"
Expand All @@ -56,9 +85,10 @@ def extract_iso_file_format(file_path):
xml_output = process.stdout
root = ET.fromstring(xml_output)
file_system_type = root.find(".//{http://kb.nl/ns/isolyzer/v1/}fileSystem").attrib['TYPE']
logging.info(f"Extracted ISO file format: {file_system_type}")
return file_system_type
except subprocess.CalledProcessError as e:
print(f"Isolyzer failed with error: {e}")
logging.error(f"Isolyzer failed with error: {e}")
return None

def extract_track_info(media_info, path, valid_extensions):
Expand Down Expand Up @@ -116,43 +146,46 @@ def is_tool(name):
return which(name) is not None

def main():
if not is_tool('isolyzer'):
print('Error: Isolyzer is not installed or not found in PATH. Please install (pip3 install isolyzer) before running this script.')
return
parser = make_parser()
args = parser.parse_args()

files_to_examine = []
top_directory = Path(args.directory)
if not top_directory.is_dir():
logging.error('Invalid directory path')
return

if args.directory:
directory = Path(args.directory)
if directory.is_dir():
files_to_examine.extend(process_directory(directory))
bags = process_bags(top_directory)
if args.vendor and not bags:
logging.error("No valid BagIt bags found in the directory.")
return

if args.file:
file = Path(args.file)
if file.is_file() and file.suffix.lower() in video_extensions.union(audio_extensions):
files_to_examine.append(file)
files_to_examine = []
for bag in bags:
files = process_directory(bag, process_json=args.vendor)
files_to_examine.extend(files)
if files:
logging.info(f"Processing {len(files)} files from {bag}")

if not files_to_examine:
print('Error: Please enter a directory or single file')
logging.error('No media files found in bags')
return

all_file_data = []

for path in files_to_examine:
if "RECYCLE.BIN" in path.parts:
print('RECYCLING BIN WITH MEDIA FILES!!!')
else:
media_info = MediaInfo.parse(str(path))
file_data = extract_track_info(media_info, path, video_extensions.union(audio_extensions))
if file_data:
print(file_data)
all_file_data.append(file_data)

with open(args.output, 'w') as f:
media_info = MediaInfo.parse(str(path))
file_data = extract_track_info(media_info, path, video_extensions.union(audio_extensions))
if file_data:
if args.vendor:
json_path = path.with_suffix('.json')
if json_path.exists():
json_data = read_json_sidecar(json_path)
file_data.extend([json_data['collectionID'], json_data['objectType'], json_data['objectFormat']])
all_file_data.append(file_data)
logging.info(f"Processed file data for: {path}")

with open(args.output, 'w', newline='') as f:
md_csv = csv.writer(f)
md_csv.writerow([
header = [
'filePath',
'asset.referenceFilename',
'technical.filename',
Expand All @@ -168,8 +201,14 @@ def main():
'role',
'divisionCode',
'driveID',
'primaryID'])
'primaryID',
'collectionID', # Added from JSON
'objectType', # Added from JSON
'objectFormat' # Added from JSON
]
md_csv.writerow(header)
md_csv.writerows(all_file_data)
logging.info(f"CSV file created successfully at {args.output}")

if __name__ == "__main__":
main()
main()

0 comments on commit 94e8bf7

Please sign in to comment.