From 57205fbb273847b14e8edcf7288199a86d985220 Mon Sep 17 00:00:00 2001 From: Paul Kocialkowski Date: Fri, 10 May 2024 20:05:48 +0200 Subject: Remove capture-pipeline Signed-off-by: Paul Kocialkowski --- capture-pipeline | 564 ------------------------------------------------------- 1 file changed, 564 deletions(-) delete mode 100755 capture-pipeline diff --git a/capture-pipeline b/capture-pipeline deleted file mode 100755 index 625b69c..0000000 --- a/capture-pipeline +++ /dev/null @@ -1,564 +0,0 @@ -#!/usr/bin/python - -import sys -import os -import getopt -import datetime -import subprocess -import yaml - -# utils - -def selection_skip(item): - if "selection" in item and item["selection"] == "skip": - return True - - return False - -# settings - -settings = {} - -def settings_item_entry(item, entry): - if entry in item: - return item[entry] - elif entry in settings: - return settings[entry] - else: - return None - -# timecode - -def timecode_seconds(timecode): - values = timecode.split(':') - seconds = 0. - - for i in range(len(values)): - index = len(values) - i - 1 - seconds += float(values[i]) * pow(60, index) - - return seconds - -def timecode_string(seconds): - remainder = seconds - string = "" - pow_max = 3 - - for i in range(pow_max): - index = pow_max - i - 1 - if remainder / pow(60, index) >= 1.: - if index == 0: - if remainder < 10.: - string += "0" - - string += "%.3f" % remainder - else: - part = int(remainder / pow(60, index)) - remainder -= float(part * pow(60, index)) - - string += "%02d:" % part - else: - if index == 0: - string += "00.0" - else: - string += "00:" - - return string - -# audio - -def audio_source_suffix(shot): - extension = settings_item_entry(shot, "audio-extension") or settings_item_entry(shot, "audio-source-extension") - - return "." + extension - -def audio_source_base(sequence, shot): - base = settings_item_entry(shot, "audio-source-path") - - if "label" in sequence: - base = os.path.join(base, sequence["label"]) - - return base - -def audio_source_name(sequence, shot, take): - name = shot_name(shot) - - if "index" in take: - name += "-" + str(take["index"]) - - return name - -def audio_source_path(sequence, shot, take): - base = audio_source_base(sequence, shot) - name = audio_source_name(sequence, shot, take) - suffix = audio_source_suffix(shot) - - return os.path.join(base, name + suffix) - -def audio_output_suffix(shot): - extension = settings_item_entry(shot, "audio-extension") or settings_item_entry(shot, "audio-output-extension") - - return "." + extension - -def audio_output_base(sequence, shot): - base = settings_item_entry(shot, "audio-output-path") - - if "label" in sequence: - base = os.path.join(base, sequence["label"]) - - return base - -def audio_output_name(sequence, shot, take, part): - name = shot_name(shot) + "-" - - part_name_part = part_name(part) - if part_name_part: - name += part_name_part + "-" - - # Remove final separator. - name = name[:-1] - - return name - -def audio_output_prepare(sequence, shot, take, part): - base = audio_output_base(sequence, shot) - name = audio_output_name(sequence, shot, take, part) - - try: - os.stat(base) - except: - os.makedirs(base, exist_ok = True) - - if name in sequence["audio-registry"]: - if sequence["audio-registry"][name]["count"] > 1: - sequence["audio-registry"][name]["index"] += 1 - -def audio_output_path(sequence, shot, take, part): - base = audio_output_base(sequence, shot) - name = audio_output_name(sequence, shot, take, part) - suffix = audio_output_suffix(shot) - - if name in sequence["audio-registry"]: - if sequence["audio-registry"][name]["count"] > 1: - name += "-" + str(sequence["audio-registry"][name]["index"]) - - return os.path.join(base, name + suffix) - -def audio_ffmpeg_command(sequence, shot, take, part): - source_path = audio_source_path(sequence, shot, take) - output_path = audio_output_path(sequence, shot, take, part) - - ffmpeg_command = [ "ffmpeg" ] - - audio_sync = settings_item_entry(take, "audio-sync") - video_sync = settings_item_entry(take, "video-sync") - - audio_start = settings_item_entry(shot, "audio-start") or settings_item_entry(part, "audio-start") - video_start = settings_item_entry(shot, "video-start") or settings_item_entry(part, "video-start") - - if audio_start: - ffmpeg_command += [ "-ss", audio_start ] - elif video_start and audio_sync and video_sync: - video_start_seconds = timecode_seconds(video_start) - video_sync_seconds = timecode_seconds(video_sync) - audio_sync_seconds = timecode_seconds(audio_sync) - audio_start_seconds = audio_sync_seconds + video_start_seconds - video_sync_seconds - - ffmpeg_command += [ "-ss", timecode_string(audio_start_seconds) ] - - audio_stop = settings_item_entry(shot, "audio-stop") or settings_item_entry(part, "audio-stop") - video_stop = settings_item_entry(shot, "video-stop") or settings_item_entry(part, "video-stop") - - if audio_stop: - ffmpeg_command += [ "-to", audio_stop ] - elif video_stop and audio_sync and video_sync: - video_stop_seconds = timecode_seconds(video_stop) - video_sync_seconds = timecode_seconds(video_sync) - audio_sync_seconds = timecode_seconds(audio_sync) - audio_stop_seconds = audio_sync_seconds + video_stop_seconds - video_sync_seconds - - ffmpeg_command += [ "-to", timecode_string(audio_stop_seconds) ] - - ffmpeg_command += [ "-i", source_path ] - - encoder = settings_item_entry(shot, "audio-encoder") - if encoder: - ffmpeg_command += [ "-c:a", encoder ] - - filters = settings_item_entry(shot, "audio-filters") - if filters: - ffmpeg_command += [ "-filter:a", ",".join(filters) ] - - ffmpeg_command += [ output_path ] - - return ffmpeg_command - -def audio_register(sequence, shot, take, part): - source_path = audio_source_path(sequence, shot, take) - - if not source_path or not os.path.isfile(source_path): - return - - if "audio-registry" not in sequence: - sequence["audio-registry"] = {} - - name = audio_output_name(sequence, shot, take, part) - if name in sequence["audio-registry"]: - sequence["audio-registry"][name]["count"] += 1 - else: - sequence["audio-registry"][name] = { "count": 1 , "index": 1} - -def audio_process(sequence, shot, take, part): - source_path = audio_source_path(sequence, shot, take) - output_path = audio_output_path(sequence, shot, take, part) - - if not source_path or not output_path: - return - - if not os.path.isfile(source_path): - print(" - audio-source: unavailable") - return - - print(" - audio-source: " + audio_source_name(sequence, shot, take)) - print(" - audio-output: " + audio_output_name(sequence, shot, take, part)) - - command = audio_ffmpeg_command(sequence, shot, take, part) - - audio_output_prepare(sequence, shot, take, part) - - if selection_skip(take) or selection_skip(part): - return - - print(" ".join(command)) - - if os.path.isfile(output_path): - return - - subprocess.call(command) - -# video - -def video_source_suffix(shot): - extension = settings_item_entry(shot, "video-extension") or settings_item_entry(shot, "video-source-extension") - - return "." + extension - -def video_source_base(sequence, shot): - base = settings_item_entry(shot, "video-source-path") - - if "label" in sequence: - base = os.path.join(base, sequence["label"]) - - return base - -def video_source_name(sequence, shot, take): - name = shot_name(shot) - - if "index" in take: - index = take["index"] - name += "-" + str(index) - - return name - -def video_source_path(sequence, shot, take): - base = video_source_base(sequence, shot) - name = video_source_name(sequence, shot, take) - suffix = video_source_suffix(shot) - - return os.path.join(base, name + suffix) - -def video_output_suffix(shot): - extension = settings_item_entry(shot, "video-extension") or settings_item_entry(shot, "video-output-extension") - - return "." + extension - -def video_output_base(sequence, shot): - base = settings_item_entry(shot, "video-output-path") - - if "label" in sequence: - base = os.path.join(base, sequence["label"]) - - return base - -def video_output_name(sequence, shot, take, part): - name = shot_name(shot) + "-" - - part_name_part = part_name(part) - if part_name_part: - name += part_name_part + "-" - - # Remove final separator. - name = name[:-1] - - return name - -def video_output_prepare(sequence, shot, take, part): - base = video_output_base(sequence, shot) - name = video_output_name(sequence, shot, take, part) - - try: - os.stat(base) - except: - os.makedirs(base, exist_ok = True) - - if name in sequence["video-registry"]: - if sequence["video-registry"][name]["count"] > 1: - sequence["video-registry"][name]["index"] += 1 - -def video_output_path(sequence, shot, take, part): - base = video_output_base(sequence, shot) - name = video_output_name(sequence, shot, take, part) - suffix = video_output_suffix(shot) - - if name in sequence["video-registry"]: - if sequence["video-registry"][name]["count"] > 1: - name += "-" + str(sequence["video-registry"][name]["index"]) - - return os.path.join(base, name + suffix) - -def video_ffmpeg_command(sequence, shot, take, part): - source_path = video_source_path(sequence, shot, take) - output_path = video_output_path(sequence, shot, take, part) - early_cut_safe = True - - ffmpeg_command = [ "ffmpeg" ] - - encoder = settings_item_entry(shot, "video-encoder") - - # Encoding is needed to cut at arbitrary positions, otherwise we - # might miss reference frames in the bitstream. - if encoder and encoder == "copy": - early_cut_safe = False - - if not early_cut_safe: - ffmpeg_command += [ "-i", source_path ] - - video_start = settings_item_entry(shot, "video-start")or settings_item_entry(part, "video-start") - if video_start: - ffmpeg_command += [ "-ss", video_start ] - - video_stop = settings_item_entry(shot, "video-stop") or settings_item_entry(part, "video-stop") - if video_stop: - ffmpeg_command += [ "-to", video_stop ] - - if early_cut_safe: - ffmpeg_command += [ "-i", source_path ] - - width = settings_item_entry(shot, "video-width") - height = settings_item_entry(shot, "video-height") - - if width and height: - ffmpeg_command += [ "-s", str(width)+"x"+str(height) ] - - frame_rate = settings_item_entry(shot, "video-frame-rate") - if frame_rate: - ffmpeg_command += [ "-r", str(frame_rate) ] - - encoder = settings_item_entry(shot, "video-encoder") - if encoder: - ffmpeg_command += [ "-c:v", encoder ] - - crf = settings_item_entry(shot, "video-crf") - if crf: - ffmpeg_command += [ "-crf", str(crf) ] - - quality = settings_item_entry(shot, "video-quality") - if quality: - ffmpeg_command += [ "-q:v", str(quality) ] - - rate_max = settings_item_entry(shot, "video-rate-max") - if rate_max: - ffmpeg_command += [ "-maxrate", rate_max ] - ffmpeg_command += [ "-bufsize", "2M" ] - - filters = settings_item_entry(shot, "video-filters") - if filters: - ffmpeg_command += [ "-filter:v", ",".join(filters) ] - - audio = settings_item_entry(shot, "video-audio") - if audio == "remove": - ffmpeg_command += [ "-an" ] - - ffmpeg_command += [ output_path ] - - return ffmpeg_command - -def video_register(sequence, shot, take, part): - source_path = video_source_path(sequence, shot, take) - - if not source_path or not os.path.isfile(source_path): - return - - if "video-registry" not in sequence: - sequence["video-registry"] = {} - - name = video_output_name(sequence, shot, take, part) - if name in sequence["video-registry"]: - sequence["video-registry"][name]["count"] += 1 - else: - sequence["video-registry"][name] = { "count": 1 , "index": 1} - -def video_process(sequence, shot, take, part): - source_path = video_source_path(sequence, shot, take) - output_path = video_output_path(sequence, shot, take, part) - - if not source_path or not os.path.isfile(source_path): - print(" - video-source: unavailable") - return - - print(" - video-source: " + video_source_name(sequence, shot, take)) - print(" - video-output: " + video_output_name(sequence, shot, take, part)) - - command = video_ffmpeg_command(sequence, shot, take, part) - - video_output_prepare(sequence, shot, take, part) - - if selection_skip(take) or selection_skip(part): - return - - print(" ".join(command)) - - if os.path.isfile(output_path): - return - - subprocess.call(command) - -# media - -def media_register(sequence, shot, take, part): - audio_register(sequence, shot, take, part) - video_register(sequence, shot, take, part) - -def media_process(sequence, shot, take, part): - audio_process(sequence, shot, take, part) - video_process(sequence, shot, take, part) - -# part - -def part_name(part): - name = "" - - if "subject" in part: - name += part["subject"] + "-" - - if "label" in part: - name += part["label"] + "-" - - if "actions" in part: - actions = part["actions"] - - if len(actions) > 2: - actions = actions[:1] + actions[-1:] - - for action in actions: - name += action + "-" - - if len(name) == 0: - return None - - # Remove final separator. - name = name[:-1] - - return name - -# take - -def take_register(sequence, shot, take): - if "parts" in take: - for part in take["parts"]: - media_register(sequence, shot, take, part) - else: - part = {} - media_register(sequence, shot, take, part) - -def take_process(sequence, shot, take): - if "parts" in take: - for part in take["parts"]: - media_process(sequence, shot, take, part) - else: - part = {} - media_process(sequence, shot, take, part) - -# shot - -def shot_name(shot): - name = "" - - if "value" in shot: - name += shot["value"] + "-" - - if "position" in shot: - name += shot["position"] + "-" - - if "subject" in shot: - name += shot["subject"] + "-" - - if "label" in shot: - name += shot["label"] + "-" - - if len(name) == 0: - return None - - # Remove final separator. - name = name[:-1] - - return name - -def shot_process(sequence, shot): - print("- shot: " + shot_name(shot)) - - if "takes" in shot: - for take in shot["takes"]: - take_process(sequence, shot, take) - else: - take = {} - take_process(sequence, shot, take) - -def shot_register(sequence, shot): - if "takes" in shot: - for take in shot["takes"]: - take_register(sequence, shot, take) - else: - take = {} - take_register(sequence, shot, take) - -def sequence_process(sequence): - if "label" in sequence: - print("# sequence: " + sequence["label"] + "\n") - - for shot in sequence["shots"]: - shot_process(sequence, shot) - - print("") - -def sequence_register(sequence): - for shot in sequence["shots"]: - shot_register(sequence, shot) - -# capture - -def capture_process(sequences): - for sequence in sequences: - sequence_register(sequence) - - for sequence in sequences: - sequence_process(sequence) - -def capture_main(): - global settings - - if len(sys.argv) < 2: - return 1 - - path = sys.argv[1] - - stream = open(path, 'r') - data = yaml.load(stream, Loader = yaml.SafeLoader) - stream.close() - - settings = data["settings"] - - capture_process(data["sequences"]) - -if __name__ == "__main__": - capture_main() -- cgit v1.2.3