Files
Bjornulf_custom_nodes/ffmpeg_images_to_video_path.py
justumen 10263f2110 0.76
2025-02-27 18:00:12 +01:00

208 lines
8.0 KiB
Python

import os
import uuid
import subprocess
import tempfile
import torch
import numpy as np
from PIL import Image
import wave
import json
import ffmpeg
class ImagesListToVideo:
@classmethod
def INPUT_TYPES(s):
return {
"required": {
"images": ("IMAGE",),
"fps": ("FLOAT", {"default": 25, "min": 1, "max": 120, "step": 0.01}),
},
"optional": {
"audio_path": ("STRING", {"forceInput": True}),
"audio": ("AUDIO", {"default": None}),
"FFMPEG_CONFIG_JSON": ("STRING", {"forceInput": True}),
}
}
RETURN_TYPES = ("STRING",)
RETURN_NAMES = ("video_path",)
FUNCTION = "images_to_video"
CATEGORY = "Bjornulf"
def parse_ffmpeg_config(self, config_json):
if not config_json:
return None
try:
return json.loads(config_json)
except json.JSONDecodeError:
print("Invalid FFmpeg configuration JSON")
return None
def build_ffmpeg_command(self, input_pattern, output_path, fps, config=None):
if not config:
return [
"ffmpeg",
"-framerate", str(fps),
"-i", input_pattern,
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-crf", "19",
"-y"
]
cmd = [config["ffmpeg"]["path"]] if config["ffmpeg"]["path"] else ["ffmpeg"]
# Handle framerate - use force_fps if enabled
cmd.extend(["-framerate", str(config["video"]["fps"]["force_fps"] if config["video"]["fps"]["enabled"] else fps)])
cmd.extend(["-i", input_pattern])
# Video codec settings
codec = config["video"]["codec"]
if codec not in [None, "None", "copy"]:
cmd.extend(["-c:v", codec])
# Pixel format
pixel_format = config["video"]["pixel_format"]
if pixel_format not in [None, "None"]:
cmd.extend(["-pix_fmt", pixel_format])
# Preset
preset = config["video"]["preset"]
if preset not in [None, "None"]:
cmd.extend(["-preset", preset])
# Handle bitrate mode - static or CRF
if config["video"]["bitrate_mode"] == "static" and config["video"]["bitrate"]:
cmd.extend(["-b:v", config["video"]["bitrate"]])
else:
crf_value = config["video"]["crf"]
if crf_value is not None:
cmd.extend(["-crf", str(crf_value)])
# Resolution change if enabled
if config["video"]["resolution"]:
width = config["video"]["resolution"]["width"]
height = config["video"]["resolution"]["height"]
if width > 0 and height > 0:
cmd.extend(["-s", f"{width}x{height}"])
# Special handling for WebM transparency if enabled
if config["output"]["container_format"] == "webm" and config["video"]["force_transparency_webm"]:
cmd.extend(["-auto-alt-ref", "0"])
return cmd
def images_to_video(self, images, fps=30, audio_path="", audio=None, FFMPEG_CONFIG_JSON=None):
config = self.parse_ffmpeg_config(FFMPEG_CONFIG_JSON)
output_dir = os.path.join("Bjornulf", "images_to_video")
os.makedirs(output_dir, exist_ok=True)
# Determine output format from config
output_format = "mp4"
if config and config["output"]["container_format"] not in [None, "None"]:
output_format = config["output"]["container_format"]
video_filename = f"video_{uuid.uuid4().hex}.{output_format}"
video_path = os.path.join(output_dir, video_filename)
with tempfile.TemporaryDirectory() as temp_dir:
# Save frames as images
for i, img in enumerate(images):
img_np = self.convert_to_numpy(img)
if img_np.shape[-1] != 3:
img_np = self.convert_to_rgb(img_np)
img_pil = Image.fromarray(img_np)
img_path = os.path.join(temp_dir, f"frame_{i:05d}.png")
img_pil.save(img_path)
input_pattern = os.path.join(temp_dir, "frame_%05d.png")
ffmpeg_cmd = self.build_ffmpeg_command(input_pattern, video_path, fps, config)
# Handle audio based on config
temp_audio_path = None
audio_enabled = not (config and config["audio"]["enabled"] == False)
if audio_enabled:
if audio is not None and isinstance(audio, dict):
waveform = audio['waveform'].numpy().squeeze()
sample_rate = audio['sample_rate']
temp_audio_path = os.path.join(temp_dir, "temp_audio.wav")
self.write_wav(temp_audio_path, waveform, sample_rate)
elif audio_path and os.path.isfile(audio_path):
temp_audio_path = audio_path
if temp_audio_path:
# First create video without audio
temp_video = os.path.join(temp_dir, "temp_video.mp4")
temp_cmd = ffmpeg_cmd + ["-y", temp_video]
try:
subprocess.run(temp_cmd, check=True, capture_output=True, text=True)
# Now add audio
audio_cmd = [
config["ffmpeg"]["path"] if config and config["ffmpeg"]["path"] else "ffmpeg",
"-i", temp_video,
"-i", temp_audio_path,
"-c:v", "copy"
]
# Audio codec settings from config
if config and config["audio"]["codec"] not in [None, "None", "copy"]:
audio_cmd.extend(["-c:a", config["audio"]["codec"]])
else:
audio_cmd.extend(["-c:a", "aac"])
# Audio bitrate
if config and config["audio"]["bitrate"]:
audio_cmd.extend(["-b:a", config["audio"]["bitrate"]])
audio_cmd.extend(["-shortest", "-y", video_path])
subprocess.run(audio_cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print(f"FFmpeg error: {e.stderr}")
return ("",)
else:
# Just create video without audio
ffmpeg_cmd.append("-y")
ffmpeg_cmd.append(video_path)
try:
subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print(f"FFmpeg error: {e.stderr}")
return ("",)
return (video_path,)
def write_wav(self, file_path, audio_data, sample_rate):
with wave.open(file_path, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
audio_data = np.int16(audio_data * 32767)
wav_file.writeframes(audio_data.tobytes())
def convert_to_numpy(self, img):
if isinstance(img, torch.Tensor):
img = img.cpu().numpy()
if img.dtype == np.uint8:
return img
elif img.dtype == np.float32 or img.dtype == np.float64:
return (img * 255).astype(np.uint8)
else:
raise ValueError(f"Unsupported data type: {img.dtype}")
def convert_to_rgb(self, img):
if img.shape[-1] == 1:
return np.repeat(img, 3, axis=-1)
elif img.shape[-1] == 768:
img = img.reshape((-1, 3))
img = (img - img.min()) / (img.max() - img.min())
img = (img * 255).astype(np.uint8)
return img.reshape((img.shape[0], -1, 3))
elif len(img.shape) == 2:
return np.stack([img, img, img], axis=-1)
else:
raise ValueError(f"Unsupported image shape: {img.shape}")