Spaces:
Sleeping
Sleeping
# utils.py | |
import cv2 | |
import numpy as np | |
from PIL import Image, PngImagePlugin, ImageDraw | |
import json | |
from datetime import datetime | |
from cryptography.fernet import Fernet | |
import base64 | |
import hashlib | |
class WatermarkProcessor: | |
def __init__(self, encryption_key=None): | |
"""Initialize with optional encryption key""" | |
if encryption_key: | |
self.fernet = Fernet(encryption_key) | |
else: | |
key = Fernet.generate_key() | |
self.fernet = Fernet(key) | |
def to_bin(self, data): | |
"""Convert data to binary format as string""" | |
if isinstance(data, str): | |
return ''.join(format(ord(char), '08b') for char in data) | |
elif isinstance(data, bytes): | |
return ''.join(format(x, '08b') for x in data) | |
elif isinstance(data, np.ndarray): | |
return [format(i, "08b") for i in data] | |
elif isinstance(data, int) or isinstance(data, np.uint8): | |
return format(data, "08b") | |
else: | |
raise TypeError("Type not supported.") | |
def create_preview(self, image_path, watermark_text, opacity=0.3): | |
"""Create a preview of watermark on image""" | |
try: | |
image = Image.open(image_path) | |
txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) | |
draw = ImageDraw.Draw(txt_layer) | |
# Calculate text position | |
text_width = draw.textlength(watermark_text) | |
text_x = (image.width - text_width) // 2 | |
text_y = image.height // 2 | |
# Add watermark text | |
draw.text((text_x, text_y), watermark_text, | |
fill=(255, 255, 255, int(255 * opacity))) | |
# Combine layers | |
preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) | |
return preview | |
except Exception as e: | |
return None | |
def png_encode(self, im_name, extra): | |
"""Encode watermark using PNG metadata""" | |
try: | |
im = Image.open(im_name) | |
info = PngImagePlugin.PngInfo() | |
info.add_text("TXT", extra) | |
im.save("test.png", pnginfo=info) | |
return "test.png", "Watermark added successfully" | |
except Exception as e: | |
return im_name, f"Error adding watermark: {str(e)}" | |
def encode(self, image_path, watermark_text, metadata=None): | |
"""Encode watermark using simple LSB steganography""" | |
try: | |
image = cv2.imread(image_path) | |
if image is None: | |
raise ValueError("Could not read image file") | |
# Prepare watermark data | |
data = { | |
'text': watermark_text, | |
'timestamp': datetime.now().isoformat(), | |
'metadata': metadata or {} | |
} | |
# Convert data to string with end marker | |
json_str = json.dumps(data, ensure_ascii=False) | |
secret_data = json_str + "#####" | |
# Convert string to binary (using utf-8 encoding) | |
binary_data = ''.join(format(ord(char), '08b') for char in secret_data) | |
# Check capacity | |
if len(binary_data) > image.shape[0] * image.shape[1] * 3: | |
return image_path, "Error: Image too small for watermark data" | |
# Embed data | |
data_index = 0 | |
for i in range(image.shape[0]): | |
for j in range(image.shape[1]): | |
for k in range(3): | |
if data_index < len(binary_data): | |
pixel = int(image[i, j, k]) | |
# Clear the LSB | |
pixel = pixel & 0xFE # Clear last bit | |
# Set the LSB according to our data | |
pixel = pixel | (int(binary_data[data_index]) & 1) | |
image[i, j, k] = np.uint8(pixel) | |
data_index += 1 | |
else: | |
break | |
if data_index >= len(binary_data): | |
break | |
if data_index >= len(binary_data): | |
break | |
# Save result | |
output_path = f"watermarked_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png" | |
cv2.imwrite(output_path, image) | |
return output_path, "Watermark added successfully" | |
except Exception as e: | |
return image_path, f"Error in encoding: {str(e)}" | |
def decode(self, image_path): | |
"""Decode watermark using simple LSB steganography""" | |
try: | |
# Try PNG metadata first | |
try: | |
im = Image.open(image_path) | |
if "TXT" in im.info: | |
return im.info["TXT"] | |
except: | |
pass | |
image = cv2.imread(image_path) | |
if image is None: | |
raise ValueError("Could not read image file") | |
# Extract binary data | |
binary_data = '' | |
for i in range(image.shape[0]): | |
for j in range(image.shape[1]): | |
for k in range(3): | |
binary_data += str(image[i, j, k] & 1) | |
# Convert binary to text | |
text = '' | |
for i in range(0, len(binary_data), 8): | |
if i + 8 <= len(binary_data): | |
byte = binary_data[i:i+8] | |
text += chr(int(byte, 2)) | |
# Check for end marker | |
if "#####" in text: | |
# Extract content before marker | |
text = text.split("#####")[0] | |
try: | |
# Parse JSON | |
data = json.loads(text) | |
return json.dumps(data, ensure_ascii=False, indent=2) | |
except json.JSONDecodeError: | |
return text | |
break | |
return "Error: No valid watermark found" | |
except Exception as e: | |
return f"Error in decoding: {str(e)}" | |
def analyze_quality(self, original_path, watermarked_path): | |
"""Analyze watermark quality""" | |
try: | |
original = cv2.imread(original_path) | |
watermarked = cv2.imread(watermarked_path) | |
if original is None or watermarked is None: | |
raise ValueError("Could not read image files") | |
# Calculate PSNR | |
mse = np.mean((original - watermarked) ** 2) | |
if mse == 0: | |
psnr = float('inf') | |
else: | |
psnr = 20 * np.log10(255.0 / np.sqrt(mse)) | |
# Calculate histogram similarity | |
hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) | |
hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) | |
hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) | |
# Count modified pixels | |
diff = cv2.bitwise_xor(original, watermarked) | |
modified_pixels = np.count_nonzero(diff) | |
report = { | |
'psnr': round(psnr, 2), | |
'histogram_similarity': round(hist_correlation, 4), | |
'modified_pixels': modified_pixels, | |
'image_size': original.shape, | |
'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 | |
} | |
return json.dumps(report, indent=2) | |
except Exception as e: | |
return f"Error in quality analysis: {str(e)}" |