Files
Bjornulf_custom_nodes/video_preview.py
justumen 39dfb0220a 0.77
2025-03-19 17:36:25 +01:00

133 lines
5.6 KiB
Python

import os
import shutil
import time
import hashlib
from pathlib import Path
import subprocess
import numpy as np
import cv2
import tempfile
# Supported extensions for video inputs
SUPPORTED_VIDEO_EXTENSIONS = {'.mp4', '.webm', '.ogg', '.mov', '.mkv'}
class VideoPreview:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"fps_for_IMAGES": ("FLOAT", {"default": 24.0, "min": 1.0, "max": 60.0}),
"autoplay": ("BOOLEAN", {"default": False}),
"mute": ("BOOLEAN", {"default": True}),
"loop": ("BOOLEAN", {"default": False}),
},
"optional": {
"video_path": ("STRING", {"forceInput": True, "default": ""}),
"IMAGES": ("IMAGE", {"default": None}),
}
}
RETURN_TYPES = ()
FUNCTION = "preview_video"
CATEGORY = "Bjornulf"
OUTPUT_NODE = True
def preview_video(self, fps_for_IMAGES, autoplay, mute, loop, video_path="", IMAGES=None):
try:
# Destination directory for preview videos
dest_dir = os.path.join("output", "Bjornulf", "preview_video")
os.makedirs(dest_dir, exist_ok=True)
# Determine which input is provided
if video_path and isinstance(video_path, str) and video_path.strip():
video_path = os.path.abspath(video_path)
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
ext = Path(video_path).suffix.lower() # e.g., '.mp4', '.webm'
if ext not in SUPPORTED_VIDEO_EXTENSIONS:
raise ValueError(f"Unsupported video format: {ext}. Supported formats: {', '.join(SUPPORTED_VIDEO_EXTENSIONS)}")
final_video_path = video_path
# Generate unique filename with original extension
file_hash = hashlib.md5(open(final_video_path, 'rb').read()).hexdigest()[:8]
timestamp = int(time.time())
base_name = "video_preview" # More descriptive than "image_sequence"
dest_name = f"{base_name}_{timestamp}_{file_hash}{ext}" # Keeps original extension
dest_path = os.path.join(dest_dir, dest_name)
shutil.copy2(final_video_path, dest_path)
print(f"Video copied to: {dest_path}")
elif IMAGES is not None and len(IMAGES) > 0:
# Use a unique temporary directory for this run
with tempfile.TemporaryDirectory(prefix="bjornulf_temp_video_") as temp_dir:
# Convert image tensors to files in the unique temp directory
image_files = []
for i, img_tensor in enumerate(IMAGES):
# Convert tensor (H, W, C) in range [0, 1] to numpy array in range [0, 255]
img_np = (img_tensor.numpy() * 255).astype(np.uint8)
# Ensure RGB format for OpenCV (ComfyUI IMAGES are typically RGB)
img_path = os.path.join(temp_dir, f"frame_{i:04d}.png")
cv2.imwrite(img_path, cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR))
image_files.append(img_path)
if not image_files:
raise ValueError("No valid IMAGES provided to create a video.")
# Create temporary video using FFmpeg
output_video = os.path.join(temp_dir, "temp_video.mp4")
pattern = os.path.join(temp_dir, "frame_%04d.png")
cmd = [
"ffmpeg",
"-framerate", str(fps_for_IMAGES),
"-i", pattern,
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-y", # Overwrite output file if it exists
output_video
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if not os.path.exists(output_video):
raise RuntimeError("Failed to create temporary video from IMAGES.")
final_video_path = output_video
# Generate unique destination filename
file_hash = hashlib.md5(open(final_video_path, 'rb').read()).hexdigest()[:8]
timestamp = int(time.time())
base_name = "image_sequence"
dest_name = f"{base_name}_{timestamp}_{file_hash}.mp4"
dest_path = os.path.join(dest_dir, dest_name)
# Copy the video to the preview directory
if not os.path.exists(dest_path):
shutil.copy2(final_video_path, dest_path)
else:
raise ValueError("Either 'video_path' or 'IMAGES' must be provided.")
# Successful return with video data
return {
"ui": {
"video": [dest_name, "Bjornulf/preview_video"],
"metadata": {
"width": 512,
"height": 512,
"autoplay": autoplay,
"mute": mute,
"loop": loop
}
}
}
except Exception as e:
# Error case: return an empty list for "video" to prevent iteration error
return {
"ui": {
"video": [], # Changed from None to []
"error": str(e)
}
}