fix(csp): support CivitAI CDN subdomains for example images (#822)

- Update CSP whitelist to use wildcard *.civitai.com for all CDN subdomains
- Fix hostname parsing to use parsed.hostname instead of parsed.netloc (handles ports)
- Update rewrite_preview_url() to support all CivitAI CDN subdomains
- Update rewriteCivitaiUrl() frontend function to support subdomains
- Add comprehensive tests for edge cases (ports, subdomains, invalid URLs)
- Add security note explaining wildcard CSP design decision

Fixes CSP blocking of images from image-b2.civitai.com and other CDN subdomains
This commit is contained in:
Will Miao
2026-04-03 09:40:15 +08:00
parent 05636712f0
commit 30db8c3d1d
6 changed files with 236 additions and 17 deletions

View File

@@ -4,15 +4,21 @@ from typing import Awaitable, Callable, Dict, List
from aiohttp import web from aiohttp import web
# Use wildcard for CivitAI to support their CDN subdomains (e.g., image-b2.civitai.com)
# Security note: This is acceptable because:
# 1. CSP img-src only controls image/video loading, not script execution
# 2. All *.civitai.com subdomains are controlled by Civitai
# 3. Explicit domain list would require constant updates as Civitai adds CDN nodes
REMOTE_MEDIA_SOURCES = ( REMOTE_MEDIA_SOURCES = (
"https://image.civitai.com", "https://*.civitai.com",
"https://img.genur.art", "https://img.genur.art",
) )
@web.middleware @web.middleware
async def relax_csp_for_remote_media( async def relax_csp_for_remote_media(
request: web.Request, handler: Callable[[web.Request], Awaitable[web.StreamResponse]] request: web.Request,
handler: Callable[[web.Request], Awaitable[web.StreamResponse]],
) -> web.StreamResponse: ) -> web.StreamResponse:
"""Allow LoRA Manager media previews to load from trusted remote domains. """Allow LoRA Manager media previews to load from trusted remote domains.
@@ -43,7 +49,9 @@ async def relax_csp_for_remote_media(
directive_order.append(name) directive_order.append(name)
directives[name] = values directives[name] = values
def merge_sources(name: str, sources: List[str], defaults: List[str] | None = None) -> None: def merge_sources(
name: str, sources: List[str], defaults: List[str] | None = None
) -> None:
existing = directives.get(name, list(defaults or [])) existing = directives.get(name, list(defaults or []))
for source in sources: for source in sources:

View File

@@ -22,7 +22,9 @@ def _normalize_commercial_values(value: Any) -> Sequence[str]:
def _split_aggregate(value_str: str) -> list[str]: def _split_aggregate(value_str: str) -> list[str]:
stripped = value_str.strip() stripped = value_str.strip()
looks_aggregate = "," in stripped or (stripped.startswith("{") and stripped.endswith("}")) looks_aggregate = "," in stripped or (
stripped.startswith("{") and stripped.endswith("}")
)
if not looks_aggregate: if not looks_aggregate:
return [value_str] return [value_str]
@@ -141,14 +143,18 @@ def build_license_flags(payload: Mapping[str, Any] | None) -> int:
return flags return flags
def resolve_license_info(model_data: Mapping[str, Any] | None) -> tuple[Dict[str, Any], int]: def resolve_license_info(
model_data: Mapping[str, Any] | None,
) -> tuple[Dict[str, Any], int]:
"""Return normalized license payload and its encoded bitset.""" """Return normalized license payload and its encoded bitset."""
payload = resolve_license_payload(model_data) payload = resolve_license_payload(model_data)
return payload, build_license_flags(payload) return payload, build_license_flags(payload)
def rewrite_preview_url(source_url: str | None, media_type: str | None = None) -> tuple[str | None, bool]: def rewrite_preview_url(
source_url: str | None, media_type: str | None = None
) -> tuple[str | None, bool]:
"""Rewrite Civitai preview URLs to use optimized renditions. """Rewrite Civitai preview URLs to use optimized renditions.
Args: Args:
@@ -168,7 +174,12 @@ def rewrite_preview_url(source_url: str | None, media_type: str | None = None) -
except ValueError: except ValueError:
return source_url, False return source_url, False
if parsed.netloc.lower() != "image.civitai.com": hostname = parsed.hostname
if hostname is None:
return source_url, False
hostname = hostname.lower()
if hostname == "civitai.com" or not hostname.endswith(".civitai.com"):
return source_url, False return source_url, False
replacement = "/width=450,optimized=true" replacement = "/width=450,optimized=true"

View File

@@ -30,8 +30,9 @@ export function rewriteCivitaiUrl(sourceUrl, mediaType = null, mode = Optimizati
try { try {
const url = new URL(sourceUrl); const url = new URL(sourceUrl);
// Check if it's a CivitAI image domain // Check if it's a CivitAI CDN domain (supports all subdomains like image-b2.civitai.com)
if (url.hostname.toLowerCase() !== 'image.civitai.com') { const hostname = url.hostname.toLowerCase();
if (hostname === 'civitai.com' || !hostname.endsWith('.civitai.com')) {
return [sourceUrl, false]; return [sourceUrl, false];
} }
@@ -112,7 +113,8 @@ export function isCivitaiUrl(url) {
if (!url) return false; if (!url) return false;
try { try {
const parsed = new URL(url); const parsed = new URL(url);
return parsed.hostname.toLowerCase() === 'image.civitai.com'; const hostname = parsed.hostname.toLowerCase();
return hostname.endsWith('.civitai.com') && hostname !== 'civitai.com';
} catch (e) { } catch (e) {
return false; return false;
} }

View File

@@ -94,6 +94,37 @@ describe('civitaiUtils', () => {
expect(wasRewritten).toBe(false); expect(wasRewritten).toBe(false);
expect(rewritten).toBe('not-a-valid-url'); expect(rewritten).toBe('not-a-valid-url');
}); });
it('should rewrite URLs from CivitAI CDN subdomains', () => {
const originalUrl = 'https://image-b2.civitai.com/file/civitai-media-cache/original=true/sample.png';
const [rewritten, wasRewritten] = rewriteCivitaiUrl(originalUrl, 'image', OptimizationMode.THUMBNAIL);
expect(wasRewritten).toBe(true);
expect(rewritten).toBe('https://image-b2.civitai.com/file/civitai-media-cache/width=450,optimized=true/sample.png');
});
it('should handle URLs with explicit port numbers', () => {
const originalUrl = 'https://image.civitai.com:443/checkpoints/original=true/test.png';
const [rewritten, wasRewritten] = rewriteCivitaiUrl(originalUrl, 'image', OptimizationMode.THUMBNAIL);
expect(wasRewritten).toBe(true);
// JavaScript URL.toString() removes default HTTPS port (443)
expect(rewritten).toBe('https://image.civitai.com/checkpoints/width=450,optimized=true/test.png');
});
it('should handle case-insensitive hostnames', () => {
const testCases = [
'https://IMAGE.CIVITAI.COM/original=true/test.png',
'https://Image.Civitai.Com/original=true/test.png',
'https://image-b2.CIVITAI.com/original=true/test.png',
];
for (const url of testCases) {
const [rewritten, wasRewritten] = rewriteCivitaiUrl(url, 'image', OptimizationMode.THUMBNAIL);
expect(wasRewritten).toBe(true);
expect(rewritten).toContain('width=450,optimized=true');
}
});
}); });
describe('getOptimizedUrl', () => { describe('getOptimizedUrl', () => {
@@ -157,6 +188,23 @@ describe('civitaiUtils', () => {
expect(isCivitaiUrl('https://image.civitai.com/')).toBe(true); expect(isCivitaiUrl('https://image.civitai.com/')).toBe(true);
}); });
it('should return true for CivitAI CDN subdomains', () => {
expect(isCivitaiUrl('https://image-b2.civitai.com/file/test.png')).toBe(true);
expect(isCivitaiUrl('https://image-b3.civitai.com/test.jpg')).toBe(true);
expect(isCivitaiUrl('https://cdn.civitai.com/test.png')).toBe(true);
});
it('should return true for CivitAI URLs with explicit ports', () => {
expect(isCivitaiUrl('https://image.civitai.com:443/test.png')).toBe(true);
expect(isCivitaiUrl('https://image-b2.civitai.com:443/file/test.jpg')).toBe(true);
});
it('should handle case-insensitive hostnames', () => {
expect(isCivitaiUrl('https://IMAGE.CIVITAI.COM/test.png')).toBe(true);
expect(isCivitaiUrl('https://Image.Civitai.Com/test.png')).toBe(true);
expect(isCivitaiUrl('https://image-b2.CIVITAI.com/test.png')).toBe(true);
});
it('should return false for non-CivitAI URLs', () => { it('should return false for non-CivitAI URLs', () => {
expect(isCivitaiUrl('https://example.com/image.jpg')).toBe(false); expect(isCivitaiUrl('https://example.com/image.jpg')).toBe(false);
expect(isCivitaiUrl('https://civitai.com/image.jpg')).toBe(false); expect(isCivitaiUrl('https://civitai.com/image.jpg')).toBe(false);

View File

@@ -2,7 +2,10 @@ import pytest
from aiohttp import web from aiohttp import web
from aiohttp.test_utils import make_mocked_request from aiohttp.test_utils import make_mocked_request
from py.middleware.csp_middleware import REMOTE_MEDIA_SOURCES, relax_csp_for_remote_media from py.middleware.csp_middleware import (
REMOTE_MEDIA_SOURCES,
relax_csp_for_remote_media,
)
DEFAULT_CSP = ( DEFAULT_CSP = (
"default-src 'self'; " "default-src 'self'; "
@@ -40,7 +43,9 @@ async def _invoke_middleware(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_relax_csp_appends_remote_sources_and_preserves_existing_directives() -> None: async def test_relax_csp_appends_remote_sources_and_preserves_existing_directives() -> (
None
):
response = await _invoke_middleware("/some-path", web.Response()) response = await _invoke_middleware("/some-path", web.Response())
header_value = response.headers.get("Content-Security-Policy") header_value = response.headers.get("Content-Security-Policy")
assert header_value is not None assert header_value is not None
@@ -48,16 +53,17 @@ async def test_relax_csp_appends_remote_sources_and_preserves_existing_directive
directives = _parse_directives(header_value) directives = _parse_directives(header_value)
# Existing directives remain intact # Existing directives remain intact
assert directives["script-src"] == ["'self'", "'unsafe-inline'", "'unsafe-eval'", "blob:"] assert directives["script-src"] == [
"'self'",
"'unsafe-inline'",
"'unsafe-eval'",
"blob:",
]
assert directives["img-src"][:3] == ["'self'", "data:", "blob:"] assert directives["img-src"][:3] == ["'self'", "data:", "blob:"]
# Remote media hosts are added once to the relevant directives # Remote media hosts are added once to the relevant directives
for source in REMOTE_MEDIA_SOURCES: for source in REMOTE_MEDIA_SOURCES:
assert source in directives["img-src"] assert source in directives["img-src"]
assert "media-src" in directives
assert directives["media-src"][0] == "'self'"
for source in REMOTE_MEDIA_SOURCES:
assert source in directives["media-src"] assert source in directives["media-src"]

View File

@@ -0,0 +1,144 @@
"""Tests for CivitAI URL utilities."""
import pytest
from py.utils.civitai_utils import rewrite_preview_url
class TestRewritePreviewUrl:
"""Test cases for rewrite_preview_url function."""
def test_handles_none_input(self):
"""Should return (None, False) for None input."""
result, was_rewritten = rewrite_preview_url(None)
assert result is None
assert was_rewritten is False
def test_handles_empty_string(self):
"""Should return (empty_string, False) for empty input."""
result, was_rewritten = rewrite_preview_url("")
assert result == ""
assert was_rewritten is False
def test_handles_invalid_url(self):
"""Should return original URL and False for invalid URLs."""
invalid_url = "not-a-valid-url"
result, was_rewritten = rewrite_preview_url(invalid_url)
assert result == invalid_url
assert was_rewritten is False
def test_handles_url_without_scheme(self):
"""Should return original URL and False for URLs without scheme."""
url = "image.civitai.com/something"
result, was_rewritten = rewrite_preview_url(url)
assert result == url
assert was_rewritten is False
def test_returns_false_for_non_civitai_domains(self):
"""Should not rewrite URLs from other domains."""
url = "https://example.com/image.jpg"
result, was_rewritten = rewrite_preview_url(url)
assert result == url
assert was_rewritten is False
def test_returns_false_for_main_civitai_domain(self):
"""Should not rewrite URLs from main civitai.com domain."""
url = "https://civitai.com/images/123"
result, was_rewritten = rewrite_preview_url(url)
assert result == url
assert was_rewritten is False
def test_rewrites_image_civitai_com_urls(self):
"""Should rewrite URLs from image.civitai.com."""
url = "https://image.civitai.com/checkpoints/original=true"
result, was_rewritten = rewrite_preview_url(url, "image")
assert (
result == "https://image.civitai.com/checkpoints/width=450,optimized=true"
)
assert was_rewritten is True
def test_rewrites_subdomain_civitai_urls(self):
"""Should rewrite URLs from CivitAI CDN subdomains like image-b2.civitai.com."""
url = "https://image-b2.civitai.com/file/civitai-media-cache/original=true/sample.png"
result, was_rewritten = rewrite_preview_url(url, "image")
assert (
result
== "https://image-b2.civitai.com/file/civitai-media-cache/width=450,optimized=true/sample.png"
)
assert was_rewritten is True
def test_rewrites_multiple_subdomains(self):
"""Should rewrite URLs from various CivitAI subdomains."""
test_cases = [
"https://image-b3.civitai.com/original=true/test.jpg",
"https://cdn.civitai.com/original=true/test.png",
]
for url in test_cases:
result, was_rewritten = rewrite_preview_url(url, "image")
assert was_rewritten is True
assert "width=450,optimized=true" in result
def test_handles_urls_with_explicit_port(self):
"""Should correctly handle URLs with explicit port numbers."""
url = "https://image.civitai.com:443/checkpoints/original=true"
result, was_rewritten = rewrite_preview_url(url, "image")
assert was_rewritten is True
assert "width=450,optimized=true" in result
# Port is preserved in the URL (this is acceptable behavior)
assert ":443" in result
def test_rewrites_video_urls_with_transcode(self):
"""Should rewrite video URLs with transcode parameter."""
url = "https://image.civitai.com/videos/original=true/sample.mp4"
result, was_rewritten = rewrite_preview_url(url, "video")
assert (
result
== "https://image.civitai.com/videos/transcode=true,width=450,optimized=true/sample.mp4"
)
assert was_rewritten is True
def test_video_rewrite_uses_case_insensitive_type(self):
"""Should handle video type case-insensitively."""
url = "https://image.civitai.com/original=true/test.mp4"
result1, was1 = rewrite_preview_url(url, "VIDEO")
result2, was2 = rewrite_preview_url(url, "Video")
assert was1 is True
assert was2 is True
assert "transcode=true" in result1
assert "transcode=true" in result2
def test_returns_original_when_no_original_true_in_path(self):
"""Should not rewrite URLs that don't contain /original=true."""
url = "https://image.civitai.com/checkpoints/optimized=true"
result, was_rewritten = rewrite_preview_url(url)
assert result == url
assert was_rewritten is False
def test_preserves_path_structure_after_rewrite(self):
"""Should maintain path structure after rewriting."""
url = "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/abc123/original=true/12345.png"
result, was_rewritten = rewrite_preview_url(url, "image")
assert was_rewritten is True
assert result.startswith(
"https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/abc123/"
)
assert result.endswith("/12345.png")
def test_defaults_to_image_mode_when_media_type_is_none(self):
"""Should use image optimization when media_type is None."""
url = "https://image.civitai.com/original=true/test.png"
result, was_rewritten = rewrite_preview_url(url, None)
assert was_rewritten is True
assert "transcode=true" not in result
assert "width=450,optimized=true" in result
def test_case_insensitive_hostname_matching(self):
"""Should handle case-insensitive hostname matching."""
test_cases = [
"https://IMAGE.CIVITAI.COM/original=true/test.png",
"https://Image.Civitai.Com/original=true/test.png",
"https://image-b2.CIVITAI.com/original=true/test.png",
]
for url in test_cases:
result, was_rewritten = rewrite_preview_url(url, "image")
assert was_rewritten is True, f"Failed for URL: {url}"