import cv2 import numpy as np from PIL import Image, PngImagePlugin, ImageDraw import json from datetime import datetime from cryptography.fernet import Fernet import base64 import hashlib class WatermarkProcessor: def __init__(self, encryption_key=None): """Initialize with optional encryption key""" if encryption_key: self.fernet = Fernet(encryption_key) else: key = Fernet.generate_key() self.fernet = Fernet(key) def to_bin(self, data): """Convert data to binary format as string""" if isinstance(data, str): return ''.join(format(ord(char), '08b') for char in data) elif isinstance(data, bytes): return ''.join(format(x, '08b') for x in data) elif isinstance(data, np.ndarray): return [format(i, "08b") for i in data] elif isinstance(data, int) or isinstance(data, np.uint8): return format(data, "08b") else: raise TypeError("Type not supported.") def create_preview(self, image_path, watermark_text, opacity=0.3): """Create a preview of watermark on image""" try: image = Image.open(image_path) txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) draw = ImageDraw.Draw(txt_layer) # Calculate text position text_width = draw.textlength(watermark_text) text_x = (image.width - text_width) // 2 text_y = image.height // 2 # Add watermark text draw.text((text_x, text_y), watermark_text, fill=(255, 255, 255, int(255 * opacity))) # Combine layers preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) return preview except Exception as e: return None def png_encode(self, im_name, extra): """Encode watermark using PNG metadata""" try: im = Image.open(im_name) info = PngImagePlugin.PngInfo() info.add_text("TXT", extra) im.save("test.png", pnginfo=info) return "test.png", "Watermark added successfully" except Exception as e: return im_name, f"Error adding watermark: {str(e)}" def encode(self, image_path, watermark_text, metadata=None): """Encode watermark using simple LSB steganography with header. 헤더(32비트)는 watermark 데이터(JSON 문자열)의 길이(문자수)를 이진 문자열로 저장합니다. """ try: image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") # Prepare watermark data data = { 'text': watermark_text, 'timestamp': datetime.now().isoformat(), 'metadata': metadata or {} } # Convert data to string (UTF-8) json_str = json.dumps(data, ensure_ascii=False) # 헤더: 32비트에 데이터 길이(문자수)를 저장 data_length = len(json_str) header = format(data_length, '032b') # 본문: 각 문자를 8비트 이진수로 변환 body = ''.join(format(ord(char), '08b') for char in json_str) binary_data = header + body # Check capacity if len(binary_data) > image.shape[0] * image.shape[1] * 3: return image_path, "Error: Image too small for watermark data" # Embed data into LSB of each pixel data_index = 0 for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(3): if data_index < len(binary_data): pixel = int(image[i, j, k]) # Clear the LSB pixel = pixel & 0xFE # Set the LSB according to our data bit pixel = pixel | (int(binary_data[data_index]) & 1) image[i, j, k] = np.uint8(pixel) data_index += 1 else: break if data_index >= len(binary_data): break if data_index >= len(binary_data): break # Save result image (PNG: lossless) output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" cv2.imwrite(output_path, image) return output_path, "Watermark added successfully" except Exception as e: return image_path, f"Error in encoding: {str(e)}" def decode(self, image_path): """Decode watermark using simple LSB steganography with header. 먼저 32비트(헤더)만큼 읽어 데이터 길이(문자수)를 구한 후, 그 길이에 해당하는 본문 비트만 읽어 문자열로 복원합니다. """ try: # PNG 메타데이터 먼저 확인 try: im = Image.open(image_path) if "TXT" in im.info: return im.info["TXT"] except: pass image = cv2.imread(image_path) if image is None: raise ValueError("Could not read image file") bits = [] total_needed = None count = 0 # 픽셀 순회: 필요한 비트 수만큼 읽어오기 for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(3): bits.append(str(image[i, j, k] & 1)) count += 1 # 헤더 32비트를 모두 읽은 경우 if count == 32 and total_needed is None: header = ''.join(bits[:32]) data_length = int(header, 2) total_needed = 32 + data_length * 8 if total_needed is not None and count >= total_needed: break if total_needed is not None and count >= total_needed: break if total_needed is not None and count >= total_needed: break if total_needed is None: return "Error: Not enough data to read header" binary_data = ''.join(bits[:total_needed]) header = binary_data[:32] data_length = int(header, 2) total_bits = data_length * 8 message_bits = binary_data[32:32 + total_bits] text = '' for i in range(0, len(message_bits), 8): byte = message_bits[i:i+8] text += chr(int(byte, 2)) try: data = json.loads(text) return json.dumps(data, ensure_ascii=False, indent=2) except json.JSONDecodeError: return text except Exception as e: return f"Error in decoding: {str(e)}" def analyze_quality(self, original_path, watermarked_path): """Analyze watermark quality""" try: original = cv2.imread(original_path) watermarked = cv2.imread(watermarked_path) if original is None or watermarked is None: raise ValueError("Could not read image files") # Calculate PSNR mse = np.mean((original - watermarked) ** 2) if mse == 0: psnr = float('inf') else: psnr = 20 * np.log10(255.0 / np.sqrt(mse)) # Calculate histogram similarity hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) # Count modified pixels diff = cv2.bitwise_xor(original, watermarked) modified_pixels = np.count_nonzero(diff) report = { 'psnr': round(psnr, 2), 'histogram_similarity': round(hist_correlation, 4), 'modified_pixels': modified_pixels, 'image_size': original.shape, 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 } return json.dumps(report, indent=2) except Exception as e: return f"Error in quality analysis: {str(e)}"