diff --git a/pdm.lock b/pdm.lock index c231b7e..a16c289 100755 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev", "testing"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:d97e0fe3e9659cc78992a3fbe8cc33307067e47ad4b074c9c5e48f72289208a6" +content_hash = "sha256:6357c7ce4e98787f69a1bba0f67d789f7fe5bd5026cbba51b19c2980a21e0b31" [[metadata.targets]] requires_python = ">=3.9" @@ -89,6 +89,58 @@ files = [ {file = "argon2_cffi_bindings-21.2.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:5e00316dabdaea0b2dd82d141cc66889ced0cdcbfa599e8b471cf22c620c329a"}, ] +[[package]] +name = "bchlib" +version = "2.1.3" +requires_python = ">=3.6.0" +summary = "A python wrapper module for the Linux kernel BCH library." +groups = ["default"] +files = [ + {file = "bchlib-2.1.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0ba3c8255f4d1aaf4dba90d39e8b31563e5bdf42c9b08660a2c76bacec1814f7"}, + {file = "bchlib-2.1.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:08cab7d645613bc486ad547f29e49f51ad1defab20e3c8b3d6b4a6af86d0d673"}, + {file = "bchlib-2.1.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ae1aa14cc35bedd0f283f7ad357dc70ae2f6164aa3aa58eee2cb329d0e780d6c"}, + {file = "bchlib-2.1.3-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:258c2a6e223810e265eb4337da8fe33d4f15ca0ce35c32267c45c5002a187d64"}, + {file = "bchlib-2.1.3-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:ec7036de49578d74c82888af88d0c4fc22ece684de20ddf07b5d1d0b39d542ba"}, + {file = "bchlib-2.1.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:4669bba18732fc1be06761fec4685a66b194748a3f08cf5afdf541702bd4c5e1"}, + {file = "bchlib-2.1.3-cp310-cp310-win32.whl", hash = "sha256:d133db99607be27e16e74ea2a5aca9244e357b5a5f1580e418143929f99cd0bb"}, + {file = "bchlib-2.1.3-cp310-cp310-win_amd64.whl", hash = "sha256:f70569bd9df1eda7c2dee55d995ec65ee651906a877421525c99f5187d36be24"}, + {file = "bchlib-2.1.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:64407c0bc8b58ce41399f7f1d52a46c7fa63dc83fa91e70cf84f46b87854e807"}, + {file = "bchlib-2.1.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ee67db08c41894cf490de535f4ec54659745ffafbcf37b9f04b18b1e44101d96"}, + {file = "bchlib-2.1.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:80570e504162b330fffd8d9b20679a09eddb3d9edd961159d2bf73e064fb22a2"}, + {file = "bchlib-2.1.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5f601388cd8847884eef6d68d14dfb37ae469ad8fa3429b49bbefd663208c1f5"}, + {file = "bchlib-2.1.3-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:0ef8ff4bb45d5263702d615ec38662354d8374cfa4db2d9ab7f18f5a29f211e0"}, + {file = "bchlib-2.1.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:460277205bc29062275159d0221f6b286669a8d8f4c61f8e6ae4eaa7b3091370"}, + {file = "bchlib-2.1.3-cp311-cp311-win32.whl", hash = "sha256:06375176fb7465a9de16a8b3318ba16e4f2ba4edb78cd631e5588521c9661a60"}, + {file = "bchlib-2.1.3-cp311-cp311-win_amd64.whl", hash = "sha256:1060fecdefc0d9782aa5ef4c82537a60049af45c842d540c16a64fef85c0c44d"}, + {file = "bchlib-2.1.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:3edba1ba46b895fcb31b4269e2527a3f69b2a5789a8f34fcd95eacddeedb7758"}, + {file = "bchlib-2.1.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:ec73cbf5d5ef85fc5614b704ab71b4e7d7de41a762c93fda1a96dc8c0a98d8c3"}, + {file = "bchlib-2.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8397acef706f069c096a379ab3b56de650e9d64f8b54dea6af53e48a0406656d"}, + {file = "bchlib-2.1.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6c3c0c08c3eb114c4fa782c4c22d9cd197082a5c5fc9e2b7b95036e1af44790f"}, + {file = "bchlib-2.1.3-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:0be4fdcf66cdf91f51126a7139330d983a9bc05289cb142d45b7f7d91019ee22"}, + {file = "bchlib-2.1.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:f1170738939e2dcde43cb772095412e6368cc6942f143f55b646d5c252d3e7af"}, + {file = "bchlib-2.1.3-cp312-cp312-win32.whl", hash = "sha256:0019affb4dbf9af38ebb1cb813c17140ec3c1af9afc55844bc4b3fab462be145"}, + {file = "bchlib-2.1.3-cp312-cp312-win_amd64.whl", hash = "sha256:c469d9c19d0621815dd04e759b842ed218f828a1e1fc5e1c6939944785b335e4"}, + {file = "bchlib-2.1.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:b077da495ecb05fda51ee997e9b9fe85efc4dfa887dbc3a1c3351162f0a77a57"}, + {file = "bchlib-2.1.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c1b807eb7c99d739e5dc74c0f90335b9d2168639e3e5df12a3f018a4950a0280"}, + {file = "bchlib-2.1.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5f41747e7eabb79d2ace40b5b61ed790692859444e423323049dac175302afa6"}, + {file = "bchlib-2.1.3-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:19d8ba8d24642d0d5da13a8c1149fd979623288f2250e83e0a2166c0d0773b8c"}, + {file = "bchlib-2.1.3-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:0a7e2503e7b0ae1f95c8117da03ba51edc642f7f10ac0dfab216e938ebdc615a"}, + {file = "bchlib-2.1.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:a14541ca447c94c063f7371891ff8e2f3134004e40f521dc5244b311ab2c7e76"}, + {file = "bchlib-2.1.3-cp39-cp39-win32.whl", hash = "sha256:6ba74cbbd7541c46fae4187dd00d5c363c622432aa9d365c505c2cd2dfa59e99"}, + {file = "bchlib-2.1.3-cp39-cp39-win_amd64.whl", hash = "sha256:2d078e51f2c1c464d319b00b536b9731de86423e09e72084d98ffd714d3de94f"}, + {file = "bchlib-2.1.3-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:d5125436a420cf386d1c3dde5203c9b4611096aabd0da84b2de04b96d42ba723"}, + {file = "bchlib-2.1.3-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:7d6dba3d9256b51c8fc3ed1bdbc129bfcda56cfa04d0280b638b9e9364d5bdd0"}, + {file = "bchlib-2.1.3-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:85270e5a93681ece506f6b37772f2be404e71d4911cce3fdd539c7f89f0ebdac"}, + {file = "bchlib-2.1.3-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6804608aaae5efc858003ab8e69a27c63fde41dfa85f89febd7bc001eef46502"}, + {file = "bchlib-2.1.3-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:c9682324fb384008f08432e4b40f044be7a976af24462c5fa56b1fbc6f7abc92"}, + {file = "bchlib-2.1.3-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:d5700fd82130b429e1a7defd15e8cf7a36825ed1f22c969e83affc2dd952daaf"}, + {file = "bchlib-2.1.3-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:8e83931682b6b561ff26ccc71c8115e3b1fdce611f7adf94a6060cfed61d6416"}, + {file = "bchlib-2.1.3-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c3a28dbb408e8bd06407fa5ff5f5015bd8bc2e1de95b8f99c9fa98594e838c46"}, + {file = "bchlib-2.1.3-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:12b83b336216278041ae7aa4fc7f15ddb4107c609f48860894ecbd6406395bf8"}, + {file = "bchlib-2.1.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:18551bd114cecf5da37f76e959041b128ccddb6ccd8032f12d72e2ac39d67d9f"}, + {file = "bchlib-2.1.3.tar.gz", hash = "sha256:f1c5d2a838259bd505f4c4afbd1187d5751224a4401923a10c5dd4f2e21e2125"}, +] + [[package]] name = "certifi" version = "2024.2.2" diff --git a/playground/image_metadata/read_lsb.py b/playground/image_metadata/read_lsb.py index 9c2b833..4ac2d78 100644 --- a/playground/image_metadata/read_lsb.py +++ b/playground/image_metadata/read_lsb.py @@ -1,4 +1,3 @@ -import os from pathlib import Path from novelai_python.tool.image_metadata.lsb_extractor import ImageLsbDataExtractor diff --git a/playground/image_metadata/read_nai_tag.py b/playground/image_metadata/read_nai_tag.py index 24e0bc6..5cf3de5 100644 --- a/playground/image_metadata/read_nai_tag.py +++ b/playground/image_metadata/read_nai_tag.py @@ -2,14 +2,13 @@ # @Time : 2024/2/8 下午4:57 # @Author : sudoskys # @File : read_nai_tag.py -from io import BytesIO from pathlib import Path -from novelai_python.tool.image_metadata import ImageMetadata +from novelai_python.tool.image_metadata import ImageMetadata, ImageVerifier image = Path(__file__).parent.joinpath("sample-0316.png") image_clear = ImageMetadata.reset_alpha( - input_img=BytesIO(image.read_bytes()) + image=image ) try: @@ -18,23 +17,11 @@ raise LookupError("Cant find a MetaData") print(meta.Title) -print(meta.Description) -print(meta.Comment) - -try: - meta = ImageMetadata.load_image(image_clear) - print(meta.Title) - print(meta.Description) - print(meta.Comment) -except ValueError: - print("Cant find a MetaData") - -image = Path(__file__).parent.joinpath("sample-0317.png") -try: - meta = ImageMetadata.load_image(image) -except ValueError: - raise LookupError("Cant find a MetaData") - -print(meta.Title) -print(meta.Description) -print(meta.Comment) +print(f"Description: {meta.Description}") +print(f"Comment: {meta.Comment}") +print(f"Request Method: {meta.Comment.request_type}") +print(f"Used image model: {meta.used_model}") +# Verify if the image is from NovelAI +is_novelai, have_latent = ImageVerifier().verify(image=image) +print(f"Is NovelAI: {is_novelai}") +print(f"Have Latent: {have_latent}") diff --git a/pyproject.toml b/pyproject.toml index 447ca82..c005460 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "novelai-python" -version = "0.5.1" +version = "0.5.2" description = "NovelAI Python Binding With Pydantic" authors = [ { name = "sudoskys", email = "coldlando@hotmail.com" }, @@ -28,6 +28,7 @@ dependencies = [ "tokenizers>=0.15.2", "json-repair>=0.29.4", "robust-downloader>=0.0.2", + "bchlib>=2.1.3", ] requires-python = ">=3.9" readme = "README.md" diff --git a/src/novelai_python/tool/image_metadata/__init__.py b/src/novelai_python/tool/image_metadata/__init__.py index b9aa225..351a8d0 100644 --- a/src/novelai_python/tool/image_metadata/__init__.py +++ b/src/novelai_python/tool/image_metadata/__init__.py @@ -2,8 +2,7 @@ import base64 import json from io import BytesIO -from pathlib import Path -from typing import Union, Optional, List +from typing import Union, Optional, List, Tuple import numpy as np from PIL import Image @@ -13,9 +12,11 @@ from nacl.signing import VerifyKey from pydantic import BaseModel, ConfigDict +from novelai_python.sdk.ai._enum import PROMOTION, Model +from ._type import IMAGE_INPUT_TYPE, get_image_bytes +from .bch_utils import fec_decode from .lsb_extractor import ImageLsbDataExtractor from .lsb_injector import inject_data -from novelai_python.sdk.ai._enum import PROMOTION, Model class ImageMetadata(BaseModel): @@ -24,38 +25,52 @@ class ImageMetadata(BaseModel): """ class CommentModel(BaseModel): - prompt: str - steps: int = None - height: int = None - width: int = None - scale: float = None - uncond_scale: float = None - cfg_rescale: float = None - seed: int = None - n_samples: int = None - hide_debug_overlay: bool = None - noise_schedule: str = None - legacy_v3_extend: bool = None + prompt: Optional[str] = None + steps: Optional[int] = None + height: Optional[int] = None + width: Optional[int] = None + scale: Optional[float] = None + uncond_scale: Optional[float] = None + cfg_rescale: Optional[float] = None + seed: Optional[int] = None + n_samples: Optional[int] = None + hide_debug_overlay: Optional[bool] = None + noise_schedule: Optional[str] = None + legacy_v3_extend: Optional[bool] = None reference_information_extracted: Optional[float] = None reference_strength: Optional[float] = None reference_strength_multiple: Optional[List[float]] = None reference_information_extracted_multiple: Optional[List[float]] = None - sampler: str = None - controlnet_strength: float = None - controlnet_model: Union[None, str] = None - dynamic_thresholding: bool = None - dynamic_thresholding_percentile: float = None - dynamic_thresholding_mimic_scale: float = None - sm: bool = None - sm_dyn: bool = None - skip_cfg_below_sigma: float = None - lora_unet_weights: Union[None, str] = None - lora_clip_weights: Union[None, str] = None - uc: str = None - request_type: str = None - signed_hash: str = None + sampler: Optional[str] = None + controlnet_strength: Optional[float] = None + controlnet_model: Optional[Union[None, str]] = None + dynamic_thresholding: Optional[bool] = None + dynamic_thresholding_percentile: Optional[float] = None + dynamic_thresholding_mimic_scale: Optional[float] = None + sm: Optional[bool] = None + sm_dyn: Optional[bool] = None + skip_cfg_below_sigma: Optional[float] = None + lora_unet_weights: Optional[Union[None, str]] = None + lora_clip_weights: Optional[Union[None, str]] = None + uc: Optional[str] = None + request_type: Optional[str] = None + signed_hash: Optional[str] = None + skip_cfg_above_sigma: Optional[float] = None + deliberate_euler_ancestral_bug: Optional[bool] = None + prefer_brownian: Optional[bool] = None + cfg_sched_eligibility: Optional[str] = None + explike_fine_detail: Optional[bool] = None + minimize_sigma_inf: Optional[bool] = None + uncond_per_vibe: Optional[bool] = None + wonky_vibe_correlation: Optional[bool] = None + version: Optional[int] = None + model_config = ConfigDict(extra="allow") + @property + def generate_method(self): + return self.request_type or "Unknown" + @property def negative_prompt(self): return self.uc @@ -64,7 +79,6 @@ def negative_prompt(self): def vibe_transfer_strength(self) -> List[float]: """ Get the vibe transfer strength totally - :return: List[float] """ if self.reference_strength_multiple: return self.reference_strength_multiple @@ -75,7 +89,6 @@ def vibe_transfer_strength(self) -> List[float]: def vibe_transfer_information(self) -> List[float]: """ Get the vibe transfer information totally - :return: List[float] """ if self.reference_information_extracted_multiple: return self.reference_information_extracted_multiple @@ -88,11 +101,7 @@ def vibe_transfer_information(self) -> List[float]: Source: str = None Description: str Comment: CommentModel - """ - AI generated image - silvery white, best quality, amazing quality, very aesthetic, absurdres - {'prompt': 'silvery white, best quality, amazing quality, very aesthetic, absurdres', 'steps': 28, 'height': 128, 'width': 128, 'scale': 10.0, 'uncond_scale': 1.0, 'cfg_rescale': 0.38, 'seed': 1148692016, 'n_samples': 1, 'hide_debug_overlay': False, 'noise_schedule': 'native', 'legacy_v3_extend': False, 'reference_information_extracted': 0.87, 'reference_strength': 1.0, 'sampler': 'k_euler', 'controlnet_strength': 1.0, 'controlnet_model': None, 'dynamic_thresholding': False, 'dynamic_thresholding_percentile': 0.999, 'dynamic_thresholding_mimic_scale': 10.0, 'sm': True, 'sm_dyn': True, 'skip_cfg_below_sigma': 0.0, 'lora_unet_weights': None, 'lora_clip_weights': None, 'uc': 'nsfw, lowres, {bad}, error, fewer, extra, missing, worst quality, jpeg artifacts, bad quality, watermark, unfinished, displeasing, chromatic aberration, signature, extra digits, artistic error, username, scan, [abstract], small', 'request_type': 'PromptGenerateRequest', 'signed_hash': 'GmLfTgQTOI4WRcTloiSmenArH/Y8szJdVJ1DW18yeqYPkqw3o1zellB4xCQFYle+0tuF9KVP8RkHiDNbQXr0DA=='} - """ + model_config = ConfigDict(extra="allow") @property @@ -104,15 +113,14 @@ def used_model(self) -> Union[Model, None]: return PROMOTION.get(self.Source, None) @staticmethod - def reset_alpha(input_img: BytesIO) -> BytesIO: + def reset_alpha(image: IMAGE_INPUT_TYPE) -> BytesIO: """ Remove LSB from the image, set the alpha channel to 254 - :param input_img: - :return: + :param image: Type: Union[str, bytes, Path, BytesIO] + :return: BytesIO """ - if isinstance(input_img, BytesIO): - input_img.seek(0) - image = Image.open(input_img).convert('RGBA') + image_data = get_image_bytes(image) + image = Image.open(image_data).convert('RGBA') data = np.array(image) data[..., 3] = 254 new_image = Image.fromarray(data, 'RGBA') @@ -120,20 +128,18 @@ def reset_alpha(input_img: BytesIO) -> BytesIO: new_image.save(_new_img_io, format="PNG") return _new_img_io - def apply_to_image(self, - origin_image: Image.Image, - *, - inject_lsb: bool = True - ) -> BytesIO: + def apply_to_image(self, image: IMAGE_INPUT_TYPE, *, inject_lsb: bool = True) -> BytesIO: """ Write metadata to origin_image If you set inject_lsb to True, the image will be injected with metadata using LSB. **But if you set inject_lsb to False, the image will be reset to the 254 alpha channel** - :param origin_image: BytesIO + :param image: Union[str, bytes, Path, BytesIO] :param inject_lsb: Inject metadata using LSB :return: BytesIO """ + image_data = get_image_bytes(image) + origin_image = Image.open(image_data) metadata = PngInfo() for k, v in self.model_dump(mode="json").items(): if isinstance(v, dict): @@ -142,84 +148,150 @@ def apply_to_image(self, if inject_lsb: # Inject metadata using LSB origin_image = inject_data(origin_image, metadata) - # Save original image with metadata (and LSB if inject_lsb is True) - new_img = BytesIO() - origin_image.save(new_img, format="PNG", pnginfo=metadata, optimize=False, compress_level=0) - return new_img + + # Prepare image to be saved + be_copy_image = BytesIO() + origin_image.save(be_copy_image, format="PNG", pnginfo=metadata, optimize=False, compress_level=0) + return be_copy_image @classmethod - def load_image(cls, - image_io: Union[str, bytes, Path, BytesIO] - ): + def _extract_metadata_from_lsb(cls, image: IMAGE_INPUT_TYPE): """ - Load image and extract metadata using LSB/Metadata - :param image_io: str, bytes, Path, BytesIO - :return: ImageMetadata - :raises ValueError: Data extraction failed + Extract metadata using LSB extraction method. + :param image: IMAGE_INPUT_TYPE """ - if isinstance(image_io, BytesIO): - image_io.seek(0) try: - image_data = ImageLsbDataExtractor().extract_data(image_io) - model = cls.model_validate(image_data) + image_meta_data, fec_data = ImageLsbDataExtractor().extract_data(image) + return cls.model_validate(image_meta_data) except Exception as e: - logger.trace(f"Error trying extracting data in LSB: {e}") - else: - return model - with Image.open(image_io) as img: + logger.trace(f"Error extracting data in LSB: {e}") + return None + + @classmethod + def _extract_metadata_from_comments(cls, image: IMAGE_INPUT_TYPE): + """ + Extract metadata from image comments and other info. + :param image: Union[str, bytes, Path, BytesIO] + """ + with Image.open(image) as img: title = img.info.get("Title", None) - prompt = img.info.get("Description", None) - comment = img.info.get("Comment", None) + description = img.info.get("Description", None) + comment = img.info.get("Comment", "{}") try: - assert isinstance(comment, str), ValueError("Comment Empty") - comment = json.loads(comment) + comment_dict = json.loads(comment) except Exception as e: logger.trace(f"Error loading comment: {e}") - comment = {} - if comment.get("prompt", None) is None: - comment["prompt"] = prompt + comment_dict = {} try: - comment_model = cls.CommentModel.model_validate(comment) - return cls(Title=title, Description=prompt, Comment=comment_model) + comment_dict.setdefault("prompt", description) + comment_model = cls.CommentModel.model_validate(comment_dict) + return cls(Title=title, Description=description, Comment=comment_model) except Exception as e: logger.debug(f"Error loading comment: {e}") - raise ValueError("Data extraction failed") + return None + + @classmethod + def load_image(cls, image: IMAGE_INPUT_TYPE): + """ + Load image and extract metadata using LSB/Metadata + :param image: str, bytes, Path, BytesIO + :return: ImageMetadata + :raises ValueError: Data extraction failed + """ + image_data = get_image_bytes(image) + metadata = cls._extract_metadata_from_lsb(image_data) + if metadata: + return metadata + metadata = cls._extract_metadata_from_comments(image) + if metadata: + return metadata + raise ValueError("No metadata found") + + +class ImageVerifier: @staticmethod - def verify_image_is_novelai( - image: Union[Image.Image, np.ndarray], - verify_key_hex: str = "Y2JcQAOhLwzwSDUJPNgL04nS0Tbqm7cSRc4xk0vRMic=" - ) -> bool: + def verify_latents(image: Image.Image, signed_hash: bytes, verify_key: VerifyKey): + image.load() + sig = None + latents = None + try: + for cid, data, after_idat in image.private_chunks: + if after_idat: + if cid == b'ltns': + sig = data + elif cid == b'ltnt': + latents = data + except Exception as e: + logger.trace(f"Error extracting latents: {e}") + return True, False, None + if sig is None or latents is None: + return True, False, None + if not sig.startswith(b'NovelAI_ltntsig'): + return False, False, None + sig = sig[len(b'NovelAI_ltntsig'):] + if not latents.startswith(b'NovelAI_latents'): + return False, False, None + latents = latents[len(b'NovelAI_latents'):] + if len(sig) != 64: + return False, False, None + w, h = image.size + base_size = (w // 8) * (h // 8) * 4 + if len(latents) != base_size * 4 and len(latents) != base_size * 2: + return False, False, None + float_dim = 4 if len(latents) == base_size * 4 else 2 + try: + verify_key.verify(signed_hash + latents, sig) + return True, True, (float_dim, latents) + except Exception as e: + logger.trace(f"Error verifying latents: {e}") + return False, False, None + + def verify(self, + image, + verify_key_hex: str = "Y2JcQAOhLwzwSDUJPNgL04nS0Tbqm7cSRc4xk0vRMic=" + ) -> Tuple[bool, bool]: """ Verify if the image is a NovelAI generated image - :param image: Image.Image or np.ndarray - :param verify_key_hex: - :return: bool - :raises RuntimeError: No metadata found in image - :raises RuntimeError: Comment not in metadata - :raises RuntimeError: signed_hash not in comment + + :param image: Union[str, bytes, Path, BytesIO] - The input image to verify. + :param verify_key_hex: str - The verification key in base64 format. + :return: Tuple[bool, bool] - The first bool indicates if the image is verified, the second bool indicates if the image has latents. + :raises ValueError: If the required metadata or signed hash is missing. """ - # MIT:https://github.com/NovelAI/novelai-image-metadata/blob/main/nai_sig.py - if isinstance(image, Image.Image): - image = np.array(image) - metadata = ImageLsbDataExtractor().extract_data(image) - - if metadata is None: - raise RuntimeError("No metadata found in image") - if "Comment" not in metadata: - raise RuntimeError("Comment not in metadata") - comment = metadata["Comment"] - if "signed_hash" not in comment: - raise RuntimeError("signed_hash not in comment") - signed_hash = comment["signed_hash"].encode("utf-8") - signed_hash = base64.b64decode(signed_hash) - del comment["signed_hash"] - verify_key_hex = verify_key_hex.encode("utf-8") - verify_key = VerifyKey(verify_key_hex, encoder=Base64Encoder) - image_and_comment = image[:, :, :3].tobytes() + json.dumps(comment).encode("utf-8") + image_obj = Image.open(get_image_bytes(image)) + w, h = image_obj.size + image_array = np.array(image_obj) + try: + metadata, fec_data = ImageLsbDataExtractor().extract_data(image, get_fec=True) + except Exception as e: + logger.trace(f"Error extracting data in LSB: {e}") + metadata = None + fec_data = None + if not metadata or not metadata.get("Comment") or not metadata["Comment"].get("signed_hash"): + raise ValueError( + "Bad image lsb or metadata. Comment or Comment.signed_hash is missing, the image is be tampered or not generated by NovelAI." + ) + parameter = metadata["Comment"] + signed_hash = base64.b64decode(parameter.pop("signed_hash").encode("utf-8")) + # Build verify key + verify_key = VerifyKey(verify_key_hex.encode("utf-8"), encoder=Base64Encoder) + # Verify latents + good_latents, have_latents, latents = self.verify_latents(image_obj, signed_hash, verify_key) + if not good_latents: + return False, False + rgb = image_array[:, :, :3].tobytes() + parameter = json.dumps(parameter).encode("utf-8") + image_and_comment = rgb + parameter try: verify_key.verify(image_and_comment, signed_hash) except Exception as e: - logger.trace(e) - return False - return True + logger.trace(f"Error verifying image [1]: {e}") + try: + rgb, errs = fec_decode(bytearray(rgb), bytearray(fec_data), w, h) + image_and_comment = rgb + parameter + verify_key.verify(image_and_comment, signed_hash) + except Exception as e: + logger.trace(f"Error verifying image [2]: {e}") + return False, False + return True, have_latents diff --git a/src/novelai_python/tool/image_metadata/_type.py b/src/novelai_python/tool/image_metadata/_type.py new file mode 100644 index 0000000..fc9252b --- /dev/null +++ b/src/novelai_python/tool/image_metadata/_type.py @@ -0,0 +1,21 @@ +from io import BytesIO +from pathlib import Path +from typing import Union + +IMAGE_INPUT_TYPE = Union[str, bytes, Path, BytesIO] + + +def get_image_bytes(image: IMAGE_INPUT_TYPE) -> BytesIO: + if isinstance(image, (str, Path)): + try: + with open(image, "rb") as f: + return BytesIO(f.read()) + except FileNotFoundError: + raise FileNotFoundError(f"Image not found: {image}") + elif isinstance(image, BytesIO): + image.seek(0) + return image + elif isinstance(image, bytes): + return BytesIO(image) + else: + raise TypeError(f"Invalid image type: {type(image)}, only {IMAGE_INPUT_TYPE} is supported") diff --git a/src/novelai_python/tool/image_metadata/bch_utils.py b/src/novelai_python/tool/image_metadata/bch_utils.py new file mode 100644 index 0000000..ebd0518 --- /dev/null +++ b/src/novelai_python/tool/image_metadata/bch_utils.py @@ -0,0 +1,160 @@ +# 纠错码 +# MIT: https://github.com/NovelAI/novelai-image-metadata/blob/main/nai_bch.py +import functools + +import bchlib +import numpy as np + +correctable_bits = 16 +block_length = 2019 +code_block_len = 1920 + + +def bit_shuffle(data_bytes, w, h, use_bytes=False): + bits = np.frombuffer(data_bytes, dtype=np.uint8) + bit_fac = 8 + if use_bytes: + bit_fac = 1 + else: + bits = np.unpackbits(bits) + bits = bits.reshape((h, w, 3 * bit_fac)) + code_block_len = 1920 + flat_tile_len = (w * h * 3) // code_block_len + tile_w = 32 + if flat_tile_len // tile_w > 100: + tile_w = 64 + tile_h = flat_tile_len // tile_w + h_cutoff = (h // tile_h) * tile_h + tile_hr = h - h_cutoff + easy_tiles = bits[:h_cutoff].reshape(h_cutoff // tile_h, tile_h, w // tile_w, tile_w, 3 * bit_fac) + easy_tiles = easy_tiles.swapaxes(1, 2) + easy_tiles = easy_tiles.reshape(-1, tile_h * tile_w) + easy_tiles = easy_tiles.T + rest_tiles = bits[h_cutoff:] + rest_tiles = rest_tiles.reshape(tile_hr, 1, w // tile_w, tile_w, 3 * bit_fac) + rest_tiles = rest_tiles.swapaxes(1, 2) + rest_tiles = rest_tiles.reshape(-1, tile_hr * tile_w) + rest_tiles = rest_tiles.T + rest_dim = rest_tiles.shape[-1] + rest_tiles = np.pad(rest_tiles, ((0, 0), (0, easy_tiles.shape[-1] - rest_tiles.shape[-1])), mode='constant', + constant_values=0) + bits = np.concatenate((easy_tiles, rest_tiles), axis=0) + dim = bits.shape[-1] + bits = bits.reshape((-1,)) + if not use_bytes: + bits = np.packbits(bits) + return bytearray(bits.tobytes()), dim, rest_tiles.shape[0], rest_dim + + +def bit_unshuffle(data_bytes, w, h, dim, rest_size, rest_dim, use_bytes=False): + bits = np.frombuffer(data_bytes, dtype=np.uint8) + bit_fac = 8 + if use_bytes: + bit_fac = 1 + else: + bits = np.unpackbits(bits) + code_block_len = 1920 + flat_tile_len = (w * h * 3) // code_block_len + tile_w = 32 + if flat_tile_len // tile_w > 100: + tile_w = 64 + tile_h = flat_tile_len // tile_w + h_cutoff = (h // tile_h) * tile_h + tile_hr = h - h_cutoff + bits = bits.reshape((-1, dim)) + rev_cutoff = bits.shape[0] - rest_size + rest_tiles = bits[rev_cutoff:] + rest_tiles = rest_tiles.reshape((-1, dim)) + rest_tiles = rest_tiles[:, :rest_dim] + rest_tiles = rest_tiles.T + rest_tiles = rest_tiles.reshape((tile_hr, w // tile_w, 1, tile_w, 3 * bit_fac)) + rest_tiles = rest_tiles.swapaxes(1, 2) + rest_tiles = rest_tiles.reshape((-1,)) + easy_tiles = bits[:rev_cutoff] + easy_tiles = easy_tiles.T + easy_tiles = easy_tiles.reshape((h_cutoff // tile_h, w // tile_w, tile_h, tile_w, 3 * bit_fac)) + easy_tiles = easy_tiles.swapaxes(1, 2) + easy_tiles = easy_tiles.reshape((-1,)) + data_bytes = np.concatenate((easy_tiles, rest_tiles), axis=0) + if not use_bytes: + data_bytes = np.packbits(data_bytes) + data_bytes = data_bytes.tobytes() + return bytearray(data_bytes) + + +@functools.lru_cache(maxsize=512) +def get_indices(len_db): + indices = np.arange(0, len_db) + rng = np.random.Generator(np.random.MT19937(31337)) + indices = rng.permutation(indices) + unshuffled = indices.copy() + unshuffled[indices] = np.arange(0, len_db) + return indices, unshuffled + + +def rand_byte_shuffle(data_bytes): + indices, _ = get_indices(len(data_bytes)) + data_bytes = np.frombuffer(data_bytes, dtype=np.uint8) + data_bytes = bytearray(data_bytes[indices].tobytes()) + return data_bytes + + +def rand_byte_unshuffle(data_bytes): + _, indices = get_indices(len(data_bytes)) + data_bytes = np.frombuffer(data_bytes, dtype=np.uint8) + data_bytes = bytearray(data_bytes[indices].tobytes()) + return data_bytes + + +use_bytes = True # Bit shuffling provides better resilience, but byte shuffling is much faster and still sufficient +shuffle_fn = lambda data_bytes, w, h: bit_shuffle(data_bytes, w, h, use_bytes=use_bytes) +unshuffle_fn = lambda data_bytes, w, h, dim, rest_size, rest_dim: bit_unshuffle(data_bytes, w, h, dim, rest_size, + rest_dim, use_bytes=use_bytes) + + +def split_byte_ranges(data_bytes, n, w, h, shuffle=False): + if shuffle: + data_bytes, dim, rest_size, rest_dim = shuffle_fn(data_bytes.copy(), w, h) + chunks = [] + for i in range(0, len(data_bytes), n): + chunks.append(data_bytes[i:i + n]) + if shuffle: + return chunks, dim, rest_size, rest_dim + return chunks + + +def pad(data_bytes): + return bytearray(data_bytes + b'\x00' * (2019 - len(data_bytes))) + + +# Returns codes for the data in data_bytes +def fec_encode(data_bytes, w, h): + encoder = bchlib.BCH(16, prim_poly=17475) + # import galois + # encoder = galois.BCH(16383, 16383-224, d=17, c=224) + chunks = [bytearray(encoder.encode(pad(x))) for x in split_byte_ranges(data_bytes, 2019, w, h, shuffle=True)[0]] + return b''.join(chunks) + + +# Returns the error corrected data and number of corrected errors or None, None if errors are not correctable +def fec_decode(data_bytes, codes, w, h): + encoder = bchlib.BCH(16, prim_poly=17475) + chunks, dim, rest_size, rest_dim = split_byte_ranges(data_bytes, 2019, w, h, shuffle=True) + codes = split_byte_ranges(codes, 28, w, h) + corrected = [] + total_errors = 0 + for i, chunk in enumerate(chunks): + c_len = len(chunk) + chunk = pad(chunk) + code = bytearray(codes[i]) + n_err = encoder.decode(chunk, code) + if n_err > 0: + total_errors += n_err + encoder.correct(chunk, code) + # Ignoring the following case, since it might be caused by corrupted codes which might not even be needed + # if n_err < 0: + # raise ValueError("Too many errors to correct") + corrected.append(chunk[:c_len]) + corrected = b''.join(corrected) + corrected = unshuffle_fn(corrected, w, h, dim, rest_size, rest_dim) + return corrected, total_errors diff --git a/src/novelai_python/tool/image_metadata/lsb_extractor.py b/src/novelai_python/tool/image_metadata/lsb_extractor.py index d48f3f1..be376ed 100644 --- a/src/novelai_python/tool/image_metadata/lsb_extractor.py +++ b/src/novelai_python/tool/image_metadata/lsb_extractor.py @@ -1,12 +1,12 @@ # MIT: https://github.com/NovelAI/novelai-image-metadata/blob/main/nai_meta.py +import gzip +import json from io import BytesIO from pathlib import Path from typing import Union -from PIL import Image import numpy as np -import gzip -import json +from PIL import Image class LSBExtractor(object): @@ -59,42 +59,45 @@ class ImageLsbDataExtractor(object): def __init__(self): self.magic = "stealth_pngcomp" - def extract_data(self, image: Union[str, bytes, Path, BytesIO, np.ndarray]) -> dict: - try: - if isinstance(image, Image.Image): - image = np.array(image) - elif isinstance(image, (str, bytes, Path, BytesIO)): - img = Image.open(image) - image = np.array(img) + def extract_data(self, image: Union[BytesIO, bytes, Path], get_fec: bool = False) -> tuple: + if isinstance(image, (bytes, BytesIO, Path)): + img = Image.open(image) + img.convert("RGBA") + image = np.array(img) + else: + raise TypeError(f"Invalid image type: {type(image)}, only {type(BytesIO)} is supported") + try: if not (image.shape[-1] == 4 and len(image.shape) == 3): - raise AssertionError('image format error') - + raise AssertionError('image format error, maybe image already be modified') reader = LSBExtractor(image) - read_magic = reader.get_next_n_bytes(len(self.magic)).decode("utf-8") if not (self.magic == read_magic): raise AssertionError('magic number mismatch') - read_len = reader.read_32bit_integer() // 8 json_data = reader.get_next_n_bytes(read_len) - json_data = json.loads(gzip.decompress(json_data).decode("utf-8")) if "Comment" in json_data: json_data["Comment"] = json.loads(json_data["Comment"]) - return json_data + + if not get_fec: + return json_data, None + + fec_len = reader.read_32bit_integer() + fec_data = None + if fec_len != 0xffffffff: + fec_data = reader.get_next_n_bytes(fec_len // 8) + + return json_data, fec_data except FileNotFoundError: # 无法找到文件 raise Exception(f"The file {image} does not exist.") - except json.JSONDecodeError as e: # 无法解析JSON数据 raise Exception(f"Failed to decode JSON data from image: {image}. Error: {str(e)}") - except AssertionError as err: # 魔数不匹配 raise Exception(f"Failed to extract data from image: {image}. Error: {str(err)}") - except Exception as e: # 从图像中提取数据时发生意外错误 raise Exception(f"Unexpected error happened when extracting data from image: {image}. Error: {str(e)}")