fix:移除了需要链接数据库的问题

This commit is contained in:
爱喝水的木子
2026-03-28 20:25:20 +08:00
parent ece892e9a9
commit 268bffa7d9
2 changed files with 385 additions and 241 deletions

405
spider.py
View File

@@ -3,32 +3,74 @@ import re
import subprocess
import threading
import time
import shutil
from pathlib import Path
from collections import deque
from bilibili_api import HEADERS, get_client, video # pip install bilibili-api-python
from pymongo import MongoClient, ReturnDocument # pip install pymongo
MONGO_URI = "mongodb://192.168.28.9:27017/"
MONGO_DB_NAME = "bilibiliss"
VIDEO_COLLECTION = "bilibili_video_pool"
BASE_DIR = Path(__file__).resolve().parent
URLS_FILE = BASE_DIR / "urls.txt"
OUTPUT_DIR = BASE_DIR / "downloads"
FFMPEG_PATH = "ffmpeg"
DOWNLOADED_BVID_FILE = BASE_DIR / "downloaded_bvids.txt" # 已下载 bvid 记录文件
FFMPEG_PATH = None # Will be set by find_ffmpeg()
POLL_SECONDS = 2
FETCH_RELATED_LIMIT = 20
STATUS_INIT = "init"
STATUS_FETCHING = "fetching"
STATUS_READY = "ready"
STATUS_DOWNLOADING = "downloading"
STATUS_DONE = "done"
STATUS_ERROR = "error"
client = MongoClient(MONGO_URI)
collection = client[MONGO_DB_NAME][VIDEO_COLLECTION]
def find_ffmpeg():
"""查找 ffmpeg 可执行文件的路径"""
# 首先尝试在系统 PATH 中查找
ffmpeg_path = shutil.which("ffmpeg")
if ffmpeg_path:
return ffmpeg_path
# Windows 常见的 ffmpeg 安装位置
common_paths = [
r"C:\ffmpeg\bin\ffmpeg.exe",
r"C:\Program Files\ffmpeg\bin\ffmpeg.exe",
r"C:\Program Files (x86)\ffmpeg\bin\ffmpeg.exe",
Path.home() / "ffmpeg" / "bin" / "ffmpeg.exe",
Path.home() / "Downloads" / "ffmpeg" / "bin" / "ffmpeg.exe",
BASE_DIR / "ffmpeg" / "bin" / "ffmpeg.exe",
BASE_DIR / "ffmpeg.exe",
]
for path in common_paths:
if Path(path).exists():
return str(path)
return None
# 使用线程安全的队列替代数据库
video_queue = deque() # 待下载的视频队列
downloaded_set = set() # 已下载的视频集合,防止重复
queue_lock = threading.Lock()
def load_downloaded_bvids():
"""从文件加载已下载的 bvid 列表"""
global downloaded_set
if DOWNLOADED_BVID_FILE.exists():
try:
with open(DOWNLOADED_BVID_FILE, "r", encoding="utf-8") as f:
downloaded_set = set(line.strip() for line in f if line.strip())
print(f"loaded {len(downloaded_set)} downloaded bvids from file")
except Exception as e:
print(f"failed to load downloaded bvids: {e}")
downloaded_set = set()
else:
print("no downloaded bvids file found, starting fresh")
downloaded_set = set()
def save_downloaded_bvid(bvid: str):
"""将 bvid 追加保存到文件"""
try:
with open(DOWNLOADED_BVID_FILE, "a", encoding="utf-8") as f:
f.write(bvid + "\n")
except Exception as e:
print(f"failed to save bvid {bvid}: {e}")
def get_bvid_from_url(url: str) -> str:
@@ -56,85 +98,28 @@ def get_output_path(file_name: str) -> Path:
return OUTPUT_DIR / file_name
def ensure_indexes():
collection.create_index("bvid", unique=True)
collection.create_index("status")
def add_to_queue(bvid: str, source_url: str = ""):
"""添加视频到下载队列"""
with queue_lock:
if bvid not in downloaded_set and bvid not in [item["bvid"] for item in video_queue]:
video_queue.append({
"bvid": bvid,
"source_url": source_url or f"https://www.bilibili.com/video/{bvid}",
"added_at": time.time()
})
print(f"added to queue: {bvid}, queue size: {len(video_queue)}")
return True
else:
print(f"skipped duplicate: {bvid}")
return False
def reset_in_progress_docs():
fetch_reset = collection.update_many(
{"status": STATUS_FETCHING},
{"$set": {"status": STATUS_INIT, "updated_at": time.time()}},
)
download_reset = collection.update_many(
{"status": STATUS_DOWNLOADING},
{"$set": {"status": STATUS_READY, "updated_at": time.time()}},
)
print(
f"reset in-progress docs: fetching={fetch_reset.modified_count}, "
f"downloading={download_reset.modified_count}"
)
def seed_from_urls_file():
if not URLS_FILE.exists():
print(f"seed skipped, file not found: {URLS_FILE}")
return
inserted = 0
skipped = 0
with open(URLS_FILE, "r", encoding="utf-8") as file_obj:
for raw_line in file_obj:
url = raw_line.strip()
if not url:
continue
try:
bvid = get_bvid_from_url(url)
except ValueError:
skipped += 1
print(f"seed skipped, invalid url: {url}")
continue
result = collection.update_one(
{"bvid": bvid},
{
"$setOnInsert": {
"bvid": bvid,
"source_url": url,
"video_url": f"https://www.bilibili.com/video/{bvid}",
"title": "",
"download_file_name": f"{bvid}.mp4",
"status": STATUS_INIT,
"related_fetched": False,
"downloaded": False,
"created_at": time.time(),
}
},
upsert=True,
)
if result.upserted_id is not None:
inserted += 1
else:
skipped += 1
print(f"seed duplicate skipped: {bvid}")
print(f"seed complete, inserted={inserted}, skipped={skipped}")
def claim_fetch_doc():
return collection.find_one_and_update(
{"status": STATUS_INIT, "related_fetched": False},
{"$set": {"status": STATUS_FETCHING, "fetch_started_at": time.time()}},
return_document=ReturnDocument.AFTER,
)
def claim_download_doc():
return collection.find_one_and_update(
{"status": STATUS_READY, "downloaded": False},
{"$set": {"status": STATUS_DOWNLOADING, "download_started_at": time.time()}},
return_document=ReturnDocument.AFTER,
)
def get_from_queue():
"""从队列获取一个待下载的视频"""
with queue_lock:
if video_queue:
return video_queue.popleft()
return None
async def fetch_video_info_and_related(bvid: str):
@@ -145,39 +130,19 @@ async def fetch_video_info_and_related(bvid: str):
def save_related_bvids(parent_bvid: str, related_items):
"""将相关视频添加到队列"""
inserted = 0
skipped = 0
for item in related_items[:FETCH_RELATED_LIMIT]:
related_bvid = item.get("bvid")
if not related_bvid:
continue
result = collection.update_one(
{"bvid": related_bvid},
{
"$setOnInsert": {
"bvid": related_bvid,
"source_url": f"https://www.bilibili.com/video/{related_bvid}",
"video_url": f"https://www.bilibili.com/video/{related_bvid}",
"title": item.get("title", ""),
"download_file_name": build_output_file_name(
related_bvid, item.get("title", "")
),
"status": STATUS_INIT,
"related_fetched": False,
"downloaded": False,
"parent_bvid": parent_bvid,
"created_at": time.time(),
}
},
upsert=True,
)
if result.upserted_id is not None:
if add_to_queue(related_bvid, f"https://www.bilibili.com/video/{related_bvid}"):
inserted += 1
print(f"related inserted: {related_bvid}")
else:
skipped += 1
print(f"related duplicate skipped: {related_bvid}")
return inserted, skipped
@@ -196,6 +161,8 @@ async def download_stream(url: str, output_path: Path, intro: str):
def merge_media(video_path: Path, audio_path: Path, output_path: Path):
if not FFMPEG_PATH:
raise RuntimeError("ffmpeg not found. Please install ffmpeg and add it to PATH.")
subprocess.run(
[
FFMPEG_PATH,
@@ -215,6 +182,8 @@ def merge_media(video_path: Path, audio_path: Path, output_path: Path):
def convert_flv_to_mp4(source_path: Path, output_path: Path):
if not FFMPEG_PATH:
raise RuntimeError("ffmpeg not found. Please install ffmpeg and add it to PATH.")
subprocess.run(
[FFMPEG_PATH, "-y", "-i", str(source_path), str(output_path)],
check=True,
@@ -248,129 +217,121 @@ async def download_video_file(bvid: str, output_file_name: str):
return final_path
def mark_doc_downloaded(doc, final_path: Path):
collection.update_one(
{"_id": doc["_id"]},
{
"$set": {
"status": STATUS_DONE,
"downloaded": True,
"file_path": str(final_path),
"updated_at": time.time(),
}
},
)
def process_fetch_doc(doc):
bvid = doc["bvid"]
def process_download(bvid: str):
"""处理单个视频的下载"""
try:
# 获取视频信息
info, related_items = asyncio.run(fetch_video_info_and_related(bvid))
title = info.get("title", "")
title = info.get("title", bvid)
file_name = build_output_file_name(bvid, title)
inserted, skipped = save_related_bvids(bvid, related_items)
collection.update_one(
{"_id": doc["_id"]},
{
"$set": {
"title": title,
"aid": info.get("aid"),
"cid": info.get("cid"),
"owner_name": (info.get("owner") or {}).get("name", ""),
"video_url": f"https://www.bilibili.com/video/{bvid}",
"download_file_name": file_name,
"related_fetched": True,
"status": STATUS_READY,
"related_inserted_count": inserted,
"related_skipped_count": skipped,
"updated_at": time.time(),
}
},
)
print(f"fetch done: {bvid}, related_inserted={inserted}, related_skipped={skipped}")
except Exception as exc:
collection.update_one(
{"_id": doc["_id"]},
{
"$set": {
"status": STATUS_ERROR,
"fetch_error": str(exc),
"updated_at": time.time(),
}
},
)
print(f"fetch failed: {bvid}, error={exc}")
def process_download_doc(doc):
bvid = doc["bvid"]
file_name = doc.get("download_file_name") or f"{bvid}.mp4"
final_path = get_output_path(file_name)
if doc.get("downloaded") and final_path.exists():
print(f"download already marked and file exists, skipped: {bvid}")
return
if final_path.exists():
mark_doc_downloaded(doc, final_path)
print(f"download file already exists, skipped: {bvid} -> {final_path}")
return
try:
print(f"fetching info done: {bvid}, title: {title}")
# 检查文件是否已存在
final_path = get_output_path(file_name)
if final_path.exists():
print(f"file already exists, skipped: {bvid} -> {final_path}")
with queue_lock:
downloaded_set.add(bvid)
return
# 下载视频
final_path = asyncio.run(download_video_file(bvid, file_name))
mark_doc_downloaded(doc, final_path)
print(f"download done: {bvid} -> {final_path}")
# 标记为已下载并保存到文件
with queue_lock:
downloaded_set.add(bvid)
save_downloaded_bvid(bvid)
# 添加相关视频到队列
inserted, skipped = save_related_bvids(bvid, related_items)
print(f"related videos: inserted={inserted}, skipped={skipped}")
except Exception as exc:
collection.update_one(
{"_id": doc["_id"]},
{
"$set": {
"status": STATUS_READY,
"download_error": str(exc),
"updated_at": time.time(),
}
},
)
print(f"download failed: {bvid}, error={exc}")
def fetch_worker():
while True:
doc = claim_fetch_doc()
if not doc:
time.sleep(POLL_SECONDS)
continue
process_fetch_doc(doc)
def download_worker():
"""下载工作线程:从队列中取出视频并下载"""
print("download worker started")
while True:
doc = claim_download_doc()
if not doc:
item = get_from_queue()
if not item:
time.sleep(POLL_SECONDS)
continue
process_download_doc(doc)
bvid = item["bvid"]
print(f"processing: {bvid}")
process_download(bvid)
def input_worker():
"""输入工作线程:接收用户输入的链接"""
print("input worker started, waiting for URLs...")
print("Enter a bilibili URL (or 'quit' to exit):")
while True:
try:
url = input("> ").strip()
if url.lower() == 'quit':
print("stopping...")
break
if not url:
continue
try:
bvid = get_bvid_from_url(url)
add_to_queue(bvid, url)
except ValueError as e:
print(f"invalid url: {e}")
print("please enter a valid bilibili video URL")
except EOFError:
break
except KeyboardInterrupt:
break
def main():
ensure_indexes()
reset_in_progress_docs()
seed_from_urls_file()
fetch_thread = threading.Thread(target=fetch_worker, daemon=True, name="fetch-worker")
download_thread = threading.Thread(
target=download_worker, daemon=True, name="download-worker"
)
fetch_thread.start()
global FFMPEG_PATH
# 查找 ffmpeg
FFMPEG_PATH = find_ffmpeg()
if not FFMPEG_PATH:
print("=" * 50)
print("ERROR: ffmpeg not found!")
print("=" * 50)
print("Please install ffmpeg and add it to your system PATH.")
print("Download from: https://ffmpeg.org/download.html")
print("Or place ffmpeg.exe in one of these locations:")
print(" - C:\\ffmpeg\\bin\\")
print(" - Current directory")
print("=" * 50)
return
print(f"ffmpeg found at: {FFMPEG_PATH}")
# 加载已下载的 bvid 列表
load_downloaded_bvids()
# 启动下载工作线程
download_thread = threading.Thread(target=download_worker, daemon=True, name="download-worker")
download_thread.start()
print("workers started: fetch-worker, download-worker")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("stopping...")
print("=" * 50)
print("Bilibili Video Downloader")
print("=" * 50)
print("Download worker started in background")
print("Enter bilibili video URLs to download")
print("Related videos will be automatically discovered and queued")
print("Type 'quit' to exit")
print("=" * 50)
# 主线程处理用户输入
input_worker()
if __name__ == "__main__":
main()
main()