#!/usr/bin/env python3 """dlit — a tiny terminal wrapper for yt-dlp. Paste a link at the prompt; downloads queue and run serially in the background. Commands: :mp3, :video, :none switch the active preset. Ctrl-D or :q to quit. """ from __future__ import annotations import os import queue import readline # noqa: F401 (importing enables line editing in input()) import subprocess import sys import tempfile import threading import time from pathlib import Path import yt_dlp from yt_dlp.postprocessor import EmbedThumbnailPP from yt_dlp.postprocessor.common import PostProcessor VERSION = "1.5" DOWNLOAD_DIR = Path.home() / "Downloads" PROMPT = "dlit> " PRESETS: dict[str, dict] = { "mp3": { "format": "bestaudio/best", "writethumbnail": True, "postprocessors": [ {"key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "320"}, {"key": "FFmpegMetadata"}, # EmbedThumbnail is registered manually in worker() so that # SquareThumbnailCropper can run between metadata-write and embed. ], "postprocessor_args": { "metadata": [ "-metadata", f"comment=DLit {VERSION} (Conway)", "-metadata", "genre=", ], }, }, "video": {"format": "bestvideo+bestaudio/best"}, "none": {}, } _print_lock = threading.Lock() _line_distance: dict[int, int] = {} # job_id -> rows above the current cursor position _job_title: dict[int, str] = {} # job_id -> resolved title (once known) _RESET = "\x1b[0m" ICON_QUEUED = f"\x1b[35m~{_RESET}" # purple ICON_DOWNLOADING = f"\x1b[33m↓{_RESET}" # yellow ICON_CONVERTING = f"\x1b[34m↻{_RESET}" # blue ICON_DONE = f"\x1b[32m✓{_RESET}" # green ICON_FAILED = f"\x1b[31m✗{_RESET}" # red def add_job_line(job_id: int, text: str) -> None: """Main thread: print a new tracked line above the next prompt.""" with _print_lock: # Existing tracked lines move up by 2 rows: the just-submitted prompt line # stays on screen, and the new tracked line we're about to print is one more. for k in _line_distance: _line_distance[k] += 2 _line_distance[job_id] = 1 sys.stdout.write(text + "\n") sys.stdout.flush() def update_job_line(job_id: int, text: str) -> None: """Worker thread: rewrite this job's tracked line in place; cursor returns to its prior spot.""" with _print_lock: d = _line_distance.get(job_id) if d is None: return sys.stdout.write( "\x1b7" # save cursor (DECSC) f"\x1b[{d}A" # up d rows "\r\x1b[K" # clear that line f"{text}" "\x1b8" # restore cursor (DECRC) ) sys.stdout.flush() def make_opts(job_id: int, preset: str) -> dict: last = {"pct": -5.0, "t": 0.0} def progress(d: dict) -> None: info = d.get("info_dict") or {} title = info.get("title") if title: _job_title[job_id] = title status = d.get("status") shown_title = _job_title.get(job_id, "…")[:60] if status == "downloading": total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 done = d.get("downloaded_bytes") or 0 pct = (done * 100.0 / total) if total else 0.0 now = time.monotonic() if pct - last["pct"] >= 5 or now - last["t"] >= 2.0: last["pct"], last["t"] = pct, now speed = d.get("speed") or 0 speed_s = f"{speed / 1_000_000:4.1f} MB/s" if speed else " - MB/s" update_job_line( job_id, f"[{ICON_DOWNLOADING}] {shown_title} {pct:5.1f}% {speed_s}", ) elif status == "finished" and preset == "mp3": update_job_line(job_id, f"[{ICON_CONVERTING}] {shown_title}") return { "outtmpl": str(DOWNLOAD_DIR / "%(title)s.%(ext)s"), "noplaylist": True, "quiet": True, "no_warnings": True, "no_color": True, "noprogress": True, "progress_hooks": [progress], **PRESETS[preset], } class SquareThumbnailCropper(PostProcessor): """Detect a square cover image padded into a 16:9 YouTube thumbnail and crop the bars off before EmbedThumbnail runs. Works on bars of any solid color (black, white, the album's accent color, etc.) by sampling the leftmost/rightmost columns and walking inward until the column stops matching the edge color. Guarded by aspect ratio: if the detected content region isn't roughly square the original is left untouched, so YouTube videos with natively 16:9 thumbnails or artwork with uniform color edges that fool the sampler just pass through uncropped. """ SQUARE_TOLERANCE = 0.05 # accept detected aspect ratios within ±5% of 1:1 COLOR_TOLERANCE = 12 # per-channel RGB deviation a column may have from edge def run(self, info): path = self._thumbnail_path(info) if not path or not os.path.exists(path): return [], info crop = self._detect(path) if not crop: return [], info w, h, x, y = crop if h <= 0 or abs(w / h - 1.0) > self.SQUARE_TOLERANCE: return [], info self._apply(path, w, h, x, y) return [], info @staticmethod def _thumbnail_path(info: dict): for t in reversed(info.get("thumbnails") or []): fp = t.get("filepath") if fp: return fp return None @classmethod def _detect(cls, path: str): dims = cls._probe_dimensions(path) if not dims: return None w, h = dims buf = cls._dump_pixels(path, w, h) if buf is None: return None sample_ys = [h * i // 6 for i in range(1, 6)] tol = cls.COLOR_TOLERANCE def col_uniform(x: int, ref: tuple) -> bool: for y in sample_ys: i = (y * w + x) * 3 if (abs(buf[i] - ref[0]) > tol or abs(buf[i + 1] - ref[1]) > tol or abs(buf[i + 2] - ref[2]) > tol): return False return True def pix(x: int, y: int) -> tuple: i = (y * w + x) * 3 return buf[i], buf[i + 1], buf[i + 2] left_ref = pix(0, h // 2) left_w = 0 for x in range(w // 2): if not col_uniform(x, left_ref): break left_w = x + 1 right_ref = pix(w - 1, h // 2) right_w = 0 for x in range(w - 1, w // 2, -1): if not col_uniform(x, right_ref): break right_w += 1 content_w = w - left_w - right_w if content_w <= 0: return None return content_w, h, left_w, 0 @staticmethod def _probe_dimensions(path: str): try: res = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=10, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if res.returncode != 0: return None parts = res.stdout.strip().split(",") if len(parts) != 2: return None try: w, h = int(parts[0]), int(parts[1]) except ValueError: return None return (w, h) if w > 0 and h > 0 else None @staticmethod def _dump_pixels(path: str, w: int, h: int): try: res = subprocess.run( ["ffmpeg", "-hide_banner", "-loglevel", "error", "-i", path, "-f", "rawvideo", "-pix_fmt", "rgb24", "-"], capture_output=True, timeout=15, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if res.returncode != 0 or len(res.stdout) != w * h * 3: return None return res.stdout @staticmethod def _apply(path: str, w: int, h: int, x: int, y: int) -> None: suffix = os.path.splitext(path)[1] or ".jpg" # dir= keeps the temp file on the same filesystem as the target so the # final os.replace() is an atomic rename, not a cross-device error. fd, tmp = tempfile.mkstemp(suffix=suffix, dir=os.path.dirname(path) or ".") os.close(fd) try: subprocess.run( ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y", "-i", path, "-vf", f"crop={w}:{h}:{x}:{y}", tmp], check=True, capture_output=True, timeout=15, ) os.replace(tmp, path) except (subprocess.CalledProcessError, subprocess.TimeoutExpired): try: os.unlink(tmp) except OSError: pass def worker(jobs: "queue.Queue") -> None: while True: item = jobs.get() if item is None: return job_id, url, preset = item try: with yt_dlp.YoutubeDL(make_opts(job_id, preset)) as ydl: if preset == "mp3": ydl.add_post_processor(SquareThumbnailCropper(ydl), when="post_process") ydl.add_post_processor(EmbedThumbnailPP(ydl), when="post_process") info = ydl.extract_info(url, download=True) title = (info or {}).get("title") or _job_title.get(job_id) or url _job_title[job_id] = title update_job_line(job_id, f"[{ICON_DONE}] {title[:60]}") except Exception as exc: # noqa: BLE001 err = str(exc).splitlines()[0][:60] title = _job_title.get(job_id) or url update_job_line(job_id, f"[{ICON_FAILED}] {title[:60]} {err}") def main() -> None: DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) jobs: "queue.Queue" = queue.Queue() threading.Thread(target=worker, args=(jobs,), daemon=True).start() preset = "mp3" next_id = 1 print(f"dlit v.{VERSION} -> {DOWNLOAD_DIR} preset={preset} (:q to quit)") while True: try: line = input(PROMPT).strip() except (EOFError, KeyboardInterrupt): print() break if not line: continue if line.startswith(":"): cmd = line[1:] if cmd in ("q", "quit", "exit"): break if cmd in PRESETS: preset = cmd with _print_lock: for k in _line_distance: _line_distance[k] += 2 sys.stdout.write(f" preset -> {preset}\n") sys.stdout.flush() else: with _print_lock: for k in _line_distance: _line_distance[k] += 2 sys.stdout.write( f" unknown command: :{cmd} (try :mp3, :video, :none, :q)\n" ) sys.stdout.flush() continue jobs.put((next_id, line, preset)) add_job_line(next_id, f"[{ICON_QUEUED}] {line}") next_id += 1 if __name__ == "__main__": main()