import asyncio import re import subprocess import threading import time from pathlib import Path from bilibili_api import HEADERS, get_client, video # pip install bilibili-api-python from pymongo import MongoClient, ReturnDocument # pip install pymongo MONGO_URI = "mongodb://192.168.28.9:27017/" MONGO_DB_NAME = "bilibiliss" VIDEO_COLLECTION = "bilibili_video_pool" BASE_DIR = Path(__file__).resolve().parent URLS_FILE = BASE_DIR / "urls.txt" OUTPUT_DIR = BASE_DIR / "downloads" FFMPEG_PATH = "ffmpeg" POLL_SECONDS = 2 FETCH_RELATED_LIMIT = 20 STATUS_INIT = "init" STATUS_FETCHING = "fetching" STATUS_READY = "ready" STATUS_DOWNLOADING = "downloading" STATUS_DONE = "done" STATUS_ERROR = "error" client = MongoClient(MONGO_URI) collection = client[MONGO_DB_NAME][VIDEO_COLLECTION] def get_bvid_from_url(url: str) -> str: match = re.search(r"BV[0-9A-Za-z]{10}", url) if not match: raise ValueError(f"link does not contain bvid: {url}") return match.group(0) def sanitize_title(title: str, max_length: int = 80) -> str: cleaned = re.sub(r'[\\/:*?"<>|]+', "_", title).strip() cleaned = re.sub(r"\s+", "_", cleaned) cleaned = re.sub(r"_+", "_", cleaned) cleaned = cleaned.strip("._") if not cleaned: cleaned = "video" return cleaned[:max_length].rstrip("._") def build_output_file_name(bvid: str, title: str) -> str: return f"{bvid}_{sanitize_title(title)}.mp4" def get_output_path(file_name: str) -> Path: return OUTPUT_DIR / file_name def ensure_indexes(): collection.create_index("bvid", unique=True) collection.create_index("status") def reset_in_progress_docs(): fetch_reset = collection.update_many( {"status": STATUS_FETCHING}, {"$set": {"status": STATUS_INIT, "updated_at": time.time()}}, ) download_reset = collection.update_many( {"status": STATUS_DOWNLOADING}, {"$set": {"status": STATUS_READY, "updated_at": time.time()}}, ) print( f"reset in-progress docs: fetching={fetch_reset.modified_count}, " f"downloading={download_reset.modified_count}" ) def seed_from_urls_file(): if not URLS_FILE.exists(): print(f"seed skipped, file not found: {URLS_FILE}") return inserted = 0 skipped = 0 with open(URLS_FILE, "r", encoding="utf-8") as file_obj: for raw_line in file_obj: url = raw_line.strip() if not url: continue try: bvid = get_bvid_from_url(url) except ValueError: skipped += 1 print(f"seed skipped, invalid url: {url}") continue result = collection.update_one( {"bvid": bvid}, { "$setOnInsert": { "bvid": bvid, "source_url": url, "video_url": f"https://www.bilibili.com/video/{bvid}", "title": "", "download_file_name": f"{bvid}.mp4", "status": STATUS_INIT, "related_fetched": False, "downloaded": False, "created_at": time.time(), } }, upsert=True, ) if result.upserted_id is not None: inserted += 1 else: skipped += 1 print(f"seed duplicate skipped: {bvid}") print(f"seed complete, inserted={inserted}, skipped={skipped}") def claim_fetch_doc(): return collection.find_one_and_update( {"status": STATUS_INIT, "related_fetched": False}, {"$set": {"status": STATUS_FETCHING, "fetch_started_at": time.time()}}, return_document=ReturnDocument.AFTER, ) def claim_download_doc(): return collection.find_one_and_update( {"status": STATUS_READY, "downloaded": False}, {"$set": {"status": STATUS_DOWNLOADING, "download_started_at": time.time()}}, return_document=ReturnDocument.AFTER, ) async def fetch_video_info_and_related(bvid: str): v = video.Video(bvid=bvid) info = await v.get_info() related_items = await v.get_related() return info, related_items def save_related_bvids(parent_bvid: str, related_items): inserted = 0 skipped = 0 for item in related_items[:FETCH_RELATED_LIMIT]: related_bvid = item.get("bvid") if not related_bvid: continue result = collection.update_one( {"bvid": related_bvid}, { "$setOnInsert": { "bvid": related_bvid, "source_url": f"https://www.bilibili.com/video/{related_bvid}", "video_url": f"https://www.bilibili.com/video/{related_bvid}", "title": item.get("title", ""), "download_file_name": build_output_file_name( related_bvid, item.get("title", "") ), "status": STATUS_INIT, "related_fetched": False, "downloaded": False, "parent_bvid": parent_bvid, "created_at": time.time(), } }, upsert=True, ) if result.upserted_id is not None: inserted += 1 print(f"related inserted: {related_bvid}") else: skipped += 1 print(f"related duplicate skipped: {related_bvid}") return inserted, skipped async def download_stream(url: str, output_path: Path, intro: str): download_id = await get_client().download_create(url, HEADERS) written = 0 total = get_client().download_content_length(download_id) with open(output_path, "wb") as file_obj: while True: chunk = await get_client().download_chunk(download_id) written += file_obj.write(chunk) print(f"{intro} - {output_path.name} [{written} / {total}]", end="\r") if written >= total: break print() def merge_media(video_path: Path, audio_path: Path, output_path: Path): subprocess.run( [ FFMPEG_PATH, "-y", "-i", str(video_path), "-i", str(audio_path), "-vcodec", "copy", "-acodec", "copy", str(output_path), ], check=True, ) def convert_flv_to_mp4(source_path: Path, output_path: Path): subprocess.run( [FFMPEG_PATH, "-y", "-i", str(source_path), str(output_path)], check=True, ) async def download_video_file(bvid: str, output_file_name: str): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) final_path = get_output_path(output_file_name) temp_video_path = OUTPUT_DIR / f"{final_path.stem}_video_temp.m4s" temp_audio_path = OUTPUT_DIR / f"{final_path.stem}_audio_temp.m4s" temp_flv_path = OUTPUT_DIR / f"{final_path.stem}_temp.flv" v = video.Video(bvid=bvid) download_url_data = await v.get_download_url(0) detector = video.VideoDownloadURLDataDetecter(data=download_url_data) streams = detector.detect_best_streams() if detector.check_flv_mp4_stream(): await download_stream(streams[0].url, temp_flv_path, "download flv") convert_flv_to_mp4(temp_flv_path, final_path) temp_flv_path.unlink(missing_ok=True) else: await download_stream(streams[0].url, temp_video_path, "download video") await download_stream(streams[1].url, temp_audio_path, "download audio") merge_media(temp_video_path, temp_audio_path, final_path) temp_video_path.unlink(missing_ok=True) temp_audio_path.unlink(missing_ok=True) return final_path def mark_doc_downloaded(doc, final_path: Path): collection.update_one( {"_id": doc["_id"]}, { "$set": { "status": STATUS_DONE, "downloaded": True, "file_path": str(final_path), "updated_at": time.time(), } }, ) def process_fetch_doc(doc): bvid = doc["bvid"] try: info, related_items = asyncio.run(fetch_video_info_and_related(bvid)) title = info.get("title", "") file_name = build_output_file_name(bvid, title) inserted, skipped = save_related_bvids(bvid, related_items) collection.update_one( {"_id": doc["_id"]}, { "$set": { "title": title, "aid": info.get("aid"), "cid": info.get("cid"), "owner_name": (info.get("owner") or {}).get("name", ""), "video_url": f"https://www.bilibili.com/video/{bvid}", "download_file_name": file_name, "related_fetched": True, "status": STATUS_READY, "related_inserted_count": inserted, "related_skipped_count": skipped, "updated_at": time.time(), } }, ) print(f"fetch done: {bvid}, related_inserted={inserted}, related_skipped={skipped}") except Exception as exc: collection.update_one( {"_id": doc["_id"]}, { "$set": { "status": STATUS_ERROR, "fetch_error": str(exc), "updated_at": time.time(), } }, ) print(f"fetch failed: {bvid}, error={exc}") def process_download_doc(doc): bvid = doc["bvid"] file_name = doc.get("download_file_name") or f"{bvid}.mp4" final_path = get_output_path(file_name) if doc.get("downloaded") and final_path.exists(): print(f"download already marked and file exists, skipped: {bvid}") return if final_path.exists(): mark_doc_downloaded(doc, final_path) print(f"download file already exists, skipped: {bvid} -> {final_path}") return try: final_path = asyncio.run(download_video_file(bvid, file_name)) mark_doc_downloaded(doc, final_path) print(f"download done: {bvid} -> {final_path}") except Exception as exc: collection.update_one( {"_id": doc["_id"]}, { "$set": { "status": STATUS_READY, "download_error": str(exc), "updated_at": time.time(), } }, ) print(f"download failed: {bvid}, error={exc}") def fetch_worker(): while True: doc = claim_fetch_doc() if not doc: time.sleep(POLL_SECONDS) continue process_fetch_doc(doc) def download_worker(): while True: doc = claim_download_doc() if not doc: time.sleep(POLL_SECONDS) continue process_download_doc(doc) def main(): ensure_indexes() reset_in_progress_docs() seed_from_urls_file() fetch_thread = threading.Thread(target=fetch_worker, daemon=True, name="fetch-worker") download_thread = threading.Thread( target=download_worker, daemon=True, name="download-worker" ) fetch_thread.start() download_thread.start() print("workers started: fetch-worker, download-worker") try: while True: time.sleep(1) except KeyboardInterrupt: print("stopping...") if __name__ == "__main__": main()