数字水印系统实现
Published in:2025-07-22 |
Words: 5.1k | Reading time: 23min | reading:

实现流程框架

无论使用哪种技术,一个完整的数字水印系统通常都包括两个基本过程:

  1. 水印嵌入 (Watermark Embedding)

    • 输入:原始载体(如图片A)、水印信息(如字符串"ID:123"或一个Logo图片B)。
    • 过程:通过特定算法,将水印信息B处理后,“叠加”或“融入”到原始载体A中。
    • 输出:一个带有水印、但看起来和原始载体几乎没区别的载体A’。
  2. 水印提取/检测 (Watermark Extraction/Detection)

    • 输入:可能被修改过的带水印载体A’',有时还需要原始载体A或原始水印B作为参考。
    • 过程:通过与嵌入过程相逆的算法,从载体A’'中解码出隐藏的水印信息。
    • 输出:提取出的水印信息B’,或者一个“是/否”的判断(确认是否存在某个特定水印)。

现在,我们来看实现这些流程的具体技术。


一、 空域(Spatial Domain)技术

核心思想:直接修改原始图像的像素值(比如RGB值)来嵌入信息。这种方法直观、简单。

典型技术:LSB (Least Significant Bit) 算法

这是最经典、最简单的空域水印技术。

  • 原理

    • 计算机中的图像,每个像素的颜色由若干个二进制位(bits)表示。例如,一个8位的灰度图像,每个像素的灰度值范围是0-255,由8个二进制位组成。
    • 最低有效位 (LSB) 就是这8个位中最右边的那一位。改变这一位,对像素值的整体影响最小(最多改变1),人眼基本无法察觉。
    • 嵌入方法:将要隐藏的水印信息(比如一张黑白Logo图片)转换成二进制流(0和1的序列)。然后,用这个二进制流去替换原始图像中每个(或部分)像素的LSB。
      • 如果要嵌入1,就把像素的LSB改成1
      • 如果要嵌入0,就把像素的LSB改成0
  • 提取方法:直接读取带水印图像中相应像素的LSB,就能还原出隐藏的二进制流,从而重构水印信息。

  • 优点

    • 实现简单:算法非常直接。
    • 容量大:每个像素都能藏1比特信息,可以隐藏大量数据。
    • 隐蔽性好:对视觉影响极小。
  • 缺点

    • 鲁棒性极差(非常脆弱):这是它的致命弱点。任何对图像的轻微处理,如 JPEG压缩、缩放、裁剪、加个滤镜,都会破坏LSB层的数据,导致水印完全丢失。
  • 应用场景:主要用于脆弱水印,即内容完整性认证。因为只要图片被改动,水印就会被破坏,从而可以证明“此图已被修改”。


二、 变换域(Transform Domain)技术

核心思想:不直接改像素值,而是先对图像进行数学变换(如傅里叶变换、余弦变换),将其从空域转换到另一个域(如频域)。然后,在变换后的域中修改系数来嵌入水印。这是目前主流的、鲁棒水印的实现方式。

为什么要这么做?
因为像JPEG压缩这样的操作,主要丢弃的是图像的高频信息(细节),而保留中低频信息(轮廓、主要能量)。如果在中低频系数里嵌入水印,那么即使经过压缩,水印信息也能大概率被保留下来。

1. 基于DCT(离散余弦变换)的技术

  • 背景:DCT是 JPEG 压缩的核心算法。它将图像分成8x8的小块,然后对每个块进行DCT变换,得到一个频率系数矩阵。这个矩阵中,左上角是低频系数(能量集中),右下角是高频系数。
  • 嵌入方法
    1. 将原始图像分块(如8x8)。
    2. 对每个块进行DCT变换。
    3. 选择中频区域的系数进行修改。为什么是中频?因为低频系数对视觉影响太大,高频系数容易在压缩中丢失,中频是最佳平衡点。
    4. 根据水印信息(0或1)来修改这些中频系数。例如,通过量化或比较两个系数的大小来嵌入1比特信息。
    5. 对修改后的系数矩阵进行逆DCT变换 (IDCT),得到带水印的图像块。
  • 优点
    • 鲁棒性强:对JPEG压缩、轻微噪声、亮度调整等有很好的抵抗能力。
    • 隐蔽性好:利用了人类视觉系统对中频变化不敏感的特性。
  • 应用场景版权保护盗版追踪等需要强鲁棒性的场合。

2. 基于DWT(离散小波变换)的技术

  • 背景:DWT是 JPEG 2000 压缩标准的核心。与DCT不同,DWT能对整个图像进行多分辨率分析,得到不同尺度和方向的子带(低频、水平细节、垂直细节、对角线细节)。
  • 嵌入方法
    1. 对整个图像进行DWT变换。
    2. 水印通常被嵌入到中、低频子带的系数中。
    3. 因为DWT提供了图像的空间和频率局部化信息,所以它对裁剪、缩放等几何攻击的抵抗性更好。
  • 优点
    • 多分辨率特性:鲁棒性比DCT更全面,尤其在抗几何攻击方面。
    • 与人类视觉系统(HVS)匹配更好:可以更精细地控制水印的嵌入强度,达到更好的隐蔽性。
  • 应用场景:同DCT,用于需要更高鲁棒性的版权保护系统。

3. 基于DFT(离散傅里叶变换)的技术

  • 背景:DFT将图像转换成幅度和相位谱。
  • 特点:图像的幅度谱对旋转、缩放和平移(RST攻击) 具有不变性或特定的变化规律。
  • 嵌入方法:通过修改DFT变换后的幅度谱来嵌入水印。
  • 优点
    • 对旋转、缩放等几何攻击具有天然的鲁棒性。
  • 缺点
    • 实现复杂,且容易出现图像块效应。
  • 应用场景:主要用于抵抗几何变换的特定场合。

实现

核心的Python库:

  • Pillow (PIL Fork): 用于图像的读写和基本的像素操作(实现LSB)。
  • NumPy: 用于高效的数组和矩阵运算,是所有图像处理的基础。
  • PyWavelets (pywt): 用于实现DWT(离散小波变换)。
  • OpenCV-Python (cv2): 功能最强大的计算机视觉库,可以方便地实现DCT、DFT以及各种图像处理操作。

1. 空域 (Spatial Domain) - LSB 算法

核心库: Pillow (PIL), NumPy

这个算法不依赖复杂的数学库,Pillow和NumPy足以胜任。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from PIL import Image
import numpy as np

def encode_lsb(image_path, secret_message):
"""将秘密信息嵌入到图片的LSB层"""
img = Image.open(image_path, 'r')
width, height = img.size
img_array = np.array(img)

# 1. 将秘密信息转换为二进制流
binary_secret = ''.join([format(ord(c), '08b') for c in secret_message])
binary_secret += '1111111111111110' # 添加结束标记

if len(binary_secret) > width * height * 3:
raise ValueError("信息过长,无法嵌入到图片中!")

data_index = 0
# 2. 遍历像素并修改LSB
for y in range(height):
for x in range(width):
pixel = img_array[y, x]
# 遍历RGB三个通道
for i in range(3):
if data_index < len(binary_secret):
# 清除LSB并设置新的位
pixel[i] = (pixel[i] & 0xFE) | int(binary_secret[data_index])
data_index += 1
else:
break
if data_index >= len(binary_secret):
break
if data_index >= len(binary_secret):
break

# 3. 创建并保存新图片
encoded_image = Image.fromarray(img_array)
encoded_image.save("encoded_image.png", "PNG")
print("信息嵌入成功,已保存为 encoded_image.png")

def decode_lsb(image_path):
"""从图片的LSB层提取秘密信息"""
img = Image.open(image_path, 'r')
img_array = np.array(img)
width, height = img.size

binary_data = ""
# 1. 遍历像素并提取LSB
for y in range(height):
for x in range(width):
pixel = img_array[y, x]
for i in range(3):
binary_data += str(pixel[i] & 1)

# 2. 找到结束标记并转换信息
end_marker = '1111111111111110'
end_index = binary_data.find(end_marker)
if end_index != -1:
secret_binary = binary_data[:end_index]
secret_message = ""
for i in range(0, len(secret_binary), 8):
byte = secret_binary[i:i+8]
secret_message += chr(int(byte, 2))
return secret_message
else:
return "未找到结束标记或信息。"

# --- 使用示例 ---
# 请准备一张名为 'original.png' 的图片
# encode_lsb('original.png', "This is a secret message!")
# message = decode_lsb('encoded_image.png')
# print(f"提取到的信息: {message}")

2. 变换域 (Transform Domain) - DCT 算法

核心库: OpenCV-Python (cv2), NumPy

OpenCV 提供了非常方便的 cv2.dct()cv2.idct() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import cv2
import numpy as np

def dct_watermark_embed(image_path, watermark_path, alpha=0.1):
"""基于DCT的中频带嵌入图像水印"""
# 1. 准备载体和水印
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
watermark = cv2.imread(watermark_path, cv2.IMREAD_GRAYSCALE)

# 确保水印尺寸小于载体
watermark = cv2.resize(watermark, (img.shape[1], img.shape[0]))

# 2. 对载体图像进行DCT变换
img_dct = cv2.dct(np.float32(img))

# 3. 对水印图像进行DCT变换 (也可以直接用二值化水印)
watermark_dct = cv2.dct(np.float32(watermark))

# 4. 将水印的DCT系数加到载体的中频DCT系数上
# 这里简单地将整个水印DCT加到载体DCT上,更复杂的会选择特定区域
# alpha 是嵌入强度
result_dct = img_dct + alpha * watermark_dct

# 5. 进行逆DCT变换
result_img = cv2.idct(result_dct)

# 裁剪值到 0-255 并转换为uint8
result_img = np.clip(result_img, 0, 255).astype(np.uint8)

cv2.imwrite('dct_embedded_image.png', result_img)
print("DCT水印嵌入成功!")

def dct_watermark_extract(embedded_image_path, original_image_path, alpha=0.1):
"""提取DCT水印(需要原始图像)"""
# 这是一个非盲水印提取的例子
embedded_img = cv2.imread(embedded_image_path, cv2.IMREAD_GRAYSCALE)
original_img = cv2.imread(original_image_path, cv2.IMREAD_GRAYSCALE)

# 1. 对带水印图像和原始图像分别做DCT
embedded_dct = cv2.dct(np.float32(embedded_img))
original_dct = cv2.dct(np.float32(original_img))

# 2. 相减并除以强度因子,得到水印的DCT系数
extracted_watermark_dct = (embedded_dct - original_dct) / alpha

# 3. 逆DCT变换,恢复水印
extracted_watermark = cv2.idct(extracted_watermark_dct)
extracted_watermark = np.clip(extracted_watermark, 0, 255).astype(np.uint8)

cv2.imwrite('dct_extracted_watermark.png', extracted_watermark)
print("DCT水印提取成功!")

# --- 使用示例 ---
# 请准备 'original.png' 和 'watermark_logo.png'
# dct_watermark_embed('original.png', 'watermark_logo.png', alpha=0.1)
# dct_watermark_extract('dct_embedded_image.png', 'original.png', alpha=0.1)

注意: 上述DCT示例是一个非盲水印,提取时需要原始图像。实现盲水印(无需原图)会更复杂,通常涉及在中频带选择特定的系数对进行比较,而不是直接叠加。


3. 变换域 (Transform Domain) - DWT 算法

核心库: PyWavelets (pywt), NumPy, OpenCV-Python (cv2)

pywt 是Python中进行小波变换的标准库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import numpy as np
import pywt
import cv2

def dwt_watermark_embed(image_path, watermark_path, alpha=0.1):
"""基于DWT的LL子带嵌入图像水印"""
# 1. 准备载体和水印
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
watermark = cv2.imread(watermark_path, cv2.IMREAD_GRAYSCALE)

# 2. 对载体图像进行DWT变换
# 'haar' 是最简单的小波基
coeffs_img = pywt.dwt2(img, 'haar')
LL, (LH, HL, HH) = coeffs_img

# 确保水印尺寸与LL子带匹配
watermark_resized = cv2.resize(watermark, (LL.shape[1], LL.shape[0]))

# 3. 将水印添加到低频子带 (LL)
# LL子带能量最集中,鲁棒性最强,但对画质影响也最大
LL_watermarked = LL + alpha * watermark_resized

# 4. 重构图像
coeffs_watermarked = (LL_watermarked, (LH, HL, HH))
img_watermarked = pywt.idwt2(coeffs_watermarked, 'haar')

img_watermarked = np.clip(img_watermarked, 0, 255).astype(np.uint8)
cv2.imwrite('dwt_embedded_image.png', img_watermarked)
print("DWT水印嵌入成功!")

def dwt_watermark_extract(embedded_image_path, original_image_path, alpha=0.1):
"""提取DWT水印(需要原始图像)"""
# 同样是非盲水印提取
embedded_img = cv2.imread(embedded_image_path, cv2.IMREAD_GRAYSCALE)
original_img = cv2.imread(original_image_path, cv2.IMREAD_GRAYSCALE)

# 1. 对两张图分别做DWT
coeffs_embedded = pywt.dwt2(embedded_img, 'haar')
LL_embedded, _ = coeffs_embedded

coeffs_original = pywt.dwt2(original_img, 'haar')
LL_original, _ = coeffs_original

# 2. 提取水印
extracted_watermark = (LL_embedded - LL_original) / alpha

extracted_watermark = np.clip(extracted_watermark, 0, 255).astype(np.uint8)
cv2.imwrite('dwt_extracted_watermark.png', extracted_watermark)
print("DWT水印提取成功!")


# --- 使用示例 ---
# 请准备 'original.png' 和 'watermark_logo.png'
# dwt_watermark_embed('original.png', 'watermark_logo.png', alpha=0.2)
# dwt_watermark_extract('dwt_embedded_image.png', 'original.png', alpha=0.2)

4. 变换域 (Transform Domain) - DFT 算法

核心库: NumPy, OpenCV-Python (cv2)

DFT的实现与DCT类似,但通常用于抵抗几何攻击。这里提供一个基础的嵌入思路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import cv2
import numpy as np

def dft_watermark_embed(image_path, watermark_path, alpha=100):
"""基于DFT幅度谱嵌入水印"""
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
watermark = cv2.imread(watermark_path, cv2.IMREAD_GRAYSCALE)
watermark = cv2.resize(watermark, (img.shape[1], img.shape[0]))

# 1. DFT变换
# OpenCV的dft需要输入float32,并会输出双通道复数结果
dft = cv2.dft(np.float32(img), flags=cv2.DFT_COMPLEX_OUTPUT)

# 2. 将频谱中心移到图像中央,便于观察和操作
dft_shift = np.fft.fftshift(dft)

# 3. 计算幅度和相位谱
magnitude_spectrum = cv2.magnitude(dft_shift[:,:,0], dft_shift[:,:,1])
phase_spectrum = cv2.phase(dft_shift[:,:,0], dft_shift[:,:,1])

# 4. 在幅度谱上添加水印
magnitude_spectrum_watermarked = magnitude_spectrum + alpha * watermark

# 5. 根据新的幅度和旧的相位,重构DFT结果
real_part = magnitude_spectrum_watermarked * np.cos(phase_spectrum)
imag_part = magnitude_spectrum_watermarked * np.sin(phase_spectrum)
dft_shift_watermarked = cv2.merge([real_part, imag_part])

# 6. 将频谱中心移回左上角
dft_watermarked = np.fft.ifftshift(dft_shift_watermarked)

# 7. 逆DFT变换
img_back = cv2.idft(dft_watermarked)

# 获取实部并恢复图像
img_back = cv2.magnitude(img_back[:,:,0], img_back[:,:,1])

img_back = np.clip(img_back, 0, 255).astype(np.uint8)
cv2.imwrite('dft_embedded_image.png', img_back)
print("DFT水印嵌入成功!")

# --- 使用示例 ---
# 请准备 'original.png' 和 'watermark_logo.png'
# dft_watermark_embed('original.png', 'watermark_logo.png', alpha=100)
# 提取DFT水印通常也需要原始图像,过程与DCT/DWT类似。

安装依赖库

  • pip安装:
1
pip install pillow numpy opencv-python pywavelets

demo

水印信息嵌入(LSB)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@logger.catch
def _message_to_binary(message: str) -> str:
"""Converts a string message into its binary representation."""
binary_message = ''.join(format(ord(char), '08b') for char in message)
return binary_message

@logger.catch
def _binary_to_message(binary_message: str) -> str:
"""Converts a binary string back into a string message."""
# Ensure the binary string is a multiple of 8
if len(binary_message) % 8 != 0:
logger.warning("Binary string length is not a multiple of 8; data may be incomplete.")
binary_message = binary_message[:-(len(binary_message) % 8)]

message = ""
for i in range(0, len(binary_message), 8):
byte = binary_message[i:i + 8]
if len(byte) == 8:
message += chr(int(byte, 2))
return message

@logger.catch
def encode_lsb(input_image_path: str, secret_message: str, output_image_path: str):
"""
Encodes a secret message into an image using the LSB (Least Significant Bit) technique.
The output image will be saved in a lossless format (PNG) to preserve the watermark.

Args:
input_image_path (str): The path to the source image.
secret_message (str): The message to hide.
output_image_path (str): The path to save the watermarked image.
"""
try:
logger.info(f"Starting LSB encoding for '{input_image_path}'...")
image = Image.open(input_image_path, 'r').convert("RGBA") # Convert to RGBA for consistency
width, height = image.size
img_array = np.array(list(image.getdata()))

channels = 4 # We are using RGBA

# Add a unique delimiter to know where the message ends
binary_secret_message = _message_to_binary(secret_message + "####")
required_pixels = len(binary_secret_message)

if required_pixels > width * height * channels:
logger.error("Error: Message is too long to be encoded in this image.")
return

# Modify the LSB of the pixel data
data_index = 0
flat_array = img_array.flatten()

for i in range(len(flat_array)):
if data_index < required_pixels:
# Change the LSB of the color value
flat_array[i] = int(bin(flat_array[i])[2:-1] + binary_secret_message[data_index], 2)
data_index += 1
else:
break # Stop once the message is encoded

# Create a new image with the modified pixel data
encoded_image = Image.fromarray(flat_array.reshape(height, width, channels).astype('uint8'), image.mode)

# Save as PNG to ensure lossless storage of the watermark
encoded_image.save(output_image_path, "PNG")
logger.info(f"Successfully encoded message into '{output_image_path}'.")

except FileNotFoundError:
logger.error(f"Error: Input image not found at '{input_image_path}'.")
except Exception as e:
logger.exception(f"An unexpected error occurred during encoding: {e}")

@logger.catch
def decode_lsb(encoded_image_path: str) -> str | None:
"""
Decodes a secret message from an image using the LSB technique.

Args:
encoded_image_path (str): The path to the watermarked image.

Returns:
The decoded secret message, or None if an error occurs or no message is found.
"""
try:
logger.info(f"Starting LSB decoding for '{encoded_image_path}'...")
image = Image.open(encoded_image_path, 'r')
img_array = np.array(list(image.getdata()))

binary_data = ""
for pixel in img_array:
for value in pixel:
binary_data += bin(value)[-1]

# Find the delimiter by decoding byte by byte
decoded_message = ""
for i in range(0, len(binary_data), 8):
byte = binary_data[i:i+8]
if len(byte) < 8:
break # End of data
decoded_message += chr(int(byte, 2))
if decoded_message.endswith("####"):
logger.info("Successfully decoded the message.")
return decoded_message[:-4] # Return message without the delimiter

logger.warning("Could not find the end-of-message delimiter in the image.")
return None

except FileNotFoundError:
logger.error(f"Error: Encoded image not found at '{encoded_image_path}'.")
return None
except Exception as e:
logger.exception(f"An unexpected error occurred during decoding: {e}")
return None

水印信息验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@logger.catch
def valid_img_msg(image_path):
"""
Command-line tool to decode an invisible watermark from an image file.
"""
image_path = Path(image_path)

if not image_path.is_file():
logger.debug(f"Error: File not found at '{image_path}'")
return

logger.debug(f"Verifying '{image_path.name}'...")
decoded_message = decode_lsb(str(image_path))

if decoded_message:
logger.debug("\n--- ✅ Watermark Found! ---")
# Pretty logger.debug the decoded information
for item in decoded_message.split('|'):
if ':' in item:
key, value = item.split(':', 1)
logger.debug(f" - {key.strip():<15}: {value.strip()}")
else:
logger.debug(f" - {item}")
logger.debug("--------------------------\n")
else:
logger.debug("\n--- ❌ No Watermark Found ---")
logger.debug("No hidden message could be decoded from this image.")
logger.debug("--------------------------\n")
return decoded_message

水印信息嵌入(EXIF)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import piexif
from PIL import Image
from loguru import logger
from pathlib import Path
from typing import Optional

# 定义 UserComment 的标准前缀,UNDEFINED 表示未定义编码,允许我们使用 UTF-8。
# 这是符合 EXIF 规范的推荐做法。
USER_COMMENT_PREFIX = b'UNDEFINED\x00'

@logger.catch
def encode_exif(input_image_path: str, secret_message: str, output_image_path: str) -> bool:
"""
将秘密信息作为隐形水印编码到图片的EXIF元数据中。

Args:
input_image_path: 输入图片的路径。
secret_message: 要隐藏的信息。
output_image_path: 保存带水印图片的路径。

Returns:
如果编码成功,返回 True,否则返回 False。
"""
try:
logger.info(f"正在为 '{Path(input_image_path).name}' 添加EXIF水印...")

# 打开图片
image = Image.open(input_image_path)

# 复制原始图片信息,以便在保存时保留质量、DPI等设置
image_info = image.info.copy()

# 检查图片是否已有EXIF数据,如果没有则创建一个空的
exif_dict = {}
if "exif" in image_info and image_info['exif']:
exif_dict = piexif.load(image_info["exif"])

# [FIXED] 确保 'Exif' 子字典存在,防止 KeyError
if piexif.ImageIFD.ExifPtr not in exif_dict:
exif_dict['Exif'] = {}

# [FIXED] 将秘密信息编码并添加标准前缀,然后放入 UserComment 字段
# 这是存储任意用户数据的标准字段
user_comment_payload = USER_COMMENT_PREFIX + secret_message.encode('utf-8')
exif_dict['Exif'][piexif.ExifIFD.UserComment] = user_comment_payload

# 将更新后的EXIF字典转换为字节流
try:
exif_bytes = piexif.dump(exif_dict)
except Exception as e:
# 有时 EXIF 数据可能包含 piexif 不支持的格式
logger.error(f"无法序列化 EXIF 数据: {e}")
# 尝试移除可能导致问题的缩略图数据
if 'thumbnail' in exif_dict:
del exif_dict['thumbnail']
exif_bytes = piexif.dump(exif_dict)
else:
raise

# [FIXED] 保存图片,并附上新的EXIF数据
# 对于 JPEG,传递原始 info 字典以保留质量等设置
# 对于 PNG 等其他格式,直接使用 exif 参数
if image.format == 'JPEG':
image.save(output_image_path, "jpeg", exif=exif_bytes, **image_info)
else:
image.save(output_image_path, exif=exif_bytes)

logger.success(f"成功将EXIF水印写入 '{Path(output_image_path).name}'.")
return True

except FileNotFoundError:
logger.error(f"输入文件未找到: {input_image_path}")
return False
except Exception as e:
logger.exception(f"添加EXIF水印时发生错误: {e}")
return False

@logger.catch
def decode_exif(encoded_image_path: str) -> Optional[str]:
"""
从图片的EXIF元数据中解码出隐藏的秘密信息。

Args:
encoded_image_path: 带水印图片的路径。

Returns:
解码后的信息字符串,如果未找到或发生错误则返回 None。
"""
try:
logger.info(f"正在从 '{Path(encoded_image_path).name}' 解码EXIF水印...")

image = Image.open(encoded_image_path)

# 检查图片是否有EXIF数据
if "exif" not in image.info or not image.info['exif']:
logger.warning("图片中未找到EXIF数据。")
return None

# 加载EXIF数据
exif_dict = piexif.load(image.info["exif"])

# [FIXED] 安全地从 UserComment 字段读取信息,防止 KeyError
exif_ifd = exif_dict.get('Exif', {})
user_comment_bytes = exif_ifd.get(piexif.ExifIFD.UserComment)

if user_comment_bytes:
# [FIXED] 检查是否存在我们定义的前缀,并剥离它
if user_comment_bytes.startswith(USER_COMMENT_PREFIX):
message_bytes = user_comment_bytes[len(USER_COMMENT_PREFIX):]
# 将字节解码回字符串
decoded_message = message_bytes.decode('utf-8', errors='ignore')
logger.success("成功解码EXIF水印。")
return decoded_message
else:
logger.warning("在'UserComment'字段中找到数据,但不包含预期的水印前缀。")
return None
else:
logger.warning("在EXIF中未找到'UserComment'水印字段。")
return None

except FileNotFoundError:
logger.error(f"文件未找到: {encoded_image_path}")
return None
except Exception as e:
logger.exception(f"解码EXIF水印时发生错误: {e}")
return None

技术总结对比

技术类别 核心算法 主要优点 主要缺点 典型应用
空域 (Spatial Domain) LSB 实现简单、容量大 鲁棒性极差,一碰就坏 完整性认证 (脆弱水印)
变换域 (Transform Domain) DCT 对压缩鲁棒、隐蔽性好 对几何攻击抵抗力一般 版权保护 (鲁棒水印)
变换域 (Transform Domain) DWT 综合鲁棒性强,尤其对裁剪等 算法比DCT复杂 版权保护 (鲁棒水印)
变换域 (Transform Domain) DFT 对旋转/缩放鲁棒 实现复杂,可能影响画质 抵抗几何攻击的特殊应用

简单来说:

  • 想做防伪标签,一改就失效,用 LSB
  • 想做版权印章,不怕压缩和基本处理,用 DCTDWT。它们是目前最实用和最主流的鲁棒水印技术。
Next:
Docker 部署 Tailscale