#!/usr/bin/env python3 """DLit — Download It! a tiny terminal wrapper for yt-dlp. Paste a link at the prompt; downloads queue and run serially in the background. Commands: :mp3, :video, :none switch the active preset. Ctrl-D or :q to quit. """ from __future__ import annotations import os import queue import re import readline # noqa: F401 (importing enables line editing in input()) import shutil import subprocess import sys import tempfile import threading import time from pathlib import Path import yt_dlp from yt_dlp.postprocessor import EmbedThumbnailPP from yt_dlp.postprocessor.common import PostProcessor VERSION = "1.7" DOWNLOAD_DIR = Path.home() / "Downloads" PROMPT = "dlit> " # A pasted link is first *resolved* (a quick, flat metadata fetch) to decide # whether it is one video or a playlist. A playlist fans out into one queued # job per entry, each labelled "(PlaylistName - i/N)". PLAYLIST_LIMIT bounds how # many entries a single paste may expand into, so a pathological or effectively # endless feed (e.g. youtube.com's recommendations) can't flood the queue. PLAYLIST_LIMIT = 200 SOCKET_TIMEOUT = 30 # seconds; bounds network stalls during resolve + download PRESETS: dict[str, dict] = { "mp3": { "format": "bestaudio/best", "writethumbnail": True, "postprocessors": [ {"key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "320"}, {"key": "FFmpegMetadata"}, # EmbedThumbnail is registered manually in worker() so that # SquareThumbnailCropper can run between metadata-write and embed. ], "postprocessor_args": { "metadata": [ "-metadata", f"comment=DLit {VERSION} (Conway)", "-metadata", "genre=", ], }, }, "video": {"format": "bestvideo+bestaudio/best"}, "none": {}, } class _QuietLogger: """Swallow yt-dlp's own console output. yt-dlp writes errors/warnings straight to stderr even under quiet=True (see YoutubeDL.to_stderr). Those bytes bypass our cursor save/restore line management and land on top of the `dlit>` prompt, corrupting the display — most visibly on an invalid URL, which emits an "ERROR: ..." line. Routing everything through a logger keeps all output under our control; the failure itself still surfaces via the DownloadError that worker() catches. """ def debug(self, msg: str) -> None: ... def info(self, msg: str) -> None: ... def warning(self, msg: str) -> None: ... def error(self, msg: str) -> None: ... _QUIET_LOGGER = _QuietLogger() _print_lock = threading.Lock() _line_distance: dict[int, int] = {} # job_id -> rows above the current cursor position _job_title: dict[int, str] = {} # job_id -> resolved title (once known) _job_tag: dict[int, str] = {} # job_id -> "(PlaylistName - i/N) " prefix, "" for singles _RESET = "\x1b[0m" ICON_QUEUED = f"\x1b[35m~{_RESET}" # purple ICON_DOWNLOADING = f"\x1b[33m↓{_RESET}" # yellow ICON_CONVERTING = f"\x1b[34m↻{_RESET}" # blue ICON_DONE = f"\x1b[32m✓{_RESET}" # green ICON_FAILED = f"\x1b[31m✗{_RESET}" # red _SGR_RE = re.compile(r"\x1b\[[0-9;]*m") def _clamp(text: str) -> str: """Trim a status line to the terminal width, counting visible columns only. Job lines are rewritten in place by relying on each one occupying exactly one terminal row (see update_job_line). A line wider than the terminal wraps onto the row below — the prompt — and the cursor save/restore math no longer lands where it should, corrupting the display. Clamping keeps every line to a single row. SGR colour codes don't occupy columns, so they're copied through without counting toward the budget. """ width = shutil.get_terminal_size((80, 24)).columns - 1 if width <= 0: return text out: list[str] = [] visible = i = 0 while i < len(text): m = _SGR_RE.match(text, i) if m: out.append(m.group()) i = m.end() continue if visible >= width: return "".join(out) + _RESET # reset in case we cut mid-colour out.append(text[i]) visible += 1 i += 1 return "".join(out) def add_job_lines(items: list[tuple[int, str]]) -> None: """Main thread: print one or more new tracked lines above the next prompt. Submitting a link can fan out into several lines at once (one per playlist entry). Printing N lines pushes the next prompt down by N+1 rows — the N new lines plus the just-submitted prompt line that stays on screen — so every existing tracked line moves up by that much. The new lines, printed top to bottom, sit at distances N, N-1, … 1 above the coming prompt. With N=1 this reduces to the original "+= 2, new line at distance 1". """ with _print_lock: n = len(items) for k in _line_distance: _line_distance[k] += n + 1 for offset, (job_id, text) in enumerate(items): _line_distance[job_id] = n - offset sys.stdout.write(_clamp(text) + "\n") sys.stdout.flush() def notice(text: str) -> None: """Main thread: print a one-off, untracked line above the next prompt.""" with _print_lock: for k in _line_distance: _line_distance[k] += 2 sys.stdout.write(_clamp(text) + "\n") sys.stdout.flush() def update_job_line(job_id: int, text: str) -> None: """Worker thread: rewrite this job's tracked line in place; cursor returns to its prior spot.""" with _print_lock: d = _line_distance.get(job_id) if d is None: return sys.stdout.write( "\x1b7" # save cursor (DECSC) f"\x1b[{d}A" # up d rows "\r\x1b[K" # clear that line f"{_clamp(text)}" "\x1b8" # restore cursor (DECRC) ) sys.stdout.flush() def make_opts(job_id: int, preset: str) -> dict: last = {"pct": -5.0, "t": 0.0} def progress(d: dict) -> None: info = d.get("info_dict") or {} title = info.get("title") if title: _job_title[job_id] = title status = d.get("status") tag = _job_tag.get(job_id, "") shown_title = tag + _job_title.get(job_id, "…")[:60] if status == "downloading": total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 done = d.get("downloaded_bytes") or 0 pct = (done * 100.0 / total) if total else 0.0 now = time.monotonic() if pct - last["pct"] >= 5 or now - last["t"] >= 2.0: last["pct"], last["t"] = pct, now speed = d.get("speed") or 0 speed_s = f"{speed / 1_000_000:4.1f} MB/s" if speed else " - MB/s" update_job_line( job_id, f"[{ICON_DOWNLOADING}] {shown_title} {pct:5.1f}% {speed_s}", ) elif status == "finished" and preset == "mp3": update_job_line(job_id, f"[{ICON_CONVERTING}] {shown_title}") return { "outtmpl": str(DOWNLOAD_DIR / "%(title)s.%(ext)s"), "noplaylist": True, "quiet": True, "no_warnings": True, "no_color": True, "noprogress": True, "logger": _QUIET_LOGGER, "socket_timeout": SOCKET_TIMEOUT, "progress_hooks": [progress], **PRESETS[preset], } def resolve_opts() -> dict: """Options for the quick pre-download probe that classifies a pasted link. extract_flat keeps it cheap: playlist entries come back as bare references (id/title/url) without fetching each video. noplaylist=True preserves the download behaviour — a video that merely sits inside a playlist (watch?v=…&list=…) resolves to the single video; only a pure playlist or feed URL fans out. playlistend bounds how far an endless feed is walked. """ return { "quiet": True, "no_warnings": True, "no_color": True, "noplaylist": True, "logger": _QUIET_LOGGER, "socket_timeout": SOCKET_TIMEOUT, "extract_flat": "in_playlist", "playlistend": PLAYLIST_LIMIT, } def resolve(url: str) -> tuple[bool, str, list[tuple[str, str]]]: """Classify a pasted link. Returns (is_playlist, name, [(entry_url, title)]). A single video comes back as (False, title, [(url, title)]). A playlist or feed comes back as (True, playlist_name, [...]) — possibly empty, e.g. a logged-out recommendations feed, in which case nothing is queued. """ with yt_dlp.YoutubeDL(resolve_opts()) as ydl: info = ydl.extract_info(url, download=False) or {} entries = info.get("entries") if entries is None: title = info.get("title") or url return False, title, [(info.get("webpage_url") or url, title)] name = info.get("title") or "playlist" items = [] for e in entries: if not e: continue eurl = e.get("url") or e.get("webpage_url") or e.get("id") if eurl: items.append((eurl, e.get("title") or eurl)) return True, name, items class SquareThumbnailCropper(PostProcessor): """Detect a square cover image padded into a 16:9 YouTube thumbnail and crop the bars off before EmbedThumbnail runs. Works on bars of any solid color (black, white, the album's accent color, etc.) by sampling the leftmost/rightmost columns and walking inward until the column stops matching the edge color. Guarded by aspect ratio: if the detected content region isn't roughly square the original is left untouched, so YouTube videos with natively 16:9 thumbnails or artwork with uniform color edges that fool the sampler just pass through uncropped. """ SQUARE_TOLERANCE = 0.05 # accept detected aspect ratios within ±5% of 1:1 COLOR_TOLERANCE = 12 # per-channel RGB deviation a column may have from edge def run(self, info): path = self._thumbnail_path(info) if not path or not os.path.exists(path): return [], info crop = self._detect(path) if not crop: return [], info w, h, x, y = crop if h <= 0 or abs(w / h - 1.0) > self.SQUARE_TOLERANCE: return [], info self._apply(path, w, h, x, y) return [], info @staticmethod def _thumbnail_path(info: dict): for t in reversed(info.get("thumbnails") or []): fp = t.get("filepath") if fp: return fp return None @classmethod def _detect(cls, path: str): dims = cls._probe_dimensions(path) if not dims: return None w, h = dims buf = cls._dump_pixels(path, w, h) if buf is None: return None sample_ys = [h * i // 6 for i in range(1, 6)] tol = cls.COLOR_TOLERANCE def col_uniform(x: int, ref: tuple) -> bool: for y in sample_ys: i = (y * w + x) * 3 if (abs(buf[i] - ref[0]) > tol or abs(buf[i + 1] - ref[1]) > tol or abs(buf[i + 2] - ref[2]) > tol): return False return True def pix(x: int, y: int) -> tuple: i = (y * w + x) * 3 return buf[i], buf[i + 1], buf[i + 2] left_ref = pix(0, h // 2) left_w = 0 for x in range(w // 2): if not col_uniform(x, left_ref): break left_w = x + 1 right_ref = pix(w - 1, h // 2) right_w = 0 for x in range(w - 1, w // 2, -1): if not col_uniform(x, right_ref): break right_w += 1 content_w = w - left_w - right_w if content_w <= 0: return None return content_w, h, left_w, 0 @staticmethod def _probe_dimensions(path: str): try: res = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=10, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if res.returncode != 0: return None parts = res.stdout.strip().split(",") if len(parts) != 2: return None try: w, h = int(parts[0]), int(parts[1]) except ValueError: return None return (w, h) if w > 0 and h > 0 else None @staticmethod def _dump_pixels(path: str, w: int, h: int): try: res = subprocess.run( ["ffmpeg", "-hide_banner", "-loglevel", "error", "-i", path, "-f", "rawvideo", "-pix_fmt", "rgb24", "-"], capture_output=True, timeout=15, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if res.returncode != 0 or len(res.stdout) != w * h * 3: return None return res.stdout @staticmethod def _apply(path: str, w: int, h: int, x: int, y: int) -> None: suffix = os.path.splitext(path)[1] or ".jpg" # dir= keeps the temp file on the same filesystem as the target so the # final os.replace() is an atomic rename, not a cross-device error. fd, tmp = tempfile.mkstemp(suffix=suffix, dir=os.path.dirname(path) or ".") os.close(fd) try: subprocess.run( ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y", "-i", path, "-vf", f"crop={w}:{h}:{x}:{y}", tmp], check=True, capture_output=True, timeout=15, ) os.replace(tmp, path) except (subprocess.CalledProcessError, subprocess.TimeoutExpired): try: os.unlink(tmp) except OSError: pass def worker(jobs: "queue.Queue") -> None: while True: item = jobs.get() if item is None: return job_id, url, preset = item tag = _job_tag.get(job_id, "") try: with yt_dlp.YoutubeDL(make_opts(job_id, preset)) as ydl: if preset == "mp3": ydl.add_post_processor(SquareThumbnailCropper(ydl), when="post_process") ydl.add_post_processor(EmbedThumbnailPP(ydl), when="post_process") info = ydl.extract_info(url, download=True) title = (info or {}).get("title") or _job_title.get(job_id) or url _job_title[job_id] = title update_job_line(job_id, f"[{ICON_DONE}] {tag}{title[:60]}") except Exception as exc: # noqa: BLE001 err = str(exc).splitlines()[0][:60] title = _job_title.get(job_id) or url update_job_line(job_id, f"[{ICON_FAILED}] {tag}{title[:60]} {err}") def main() -> None: DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) jobs: "queue.Queue" = queue.Queue() threading.Thread(target=worker, args=(jobs,), daemon=True).start() preset = "mp3" next_id = 1 print(f"dlit v.{VERSION} -> {DOWNLOAD_DIR} preset={preset} (:q to quit)") while True: try: line = input(PROMPT).strip() except (EOFError, KeyboardInterrupt): print() break if not line: continue if line.startswith(":"): cmd = line[1:] if cmd in ("q", "quit", "exit"): break if cmd in PRESETS: preset = cmd notice(f" preset -> {preset}") else: notice(f" unknown command: :{cmd} (try :mp3, :video, :none, :q)") continue # A pasted link: resolve it (a quick network probe) to learn whether it's # a single video or a playlist, then queue one job per resulting entry. try: is_playlist, name, entries = resolve(line) except Exception as exc: # noqa: BLE001 err = str(exc).splitlines()[0][:60] add_job_lines([(next_id, f"[{ICON_FAILED}] {line} {err}")]) next_id += 1 continue if not entries: notice(f" nothing to download in '{name[:40]}'") continue total = len(entries) lines, pending = [], [] for i, (entry_url, title) in enumerate(entries, 1): _job_title[next_id] = title if is_playlist: _job_tag[next_id] = f"({name[:24]} - {i}/{total}) " label = _job_tag.get(next_id, "") + title lines.append((next_id, f"[{ICON_QUEUED}] {label}")) pending.append((next_id, entry_url, preset)) next_id += 1 # Register the display lines before enqueuing, so a fast worker never # tries to update a line whose distance isn't set yet. add_job_lines(lines) for job in pending: jobs.put(job) if __name__ == "__main__": main()