#!/usr/bin/python import sys import os import getopt import datetime import subprocess import yaml # utils def selection_skip(item): if "selection" in item and item["selection"] == "skip": return True return False # settings settings = {} def settings_item_entry(item, entry): if entry in item: return item[entry] elif entry in settings: return settings[entry] else: return None # timecode def timecode_seconds(timecode): values = timecode.split(':') seconds = 0. for i in range(len(values)): index = len(values) - i - 1 seconds += float(values[i]) * pow(60, index) return seconds def timecode_string(seconds): remainder = seconds string = "" pow_max = 3 for i in range(pow_max): index = pow_max - i - 1 if remainder / pow(60, index) >= 1.: if index == 0: if remainder < 10.: string += "0" string += "%.3f" % remainder else: part = int(remainder / pow(60, index)) remainder -= float(part * pow(60, index)) string += "%02d:" % part else: if index == 0: string += "00.0" else: string += "00:" return string # audio def audio_source_suffix(shot): extension = settings_item_entry(shot, "audio-extension") or settings_item_entry(shot, "audio-source-extension") return "." + extension def audio_source_base(sequence, shot): base = settings_item_entry(shot, "audio-source-path") if "label" in sequence: base = os.path.join(base, sequence["label"]) return base def audio_source_name(sequence, shot, take): name = shot_name(shot) if "index" in take: name += "-" + str(take["index"]) return name def audio_source_path(sequence, shot, take): base = audio_source_base(sequence, shot) name = audio_source_name(sequence, shot, take) suffix = audio_source_suffix(shot) return os.path.join(base, name + suffix) def audio_output_suffix(shot): extension = settings_item_entry(shot, "audio-extension") or settings_item_entry(shot, "audio-output-extension") return "." + extension def audio_output_base(sequence, shot): base = settings_item_entry(shot, "audio-output-path") if "label" in sequence: base = os.path.join(base, sequence["label"]) return base def audio_output_name(sequence, shot, take, part): name = shot_name(shot) + "-" part_name_part = part_name(part) if part_name_part: name += part_name_part + "-" # Remove final separator. name = name[:-1] return name def audio_output_prepare(sequence, shot, take, part): base = audio_output_base(sequence, shot) name = audio_output_name(sequence, shot, take, part) try: os.stat(base) except: os.makedirs(base, exist_ok = True) if name in sequence["audio-registry"]: if sequence["audio-registry"][name]["count"] > 1: sequence["audio-registry"][name]["index"] += 1 def audio_output_path(sequence, shot, take, part): base = audio_output_base(sequence, shot) name = audio_output_name(sequence, shot, take, part) suffix = audio_output_suffix(shot) if name in sequence["audio-registry"]: if sequence["audio-registry"][name]["count"] > 1: name += "-" + str(sequence["audio-registry"][name]["index"]) return os.path.join(base, name + suffix) def audio_ffmpeg_command(sequence, shot, take, part): source_path = audio_source_path(sequence, shot, take) output_path = audio_output_path(sequence, shot, take, part) ffmpeg_command = [ "ffmpeg" ] audio_sync = settings_item_entry(take, "audio-sync") video_sync = settings_item_entry(take, "video-sync") audio_start = settings_item_entry(shot, "audio-start") or settings_item_entry(part, "audio-start") video_start = settings_item_entry(shot, "video-start") or settings_item_entry(part, "video-start") if audio_start: ffmpeg_command += [ "-ss", audio_start ] elif video_start and audio_sync and video_sync: video_start_seconds = timecode_seconds(video_start) video_sync_seconds = timecode_seconds(video_sync) audio_sync_seconds = timecode_seconds(audio_sync) audio_start_seconds = audio_sync_seconds + video_start_seconds - video_sync_seconds ffmpeg_command += [ "-ss", timecode_string(audio_start_seconds) ] audio_stop = settings_item_entry(shot, "audio-stop") or settings_item_entry(part, "audio-stop") video_stop = settings_item_entry(shot, "video-stop") or settings_item_entry(part, "video-stop") if audio_stop: ffmpeg_command += [ "-to", audio_stop ] elif video_stop and audio_sync and video_sync: video_stop_seconds = timecode_seconds(video_stop) video_sync_seconds = timecode_seconds(video_sync) audio_sync_seconds = timecode_seconds(audio_sync) audio_stop_seconds = audio_sync_seconds + video_stop_seconds - video_sync_seconds ffmpeg_command += [ "-to", timecode_string(audio_stop_seconds) ] ffmpeg_command += [ "-i", source_path ] encoder = settings_item_entry(shot, "audio-encoder") if encoder: ffmpeg_command += [ "-c:a", encoder ] filters = settings_item_entry(shot, "audio-filters") if filters: ffmpeg_command += [ "-filter:a", ",".join(filters) ] ffmpeg_command += [ output_path ] return ffmpeg_command def audio_register(sequence, shot, take, part): source_path = audio_source_path(sequence, shot, take) if not source_path or not os.path.isfile(source_path): return if "audio-registry" not in sequence: sequence["audio-registry"] = {} name = audio_output_name(sequence, shot, take, part) if name in sequence["audio-registry"]: sequence["audio-registry"][name]["count"] += 1 else: sequence["audio-registry"][name] = { "count": 1 , "index": 1} def audio_process(sequence, shot, take, part): source_path = audio_source_path(sequence, shot, take) output_path = audio_output_path(sequence, shot, take, part) if not source_path or not output_path: return if not os.path.isfile(source_path): print(" - audio-source: unavailable") return print(" - audio-source: " + audio_source_name(sequence, shot, take)) print(" - audio-output: " + audio_output_name(sequence, shot, take, part)) command = audio_ffmpeg_command(sequence, shot, take, part) audio_output_prepare(sequence, shot, take, part) if selection_skip(take) or selection_skip(part): return print(" ".join(command)) if os.path.isfile(output_path): return subprocess.call(command) # video def video_source_suffix(shot): extension = settings_item_entry(shot, "video-extension") or settings_item_entry(shot, "video-source-extension") return "." + extension def video_source_base(sequence, shot): base = settings_item_entry(shot, "video-source-path") if "label" in sequence: base = os.path.join(base, sequence["label"]) return base def video_source_name(sequence, shot, take): name = shot_name(shot) if "index" in take: index = take["index"] name += "-" + str(index) return name def video_source_path(sequence, shot, take): base = video_source_base(sequence, shot) name = video_source_name(sequence, shot, take) suffix = video_source_suffix(shot) return os.path.join(base, name + suffix) def video_output_suffix(shot): extension = settings_item_entry(shot, "video-extension") or settings_item_entry(shot, "video-output-extension") return "." + extension def video_output_base(sequence, shot): base = settings_item_entry(shot, "video-output-path") if "label" in sequence: base = os.path.join(base, sequence["label"]) return base def video_output_name(sequence, shot, take, part): name = shot_name(shot) + "-" part_name_part = part_name(part) if part_name_part: name += part_name_part + "-" # Remove final separator. name = name[:-1] return name def video_output_prepare(sequence, shot, take, part): base = video_output_base(sequence, shot) name = video_output_name(sequence, shot, take, part) try: os.stat(base) except: os.makedirs(base, exist_ok = True) if name in sequence["video-registry"]: if sequence["video-registry"][name]["count"] > 1: sequence["video-registry"][name]["index"] += 1 def video_output_path(sequence, shot, take, part): base = video_output_base(sequence, shot) name = video_output_name(sequence, shot, take, part) suffix = video_output_suffix(shot) if name in sequence["video-registry"]: if sequence["video-registry"][name]["count"] > 1: name += "-" + str(sequence["video-registry"][name]["index"]) return os.path.join(base, name + suffix) def video_ffmpeg_command(sequence, shot, take, part): source_path = video_source_path(sequence, shot, take) output_path = video_output_path(sequence, shot, take, part) early_cut_safe = True ffmpeg_command = [ "ffmpeg" ] encoder = settings_item_entry(shot, "video-encoder") # Encoding is needed to cut at arbitrary positions, otherwise we # might miss reference frames in the bitstream. if encoder and encoder == "copy": early_cut_safe = False if not early_cut_safe: ffmpeg_command += [ "-i", source_path ] video_start = settings_item_entry(shot, "video-start")or settings_item_entry(part, "video-start") if video_start: ffmpeg_command += [ "-ss", video_start ] video_stop = settings_item_entry(shot, "video-stop") or settings_item_entry(part, "video-stop") if video_stop: ffmpeg_command += [ "-to", video_stop ] if early_cut_safe: ffmpeg_command += [ "-i", source_path ] width = settings_item_entry(shot, "video-width") height = settings_item_entry(shot, "video-height") if width and height: ffmpeg_command += [ "-s", str(width)+"x"+str(height) ] frame_rate = settings_item_entry(shot, "video-frame-rate") if frame_rate: ffmpeg_command += [ "-r", str(frame_rate) ] encoder = settings_item_entry(shot, "video-encoder") if encoder: ffmpeg_command += [ "-c:v", encoder ] crf = settings_item_entry(shot, "video-crf") if crf: ffmpeg_command += [ "-crf", str(crf) ] quality = settings_item_entry(shot, "video-quality") if quality: ffmpeg_command += [ "-q:v", str(quality) ] rate_max = settings_item_entry(shot, "video-rate-max") if rate_max: ffmpeg_command += [ "-maxrate", rate_max ] ffmpeg_command += [ "-bufsize", "2M" ] filters = settings_item_entry(shot, "video-filters") if filters: ffmpeg_command += [ "-filter:v", ",".join(filters) ] audio = settings_item_entry(shot, "video-audio") if audio == "remove": ffmpeg_command += [ "-an" ] ffmpeg_command += [ output_path ] return ffmpeg_command def video_register(sequence, shot, take, part): source_path = video_source_path(sequence, shot, take) if not source_path or not os.path.isfile(source_path): return if "video-registry" not in sequence: sequence["video-registry"] = {} name = video_output_name(sequence, shot, take, part) if name in sequence["video-registry"]: sequence["video-registry"][name]["count"] += 1 else: sequence["video-registry"][name] = { "count": 1 , "index": 1} def video_process(sequence, shot, take, part): source_path = video_source_path(sequence, shot, take) output_path = video_output_path(sequence, shot, take, part) if not source_path or not os.path.isfile(source_path): print(" - video-source: unavailable") return print(" - video-source: " + video_source_name(sequence, shot, take)) print(" - video-output: " + video_output_name(sequence, shot, take, part)) command = video_ffmpeg_command(sequence, shot, take, part) video_output_prepare(sequence, shot, take, part) if selection_skip(take) or selection_skip(part): return print(" ".join(command)) if os.path.isfile(output_path): return subprocess.call(command) # media def media_register(sequence, shot, take, part): audio_register(sequence, shot, take, part) video_register(sequence, shot, take, part) def media_process(sequence, shot, take, part): audio_process(sequence, shot, take, part) video_process(sequence, shot, take, part) # part def part_name(part): name = "" if "subject" in part: name += part["subject"] + "-" if "label" in part: name += part["label"] + "-" if "actions" in part: actions = part["actions"] if len(actions) > 2: actions = actions[:1] + actions[-1:] for action in actions: name += action + "-" if len(name) == 0: return None # Remove final separator. name = name[:-1] return name # take def take_register(sequence, shot, take): if "parts" in take: for part in take["parts"]: media_register(sequence, shot, take, part) else: part = {} media_register(sequence, shot, take, part) def take_process(sequence, shot, take): if "parts" in take: for part in take["parts"]: media_process(sequence, shot, take, part) else: part = {} media_process(sequence, shot, take, part) # shot def shot_name(shot): name = "" if "value" in shot: name += shot["value"] + "-" if "position" in shot: name += shot["position"] + "-" if "subject" in shot: name += shot["subject"] + "-" if "label" in shot: name += shot["label"] + "-" if len(name) == 0: return None # Remove final separator. name = name[:-1] return name def shot_process(sequence, shot): print("- shot: " + shot_name(shot)) if "takes" in shot: for take in shot["takes"]: take_process(sequence, shot, take) else: take = {} take_process(sequence, shot, take) def shot_register(sequence, shot): if "takes" in shot: for take in shot["takes"]: take_register(sequence, shot, take) else: take = {} take_register(sequence, shot, take) def sequence_process(sequence): if "label" in sequence: print("# sequence: " + sequence["label"] + "\n") for shot in sequence["shots"]: shot_process(sequence, shot) print("") def sequence_register(sequence): for shot in sequence["shots"]: shot_register(sequence, shot) # capture def capture_process(sequences): for sequence in sequences: sequence_register(sequence) for sequence in sequences: sequence_process(sequence) def capture_main(): global settings if len(sys.argv) < 2: return 1 path = sys.argv[1] stream = open(path, 'r') data = yaml.load(stream, Loader = yaml.SafeLoader) stream.close() settings = data["settings"] capture_process(data["sequences"]) if __name__ == "__main__": capture_main()