This commit is contained in:
爱喝水的木子
2026-03-24 15:45:57 +08:00
commit f7e23f2eb1
3 changed files with 401 additions and 0 deletions

376
spider.py Normal file
View File

@@ -0,0 +1,376 @@
import asyncio
import re
import subprocess
import threading
import time
from pathlib import Path
from bilibili_api import HEADERS, get_client, video # pip install bilibili-api-python
from pymongo import MongoClient, ReturnDocument # pip install pymongo
MONGO_URI = "mongodb://192.168.28.9:27017/"
MONGO_DB_NAME = "bilibiliss"
VIDEO_COLLECTION = "bilibili_video_pool"
BASE_DIR = Path(__file__).resolve().parent
URLS_FILE = BASE_DIR / "urls.txt"
OUTPUT_DIR = BASE_DIR / "downloads"
FFMPEG_PATH = "ffmpeg"
POLL_SECONDS = 2
FETCH_RELATED_LIMIT = 20
STATUS_INIT = "init"
STATUS_FETCHING = "fetching"
STATUS_READY = "ready"
STATUS_DOWNLOADING = "downloading"
STATUS_DONE = "done"
STATUS_ERROR = "error"
client = MongoClient(MONGO_URI)
collection = client[MONGO_DB_NAME][VIDEO_COLLECTION]
def get_bvid_from_url(url: str) -> str:
match = re.search(r"BV[0-9A-Za-z]{10}", url)
if not match:
raise ValueError(f"link does not contain bvid: {url}")
return match.group(0)
def sanitize_title(title: str, max_length: int = 80) -> str:
cleaned = re.sub(r'[\\/:*?"<>|]+', "_", title).strip()
cleaned = re.sub(r"\s+", "_", cleaned)
cleaned = re.sub(r"_+", "_", cleaned)
cleaned = cleaned.strip("._")
if not cleaned:
cleaned = "video"
return cleaned[:max_length].rstrip("._")
def build_output_file_name(bvid: str, title: str) -> str:
return f"{bvid}_{sanitize_title(title)}.mp4"
def get_output_path(file_name: str) -> Path:
return OUTPUT_DIR / file_name
def ensure_indexes():
collection.create_index("bvid", unique=True)
collection.create_index("status")
def reset_in_progress_docs():
fetch_reset = collection.update_many(
{"status": STATUS_FETCHING},
{"$set": {"status": STATUS_INIT, "updated_at": time.time()}},
)
download_reset = collection.update_many(
{"status": STATUS_DOWNLOADING},
{"$set": {"status": STATUS_READY, "updated_at": time.time()}},
)
print(
f"reset in-progress docs: fetching={fetch_reset.modified_count}, "
f"downloading={download_reset.modified_count}"
)
def seed_from_urls_file():
if not URLS_FILE.exists():
print(f"seed skipped, file not found: {URLS_FILE}")
return
inserted = 0
skipped = 0
with open(URLS_FILE, "r", encoding="utf-8") as file_obj:
for raw_line in file_obj:
url = raw_line.strip()
if not url:
continue
try:
bvid = get_bvid_from_url(url)
except ValueError:
skipped += 1
print(f"seed skipped, invalid url: {url}")
continue
result = collection.update_one(
{"bvid": bvid},
{
"$setOnInsert": {
"bvid": bvid,
"source_url": url,
"video_url": f"https://www.bilibili.com/video/{bvid}",
"title": "",
"download_file_name": f"{bvid}.mp4",
"status": STATUS_INIT,
"related_fetched": False,
"downloaded": False,
"created_at": time.time(),
}
},
upsert=True,
)
if result.upserted_id is not None:
inserted += 1
else:
skipped += 1
print(f"seed duplicate skipped: {bvid}")
print(f"seed complete, inserted={inserted}, skipped={skipped}")
def claim_fetch_doc():
return collection.find_one_and_update(
{"status": STATUS_INIT, "related_fetched": False},
{"$set": {"status": STATUS_FETCHING, "fetch_started_at": time.time()}},
return_document=ReturnDocument.AFTER,
)
def claim_download_doc():
return collection.find_one_and_update(
{"status": STATUS_READY, "downloaded": False},
{"$set": {"status": STATUS_DOWNLOADING, "download_started_at": time.time()}},
return_document=ReturnDocument.AFTER,
)
async def fetch_video_info_and_related(bvid: str):
v = video.Video(bvid=bvid)
info = await v.get_info()
related_items = await v.get_related()
return info, related_items
def save_related_bvids(parent_bvid: str, related_items):
inserted = 0
skipped = 0
for item in related_items[:FETCH_RELATED_LIMIT]:
related_bvid = item.get("bvid")
if not related_bvid:
continue
result = collection.update_one(
{"bvid": related_bvid},
{
"$setOnInsert": {
"bvid": related_bvid,
"source_url": f"https://www.bilibili.com/video/{related_bvid}",
"video_url": f"https://www.bilibili.com/video/{related_bvid}",
"title": item.get("title", ""),
"download_file_name": build_output_file_name(
related_bvid, item.get("title", "")
),
"status": STATUS_INIT,
"related_fetched": False,
"downloaded": False,
"parent_bvid": parent_bvid,
"created_at": time.time(),
}
},
upsert=True,
)
if result.upserted_id is not None:
inserted += 1
print(f"related inserted: {related_bvid}")
else:
skipped += 1
print(f"related duplicate skipped: {related_bvid}")
return inserted, skipped
async def download_stream(url: str, output_path: Path, intro: str):
download_id = await get_client().download_create(url, HEADERS)
written = 0
total = get_client().download_content_length(download_id)
with open(output_path, "wb") as file_obj:
while True:
chunk = await get_client().download_chunk(download_id)
written += file_obj.write(chunk)
print(f"{intro} - {output_path.name} [{written} / {total}]", end="\r")
if written >= total:
break
print()
def merge_media(video_path: Path, audio_path: Path, output_path: Path):
subprocess.run(
[
FFMPEG_PATH,
"-y",
"-i",
str(video_path),
"-i",
str(audio_path),
"-vcodec",
"copy",
"-acodec",
"copy",
str(output_path),
],
check=True,
)
def convert_flv_to_mp4(source_path: Path, output_path: Path):
subprocess.run(
[FFMPEG_PATH, "-y", "-i", str(source_path), str(output_path)],
check=True,
)
async def download_video_file(bvid: str, output_file_name: str):
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
final_path = get_output_path(output_file_name)
temp_video_path = OUTPUT_DIR / f"{final_path.stem}_video_temp.m4s"
temp_audio_path = OUTPUT_DIR / f"{final_path.stem}_audio_temp.m4s"
temp_flv_path = OUTPUT_DIR / f"{final_path.stem}_temp.flv"
v = video.Video(bvid=bvid)
download_url_data = await v.get_download_url(0)
detector = video.VideoDownloadURLDataDetecter(data=download_url_data)
streams = detector.detect_best_streams()
if detector.check_flv_mp4_stream():
await download_stream(streams[0].url, temp_flv_path, "download flv")
convert_flv_to_mp4(temp_flv_path, final_path)
temp_flv_path.unlink(missing_ok=True)
else:
await download_stream(streams[0].url, temp_video_path, "download video")
await download_stream(streams[1].url, temp_audio_path, "download audio")
merge_media(temp_video_path, temp_audio_path, final_path)
temp_video_path.unlink(missing_ok=True)
temp_audio_path.unlink(missing_ok=True)
return final_path
def mark_doc_downloaded(doc, final_path: Path):
collection.update_one(
{"_id": doc["_id"]},
{
"$set": {
"status": STATUS_DONE,
"downloaded": True,
"file_path": str(final_path),
"updated_at": time.time(),
}
},
)
def process_fetch_doc(doc):
bvid = doc["bvid"]
try:
info, related_items = asyncio.run(fetch_video_info_and_related(bvid))
title = info.get("title", "")
file_name = build_output_file_name(bvid, title)
inserted, skipped = save_related_bvids(bvid, related_items)
collection.update_one(
{"_id": doc["_id"]},
{
"$set": {
"title": title,
"aid": info.get("aid"),
"cid": info.get("cid"),
"owner_name": (info.get("owner") or {}).get("name", ""),
"video_url": f"https://www.bilibili.com/video/{bvid}",
"download_file_name": file_name,
"related_fetched": True,
"status": STATUS_READY,
"related_inserted_count": inserted,
"related_skipped_count": skipped,
"updated_at": time.time(),
}
},
)
print(f"fetch done: {bvid}, related_inserted={inserted}, related_skipped={skipped}")
except Exception as exc:
collection.update_one(
{"_id": doc["_id"]},
{
"$set": {
"status": STATUS_ERROR,
"fetch_error": str(exc),
"updated_at": time.time(),
}
},
)
print(f"fetch failed: {bvid}, error={exc}")
def process_download_doc(doc):
bvid = doc["bvid"]
file_name = doc.get("download_file_name") or f"{bvid}.mp4"
final_path = get_output_path(file_name)
if doc.get("downloaded") and final_path.exists():
print(f"download already marked and file exists, skipped: {bvid}")
return
if final_path.exists():
mark_doc_downloaded(doc, final_path)
print(f"download file already exists, skipped: {bvid} -> {final_path}")
return
try:
final_path = asyncio.run(download_video_file(bvid, file_name))
mark_doc_downloaded(doc, final_path)
print(f"download done: {bvid} -> {final_path}")
except Exception as exc:
collection.update_one(
{"_id": doc["_id"]},
{
"$set": {
"status": STATUS_READY,
"download_error": str(exc),
"updated_at": time.time(),
}
},
)
print(f"download failed: {bvid}, error={exc}")
def fetch_worker():
while True:
doc = claim_fetch_doc()
if not doc:
time.sleep(POLL_SECONDS)
continue
process_fetch_doc(doc)
def download_worker():
while True:
doc = claim_download_doc()
if not doc:
time.sleep(POLL_SECONDS)
continue
process_download_doc(doc)
def main():
ensure_indexes()
reset_in_progress_docs()
seed_from_urls_file()
fetch_thread = threading.Thread(target=fetch_worker, daemon=True, name="fetch-worker")
download_thread = threading.Thread(
target=download_worker, daemon=True, name="download-worker"
)
fetch_thread.start()
download_thread.start()
print("workers started: fetch-worker, download-worker")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("stopping...")
if __name__ == "__main__":
main()