From 30db8c3d1d65b7972a2335957543b7cfd16d0d8e Mon Sep 17 00:00:00 2001 From: Will Miao Date: Fri, 3 Apr 2026 09:40:15 +0800 Subject: [PATCH] fix(csp): support CivitAI CDN subdomains for example images (#822) - Update CSP whitelist to use wildcard *.civitai.com for all CDN subdomains - Fix hostname parsing to use parsed.hostname instead of parsed.netloc (handles ports) - Update rewrite_preview_url() to support all CivitAI CDN subdomains - Update rewriteCivitaiUrl() frontend function to support subdomains - Add comprehensive tests for edge cases (ports, subdomains, invalid URLs) - Add security note explaining wildcard CSP design decision Fixes CSP blocking of images from image-b2.civitai.com and other CDN subdomains --- py/middleware/csp_middleware.py | 14 ++- py/utils/civitai_utils.py | 19 ++- static/js/utils/civitaiUtils.js | 8 +- tests/frontend/utils/civitaiUtils.test.js | 48 ++++++++ tests/middleware/test_csp_middleware.py | 20 +-- tests/utils/test_civitai_utils_rewrite.py | 144 ++++++++++++++++++++++ 6 files changed, 236 insertions(+), 17 deletions(-) create mode 100644 tests/utils/test_civitai_utils_rewrite.py diff --git a/py/middleware/csp_middleware.py b/py/middleware/csp_middleware.py index cad6b99d..18553250 100644 --- a/py/middleware/csp_middleware.py +++ b/py/middleware/csp_middleware.py @@ -4,15 +4,21 @@ from typing import Awaitable, Callable, Dict, List from aiohttp import web +# Use wildcard for CivitAI to support their CDN subdomains (e.g., image-b2.civitai.com) +# Security note: This is acceptable because: +# 1. CSP img-src only controls image/video loading, not script execution +# 2. All *.civitai.com subdomains are controlled by Civitai +# 3. Explicit domain list would require constant updates as Civitai adds CDN nodes REMOTE_MEDIA_SOURCES = ( - "https://image.civitai.com", + "https://*.civitai.com", "https://img.genur.art", ) @web.middleware async def relax_csp_for_remote_media( - request: web.Request, handler: Callable[[web.Request], Awaitable[web.StreamResponse]] + request: web.Request, + handler: Callable[[web.Request], Awaitable[web.StreamResponse]], ) -> web.StreamResponse: """Allow LoRA Manager media previews to load from trusted remote domains. @@ -43,7 +49,9 @@ async def relax_csp_for_remote_media( directive_order.append(name) directives[name] = values - def merge_sources(name: str, sources: List[str], defaults: List[str] | None = None) -> None: + def merge_sources( + name: str, sources: List[str], defaults: List[str] | None = None + ) -> None: existing = directives.get(name, list(defaults or [])) for source in sources: diff --git a/py/utils/civitai_utils.py b/py/utils/civitai_utils.py index 198af376..76426e55 100644 --- a/py/utils/civitai_utils.py +++ b/py/utils/civitai_utils.py @@ -22,7 +22,9 @@ def _normalize_commercial_values(value: Any) -> Sequence[str]: def _split_aggregate(value_str: str) -> list[str]: stripped = value_str.strip() - looks_aggregate = "," in stripped or (stripped.startswith("{") and stripped.endswith("}")) + looks_aggregate = "," in stripped or ( + stripped.startswith("{") and stripped.endswith("}") + ) if not looks_aggregate: return [value_str] @@ -141,14 +143,18 @@ def build_license_flags(payload: Mapping[str, Any] | None) -> int: return flags -def resolve_license_info(model_data: Mapping[str, Any] | None) -> tuple[Dict[str, Any], int]: +def resolve_license_info( + model_data: Mapping[str, Any] | None, +) -> tuple[Dict[str, Any], int]: """Return normalized license payload and its encoded bitset.""" payload = resolve_license_payload(model_data) return payload, build_license_flags(payload) -def rewrite_preview_url(source_url: str | None, media_type: str | None = None) -> tuple[str | None, bool]: +def rewrite_preview_url( + source_url: str | None, media_type: str | None = None +) -> tuple[str | None, bool]: """Rewrite Civitai preview URLs to use optimized renditions. Args: @@ -168,7 +174,12 @@ def rewrite_preview_url(source_url: str | None, media_type: str | None = None) - except ValueError: return source_url, False - if parsed.netloc.lower() != "image.civitai.com": + hostname = parsed.hostname + if hostname is None: + return source_url, False + + hostname = hostname.lower() + if hostname == "civitai.com" or not hostname.endswith(".civitai.com"): return source_url, False replacement = "/width=450,optimized=true" diff --git a/static/js/utils/civitaiUtils.js b/static/js/utils/civitaiUtils.js index 0ac0f6a8..fb3422af 100644 --- a/static/js/utils/civitaiUtils.js +++ b/static/js/utils/civitaiUtils.js @@ -30,8 +30,9 @@ export function rewriteCivitaiUrl(sourceUrl, mediaType = null, mode = Optimizati try { const url = new URL(sourceUrl); - // Check if it's a CivitAI image domain - if (url.hostname.toLowerCase() !== 'image.civitai.com') { + // Check if it's a CivitAI CDN domain (supports all subdomains like image-b2.civitai.com) + const hostname = url.hostname.toLowerCase(); + if (hostname === 'civitai.com' || !hostname.endsWith('.civitai.com')) { return [sourceUrl, false]; } @@ -112,7 +113,8 @@ export function isCivitaiUrl(url) { if (!url) return false; try { const parsed = new URL(url); - return parsed.hostname.toLowerCase() === 'image.civitai.com'; + const hostname = parsed.hostname.toLowerCase(); + return hostname.endsWith('.civitai.com') && hostname !== 'civitai.com'; } catch (e) { return false; } diff --git a/tests/frontend/utils/civitaiUtils.test.js b/tests/frontend/utils/civitaiUtils.test.js index 0655fdf9..d14637b4 100644 --- a/tests/frontend/utils/civitaiUtils.test.js +++ b/tests/frontend/utils/civitaiUtils.test.js @@ -94,6 +94,37 @@ describe('civitaiUtils', () => { expect(wasRewritten).toBe(false); expect(rewritten).toBe('not-a-valid-url'); }); + + it('should rewrite URLs from CivitAI CDN subdomains', () => { + const originalUrl = 'https://image-b2.civitai.com/file/civitai-media-cache/original=true/sample.png'; + const [rewritten, wasRewritten] = rewriteCivitaiUrl(originalUrl, 'image', OptimizationMode.THUMBNAIL); + + expect(wasRewritten).toBe(true); + expect(rewritten).toBe('https://image-b2.civitai.com/file/civitai-media-cache/width=450,optimized=true/sample.png'); + }); + + it('should handle URLs with explicit port numbers', () => { + const originalUrl = 'https://image.civitai.com:443/checkpoints/original=true/test.png'; + const [rewritten, wasRewritten] = rewriteCivitaiUrl(originalUrl, 'image', OptimizationMode.THUMBNAIL); + + expect(wasRewritten).toBe(true); + // JavaScript URL.toString() removes default HTTPS port (443) + expect(rewritten).toBe('https://image.civitai.com/checkpoints/width=450,optimized=true/test.png'); + }); + + it('should handle case-insensitive hostnames', () => { + const testCases = [ + 'https://IMAGE.CIVITAI.COM/original=true/test.png', + 'https://Image.Civitai.Com/original=true/test.png', + 'https://image-b2.CIVITAI.com/original=true/test.png', + ]; + + for (const url of testCases) { + const [rewritten, wasRewritten] = rewriteCivitaiUrl(url, 'image', OptimizationMode.THUMBNAIL); + expect(wasRewritten).toBe(true); + expect(rewritten).toContain('width=450,optimized=true'); + } + }); }); describe('getOptimizedUrl', () => { @@ -157,6 +188,23 @@ describe('civitaiUtils', () => { expect(isCivitaiUrl('https://image.civitai.com/')).toBe(true); }); + it('should return true for CivitAI CDN subdomains', () => { + expect(isCivitaiUrl('https://image-b2.civitai.com/file/test.png')).toBe(true); + expect(isCivitaiUrl('https://image-b3.civitai.com/test.jpg')).toBe(true); + expect(isCivitaiUrl('https://cdn.civitai.com/test.png')).toBe(true); + }); + + it('should return true for CivitAI URLs with explicit ports', () => { + expect(isCivitaiUrl('https://image.civitai.com:443/test.png')).toBe(true); + expect(isCivitaiUrl('https://image-b2.civitai.com:443/file/test.jpg')).toBe(true); + }); + + it('should handle case-insensitive hostnames', () => { + expect(isCivitaiUrl('https://IMAGE.CIVITAI.COM/test.png')).toBe(true); + expect(isCivitaiUrl('https://Image.Civitai.Com/test.png')).toBe(true); + expect(isCivitaiUrl('https://image-b2.CIVITAI.com/test.png')).toBe(true); + }); + it('should return false for non-CivitAI URLs', () => { expect(isCivitaiUrl('https://example.com/image.jpg')).toBe(false); expect(isCivitaiUrl('https://civitai.com/image.jpg')).toBe(false); diff --git a/tests/middleware/test_csp_middleware.py b/tests/middleware/test_csp_middleware.py index bc07d58f..1ad1c56d 100644 --- a/tests/middleware/test_csp_middleware.py +++ b/tests/middleware/test_csp_middleware.py @@ -2,7 +2,10 @@ import pytest from aiohttp import web from aiohttp.test_utils import make_mocked_request -from py.middleware.csp_middleware import REMOTE_MEDIA_SOURCES, relax_csp_for_remote_media +from py.middleware.csp_middleware import ( + REMOTE_MEDIA_SOURCES, + relax_csp_for_remote_media, +) DEFAULT_CSP = ( "default-src 'self'; " @@ -40,7 +43,9 @@ async def _invoke_middleware( @pytest.mark.asyncio -async def test_relax_csp_appends_remote_sources_and_preserves_existing_directives() -> None: +async def test_relax_csp_appends_remote_sources_and_preserves_existing_directives() -> ( + None +): response = await _invoke_middleware("/some-path", web.Response()) header_value = response.headers.get("Content-Security-Policy") assert header_value is not None @@ -48,16 +53,17 @@ async def test_relax_csp_appends_remote_sources_and_preserves_existing_directive directives = _parse_directives(header_value) # Existing directives remain intact - assert directives["script-src"] == ["'self'", "'unsafe-inline'", "'unsafe-eval'", "blob:"] + assert directives["script-src"] == [ + "'self'", + "'unsafe-inline'", + "'unsafe-eval'", + "blob:", + ] assert directives["img-src"][:3] == ["'self'", "data:", "blob:"] # Remote media hosts are added once to the relevant directives for source in REMOTE_MEDIA_SOURCES: assert source in directives["img-src"] - - assert "media-src" in directives - assert directives["media-src"][0] == "'self'" - for source in REMOTE_MEDIA_SOURCES: assert source in directives["media-src"] diff --git a/tests/utils/test_civitai_utils_rewrite.py b/tests/utils/test_civitai_utils_rewrite.py new file mode 100644 index 00000000..8d6a3108 --- /dev/null +++ b/tests/utils/test_civitai_utils_rewrite.py @@ -0,0 +1,144 @@ +"""Tests for CivitAI URL utilities.""" + +import pytest + +from py.utils.civitai_utils import rewrite_preview_url + + +class TestRewritePreviewUrl: + """Test cases for rewrite_preview_url function.""" + + def test_handles_none_input(self): + """Should return (None, False) for None input.""" + result, was_rewritten = rewrite_preview_url(None) + assert result is None + assert was_rewritten is False + + def test_handles_empty_string(self): + """Should return (empty_string, False) for empty input.""" + result, was_rewritten = rewrite_preview_url("") + assert result == "" + assert was_rewritten is False + + def test_handles_invalid_url(self): + """Should return original URL and False for invalid URLs.""" + invalid_url = "not-a-valid-url" + result, was_rewritten = rewrite_preview_url(invalid_url) + assert result == invalid_url + assert was_rewritten is False + + def test_handles_url_without_scheme(self): + """Should return original URL and False for URLs without scheme.""" + url = "image.civitai.com/something" + result, was_rewritten = rewrite_preview_url(url) + assert result == url + assert was_rewritten is False + + def test_returns_false_for_non_civitai_domains(self): + """Should not rewrite URLs from other domains.""" + url = "https://example.com/image.jpg" + result, was_rewritten = rewrite_preview_url(url) + assert result == url + assert was_rewritten is False + + def test_returns_false_for_main_civitai_domain(self): + """Should not rewrite URLs from main civitai.com domain.""" + url = "https://civitai.com/images/123" + result, was_rewritten = rewrite_preview_url(url) + assert result == url + assert was_rewritten is False + + def test_rewrites_image_civitai_com_urls(self): + """Should rewrite URLs from image.civitai.com.""" + url = "https://image.civitai.com/checkpoints/original=true" + result, was_rewritten = rewrite_preview_url(url, "image") + assert ( + result == "https://image.civitai.com/checkpoints/width=450,optimized=true" + ) + assert was_rewritten is True + + def test_rewrites_subdomain_civitai_urls(self): + """Should rewrite URLs from CivitAI CDN subdomains like image-b2.civitai.com.""" + url = "https://image-b2.civitai.com/file/civitai-media-cache/original=true/sample.png" + result, was_rewritten = rewrite_preview_url(url, "image") + assert ( + result + == "https://image-b2.civitai.com/file/civitai-media-cache/width=450,optimized=true/sample.png" + ) + assert was_rewritten is True + + def test_rewrites_multiple_subdomains(self): + """Should rewrite URLs from various CivitAI subdomains.""" + test_cases = [ + "https://image-b3.civitai.com/original=true/test.jpg", + "https://cdn.civitai.com/original=true/test.png", + ] + for url in test_cases: + result, was_rewritten = rewrite_preview_url(url, "image") + assert was_rewritten is True + assert "width=450,optimized=true" in result + + def test_handles_urls_with_explicit_port(self): + """Should correctly handle URLs with explicit port numbers.""" + url = "https://image.civitai.com:443/checkpoints/original=true" + result, was_rewritten = rewrite_preview_url(url, "image") + assert was_rewritten is True + assert "width=450,optimized=true" in result + # Port is preserved in the URL (this is acceptable behavior) + assert ":443" in result + + def test_rewrites_video_urls_with_transcode(self): + """Should rewrite video URLs with transcode parameter.""" + url = "https://image.civitai.com/videos/original=true/sample.mp4" + result, was_rewritten = rewrite_preview_url(url, "video") + assert ( + result + == "https://image.civitai.com/videos/transcode=true,width=450,optimized=true/sample.mp4" + ) + assert was_rewritten is True + + def test_video_rewrite_uses_case_insensitive_type(self): + """Should handle video type case-insensitively.""" + url = "https://image.civitai.com/original=true/test.mp4" + result1, was1 = rewrite_preview_url(url, "VIDEO") + result2, was2 = rewrite_preview_url(url, "Video") + assert was1 is True + assert was2 is True + assert "transcode=true" in result1 + assert "transcode=true" in result2 + + def test_returns_original_when_no_original_true_in_path(self): + """Should not rewrite URLs that don't contain /original=true.""" + url = "https://image.civitai.com/checkpoints/optimized=true" + result, was_rewritten = rewrite_preview_url(url) + assert result == url + assert was_rewritten is False + + def test_preserves_path_structure_after_rewrite(self): + """Should maintain path structure after rewriting.""" + url = "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/abc123/original=true/12345.png" + result, was_rewritten = rewrite_preview_url(url, "image") + assert was_rewritten is True + assert result.startswith( + "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/abc123/" + ) + assert result.endswith("/12345.png") + + def test_defaults_to_image_mode_when_media_type_is_none(self): + """Should use image optimization when media_type is None.""" + url = "https://image.civitai.com/original=true/test.png" + result, was_rewritten = rewrite_preview_url(url, None) + assert was_rewritten is True + assert "transcode=true" not in result + assert "width=450,optimized=true" in result + + def test_case_insensitive_hostname_matching(self): + """Should handle case-insensitive hostname matching.""" + test_cases = [ + "https://IMAGE.CIVITAI.COM/original=true/test.png", + "https://Image.Civitai.Com/original=true/test.png", + "https://image-b2.CIVITAI.com/original=true/test.png", + ] + for url in test_cases: + result, was_rewritten = rewrite_preview_url(url, "image") + assert was_rewritten is True, f"Failed for URL: {url}"