fix(metadata): trace conditioning provenance for prompts

This commit is contained in:
Will Miao
2026-04-23 14:41:54 +08:00
parent 2eef629821
commit ebdbb36271
3 changed files with 494 additions and 38 deletions

View File

@@ -352,50 +352,101 @@ class MetadataProcessor:
# Check if we have stored conditioning objects for this sampler
if sampler_id in metadata.get(PROMPTS, {}) and (
"pos_conditioning" in metadata[PROMPTS][sampler_id] or
"neg_conditioning" in metadata[PROMPTS][sampler_id]):
"pos_conditioning" in metadata[PROMPTS][sampler_id] or
"neg_conditioning" in metadata[PROMPTS][sampler_id]
):
pos_conditioning = metadata[PROMPTS][sampler_id].get("pos_conditioning")
neg_conditioning = metadata[PROMPTS][sampler_id].get("neg_conditioning")
# Helper function to recursively find prompt text for a conditioning object
def find_prompt_text_for_conditioning(conditioning_obj, is_positive=True):
def extend_unique(target, values):
for value in values:
if value and value not in target:
target.append(value)
# Helper function to recursively find prompt texts for a conditioning object.
# Transform nodes can map one output conditioning to multiple source conditionings.
def find_prompt_texts_for_conditioning(
conditioning_obj, is_positive=True, visited=None
):
if conditioning_obj is None:
return ""
return []
if visited is None:
visited = set()
conditioning_id = id(conditioning_obj)
if conditioning_id in visited:
return []
visited.add(conditioning_id)
prompt_texts = []
# Try to match conditioning objects with those stored by extractors
for prompt_node_id, prompt_data in metadata[PROMPTS].items():
# For nodes with single conditioning output
if "conditioning" in prompt_data:
if id(prompt_data["conditioning"]) == id(conditioning_obj):
return prompt_data.get("text", "")
# For nodes with separate pos_conditioning and neg_conditioning outputs (like TSC_EfficientLoader)
if is_positive and "positive_encoded" in prompt_data:
if id(prompt_data["positive_encoded"]) == id(conditioning_obj):
if "positive_text" in prompt_data:
return prompt_data["positive_text"]
else:
orig_conditioning = prompt_data.get("orig_pos_cond", None)
if orig_conditioning is not None:
# Recursively find the prompt text for the original conditioning
return find_prompt_text_for_conditioning(orig_conditioning, is_positive=True)
if not is_positive and "negative_encoded" in prompt_data:
if id(prompt_data["negative_encoded"]) == id(conditioning_obj):
if "negative_text" in prompt_data:
return prompt_data["negative_text"]
else:
orig_conditioning = prompt_data.get("orig_neg_cond", None)
if orig_conditioning is not None:
# Recursively find the prompt text for the original conditioning
return find_prompt_text_for_conditioning(orig_conditioning, is_positive=False)
return ""
if not isinstance(prompt_data, dict):
continue
# For CLIP text nodes with a single conditioning output.
if id(prompt_data.get("conditioning")) == conditioning_id:
text = prompt_data.get("text", "")
if text:
extend_unique(prompt_texts, [text])
# Generic provenance for passthrough/transform/combine nodes.
for source in prompt_data.get("conditioning_sources", []):
if id(source.get("output")) != conditioning_id:
continue
for input_conditioning in source.get("inputs", []):
extend_unique(
prompt_texts,
find_prompt_texts_for_conditioning(
input_conditioning, is_positive, visited
),
)
# For nodes with separate pos_conditioning and neg_conditioning outputs
# like TSC_EfficientLoader and existing ControlNet-style metadata.
if (
is_positive
and id(prompt_data.get("positive_encoded")) == conditioning_id
):
if prompt_data.get("positive_text"):
extend_unique(prompt_texts, [prompt_data["positive_text"]])
else:
extend_unique(
prompt_texts,
find_prompt_texts_for_conditioning(
prompt_data.get("orig_pos_cond"),
is_positive=True,
visited=visited,
),
)
if (
not is_positive
and id(prompt_data.get("negative_encoded")) == conditioning_id
):
if prompt_data.get("negative_text"):
extend_unique(prompt_texts, [prompt_data["negative_text"]])
else:
extend_unique(
prompt_texts,
find_prompt_texts_for_conditioning(
prompt_data.get("orig_neg_cond"),
is_positive=False,
visited=visited,
),
)
return prompt_texts
# Find prompt texts using the helper function
result["prompt"] = find_prompt_text_for_conditioning(pos_conditioning, is_positive=True)
result["negative_prompt"] = find_prompt_text_for_conditioning(neg_conditioning, is_positive=False)
result["prompt"] = ", ".join(
find_prompt_texts_for_conditioning(pos_conditioning, is_positive=True)
)
result["negative_prompt"] = ", ".join(
find_prompt_texts_for_conditioning(neg_conditioning, is_positive=False)
)
return result