import asyncio import re import subprocess import threading import time import shutil from pathlib import Path from collections import deque from bilibili_api import HEADERS, get_client, video # pip install bilibili-api-python BASE_DIR = Path(__file__).resolve().parent OUTPUT_DIR = BASE_DIR / "downloads" DOWNLOADED_BVID_FILE = BASE_DIR / "downloaded_bvids.txt" # 已下载 bvid 记录文件 SKIPPED_BVID_FILE = BASE_DIR / "skipped_bvids.txt" # 跳过的 bvid 记录文件 FFMPEG_PATH = None # Will be set by find_ffmpeg() POLL_SECONDS = 2 FETCH_RELATED_LIMIT = 20 def find_ffmpeg(): """查找 ffmpeg 可执行文件的路径""" # 首先尝试在系统 PATH 中查找 ffmpeg_path = shutil.which("ffmpeg") if ffmpeg_path: return ffmpeg_path # Windows 常见的 ffmpeg 安装位置 common_paths = [ r"C:\ffmpeg\bin\ffmpeg.exe", r"C:\Program Files\ffmpeg\bin\ffmpeg.exe", r"C:\Program Files (x86)\ffmpeg\bin\ffmpeg.exe", Path.home() / "ffmpeg" / "bin" / "ffmpeg.exe", Path.home() / "Downloads" / "ffmpeg" / "bin" / "ffmpeg.exe", BASE_DIR / "ffmpeg" / "bin" / "ffmpeg.exe", BASE_DIR / "ffmpeg.exe", ] for path in common_paths: if Path(path).exists(): return str(path) return None # 使用线程安全的队列替代数据库 video_queue = deque() # 待下载的视频队列 downloaded_set = set() # 已下载的视频集合,防止重复 queue_lock = threading.Lock() def load_downloaded_bvids(): """从文件加载已下载的 bvid 列表""" global downloaded_set if DOWNLOADED_BVID_FILE.exists(): try: with open(DOWNLOADED_BVID_FILE, "r", encoding="utf-8") as f: downloaded_set = set(line.strip() for line in f if line.strip()) print(f"loaded {len(downloaded_set)} downloaded bvids from file") except Exception as e: print(f"failed to load downloaded bvids: {e}") downloaded_set = set() else: print("no downloaded bvids file found, starting fresh") downloaded_set = set() def save_downloaded_bvid(bvid: str): """将 bvid 追加保存到文件""" try: with open(DOWNLOADED_BVID_FILE, "a", encoding="utf-8") as f: f.write(bvid + "\n") except Exception as e: print(f"failed to save bvid {bvid}: {e}") def save_skipped_bvid(bvid: str, view_count: int, title: str): """将跳过的 bvid 及其信息追加保存到文件""" try: with open(SKIPPED_BVID_FILE, "a", encoding="utf-8") as f: f.write(f"{bvid}\t{view_count}\t{title}\n") except Exception as e: print(f"failed to save skipped bvid {bvid}: {e}") def get_bvid_from_url(url: str) -> str: match = re.search(r"BV[0-9A-Za-z]{10}", url) if not match: raise ValueError(f"link does not contain bvid: {url}") return match.group(0) def sanitize_title(title: str, max_length: int = 80) -> str: # 只保留字母、数字和中文,其他全部移除 cleaned = re.sub(r'[^\w\u4e00-\u9fff]', '', title) # 移除连续下划线 cleaned = re.sub(r'_+', '_', cleaned) cleaned = cleaned.strip('_') if not cleaned: cleaned = "video" return cleaned[:max_length].rstrip('_') def build_output_file_name(bvid: str, title: str) -> str: return f"{bvid}_{sanitize_title(title)}.mp4" def get_output_path(file_name: str) -> Path: return OUTPUT_DIR / file_name def add_to_queue(bvid: str, source_url: str = ""): """添加视频到下载队列""" with queue_lock: if bvid not in downloaded_set and bvid not in [item["bvid"] for item in video_queue]: video_queue.append({ "bvid": bvid, "source_url": source_url or f"https://www.bilibili.com/video/{bvid}", "added_at": time.time() }) print(f"added to queue: {bvid}, queue size: {len(video_queue)}") return True else: print(f"skipped duplicate: {bvid}") return False def get_from_queue(): """从队列获取一个待下载的视频""" with queue_lock: if video_queue: return video_queue.popleft() return None async def fetch_video_info_and_related(bvid: str): v = video.Video(bvid=bvid) info = await v.get_info() related_items = await v.get_related() return info, related_items def save_related_bvids(parent_bvid: str, related_items): """将相关视频添加到队列""" inserted = 0 skipped = 0 for item in related_items[:FETCH_RELATED_LIMIT]: related_bvid = item.get("bvid") if not related_bvid: continue if add_to_queue(related_bvid, f"https://www.bilibili.com/video/{related_bvid}"): inserted += 1 else: skipped += 1 return inserted, skipped async def download_stream(url: str, output_path: Path, intro: str): download_id = await get_client().download_create(url, HEADERS) written = 0 total = get_client().download_content_length(download_id) with open(output_path, "wb") as file_obj: while True: chunk = await get_client().download_chunk(download_id) written += file_obj.write(chunk) print(f"{intro} - {output_path.name} [{written} / {total}]", end="\r") if written >= total: break print() def merge_media(video_path: Path, audio_path: Path, output_path: Path): if not FFMPEG_PATH: raise RuntimeError("ffmpeg not found. Please install ffmpeg and add it to PATH.") subprocess.run( [ FFMPEG_PATH, "-y", "-i", str(video_path), "-i", str(audio_path), "-vcodec", "copy", "-acodec", "copy", str(output_path), ], check=True, ) def convert_flv_to_mp4(source_path: Path, output_path: Path): if not FFMPEG_PATH: raise RuntimeError("ffmpeg not found. Please install ffmpeg and add it to PATH.") subprocess.run( [FFMPEG_PATH, "-y", "-i", str(source_path), str(output_path)], check=True, ) async def download_video_file(bvid: str, output_file_name: str): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) final_path = get_output_path(output_file_name) temp_video_path = OUTPUT_DIR / f"{final_path.stem}_video_temp.m4s" temp_audio_path = OUTPUT_DIR / f"{final_path.stem}_audio_temp.m4s" temp_flv_path = OUTPUT_DIR / f"{final_path.stem}_temp.flv" v = video.Video(bvid=bvid) download_url_data = await v.get_download_url(0) detector = video.VideoDownloadURLDataDetecter(data=download_url_data) streams = detector.detect_best_streams() if detector.check_flv_mp4_stream(): await download_stream(streams[0].url, temp_flv_path, "download flv") convert_flv_to_mp4(temp_flv_path, final_path) temp_flv_path.unlink(missing_ok=True) else: await download_stream(streams[0].url, temp_video_path, "download video") await download_stream(streams[1].url, temp_audio_path, "download audio") merge_media(temp_video_path, temp_audio_path, final_path) temp_video_path.unlink(missing_ok=True) temp_audio_path.unlink(missing_ok=True) return final_path def download_with_youget(bvid: str, title: str, output_dir: Path) -> bool: """使用 you-get 下载视频""" url = f"https://www.bilibili.com/video/{bvid}" # 生成自定义文件名:bvid_清理后的标题 custom_filename = f"{bvid}_{sanitize_title(title)}" try: # 使用 you-get 下载视频,指定文件名,禁用字幕 result = subprocess.run( ["you-get", "-o", str(output_dir), "-O", custom_filename, "--no-caption", url], capture_output=True, text=True, check=True ) print(f"you-get output: {result.stdout}") return True except subprocess.CalledProcessError as e: print(f"you-get download failed: {e.stderr}") return False except FileNotFoundError: print("you-get not found, please install it: pip install you-get") return False def process_download(bvid: str): """处理单个视频的下载""" try: # 获取视频信息 info, related_items = asyncio.run(fetch_video_info_and_related(bvid)) title = info.get("title", bvid) # 获取观看量 view_count = info.get("stat", {}).get("view", 0) print(f"fetching info done: {bvid}, title: {title}, views: {view_count}") # 判断观看量是否超过 50w (500000) if view_count < 500000: print(f"skipped: {bvid}, view count {view_count} < 500000") # 记录跳过的视频信息 save_skipped_bvid(bvid, view_count, title) # 仍然添加相关视频到队列 inserted, skipped = save_related_bvids(bvid, related_items) print(f"related videos: inserted={inserted}, skipped={skipped}") return print(f"view count {view_count} >= 500000, downloading...") file_name = build_output_file_name(bvid, title) # 检查文件是否已存在 final_path = get_output_path(file_name) if final_path.exists(): print(f"file already exists, skipped: {bvid} -> {final_path}") with queue_lock: downloaded_set.add(bvid) return # 使用 you-get 下载视频 success = download_with_youget(bvid, title, OUTPUT_DIR) if success: print(f"download done: {bvid}") # 标记为已下载并保存到文件 with queue_lock: downloaded_set.add(bvid) save_downloaded_bvid(bvid) else: print(f"download failed: {bvid}") # 添加相关视频到队列 inserted, skipped = save_related_bvids(bvid, related_items) print(f"related videos: inserted={inserted}, skipped={skipped}") except Exception as exc: print(f"download failed: {bvid}, error={exc}") def download_worker(): """下载工作线程:从队列中取出视频并下载""" print("download worker started") while True: item = get_from_queue() if not item: time.sleep(POLL_SECONDS) continue bvid = item["bvid"] print(f"processing: {bvid}") process_download(bvid) def input_worker(): """输入工作线程:接收用户输入的链接""" print("input worker started, waiting for URLs...") print("Enter a bilibili URL (or 'quit' to exit):") while True: try: url = input("> ").strip() if url.lower() == 'quit': print("stopping...") break if not url: continue try: bvid = get_bvid_from_url(url) add_to_queue(bvid, url) except ValueError as e: print(f"invalid url: {e}") print("please enter a valid bilibili video URL") except EOFError: break except KeyboardInterrupt: break def main(): global FFMPEG_PATH # 查找 ffmpeg FFMPEG_PATH = find_ffmpeg() if not FFMPEG_PATH: print("=" * 50) print("ERROR: ffmpeg not found!") print("=" * 50) print("Please install ffmpeg and add it to your system PATH.") print("Download from: https://ffmpeg.org/download.html") print("Or place ffmpeg.exe in one of these locations:") print(" - C:\\ffmpeg\\bin\\") print(" - Current directory") print("=" * 50) return print(f"ffmpeg found at: {FFMPEG_PATH}") # 加载已下载的 bvid 列表 load_downloaded_bvids() # 启动下载工作线程 download_thread = threading.Thread(target=download_worker, daemon=True, name="download-worker") download_thread.start() print("=" * 50) print("Bilibili Video Downloader") print("=" * 50) print("Download worker started in background") print("Enter bilibili video URLs to download") print("Related videos will be automatically discovered and queued") print("Type 'quit' to exit") print("=" * 50) # 主线程处理用户输入 input_worker() if __name__ == "__main__": main()