From f7e23f2eb140dfd427c7a99c3c91695d8ea545be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=88=B1=E5=96=9D=E6=B0=B4=E7=9A=84=E6=9C=A8=E5=AD=90?= Date: Tue, 24 Mar 2026 15:45:57 +0800 Subject: [PATCH] ax --- .gitignore | 24 ++++ spider.py | 376 +++++++++++++++++++++++++++++++++++++++++++++++++++++ urls.txt | 1 + 3 files changed, 401 insertions(+) create mode 100644 .gitignore create mode 100644 spider.py create mode 100644 urls.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..34f1177 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +__pycache__/ +*.py[cod] +*.pyo +*.pyd + +.Python +.venv/ +venv/ +env/ +ENV/ + +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ + +downloads/ +logs/ +*.log + +.vscode/ +.idea/ + +.DS_Store +Thumbs.db diff --git a/spider.py b/spider.py new file mode 100644 index 0000000..42f576b --- /dev/null +++ b/spider.py @@ -0,0 +1,376 @@ +import asyncio +import re +import subprocess +import threading +import time +from pathlib import Path + +from bilibili_api import HEADERS, get_client, video # pip install bilibili-api-python +from pymongo import MongoClient, ReturnDocument # pip install pymongo + + +MONGO_URI = "mongodb://192.168.28.9:27017/" +MONGO_DB_NAME = "bilibiliss" +VIDEO_COLLECTION = "bilibili_video_pool" + +BASE_DIR = Path(__file__).resolve().parent +URLS_FILE = BASE_DIR / "urls.txt" +OUTPUT_DIR = BASE_DIR / "downloads" +FFMPEG_PATH = "ffmpeg" +POLL_SECONDS = 2 +FETCH_RELATED_LIMIT = 20 + +STATUS_INIT = "init" +STATUS_FETCHING = "fetching" +STATUS_READY = "ready" +STATUS_DOWNLOADING = "downloading" +STATUS_DONE = "done" +STATUS_ERROR = "error" + +client = MongoClient(MONGO_URI) +collection = client[MONGO_DB_NAME][VIDEO_COLLECTION] + + +def get_bvid_from_url(url: str) -> str: + match = re.search(r"BV[0-9A-Za-z]{10}", url) + if not match: + raise ValueError(f"link does not contain bvid: {url}") + return match.group(0) + + +def sanitize_title(title: str, max_length: int = 80) -> str: + cleaned = re.sub(r'[\\/:*?"<>|]+', "_", title).strip() + cleaned = re.sub(r"\s+", "_", cleaned) + cleaned = re.sub(r"_+", "_", cleaned) + cleaned = cleaned.strip("._") + if not cleaned: + cleaned = "video" + return cleaned[:max_length].rstrip("._") + + +def build_output_file_name(bvid: str, title: str) -> str: + return f"{bvid}_{sanitize_title(title)}.mp4" + + +def get_output_path(file_name: str) -> Path: + return OUTPUT_DIR / file_name + + +def ensure_indexes(): + collection.create_index("bvid", unique=True) + collection.create_index("status") + + +def reset_in_progress_docs(): + fetch_reset = collection.update_many( + {"status": STATUS_FETCHING}, + {"$set": {"status": STATUS_INIT, "updated_at": time.time()}}, + ) + download_reset = collection.update_many( + {"status": STATUS_DOWNLOADING}, + {"$set": {"status": STATUS_READY, "updated_at": time.time()}}, + ) + print( + f"reset in-progress docs: fetching={fetch_reset.modified_count}, " + f"downloading={download_reset.modified_count}" + ) + + +def seed_from_urls_file(): + if not URLS_FILE.exists(): + print(f"seed skipped, file not found: {URLS_FILE}") + return + + inserted = 0 + skipped = 0 + with open(URLS_FILE, "r", encoding="utf-8") as file_obj: + for raw_line in file_obj: + url = raw_line.strip() + if not url: + continue + try: + bvid = get_bvid_from_url(url) + except ValueError: + skipped += 1 + print(f"seed skipped, invalid url: {url}") + continue + + result = collection.update_one( + {"bvid": bvid}, + { + "$setOnInsert": { + "bvid": bvid, + "source_url": url, + "video_url": f"https://www.bilibili.com/video/{bvid}", + "title": "", + "download_file_name": f"{bvid}.mp4", + "status": STATUS_INIT, + "related_fetched": False, + "downloaded": False, + "created_at": time.time(), + } + }, + upsert=True, + ) + if result.upserted_id is not None: + inserted += 1 + else: + skipped += 1 + print(f"seed duplicate skipped: {bvid}") + + print(f"seed complete, inserted={inserted}, skipped={skipped}") + + +def claim_fetch_doc(): + return collection.find_one_and_update( + {"status": STATUS_INIT, "related_fetched": False}, + {"$set": {"status": STATUS_FETCHING, "fetch_started_at": time.time()}}, + return_document=ReturnDocument.AFTER, + ) + + +def claim_download_doc(): + return collection.find_one_and_update( + {"status": STATUS_READY, "downloaded": False}, + {"$set": {"status": STATUS_DOWNLOADING, "download_started_at": time.time()}}, + return_document=ReturnDocument.AFTER, + ) + + +async def fetch_video_info_and_related(bvid: str): + v = video.Video(bvid=bvid) + info = await v.get_info() + related_items = await v.get_related() + return info, related_items + + +def save_related_bvids(parent_bvid: str, related_items): + inserted = 0 + skipped = 0 + for item in related_items[:FETCH_RELATED_LIMIT]: + related_bvid = item.get("bvid") + if not related_bvid: + continue + + result = collection.update_one( + {"bvid": related_bvid}, + { + "$setOnInsert": { + "bvid": related_bvid, + "source_url": f"https://www.bilibili.com/video/{related_bvid}", + "video_url": f"https://www.bilibili.com/video/{related_bvid}", + "title": item.get("title", ""), + "download_file_name": build_output_file_name( + related_bvid, item.get("title", "") + ), + "status": STATUS_INIT, + "related_fetched": False, + "downloaded": False, + "parent_bvid": parent_bvid, + "created_at": time.time(), + } + }, + upsert=True, + ) + if result.upserted_id is not None: + inserted += 1 + print(f"related inserted: {related_bvid}") + else: + skipped += 1 + print(f"related duplicate skipped: {related_bvid}") + return inserted, skipped + + +async def download_stream(url: str, output_path: Path, intro: str): + download_id = await get_client().download_create(url, HEADERS) + written = 0 + total = get_client().download_content_length(download_id) + with open(output_path, "wb") as file_obj: + while True: + chunk = await get_client().download_chunk(download_id) + written += file_obj.write(chunk) + print(f"{intro} - {output_path.name} [{written} / {total}]", end="\r") + if written >= total: + break + print() + + +def merge_media(video_path: Path, audio_path: Path, output_path: Path): + subprocess.run( + [ + FFMPEG_PATH, + "-y", + "-i", + str(video_path), + "-i", + str(audio_path), + "-vcodec", + "copy", + "-acodec", + "copy", + str(output_path), + ], + check=True, + ) + + +def convert_flv_to_mp4(source_path: Path, output_path: Path): + subprocess.run( + [FFMPEG_PATH, "-y", "-i", str(source_path), str(output_path)], + check=True, + ) + + +async def download_video_file(bvid: str, output_file_name: str): + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + + final_path = get_output_path(output_file_name) + temp_video_path = OUTPUT_DIR / f"{final_path.stem}_video_temp.m4s" + temp_audio_path = OUTPUT_DIR / f"{final_path.stem}_audio_temp.m4s" + temp_flv_path = OUTPUT_DIR / f"{final_path.stem}_temp.flv" + + v = video.Video(bvid=bvid) + download_url_data = await v.get_download_url(0) + detector = video.VideoDownloadURLDataDetecter(data=download_url_data) + streams = detector.detect_best_streams() + + if detector.check_flv_mp4_stream(): + await download_stream(streams[0].url, temp_flv_path, "download flv") + convert_flv_to_mp4(temp_flv_path, final_path) + temp_flv_path.unlink(missing_ok=True) + else: + await download_stream(streams[0].url, temp_video_path, "download video") + await download_stream(streams[1].url, temp_audio_path, "download audio") + merge_media(temp_video_path, temp_audio_path, final_path) + temp_video_path.unlink(missing_ok=True) + temp_audio_path.unlink(missing_ok=True) + + return final_path + + +def mark_doc_downloaded(doc, final_path: Path): + collection.update_one( + {"_id": doc["_id"]}, + { + "$set": { + "status": STATUS_DONE, + "downloaded": True, + "file_path": str(final_path), + "updated_at": time.time(), + } + }, + ) + + +def process_fetch_doc(doc): + bvid = doc["bvid"] + try: + info, related_items = asyncio.run(fetch_video_info_and_related(bvid)) + title = info.get("title", "") + file_name = build_output_file_name(bvid, title) + inserted, skipped = save_related_bvids(bvid, related_items) + collection.update_one( + {"_id": doc["_id"]}, + { + "$set": { + "title": title, + "aid": info.get("aid"), + "cid": info.get("cid"), + "owner_name": (info.get("owner") or {}).get("name", ""), + "video_url": f"https://www.bilibili.com/video/{bvid}", + "download_file_name": file_name, + "related_fetched": True, + "status": STATUS_READY, + "related_inserted_count": inserted, + "related_skipped_count": skipped, + "updated_at": time.time(), + } + }, + ) + print(f"fetch done: {bvid}, related_inserted={inserted}, related_skipped={skipped}") + except Exception as exc: + collection.update_one( + {"_id": doc["_id"]}, + { + "$set": { + "status": STATUS_ERROR, + "fetch_error": str(exc), + "updated_at": time.time(), + } + }, + ) + print(f"fetch failed: {bvid}, error={exc}") + + +def process_download_doc(doc): + bvid = doc["bvid"] + file_name = doc.get("download_file_name") or f"{bvid}.mp4" + final_path = get_output_path(file_name) + + if doc.get("downloaded") and final_path.exists(): + print(f"download already marked and file exists, skipped: {bvid}") + return + + if final_path.exists(): + mark_doc_downloaded(doc, final_path) + print(f"download file already exists, skipped: {bvid} -> {final_path}") + return + + try: + final_path = asyncio.run(download_video_file(bvid, file_name)) + mark_doc_downloaded(doc, final_path) + print(f"download done: {bvid} -> {final_path}") + except Exception as exc: + collection.update_one( + {"_id": doc["_id"]}, + { + "$set": { + "status": STATUS_READY, + "download_error": str(exc), + "updated_at": time.time(), + } + }, + ) + print(f"download failed: {bvid}, error={exc}") + + +def fetch_worker(): + while True: + doc = claim_fetch_doc() + if not doc: + time.sleep(POLL_SECONDS) + continue + process_fetch_doc(doc) + + +def download_worker(): + while True: + doc = claim_download_doc() + if not doc: + time.sleep(POLL_SECONDS) + continue + process_download_doc(doc) + + +def main(): + ensure_indexes() + reset_in_progress_docs() + seed_from_urls_file() + + fetch_thread = threading.Thread(target=fetch_worker, daemon=True, name="fetch-worker") + download_thread = threading.Thread( + target=download_worker, daemon=True, name="download-worker" + ) + fetch_thread.start() + download_thread.start() + + print("workers started: fetch-worker, download-worker") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + print("stopping...") + + +if __name__ == "__main__": + main() diff --git a/urls.txt b/urls.txt new file mode 100644 index 0000000..d531717 --- /dev/null +++ b/urls.txt @@ -0,0 +1 @@ +https://www.bilibili.com/video/BV1Xp4y1X7Wn/?spm_id_from=333.788.recommend_more_video.8&trackid=web_related_0.router-related-2479604-9xr68.1773728848029.389&vd_source=e99b2a3d2640fab69ff8cac2517a451c \ No newline at end of file