217 lines
8.3 KiB
Python
217 lines
8.3 KiB
Python
import os
|
||
import shutil
|
||
import time
|
||
import ctypes
|
||
import imagehash
|
||
from PIL import Image, UnidentifiedImageError
|
||
|
||
# -------------------------- 配置参数 (按需修改) --------------------------
|
||
SOURCE_FOLDER = r"D:\img" # 原始图片库目录
|
||
TARGET_FOLDER = r"D:\img_todo\all_img_result" # 去重后图片输出目录
|
||
SUPPORTED_FORMATS = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.gif') # 支持的图片格式
|
||
MIN_PIXEL_COUNT = 1000 # 最小像素数阈值(宽或高小于此值的图片将被过滤)
|
||
RETRY_TIMES = 5 # 文件操作失败重试次数
|
||
RETRY_DELAY = 2 # 重试间隔(秒)
|
||
HASH_SIZE = 8 # 哈希计算尺寸(8-16之间选择,影响去重精度)
|
||
|
||
|
||
# -------------------------- 系统级辅助函数 --------------------------
|
||
def release_file_cache():
|
||
"""Windows系统-释放文件缓存/句柄,非Windows系统无影响"""
|
||
try:
|
||
if os.name == "nt":
|
||
ctypes.windll.kernel32.SetErrorMode(0x0001)
|
||
ctypes.windll.psapi.EmptyWorkingSet(ctypes.windll.kernel32.GetCurrentProcess())
|
||
except Exception as e:
|
||
print(f"⚠️ 释放文件缓存失败: {e}")
|
||
|
||
|
||
def remove_readonly_attr(file_path):
|
||
"""跨平台移除文件只读属性"""
|
||
if os.path.exists(file_path):
|
||
try:
|
||
if os.name == "nt":
|
||
ctypes.windll.kernel32.SetFileAttributesW(file_path, 128) # Windows移除只读
|
||
else:
|
||
os.chmod(file_path, 0o777) # 类Unix系统
|
||
except Exception as e:
|
||
print(f"⚠️ 移除只读属性失败 {file_path}: {e}")
|
||
|
||
|
||
# -------------------------- 核心图片处理函数 --------------------------
|
||
def get_image_info(img_path):
|
||
"""获取图片的哈希值和尺寸信息"""
|
||
img = None
|
||
try:
|
||
img = Image.open(img_path)
|
||
# 计算感知哈希值
|
||
phash = str(imagehash.phash(img.convert('L'), hash_size=HASH_SIZE))
|
||
# 获取图片尺寸
|
||
width, height = img.size
|
||
return phash, (width, height)
|
||
except UnidentifiedImageError:
|
||
print(f"❌ 无法识别图片: {os.path.basename(img_path)}")
|
||
return None, None
|
||
except Exception as e:
|
||
print(f"❌ 处理图片失败 {os.path.basename(img_path)}: {e}")
|
||
return None, None
|
||
finally:
|
||
if img:
|
||
img.close()
|
||
del img
|
||
release_file_cache()
|
||
|
||
|
||
def safe_copy_file(src_path, dst_path):
|
||
"""安全复制文件,处理权限和重试逻辑"""
|
||
remove_readonly_attr(src_path)
|
||
|
||
# 复制文件(带重试)
|
||
for retry in range(RETRY_TIMES):
|
||
try:
|
||
shutil.copy2(src_path, dst_path)
|
||
return True
|
||
except Exception as e:
|
||
if retry < RETRY_TIMES - 1:
|
||
print(f"⚠️ 复制失败 {os.path.basename(src_path)},重试中... ({retry+1}/{RETRY_TIMES})")
|
||
time.sleep(RETRY_DELAY)
|
||
else:
|
||
print(f"❌ 复制失败 {os.path.basename(src_path)}: {e}")
|
||
return False
|
||
|
||
|
||
def delete_source_file(src_path):
|
||
"""安全删除源文件,处理权限和重试逻辑"""
|
||
remove_readonly_attr(src_path)
|
||
|
||
# 删除文件(带重试)
|
||
for retry in range(RETRY_TIMES):
|
||
try:
|
||
release_file_cache()
|
||
os.remove(src_path)
|
||
return True
|
||
except PermissionError:
|
||
if retry < RETRY_TIMES - 1:
|
||
print(f"⚠️ 删除失败 {os.path.basename(src_path)},文件被占用,重试中... ({retry+1}/{RETRY_TIMES})")
|
||
time.sleep(RETRY_DELAY)
|
||
else:
|
||
print(f"❌ 删除失败 {os.path.basename(src_path)}: 文件被占用")
|
||
with open(os.path.join(TARGET_FOLDER, "delete_failed.txt"), "a", encoding="utf-8") as f:
|
||
f.write(f"{src_path}\n")
|
||
return False
|
||
except Exception as e:
|
||
if retry < RETRY_TIMES - 1:
|
||
print(f"⚠️ 删除失败 {os.path.basename(src_path)},重试中... ({retry+1}/{RETRY_TIMES})")
|
||
time.sleep(RETRY_DELAY)
|
||
else:
|
||
print(f"❌ 删除失败 {os.path.basename(src_path)}: {e}")
|
||
with open(os.path.join(TARGET_FOLDER, "delete_failed.txt"), "a", encoding="utf-8") as f:
|
||
f.write(f"{src_path}\n")
|
||
return False
|
||
|
||
|
||
# -------------------------- 主函数 --------------------------
|
||
def main():
|
||
# 创建目标文件夹
|
||
os.makedirs(TARGET_FOLDER, exist_ok=True)
|
||
|
||
# 初始化统计变量
|
||
total_files = 0
|
||
processed_files = 0
|
||
duplicate_files = 0
|
||
filtered_files = 0
|
||
moved_files = 0
|
||
error_files = 0
|
||
|
||
# 存储已处理的图片哈希值
|
||
processed_hashes = set()
|
||
|
||
print("=" * 60)
|
||
print("📁 图片去重工具")
|
||
print(f"🔍 源文件夹: {SOURCE_FOLDER}")
|
||
print(f"📂 目标文件夹: {TARGET_FOLDER}")
|
||
print(f"🎯 支持格式: {SUPPORTED_FORMATS}")
|
||
print("=" * 60)
|
||
print()
|
||
|
||
# 遍历源文件夹
|
||
for root, _, files in os.walk(SOURCE_FOLDER):
|
||
print(f"▶ 正在处理文件夹: {root}")
|
||
|
||
for filename in files:
|
||
# 检查文件格式
|
||
if not filename.lower().endswith(SUPPORTED_FORMATS):
|
||
continue
|
||
|
||
total_files += 1
|
||
file_path = os.path.join(root, filename)
|
||
|
||
try:
|
||
# 获取图片信息
|
||
img_hash, (width, height) = get_image_info(file_path)
|
||
|
||
if img_hash is None:
|
||
error_files += 1
|
||
continue
|
||
|
||
# 过滤小尺寸图片
|
||
if width < MIN_PIXEL_COUNT or height < MIN_PIXEL_COUNT:
|
||
filtered_files += 1
|
||
print(f"🔍 过滤小尺寸图片: {filename} ({width}x{height})")
|
||
continue
|
||
|
||
# 检查是否重复
|
||
if img_hash in processed_hashes:
|
||
duplicate_files += 1
|
||
print(f"🔶 发现重复图片: {filename}")
|
||
# 可以选择删除重复文件或保留
|
||
# delete_source_file(file_path)
|
||
continue
|
||
|
||
# 处理目标文件命名
|
||
ext = os.path.splitext(filename)[1].lower()
|
||
# 使用哈希值+时间戳命名,避免冲突
|
||
target_filename = f"{img_hash}_{int(time.time())}{ext}"
|
||
target_path = os.path.join(TARGET_FOLDER, target_filename)
|
||
|
||
# 复制文件到目标文件夹
|
||
if safe_copy_file(file_path, target_path):
|
||
# 删除源文件
|
||
delete_source_file(file_path)
|
||
|
||
moved_files += 1
|
||
processed_hashes.add(img_hash)
|
||
print(f"✅ 已处理: {filename} → {target_filename}")
|
||
else:
|
||
error_files += 1
|
||
|
||
except Exception as e:
|
||
error_files += 1
|
||
print(f"❌ 处理失败: {filename} - {e}")
|
||
continue
|
||
|
||
# 生成统计报告
|
||
print()
|
||
print("=" * 60)
|
||
print("📊 去重完成统计报告")
|
||
print("=" * 60)
|
||
print(f"📁 总文件数: {total_files}")
|
||
print(f"✅ 成功去重并移动: {moved_files}")
|
||
print(f"🔶 重复文件数: {duplicate_files}")
|
||
print(f"🔍 过滤小尺寸文件: {filtered_files}")
|
||
print(f"❌ 处理失败文件: {error_files}")
|
||
print(f"💾 已处理哈希值数量: {len(processed_hashes)}")
|
||
print()
|
||
|
||
# 检查是否有删除失败的文件
|
||
delete_failed_path = os.path.join(TARGET_FOLDER, "delete_failed.txt")
|
||
if os.path.exists(delete_failed_path):
|
||
with open(delete_failed_path, "r", encoding="utf-8") as f:
|
||
failed_count = len(f.readlines())
|
||
print(f"⚠️ 有 {failed_count} 个文件删除失败,详见: {delete_failed_path}")
|
||
|
||
print("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |