diff options
| author | Conway <[email protected]> | 2026-06-12 01:41:24 -0400 |
|---|---|---|
| committer | Conway <[email protected]> | 2026-06-12 01:41:24 -0400 |
| commit | 0bc89dcf150e97dcafa5efc498a5cfd0e6509abd (patch) | |
| tree | e34904e4d4f299a175452f8e85f3ab8ee04fcca9 /app.py | |
| parent | e94b672644724400702a4d40d5a5d92c7fc9ed9b (diff) | |
Diffstat (limited to 'app.py')
| -rw-r--r-- | app.py | 245 |
1 files changed, 183 insertions, 62 deletions
@@ -1,15 +1,22 @@ #!/usr/bin/env python3 -"""DLit — Download It! +"""DLit — Download It! a tiny terminal wrapper for yt-dlp. Paste a link at the prompt; downloads queue and run serially in the background. Commands: :mp3, :video, :none switch the active preset. -Ctrl-D or :q to quit. +Ctrl-D or :q to quit (lets an in-flight download finish; Ctrl-C aborts). + +v1.8: link resolution runs off the main thread so the prompt never freezes +on a slow probe; tracked lines that scroll past the top of the screen are +untracked instead of corrupting the display; job bookkeeping is pruned +rather than growing forever; playlist truncation at PLAYLIST_LIMIT is +reported; quitting drains the queue but finishes the download in flight. """ from __future__ import annotations +import itertools import os import queue import re @@ -26,7 +33,7 @@ import yt_dlp from yt_dlp.postprocessor import EmbedThumbnailPP from yt_dlp.postprocessor.common import PostProcessor -VERSION = "1.7" +VERSION = "1.8" DOWNLOAD_DIR = Path.home() / "Downloads" PROMPT = "dlit> " @@ -78,17 +85,27 @@ class _QuietLogger: _QUIET_LOGGER = _QuietLogger() +_ids = itertools.count(1) # job ids; next() on a count is GIL-atomic, so both + # the main thread and the resolver may draw from it +_shutting_down = False # set on quit: insert_lines stops redrawing the prompt + _print_lock = threading.Lock() _line_distance: dict[int, int] = {} # job_id -> rows above the current cursor position _job_title: dict[int, str] = {} # job_id -> resolved title (once known) _job_tag: dict[int, str] = {} # job_id -> "(PlaylistName - i/N) " prefix, "" for singles _RESET = "\x1b[0m" -ICON_QUEUED = f"\x1b[35m~{_RESET}" # purple -ICON_DOWNLOADING = f"\x1b[33m↓{_RESET}" # yellow -ICON_CONVERTING = f"\x1b[34m↻{_RESET}" # blue -ICON_DONE = f"\x1b[32m✓{_RESET}" # green -ICON_FAILED = f"\x1b[31m✗{_RESET}" # red + + +def _icon(sgr: str, ch: str) -> str: + return f"\x1b[{sgr}m{ch}{_RESET}" + + +ICON_QUEUED = _icon("35", "~") # purple +ICON_DOWNLOADING = _icon("33", "↓") # yellow +ICON_CONVERTING = _icon("34", "↻") # blue +ICON_DONE = _icon("32", "✓") # green +ICON_FAILED = _icon("31", "✗") # red _SGR_RE = re.compile(r"\x1b\[[0-9;]*m") @@ -122,41 +139,76 @@ def _clamp(text: str) -> str: return "".join(out) -def add_job_lines(items: list[tuple[int, str]]) -> None: - """Main thread: print one or more new tracked lines above the next prompt. +def _bump(n: int) -> None: + """Move every tracked line n rows farther from the cursor. Caller must hold + _print_lock. - Submitting a link can fan out into several lines at once (one per playlist - entry). Printing N lines pushes the next prompt down by N+1 rows — the N new - lines plus the just-submitted prompt line that stays on screen — so every - existing tracked line moves up by that much. The new lines, printed top to - bottom, sit at distances N, N-1, … 1 above the coming prompt. With N=1 this - reduces to the original "+= 2, new line at distance 1". + A line pushed past the top of the screen can no longer be addressed — + cursor-up (CSI A) clamps at row 1, so a later rewrite would land on + whatever unrelated line sits at the top. Such lines are untracked here, + which also keeps the dicts bounded by the screen height instead of growing + for the life of the session. + """ + height = shutil.get_terminal_size((80, 24)).lines + for k in list(_line_distance): + d = _line_distance[k] + n + if d >= height: + del _line_distance[k] + else: + _line_distance[k] = d + + +def post_line(text: str, job_id: int | None = None) -> None: + """Main thread, between prompts: print a new line above the next prompt; + tracked when a job_id is given. + + The just-submitted input line stays on screen above the printed line, so + existing tracked lines end up 2 rows farther from the next prompt. """ with _print_lock: - n = len(items) - for k in _line_distance: - _line_distance[k] += n + 1 - for offset, (job_id, text) in enumerate(items): - _line_distance[job_id] = n - offset - sys.stdout.write(_clamp(text) + "\n") + _bump(2) + if job_id is not None: + _line_distance[job_id] = 1 + sys.stdout.write(_clamp(text) + "\n") sys.stdout.flush() -def notice(text: str) -> None: - """Main thread: print a one-off, untracked line above the next prompt.""" +def insert_lines(items: list[tuple[int | None, str]]) -> None: + """Any thread, while the prompt is live: push new lines above it; entries + with a job id become tracked, None entries are one-off notices. + + The prompt row is cleared, the new lines printed where it stood, and the + prompt plus whatever the user had typed (readline's buffer) redrawn below, + so existing tracked lines move exactly n rows. Known limits: input long + enough to wrap onto a second row throws the geometry off by the wrapped + rows, and the redraw parks the visible cursor at the end of the typed text + even if the user had moved it left (readline's own state is unaffected). + """ with _print_lock: - for k in _line_distance: - _line_distance[k] += 2 - sys.stdout.write(_clamp(text) + "\n") + n = len(items) + _bump(n) + sys.stdout.write("\r\x1b[K") + for offset, (job_id, text) in enumerate(items): + if job_id is not None: + _line_distance[job_id] = n - offset + sys.stdout.write(_clamp(text) + "\n") + if not _shutting_down: + sys.stdout.write(PROMPT + readline.get_line_buffer()) sys.stdout.flush() def update_job_line(job_id: int, text: str) -> None: - """Worker thread: rewrite this job's tracked line in place; cursor returns to its prior spot.""" + """Worker/resolver thread: rewrite this job's tracked line in place; cursor + returns to its prior spot. Untracked (pruned or finished) jobs are a no-op.""" with _print_lock: d = _line_distance.get(job_id) if d is None: return + if d >= shutil.get_terminal_size((80, 24)).lines: + # Scrolled off since the last _bump (e.g. the terminal shrank); + # unreachable by cursor-up, so stop tracking it. + del _line_distance[job_id] + return sys.stdout.write( "\x1b7" # save cursor (DECSC) f"\x1b[{d}A" # up d rows @@ -167,6 +219,15 @@ def update_job_line(job_id: int, text: str) -> None: sys.stdout.flush() +def _forget_job(job_id: int) -> None: + """Drop a job's bookkeeping once its line is final and will never be + rewritten again.""" + with _print_lock: + _line_distance.pop(job_id, None) + _job_title.pop(job_id, None) + _job_tag.pop(job_id, None) + + def make_opts(job_id: int, preset: str) -> dict: last = {"pct": -5.0, "t": 0.0} @@ -324,7 +385,7 @@ class SquareThumbnailCropper(PostProcessor): right_ref = pix(w - 1, h // 2) right_w = 0 - for x in range(w - 1, w // 2, -1): + for x in range(w - 1, w // 2 - 1, -1): if not col_uniform(x, right_ref): break right_w += 1 @@ -391,6 +452,54 @@ class SquareThumbnailCropper(PostProcessor): pass +def resolver(resolves: "queue.Queue", jobs: "queue.Queue") -> None: + """Resolve pasted links off the main thread, then queue download jobs. + + The placeholder line printed at paste time is reused in place for a single + video (or a playlist's first entry); further playlist entries are inserted + above the live prompt as new tracked lines. Lines are registered before + their jobs are enqueued, so a fast worker never tries to update a line + whose distance isn't set yet. + """ + while True: + item = resolves.get() + if item is None: + return + job_id, url, preset = item + try: + is_playlist, name, entries = resolve(url) + except Exception as exc: # noqa: BLE001 + err = str(exc).splitlines()[0][:60] + update_job_line(job_id, f"[{ICON_FAILED}] {url[:60]} {err}") + _forget_job(job_id) + continue + if not entries: + update_job_line(job_id, f"[{ICON_FAILED}] nothing to download in '{name[:40]}'") + _forget_job(job_id) + continue + + total = len(entries) + ids = [job_id] + [next(_ids) for _ in range(total - 1)] + for i, (jid, (_eurl, title)) in enumerate(zip(ids, entries), 1): + _job_title[jid] = title + if is_playlist: + _job_tag[jid] = f"({name[:24]} - {i}/{total}) " + + update_job_line(job_id, f"[{ICON_QUEUED}] {_job_tag.get(job_id, '')}{entries[0][1]}") + extra: list[tuple[int | None, str]] = [ + (jid, f"[{ICON_QUEUED}] {_job_tag.get(jid, '')}{title}") + for jid, (_eurl, title) in zip(ids[1:], entries[1:]) + ] + if total >= PLAYLIST_LIMIT: + # yt-dlp gives no overflow signal under playlistend, so hitting the + # limit exactly is the best available proxy for truncation. + extra.append((None, f" '{name[:40]}' stopped at {PLAYLIST_LIMIT} entries")) + if extra: + insert_lines(extra) + for jid, (eurl, _title) in zip(ids, entries): + jobs.put((jid, eurl, preset)) + + def worker(jobs: "queue.Queue") -> None: while True: item = jobs.get() @@ -405,29 +514,47 @@ def worker(jobs: "queue.Queue") -> None: ydl.add_post_processor(EmbedThumbnailPP(ydl), when="post_process") info = ydl.extract_info(url, download=True) title = (info or {}).get("title") or _job_title.get(job_id) or url - _job_title[job_id] = title update_job_line(job_id, f"[{ICON_DONE}] {tag}{title[:60]}") except Exception as exc: # noqa: BLE001 err = str(exc).splitlines()[0][:60] title = _job_title.get(job_id) or url update_job_line(job_id, f"[{ICON_FAILED}] {tag}{title[:60]} {err}") + _forget_job(job_id) + + +def _drain(q: "queue.Queue") -> None: + try: + while True: + q.get_nowait() + except queue.Empty: + pass def main() -> None: + global _shutting_down DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) + resolves: "queue.Queue" = queue.Queue() jobs: "queue.Queue" = queue.Queue() - threading.Thread(target=worker, args=(jobs,), daemon=True).start() + threading.Thread(target=resolver, args=(resolves, jobs), daemon=True).start() + worker_t = threading.Thread(target=worker, args=(jobs,), daemon=True) + worker_t.start() preset = "mp3" - next_id = 1 print(f"dlit v.{VERSION} -> {DOWNLOAD_DIR} preset={preset} (:q to quit)") while True: try: line = input(PROMPT).strip() - except (EOFError, KeyboardInterrupt): + except KeyboardInterrupt: + # Hard abort: don't wait for in-flight work (daemon threads die). + print() + return + except EOFError: print() break if not line: + # The empty submit still pushed the prompt down one row. + with _print_lock: + _bump(1) continue if line.startswith(":"): cmd = line[1:] @@ -435,39 +562,33 @@ def main() -> None: break if cmd in PRESETS: preset = cmd - notice(f" preset -> {preset}") + post_line(f" preset -> {preset}") else: - notice(f" unknown command: :{cmd} (try :mp3, :video, :none, :q)") + post_line(f" unknown command: :{cmd} (try :mp3, :video, :none, :q)") continue - # A pasted link: resolve it (a quick network probe) to learn whether it's - # a single video or a playlist, then queue one job per resulting entry. - try: - is_playlist, name, entries = resolve(line) - except Exception as exc: # noqa: BLE001 - err = str(exc).splitlines()[0][:60] - add_job_lines([(next_id, f"[{ICON_FAILED}] {line} {err}")]) - next_id += 1 - continue - if not entries: - notice(f" nothing to download in '{name[:40]}'") - continue + # A pasted link: hand it to the resolver thread so the prompt stays + # live; the placeholder line is updated (and fanned out) in place. + job_id = next(_ids) + post_line(f"[{ICON_QUEUED}] resolving… {line}", job_id) + resolves.put((job_id, line, preset)) - total = len(entries) - lines, pending = [], [] - for i, (entry_url, title) in enumerate(entries, 1): - _job_title[next_id] = title - if is_playlist: - _job_tag[next_id] = f"({name[:24]} - {i}/{total}) " - label = _job_tag.get(next_id, "") + title - lines.append((next_id, f"[{ICON_QUEUED}] {label}")) - pending.append((next_id, entry_url, preset)) - next_id += 1 - # Register the display lines before enqueuing, so a fast worker never - # tries to update a line whose distance isn't set yet. - add_job_lines(lines) - for job in pending: - jobs.put(job) + # Graceful quit (:q / Ctrl-D): discard everything still waiting, but let + # an in-flight download finish so no half-written file is left behind. + _shutting_down = True + with _print_lock: + _bump(1) # the :q / Ctrl-D submit pushed the cursor down one row + _drain(resolves) + resolves.put(None) + _drain(jobs) + jobs.put(None) + worker_t.join(timeout=0.2) + if worker_t.is_alive(): + insert_lines([(None, " finishing current download — Ctrl-C to abort")]) + try: + worker_t.join() + except KeyboardInterrupt: + print() if __name__ == "__main__": |
