summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Kocialkowski2024-05-10 20:05:48 +0200
committerPaul Kocialkowski2024-05-10 20:05:48 +0200
commit57205fbb273847b14e8edcf7288199a86d985220 (patch)
tree0065d82cdb24a06bb8e97d760fc7659dbfb77f3e
parente9f7fa6b7efebde0d62fb42671563399b0b82e26 (diff)
Remove capture-pipeline
Signed-off-by: Paul Kocialkowski <contact@paulk.fr>
-rwxr-xr-xcapture-pipeline564
1 files changed, 0 insertions, 564 deletions
diff --git a/capture-pipeline b/capture-pipeline
deleted file mode 100755
index 625b69c..0000000
--- a/capture-pipeline
+++ /dev/null
@@ -1,564 +0,0 @@
-#!/usr/bin/python
-
-import sys
-import os
-import getopt
-import datetime
-import subprocess
-import yaml
-
-# utils
-
-def selection_skip(item):
- if "selection" in item and item["selection"] == "skip":
- return True
-
- return False
-
-# settings
-
-settings = {}
-
-def settings_item_entry(item, entry):
- if entry in item:
- return item[entry]
- elif entry in settings:
- return settings[entry]
- else:
- return None
-
-# timecode
-
-def timecode_seconds(timecode):
- values = timecode.split(':')
- seconds = 0.
-
- for i in range(len(values)):
- index = len(values) - i - 1
- seconds += float(values[i]) * pow(60, index)
-
- return seconds
-
-def timecode_string(seconds):
- remainder = seconds
- string = ""
- pow_max = 3
-
- for i in range(pow_max):
- index = pow_max - i - 1
- if remainder / pow(60, index) >= 1.:
- if index == 0:
- if remainder < 10.:
- string += "0"
-
- string += "%.3f" % remainder
- else:
- part = int(remainder / pow(60, index))
- remainder -= float(part * pow(60, index))
-
- string += "%02d:" % part
- else:
- if index == 0:
- string += "00.0"
- else:
- string += "00:"
-
- return string
-
-# audio
-
-def audio_source_suffix(shot):
- extension = settings_item_entry(shot, "audio-extension") or settings_item_entry(shot, "audio-source-extension")
-
- return "." + extension
-
-def audio_source_base(sequence, shot):
- base = settings_item_entry(shot, "audio-source-path")
-
- if "label" in sequence:
- base = os.path.join(base, sequence["label"])
-
- return base
-
-def audio_source_name(sequence, shot, take):
- name = shot_name(shot)
-
- if "index" in take:
- name += "-" + str(take["index"])
-
- return name
-
-def audio_source_path(sequence, shot, take):
- base = audio_source_base(sequence, shot)
- name = audio_source_name(sequence, shot, take)
- suffix = audio_source_suffix(shot)
-
- return os.path.join(base, name + suffix)
-
-def audio_output_suffix(shot):
- extension = settings_item_entry(shot, "audio-extension") or settings_item_entry(shot, "audio-output-extension")
-
- return "." + extension
-
-def audio_output_base(sequence, shot):
- base = settings_item_entry(shot, "audio-output-path")
-
- if "label" in sequence:
- base = os.path.join(base, sequence["label"])
-
- return base
-
-def audio_output_name(sequence, shot, take, part):
- name = shot_name(shot) + "-"
-
- part_name_part = part_name(part)
- if part_name_part:
- name += part_name_part + "-"
-
- # Remove final separator.
- name = name[:-1]
-
- return name
-
-def audio_output_prepare(sequence, shot, take, part):
- base = audio_output_base(sequence, shot)
- name = audio_output_name(sequence, shot, take, part)
-
- try:
- os.stat(base)
- except:
- os.makedirs(base, exist_ok = True)
-
- if name in sequence["audio-registry"]:
- if sequence["audio-registry"][name]["count"] > 1:
- sequence["audio-registry"][name]["index"] += 1
-
-def audio_output_path(sequence, shot, take, part):
- base = audio_output_base(sequence, shot)
- name = audio_output_name(sequence, shot, take, part)
- suffix = audio_output_suffix(shot)
-
- if name in sequence["audio-registry"]:
- if sequence["audio-registry"][name]["count"] > 1:
- name += "-" + str(sequence["audio-registry"][name]["index"])
-
- return os.path.join(base, name + suffix)
-
-def audio_ffmpeg_command(sequence, shot, take, part):
- source_path = audio_source_path(sequence, shot, take)
- output_path = audio_output_path(sequence, shot, take, part)
-
- ffmpeg_command = [ "ffmpeg" ]
-
- audio_sync = settings_item_entry(take, "audio-sync")
- video_sync = settings_item_entry(take, "video-sync")
-
- audio_start = settings_item_entry(shot, "audio-start") or settings_item_entry(part, "audio-start")
- video_start = settings_item_entry(shot, "video-start") or settings_item_entry(part, "video-start")
-
- if audio_start:
- ffmpeg_command += [ "-ss", audio_start ]
- elif video_start and audio_sync and video_sync:
- video_start_seconds = timecode_seconds(video_start)
- video_sync_seconds = timecode_seconds(video_sync)
- audio_sync_seconds = timecode_seconds(audio_sync)
- audio_start_seconds = audio_sync_seconds + video_start_seconds - video_sync_seconds
-
- ffmpeg_command += [ "-ss", timecode_string(audio_start_seconds) ]
-
- audio_stop = settings_item_entry(shot, "audio-stop") or settings_item_entry(part, "audio-stop")
- video_stop = settings_item_entry(shot, "video-stop") or settings_item_entry(part, "video-stop")
-
- if audio_stop:
- ffmpeg_command += [ "-to", audio_stop ]
- elif video_stop and audio_sync and video_sync:
- video_stop_seconds = timecode_seconds(video_stop)
- video_sync_seconds = timecode_seconds(video_sync)
- audio_sync_seconds = timecode_seconds(audio_sync)
- audio_stop_seconds = audio_sync_seconds + video_stop_seconds - video_sync_seconds
-
- ffmpeg_command += [ "-to", timecode_string(audio_stop_seconds) ]
-
- ffmpeg_command += [ "-i", source_path ]
-
- encoder = settings_item_entry(shot, "audio-encoder")
- if encoder:
- ffmpeg_command += [ "-c:a", encoder ]
-
- filters = settings_item_entry(shot, "audio-filters")
- if filters:
- ffmpeg_command += [ "-filter:a", ",".join(filters) ]
-
- ffmpeg_command += [ output_path ]
-
- return ffmpeg_command
-
-def audio_register(sequence, shot, take, part):
- source_path = audio_source_path(sequence, shot, take)
-
- if not source_path or not os.path.isfile(source_path):
- return
-
- if "audio-registry" not in sequence:
- sequence["audio-registry"] = {}
-
- name = audio_output_name(sequence, shot, take, part)
- if name in sequence["audio-registry"]:
- sequence["audio-registry"][name]["count"] += 1
- else:
- sequence["audio-registry"][name] = { "count": 1 , "index": 1}
-
-def audio_process(sequence, shot, take, part):
- source_path = audio_source_path(sequence, shot, take)
- output_path = audio_output_path(sequence, shot, take, part)
-
- if not source_path or not output_path:
- return
-
- if not os.path.isfile(source_path):
- print(" - audio-source: unavailable")
- return
-
- print(" - audio-source: " + audio_source_name(sequence, shot, take))
- print(" - audio-output: " + audio_output_name(sequence, shot, take, part))
-
- command = audio_ffmpeg_command(sequence, shot, take, part)
-
- audio_output_prepare(sequence, shot, take, part)
-
- if selection_skip(take) or selection_skip(part):
- return
-
- print(" ".join(command))
-
- if os.path.isfile(output_path):
- return
-
- subprocess.call(command)
-
-# video
-
-def video_source_suffix(shot):
- extension = settings_item_entry(shot, "video-extension") or settings_item_entry(shot, "video-source-extension")
-
- return "." + extension
-
-def video_source_base(sequence, shot):
- base = settings_item_entry(shot, "video-source-path")
-
- if "label" in sequence:
- base = os.path.join(base, sequence["label"])
-
- return base
-
-def video_source_name(sequence, shot, take):
- name = shot_name(shot)
-
- if "index" in take:
- index = take["index"]
- name += "-" + str(index)
-
- return name
-
-def video_source_path(sequence, shot, take):
- base = video_source_base(sequence, shot)
- name = video_source_name(sequence, shot, take)
- suffix = video_source_suffix(shot)
-
- return os.path.join(base, name + suffix)
-
-def video_output_suffix(shot):
- extension = settings_item_entry(shot, "video-extension") or settings_item_entry(shot, "video-output-extension")
-
- return "." + extension
-
-def video_output_base(sequence, shot):
- base = settings_item_entry(shot, "video-output-path")
-
- if "label" in sequence:
- base = os.path.join(base, sequence["label"])
-
- return base
-
-def video_output_name(sequence, shot, take, part):
- name = shot_name(shot) + "-"
-
- part_name_part = part_name(part)
- if part_name_part:
- name += part_name_part + "-"
-
- # Remove final separator.
- name = name[:-1]
-
- return name
-
-def video_output_prepare(sequence, shot, take, part):
- base = video_output_base(sequence, shot)
- name = video_output_name(sequence, shot, take, part)
-
- try:
- os.stat(base)
- except:
- os.makedirs(base, exist_ok = True)
-
- if name in sequence["video-registry"]:
- if sequence["video-registry"][name]["count"] > 1:
- sequence["video-registry"][name]["index"] += 1
-
-def video_output_path(sequence, shot, take, part):
- base = video_output_base(sequence, shot)
- name = video_output_name(sequence, shot, take, part)
- suffix = video_output_suffix(shot)
-
- if name in sequence["video-registry"]:
- if sequence["video-registry"][name]["count"] > 1:
- name += "-" + str(sequence["video-registry"][name]["index"])
-
- return os.path.join(base, name + suffix)
-
-def video_ffmpeg_command(sequence, shot, take, part):
- source_path = video_source_path(sequence, shot, take)
- output_path = video_output_path(sequence, shot, take, part)
- early_cut_safe = True
-
- ffmpeg_command = [ "ffmpeg" ]
-
- encoder = settings_item_entry(shot, "video-encoder")
-
- # Encoding is needed to cut at arbitrary positions, otherwise we
- # might miss reference frames in the bitstream.
- if encoder and encoder == "copy":
- early_cut_safe = False
-
- if not early_cut_safe:
- ffmpeg_command += [ "-i", source_path ]
-
- video_start = settings_item_entry(shot, "video-start")or settings_item_entry(part, "video-start")
- if video_start:
- ffmpeg_command += [ "-ss", video_start ]
-
- video_stop = settings_item_entry(shot, "video-stop") or settings_item_entry(part, "video-stop")
- if video_stop:
- ffmpeg_command += [ "-to", video_stop ]
-
- if early_cut_safe:
- ffmpeg_command += [ "-i", source_path ]
-
- width = settings_item_entry(shot, "video-width")
- height = settings_item_entry(shot, "video-height")
-
- if width and height:
- ffmpeg_command += [ "-s", str(width)+"x"+str(height) ]
-
- frame_rate = settings_item_entry(shot, "video-frame-rate")
- if frame_rate:
- ffmpeg_command += [ "-r", str(frame_rate) ]
-
- encoder = settings_item_entry(shot, "video-encoder")
- if encoder:
- ffmpeg_command += [ "-c:v", encoder ]
-
- crf = settings_item_entry(shot, "video-crf")
- if crf:
- ffmpeg_command += [ "-crf", str(crf) ]
-
- quality = settings_item_entry(shot, "video-quality")
- if quality:
- ffmpeg_command += [ "-q:v", str(quality) ]
-
- rate_max = settings_item_entry(shot, "video-rate-max")
- if rate_max:
- ffmpeg_command += [ "-maxrate", rate_max ]
- ffmpeg_command += [ "-bufsize", "2M" ]
-
- filters = settings_item_entry(shot, "video-filters")
- if filters:
- ffmpeg_command += [ "-filter:v", ",".join(filters) ]
-
- audio = settings_item_entry(shot, "video-audio")
- if audio == "remove":
- ffmpeg_command += [ "-an" ]
-
- ffmpeg_command += [ output_path ]
-
- return ffmpeg_command
-
-def video_register(sequence, shot, take, part):
- source_path = video_source_path(sequence, shot, take)
-
- if not source_path or not os.path.isfile(source_path):
- return
-
- if "video-registry" not in sequence:
- sequence["video-registry"] = {}
-
- name = video_output_name(sequence, shot, take, part)
- if name in sequence["video-registry"]:
- sequence["video-registry"][name]["count"] += 1
- else:
- sequence["video-registry"][name] = { "count": 1 , "index": 1}
-
-def video_process(sequence, shot, take, part):
- source_path = video_source_path(sequence, shot, take)
- output_path = video_output_path(sequence, shot, take, part)
-
- if not source_path or not os.path.isfile(source_path):
- print(" - video-source: unavailable")
- return
-
- print(" - video-source: " + video_source_name(sequence, shot, take))
- print(" - video-output: " + video_output_name(sequence, shot, take, part))
-
- command = video_ffmpeg_command(sequence, shot, take, part)
-
- video_output_prepare(sequence, shot, take, part)
-
- if selection_skip(take) or selection_skip(part):
- return
-
- print(" ".join(command))
-
- if os.path.isfile(output_path):
- return
-
- subprocess.call(command)
-
-# media
-
-def media_register(sequence, shot, take, part):
- audio_register(sequence, shot, take, part)
- video_register(sequence, shot, take, part)
-
-def media_process(sequence, shot, take, part):
- audio_process(sequence, shot, take, part)
- video_process(sequence, shot, take, part)
-
-# part
-
-def part_name(part):
- name = ""
-
- if "subject" in part:
- name += part["subject"] + "-"
-
- if "label" in part:
- name += part["label"] + "-"
-
- if "actions" in part:
- actions = part["actions"]
-
- if len(actions) > 2:
- actions = actions[:1] + actions[-1:]
-
- for action in actions:
- name += action + "-"
-
- if len(name) == 0:
- return None
-
- # Remove final separator.
- name = name[:-1]
-
- return name
-
-# take
-
-def take_register(sequence, shot, take):
- if "parts" in take:
- for part in take["parts"]:
- media_register(sequence, shot, take, part)
- else:
- part = {}
- media_register(sequence, shot, take, part)
-
-def take_process(sequence, shot, take):
- if "parts" in take:
- for part in take["parts"]:
- media_process(sequence, shot, take, part)
- else:
- part = {}
- media_process(sequence, shot, take, part)
-
-# shot
-
-def shot_name(shot):
- name = ""
-
- if "value" in shot:
- name += shot["value"] + "-"
-
- if "position" in shot:
- name += shot["position"] + "-"
-
- if "subject" in shot:
- name += shot["subject"] + "-"
-
- if "label" in shot:
- name += shot["label"] + "-"
-
- if len(name) == 0:
- return None
-
- # Remove final separator.
- name = name[:-1]
-
- return name
-
-def shot_process(sequence, shot):
- print("- shot: " + shot_name(shot))
-
- if "takes" in shot:
- for take in shot["takes"]:
- take_process(sequence, shot, take)
- else:
- take = {}
- take_process(sequence, shot, take)
-
-def shot_register(sequence, shot):
- if "takes" in shot:
- for take in shot["takes"]:
- take_register(sequence, shot, take)
- else:
- take = {}
- take_register(sequence, shot, take)
-
-def sequence_process(sequence):
- if "label" in sequence:
- print("# sequence: " + sequence["label"] + "\n")
-
- for shot in sequence["shots"]:
- shot_process(sequence, shot)
-
- print("")
-
-def sequence_register(sequence):
- for shot in sequence["shots"]:
- shot_register(sequence, shot)
-
-# capture
-
-def capture_process(sequences):
- for sequence in sequences:
- sequence_register(sequence)
-
- for sequence in sequences:
- sequence_process(sequence)
-
-def capture_main():
- global settings
-
- if len(sys.argv) < 2:
- return 1
-
- path = sys.argv[1]
-
- stream = open(path, 'r')
- data = yaml.load(stream, Loader = yaml.SafeLoader)
- stream.close()
-
- settings = data["settings"]
-
- capture_process(data["sequences"])
-
-if __name__ == "__main__":
- capture_main()