Add files via upload
upload:人脸检测代码
This commit is contained in:
217
drop_duplicates.py
Normal file
217
drop_duplicates.py
Normal file
@@ -0,0 +1,217 @@
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
import ctypes
|
||||
import imagehash
|
||||
from PIL import Image, UnidentifiedImageError
|
||||
|
||||
# -------------------------- 配置参数 (按需修改) --------------------------
|
||||
SOURCE_FOLDER = r"D:\img" # 原始图片库目录
|
||||
TARGET_FOLDER = r"D:\img_todo\all_img_result" # 去重后图片输出目录
|
||||
SUPPORTED_FORMATS = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.gif') # 支持的图片格式
|
||||
MIN_PIXEL_COUNT = 1000 # 最小像素数阈值(宽或高小于此值的图片将被过滤)
|
||||
RETRY_TIMES = 5 # 文件操作失败重试次数
|
||||
RETRY_DELAY = 2 # 重试间隔(秒)
|
||||
HASH_SIZE = 8 # 哈希计算尺寸(8-16之间选择,影响去重精度)
|
||||
|
||||
|
||||
# -------------------------- 系统级辅助函数 --------------------------
|
||||
def release_file_cache():
|
||||
"""Windows系统-释放文件缓存/句柄,非Windows系统无影响"""
|
||||
try:
|
||||
if os.name == "nt":
|
||||
ctypes.windll.kernel32.SetErrorMode(0x0001)
|
||||
ctypes.windll.psapi.EmptyWorkingSet(ctypes.windll.kernel32.GetCurrentProcess())
|
||||
except Exception as e:
|
||||
print(f"⚠️ 释放文件缓存失败: {e}")
|
||||
|
||||
|
||||
def remove_readonly_attr(file_path):
|
||||
"""跨平台移除文件只读属性"""
|
||||
if os.path.exists(file_path):
|
||||
try:
|
||||
if os.name == "nt":
|
||||
ctypes.windll.kernel32.SetFileAttributesW(file_path, 128) # Windows移除只读
|
||||
else:
|
||||
os.chmod(file_path, 0o777) # 类Unix系统
|
||||
except Exception as e:
|
||||
print(f"⚠️ 移除只读属性失败 {file_path}: {e}")
|
||||
|
||||
|
||||
# -------------------------- 核心图片处理函数 --------------------------
|
||||
def get_image_info(img_path):
|
||||
"""获取图片的哈希值和尺寸信息"""
|
||||
img = None
|
||||
try:
|
||||
img = Image.open(img_path)
|
||||
# 计算感知哈希值
|
||||
phash = str(imagehash.phash(img.convert('L'), hash_size=HASH_SIZE))
|
||||
# 获取图片尺寸
|
||||
width, height = img.size
|
||||
return phash, (width, height)
|
||||
except UnidentifiedImageError:
|
||||
print(f"❌ 无法识别图片: {os.path.basename(img_path)}")
|
||||
return None, None
|
||||
except Exception as e:
|
||||
print(f"❌ 处理图片失败 {os.path.basename(img_path)}: {e}")
|
||||
return None, None
|
||||
finally:
|
||||
if img:
|
||||
img.close()
|
||||
del img
|
||||
release_file_cache()
|
||||
|
||||
|
||||
def safe_copy_file(src_path, dst_path):
|
||||
"""安全复制文件,处理权限和重试逻辑"""
|
||||
remove_readonly_attr(src_path)
|
||||
|
||||
# 复制文件(带重试)
|
||||
for retry in range(RETRY_TIMES):
|
||||
try:
|
||||
shutil.copy2(src_path, dst_path)
|
||||
return True
|
||||
except Exception as e:
|
||||
if retry < RETRY_TIMES - 1:
|
||||
print(f"⚠️ 复制失败 {os.path.basename(src_path)},重试中... ({retry+1}/{RETRY_TIMES})")
|
||||
time.sleep(RETRY_DELAY)
|
||||
else:
|
||||
print(f"❌ 复制失败 {os.path.basename(src_path)}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def delete_source_file(src_path):
|
||||
"""安全删除源文件,处理权限和重试逻辑"""
|
||||
remove_readonly_attr(src_path)
|
||||
|
||||
# 删除文件(带重试)
|
||||
for retry in range(RETRY_TIMES):
|
||||
try:
|
||||
release_file_cache()
|
||||
os.remove(src_path)
|
||||
return True
|
||||
except PermissionError:
|
||||
if retry < RETRY_TIMES - 1:
|
||||
print(f"⚠️ 删除失败 {os.path.basename(src_path)},文件被占用,重试中... ({retry+1}/{RETRY_TIMES})")
|
||||
time.sleep(RETRY_DELAY)
|
||||
else:
|
||||
print(f"❌ 删除失败 {os.path.basename(src_path)}: 文件被占用")
|
||||
with open(os.path.join(TARGET_FOLDER, "delete_failed.txt"), "a", encoding="utf-8") as f:
|
||||
f.write(f"{src_path}\n")
|
||||
return False
|
||||
except Exception as e:
|
||||
if retry < RETRY_TIMES - 1:
|
||||
print(f"⚠️ 删除失败 {os.path.basename(src_path)},重试中... ({retry+1}/{RETRY_TIMES})")
|
||||
time.sleep(RETRY_DELAY)
|
||||
else:
|
||||
print(f"❌ 删除失败 {os.path.basename(src_path)}: {e}")
|
||||
with open(os.path.join(TARGET_FOLDER, "delete_failed.txt"), "a", encoding="utf-8") as f:
|
||||
f.write(f"{src_path}\n")
|
||||
return False
|
||||
|
||||
|
||||
# -------------------------- 主函数 --------------------------
|
||||
def main():
|
||||
# 创建目标文件夹
|
||||
os.makedirs(TARGET_FOLDER, exist_ok=True)
|
||||
|
||||
# 初始化统计变量
|
||||
total_files = 0
|
||||
processed_files = 0
|
||||
duplicate_files = 0
|
||||
filtered_files = 0
|
||||
moved_files = 0
|
||||
error_files = 0
|
||||
|
||||
# 存储已处理的图片哈希值
|
||||
processed_hashes = set()
|
||||
|
||||
print("=" * 60)
|
||||
print("📁 图片去重工具")
|
||||
print(f"🔍 源文件夹: {SOURCE_FOLDER}")
|
||||
print(f"📂 目标文件夹: {TARGET_FOLDER}")
|
||||
print(f"🎯 支持格式: {SUPPORTED_FORMATS}")
|
||||
print("=" * 60)
|
||||
print()
|
||||
|
||||
# 遍历源文件夹
|
||||
for root, _, files in os.walk(SOURCE_FOLDER):
|
||||
print(f"▶ 正在处理文件夹: {root}")
|
||||
|
||||
for filename in files:
|
||||
# 检查文件格式
|
||||
if not filename.lower().endswith(SUPPORTED_FORMATS):
|
||||
continue
|
||||
|
||||
total_files += 1
|
||||
file_path = os.path.join(root, filename)
|
||||
|
||||
try:
|
||||
# 获取图片信息
|
||||
img_hash, (width, height) = get_image_info(file_path)
|
||||
|
||||
if img_hash is None:
|
||||
error_files += 1
|
||||
continue
|
||||
|
||||
# 过滤小尺寸图片
|
||||
if width < MIN_PIXEL_COUNT or height < MIN_PIXEL_COUNT:
|
||||
filtered_files += 1
|
||||
print(f"🔍 过滤小尺寸图片: {filename} ({width}x{height})")
|
||||
continue
|
||||
|
||||
# 检查是否重复
|
||||
if img_hash in processed_hashes:
|
||||
duplicate_files += 1
|
||||
print(f"🔶 发现重复图片: {filename}")
|
||||
# 可以选择删除重复文件或保留
|
||||
# delete_source_file(file_path)
|
||||
continue
|
||||
|
||||
# 处理目标文件命名
|
||||
ext = os.path.splitext(filename)[1].lower()
|
||||
# 使用哈希值+时间戳命名,避免冲突
|
||||
target_filename = f"{img_hash}_{int(time.time())}{ext}"
|
||||
target_path = os.path.join(TARGET_FOLDER, target_filename)
|
||||
|
||||
# 复制文件到目标文件夹
|
||||
if safe_copy_file(file_path, target_path):
|
||||
# 删除源文件
|
||||
delete_source_file(file_path)
|
||||
|
||||
moved_files += 1
|
||||
processed_hashes.add(img_hash)
|
||||
print(f"✅ 已处理: {filename} → {target_filename}")
|
||||
else:
|
||||
error_files += 1
|
||||
|
||||
except Exception as e:
|
||||
error_files += 1
|
||||
print(f"❌ 处理失败: {filename} - {e}")
|
||||
continue
|
||||
|
||||
# 生成统计报告
|
||||
print()
|
||||
print("=" * 60)
|
||||
print("📊 去重完成统计报告")
|
||||
print("=" * 60)
|
||||
print(f"📁 总文件数: {total_files}")
|
||||
print(f"✅ 成功去重并移动: {moved_files}")
|
||||
print(f"🔶 重复文件数: {duplicate_files}")
|
||||
print(f"🔍 过滤小尺寸文件: {filtered_files}")
|
||||
print(f"❌ 处理失败文件: {error_files}")
|
||||
print(f"💾 已处理哈希值数量: {len(processed_hashes)}")
|
||||
print()
|
||||
|
||||
# 检查是否有删除失败的文件
|
||||
delete_failed_path = os.path.join(TARGET_FOLDER, "delete_failed.txt")
|
||||
if os.path.exists(delete_failed_path):
|
||||
with open(delete_failed_path, "r", encoding="utf-8") as f:
|
||||
failed_count = len(f.readlines())
|
||||
print(f"⚠️ 有 {failed_count} 个文件删除失败,详见: {delete_failed_path}")
|
||||
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
151
face_detect.py
Normal file
151
face_detect.py
Normal file
@@ -0,0 +1,151 @@
|
||||
import os
|
||||
import shutil
|
||||
import platform
|
||||
import torch
|
||||
from ultralytics import YOLO
|
||||
|
||||
# ======================== 核心参数配置(请根据自己的路径修改!!!) ========================
|
||||
SOURCE_IMG_DIR = r"D:\img_todo\all_img_result" # 去重后的图片源文件夹
|
||||
HAVE_FACE_DIR = r"D:\img_todo\good" # 检测出有人脸的图片输出目录
|
||||
NO_FACE_DIR = r"D:\img_todo\bad" # 无有效人脸的图片输出目录
|
||||
CONF_THRESHOLD = 0.9 # 置信度阈值,大于等于90%才判定为人脸
|
||||
MODEL_PATH = 'face.pt' # YOLOv8人脸检测模型路径
|
||||
# ========================================================================================
|
||||
|
||||
|
||||
def detect_device():
|
||||
"""
|
||||
自动检测并选择最佳运行设备
|
||||
返回:设备字符串 (cpu/mps/0)
|
||||
"""
|
||||
print("🔍 正在检测可用计算设备...")
|
||||
|
||||
# 检查操作系统类型
|
||||
system = platform.system()
|
||||
print(f"💻 操作系统: {system}")
|
||||
|
||||
# 优先检查Mac设备(MPS)
|
||||
if system == "Darwin": # macOS
|
||||
if torch.backends.mps.is_available():
|
||||
print("✅ 检测到Mac MPS GPU,将使用MPS加速")
|
||||
return "mps"
|
||||
else:
|
||||
print("⚠️ Mac MPS不可用,将使用CPU")
|
||||
return "cpu"
|
||||
|
||||
# 检查CUDA GPU(Windows/Linux)
|
||||
if torch.cuda.is_available():
|
||||
gpu_count = torch.cuda.device_count()
|
||||
gpu_name = torch.cuda.get_device_name(0)
|
||||
print(f"✅ 检测到CUDA GPU ({gpu_count}个): {gpu_name}")
|
||||
print(f" CUDA版本: {torch.version.cuda}")
|
||||
return 0 # 使用第一个GPU
|
||||
|
||||
# 所有GPU都不可用时,使用CPU
|
||||
print("⚠️ 未检测到可用GPU,将使用CPU")
|
||||
return "cpu"
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
人脸检测主函数:对去重后的图片进行批量人脸检测
|
||||
输出:包含人脸的图片集合(保存在HAVE_FACE_DIR目录)
|
||||
"""
|
||||
# 创建目标文件夹(如果不存在则自动创建)
|
||||
os.makedirs(HAVE_FACE_DIR, exist_ok=True)
|
||||
os.makedirs(NO_FACE_DIR, exist_ok=True)
|
||||
|
||||
# 自动检测最佳运行设备
|
||||
device = detect_device()
|
||||
|
||||
# 加载YOLOv8人脸检测专用预训练模型
|
||||
print(f"\n🚀 正在加载人脸检测模型: {MODEL_PATH}")
|
||||
try:
|
||||
model = YOLO(MODEL_PATH)
|
||||
print(f"✅ 模型加载成功: {model.model.__class__.__name__}")
|
||||
print(f"🔧 当前使用设备: {device}")
|
||||
except Exception as e:
|
||||
print(f"❌ 模型加载失败: {e}")
|
||||
return
|
||||
|
||||
# 定义需要处理的图片后缀(常用格式全覆盖)
|
||||
SUPPORT_IMG_FORMATS = ['.jpg', '.jpeg', '.png', '.bmp', '.webp', '.tif', '.tiff']
|
||||
|
||||
# 遍历源文件夹下的所有文件
|
||||
img_count = 0
|
||||
have_face_count = 0
|
||||
|
||||
print(f"\n📁 开始处理图片文件夹: {SOURCE_IMG_DIR}")
|
||||
print(f"🎯 人脸检测置信度阈值: {CONF_THRESHOLD * 100:.0f}%")
|
||||
print(f"📂 有人脸图片输出目录: {HAVE_FACE_DIR}")
|
||||
print(f"📂 无有效人脸图片输出目录: {NO_FACE_DIR}")
|
||||
print("=" * 60)
|
||||
|
||||
try:
|
||||
for file_name in os.listdir(SOURCE_IMG_DIR):
|
||||
# 获取文件完整路径和后缀
|
||||
file_path = os.path.join(SOURCE_IMG_DIR, file_name)
|
||||
file_suffix = os.path.splitext(file_name)[1].lower()
|
||||
|
||||
# 只处理图片文件
|
||||
if file_suffix not in SUPPORT_IMG_FORMATS:
|
||||
continue
|
||||
|
||||
img_count += 1
|
||||
print(f"\n📷 正在检测第{img_count}张图片: {file_name}")
|
||||
|
||||
try:
|
||||
# 执行人脸检测:核心推理,只返回置信度≥CONF_THRESHOLD的结果
|
||||
results = model(file_path, conf=CONF_THRESHOLD, device=device, verbose=False)
|
||||
|
||||
# 获取当前图片的检测结果:人脸标签+置信度
|
||||
det_boxes = results[0].boxes # 检测到的目标框集合
|
||||
detect_info = []
|
||||
for box in det_boxes:
|
||||
cls_name = model.names[int(box.cls)] # 检测的标签名称(人脸模型只有一个标签:face)
|
||||
conf_score = round(float(box.conf), 4) # 置信度,保留4位小数
|
||||
detect_info.append(f"{cls_name} - {conf_score * 100:.2f}%")
|
||||
|
||||
# 打印每张图片的所有检测标签及置信度
|
||||
if detect_info:
|
||||
print(f"🔍 检测到的标签及置信度: {detect_info}")
|
||||
else:
|
||||
print(f"🔍 检测到的标签及置信度: 无符合条件的检测结果")
|
||||
|
||||
# 核心判断逻辑:置信度≥90% 才判定有人脸
|
||||
if len(det_boxes) > 0:
|
||||
# 有人脸:移动到有人脸文件夹
|
||||
dest_path = os.path.join(HAVE_FACE_DIR, file_name)
|
||||
shutil.move(file_path, dest_path)
|
||||
print(f"✅ 判定结果:置信度≥90%,检测到人脸 → 已移动至 {os.path.basename(HAVE_FACE_DIR)}")
|
||||
have_face_count += 1
|
||||
else:
|
||||
# 无人脸/置信度不足90%:移动到无人脸文件夹
|
||||
dest_path = os.path.join(NO_FACE_DIR, file_name)
|
||||
shutil.move(file_path, dest_path)
|
||||
print(f"🔶 判定结果:无有效人脸(置信度<90%)→ 已移动至 {os.path.basename(NO_FACE_DIR)}")
|
||||
|
||||
except Exception as e:
|
||||
# 异常处理:单张图片出错不影响整体批量处理
|
||||
print(f"⚠️ 图片 {file_name} 处理失败: {str(e)} → 跳过该图片")
|
||||
continue
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n⚠️ 用户中断了程序执行")
|
||||
except Exception as e:
|
||||
print(f"\n\n❌ 程序执行出错: {e}")
|
||||
finally:
|
||||
# 批量处理完成,打印统计信息
|
||||
print("\n" + "=" * 60)
|
||||
print("📊 人脸检测完成统计报告")
|
||||
print("=" * 60)
|
||||
print(f"📁 总共检测图片数量: {img_count} 张")
|
||||
print(f"✅ 检测出有人脸(置信度≥90%)的图片数量: {have_face_count} 张")
|
||||
print(f"🔶 无有效人脸的图片数量: {img_count - have_face_count} 张")
|
||||
print(f"📂 有人脸图片保存路径: {HAVE_FACE_DIR}")
|
||||
print(f"📂 无有效人脸图片保存路径: {NO_FACE_DIR}")
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user