Files
spider/spider.py

391 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import re
import subprocess
import threading
import time
import shutil
from pathlib import Path
from collections import deque
from bilibili_api import HEADERS, get_client, video # pip install bilibili-api-python
BASE_DIR = Path(__file__).resolve().parent
OUTPUT_DIR = BASE_DIR / "downloads"
DOWNLOADED_BVID_FILE = BASE_DIR / "downloaded_bvids.txt" # 已下载 bvid 记录文件
SKIPPED_BVID_FILE = BASE_DIR / "skipped_bvids.txt" # 跳过的 bvid 记录文件
FFMPEG_PATH = None # Will be set by find_ffmpeg()
POLL_SECONDS = 2
FETCH_RELATED_LIMIT = 20
def find_ffmpeg():
"""查找 ffmpeg 可执行文件的路径"""
# 首先尝试在系统 PATH 中查找
ffmpeg_path = shutil.which("ffmpeg")
if ffmpeg_path:
return ffmpeg_path
# Windows 常见的 ffmpeg 安装位置
common_paths = [
r"C:\ffmpeg\bin\ffmpeg.exe",
r"C:\Program Files\ffmpeg\bin\ffmpeg.exe",
r"C:\Program Files (x86)\ffmpeg\bin\ffmpeg.exe",
Path.home() / "ffmpeg" / "bin" / "ffmpeg.exe",
Path.home() / "Downloads" / "ffmpeg" / "bin" / "ffmpeg.exe",
BASE_DIR / "ffmpeg" / "bin" / "ffmpeg.exe",
BASE_DIR / "ffmpeg.exe",
]
for path in common_paths:
if Path(path).exists():
return str(path)
return None
# 使用线程安全的队列替代数据库
video_queue = deque() # 待下载的视频队列
downloaded_set = set() # 已下载的视频集合,防止重复
queue_lock = threading.Lock()
def load_downloaded_bvids():
"""从文件加载已下载的 bvid 列表"""
global downloaded_set
if DOWNLOADED_BVID_FILE.exists():
try:
with open(DOWNLOADED_BVID_FILE, "r", encoding="utf-8") as f:
downloaded_set = set(line.strip() for line in f if line.strip())
print(f"loaded {len(downloaded_set)} downloaded bvids from file")
except Exception as e:
print(f"failed to load downloaded bvids: {e}")
downloaded_set = set()
else:
print("no downloaded bvids file found, starting fresh")
downloaded_set = set()
def save_downloaded_bvid(bvid: str):
"""将 bvid 追加保存到文件"""
try:
with open(DOWNLOADED_BVID_FILE, "a", encoding="utf-8") as f:
f.write(bvid + "\n")
except Exception as e:
print(f"failed to save bvid {bvid}: {e}")
def save_skipped_bvid(bvid: str, view_count: int, title: str):
"""将跳过的 bvid 及其信息追加保存到文件"""
try:
with open(SKIPPED_BVID_FILE, "a", encoding="utf-8") as f:
f.write(f"{bvid}\t{view_count}\t{title}\n")
except Exception as e:
print(f"failed to save skipped bvid {bvid}: {e}")
def get_bvid_from_url(url: str) -> str:
match = re.search(r"BV[0-9A-Za-z]{10}", url)
if not match:
raise ValueError(f"link does not contain bvid: {url}")
return match.group(0)
def sanitize_title(title: str, max_length: int = 80) -> str:
# 只保留字母、数字和中文,其他全部移除
cleaned = re.sub(r'[^\w\u4e00-\u9fff]', '', title)
# 移除连续下划线
cleaned = re.sub(r'_+', '_', cleaned)
cleaned = cleaned.strip('_')
if not cleaned:
cleaned = "video"
return cleaned[:max_length].rstrip('_')
def build_output_file_name(bvid: str, title: str) -> str:
return f"{bvid}_{sanitize_title(title)}.mp4"
def get_output_path(file_name: str) -> Path:
return OUTPUT_DIR / file_name
def add_to_queue(bvid: str, source_url: str = ""):
"""添加视频到下载队列"""
with queue_lock:
if bvid not in downloaded_set and bvid not in [item["bvid"] for item in video_queue]:
video_queue.append({
"bvid": bvid,
"source_url": source_url or f"https://www.bilibili.com/video/{bvid}",
"added_at": time.time()
})
print(f"added to queue: {bvid}, queue size: {len(video_queue)}")
return True
else:
print(f"skipped duplicate: {bvid}")
return False
def get_from_queue():
"""从队列获取一个待下载的视频"""
with queue_lock:
if video_queue:
return video_queue.popleft()
return None
async def fetch_video_info_and_related(bvid: str):
v = video.Video(bvid=bvid)
info = await v.get_info()
related_items = await v.get_related()
return info, related_items
def save_related_bvids(parent_bvid: str, related_items):
"""将相关视频添加到队列"""
inserted = 0
skipped = 0
for item in related_items[:FETCH_RELATED_LIMIT]:
related_bvid = item.get("bvid")
if not related_bvid:
continue
if add_to_queue(related_bvid, f"https://www.bilibili.com/video/{related_bvid}"):
inserted += 1
else:
skipped += 1
return inserted, skipped
async def download_stream(url: str, output_path: Path, intro: str):
download_id = await get_client().download_create(url, HEADERS)
written = 0
total = get_client().download_content_length(download_id)
with open(output_path, "wb") as file_obj:
while True:
chunk = await get_client().download_chunk(download_id)
written += file_obj.write(chunk)
print(f"{intro} - {output_path.name} [{written} / {total}]", end="\r")
if written >= total:
break
print()
def merge_media(video_path: Path, audio_path: Path, output_path: Path):
if not FFMPEG_PATH:
raise RuntimeError("ffmpeg not found. Please install ffmpeg and add it to PATH.")
subprocess.run(
[
FFMPEG_PATH,
"-y",
"-i",
str(video_path),
"-i",
str(audio_path),
"-vcodec",
"copy",
"-acodec",
"copy",
str(output_path),
],
check=True,
)
def convert_flv_to_mp4(source_path: Path, output_path: Path):
if not FFMPEG_PATH:
raise RuntimeError("ffmpeg not found. Please install ffmpeg and add it to PATH.")
subprocess.run(
[FFMPEG_PATH, "-y", "-i", str(source_path), str(output_path)],
check=True,
)
async def download_video_file(bvid: str, output_file_name: str):
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
final_path = get_output_path(output_file_name)
temp_video_path = OUTPUT_DIR / f"{final_path.stem}_video_temp.m4s"
temp_audio_path = OUTPUT_DIR / f"{final_path.stem}_audio_temp.m4s"
temp_flv_path = OUTPUT_DIR / f"{final_path.stem}_temp.flv"
v = video.Video(bvid=bvid)
download_url_data = await v.get_download_url(0)
detector = video.VideoDownloadURLDataDetecter(data=download_url_data)
streams = detector.detect_best_streams()
if detector.check_flv_mp4_stream():
await download_stream(streams[0].url, temp_flv_path, "download flv")
convert_flv_to_mp4(temp_flv_path, final_path)
temp_flv_path.unlink(missing_ok=True)
else:
await download_stream(streams[0].url, temp_video_path, "download video")
await download_stream(streams[1].url, temp_audio_path, "download audio")
merge_media(temp_video_path, temp_audio_path, final_path)
temp_video_path.unlink(missing_ok=True)
temp_audio_path.unlink(missing_ok=True)
return final_path
def download_with_youget(bvid: str, title: str, output_dir: Path) -> bool:
"""使用 you-get 下载视频"""
url = f"https://www.bilibili.com/video/{bvid}"
# 生成自定义文件名bvid_清理后的标题
custom_filename = f"{bvid}_{sanitize_title(title)}"
try:
# 使用 you-get 下载视频,指定文件名,禁用字幕
result = subprocess.run(
["you-get", "-o", str(output_dir), "-O", custom_filename, "--no-caption", url],
capture_output=True,
text=True,
check=True
)
print(f"you-get output: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
print(f"you-get download failed: {e.stderr}")
return False
except FileNotFoundError:
print("you-get not found, please install it: pip install you-get")
return False
def process_download(bvid: str):
"""处理单个视频的下载"""
try:
# 获取视频信息
info, related_items = asyncio.run(fetch_video_info_and_related(bvid))
title = info.get("title", bvid)
# 获取观看量
view_count = info.get("stat", {}).get("view", 0)
print(f"fetching info done: {bvid}, title: {title}, views: {view_count}")
# 判断观看量是否超过 50w (500000)
if view_count < 500000:
print(f"skipped: {bvid}, view count {view_count} < 500000")
# 记录跳过的视频信息
save_skipped_bvid(bvid, view_count, title)
# 仍然添加相关视频到队列
inserted, skipped = save_related_bvids(bvid, related_items)
print(f"related videos: inserted={inserted}, skipped={skipped}")
return
print(f"view count {view_count} >= 500000, downloading...")
file_name = build_output_file_name(bvid, title)
# 检查文件是否已存在
final_path = get_output_path(file_name)
if final_path.exists():
print(f"file already exists, skipped: {bvid} -> {final_path}")
with queue_lock:
downloaded_set.add(bvid)
return
# 使用 you-get 下载视频
success = download_with_youget(bvid, title, OUTPUT_DIR)
if success:
print(f"download done: {bvid}")
# 标记为已下载并保存到文件
with queue_lock:
downloaded_set.add(bvid)
save_downloaded_bvid(bvid)
else:
print(f"download failed: {bvid}")
# 添加相关视频到队列
inserted, skipped = save_related_bvids(bvid, related_items)
print(f"related videos: inserted={inserted}, skipped={skipped}")
except Exception as exc:
print(f"download failed: {bvid}, error={exc}")
def download_worker():
"""下载工作线程:从队列中取出视频并下载"""
print("download worker started")
while True:
item = get_from_queue()
if not item:
time.sleep(POLL_SECONDS)
continue
bvid = item["bvid"]
print(f"processing: {bvid}")
process_download(bvid)
def input_worker():
"""输入工作线程:接收用户输入的链接"""
print("input worker started, waiting for URLs...")
print("Enter a bilibili URL (or 'quit' to exit):")
while True:
try:
url = input("> ").strip()
if url.lower() == 'quit':
print("stopping...")
break
if not url:
continue
try:
bvid = get_bvid_from_url(url)
add_to_queue(bvid, url)
except ValueError as e:
print(f"invalid url: {e}")
print("please enter a valid bilibili video URL")
except EOFError:
break
except KeyboardInterrupt:
break
def main():
global FFMPEG_PATH
# 查找 ffmpeg
FFMPEG_PATH = find_ffmpeg()
if not FFMPEG_PATH:
print("=" * 50)
print("ERROR: ffmpeg not found!")
print("=" * 50)
print("Please install ffmpeg and add it to your system PATH.")
print("Download from: https://ffmpeg.org/download.html")
print("Or place ffmpeg.exe in one of these locations:")
print(" - C:\\ffmpeg\\bin\\")
print(" - Current directory")
print("=" * 50)
return
print(f"ffmpeg found at: {FFMPEG_PATH}")
# 加载已下载的 bvid 列表
load_downloaded_bvids()
# 启动下载工作线程
download_thread = threading.Thread(target=download_worker, daemon=True, name="download-worker")
download_thread.start()
print("=" * 50)
print("Bilibili Video Downloader")
print("=" * 50)
print("Download worker started in background")
print("Enter bilibili video URLs to download")
print("Related videos will be automatically discovered and queued")
print("Type 'quit' to exit")
print("=" * 50)
# 主线程处理用户输入
input_worker()
if __name__ == "__main__":
main()