┌   ┐
54
└   ┘

summaryrefslogtreecommitdiff
path: root/app.py
diff options
context:
space:
mode:
authorConway <[email protected]>2026-06-12 01:41:24 -0400
committerConway <[email protected]>2026-06-12 01:41:24 -0400
commit0bc89dcf150e97dcafa5efc498a5cfd0e6509abd (patch)
treee34904e4d4f299a175452f8e85f3ab8ee04fcca9 /app.py
parente94b672644724400702a4d40d5a5d92c7fc9ed9b (diff)
v1.8 - several bug fixes and featuresHEADmain
Diffstat (limited to 'app.py')
-rw-r--r--app.py245
1 files changed, 183 insertions, 62 deletions
diff --git a/app.py b/app.py
index b6cff69..3dc1214 100644
--- a/app.py
+++ b/app.py
@@ -1,15 +1,22 @@
#!/usr/bin/env python3
-"""DLit — Download It!
+"""DLit — Download It!
a tiny terminal wrapper for yt-dlp.
Paste a link at the prompt; downloads queue and run serially in the
background. Commands: :mp3, :video, :none switch the active preset.
-Ctrl-D or :q to quit.
+Ctrl-D or :q to quit (lets an in-flight download finish; Ctrl-C aborts).
+
+v1.8: link resolution runs off the main thread so the prompt never freezes
+on a slow probe; tracked lines that scroll past the top of the screen are
+untracked instead of corrupting the display; job bookkeeping is pruned
+rather than growing forever; playlist truncation at PLAYLIST_LIMIT is
+reported; quitting drains the queue but finishes the download in flight.
"""
from __future__ import annotations
+import itertools
import os
import queue
import re
@@ -26,7 +33,7 @@ import yt_dlp
from yt_dlp.postprocessor import EmbedThumbnailPP
from yt_dlp.postprocessor.common import PostProcessor
-VERSION = "1.7"
+VERSION = "1.8"
DOWNLOAD_DIR = Path.home() / "Downloads"
PROMPT = "dlit> "
@@ -78,17 +85,27 @@ class _QuietLogger:
_QUIET_LOGGER = _QuietLogger()
+_ids = itertools.count(1) # job ids; next() on a count is GIL-atomic, so both
+ # the main thread and the resolver may draw from it
+_shutting_down = False # set on quit: insert_lines stops redrawing the prompt
+
_print_lock = threading.Lock()
_line_distance: dict[int, int] = {} # job_id -> rows above the current cursor position
_job_title: dict[int, str] = {} # job_id -> resolved title (once known)
_job_tag: dict[int, str] = {} # job_id -> "(PlaylistName - i/N) " prefix, "" for singles
_RESET = "\x1b[0m"
-ICON_QUEUED = f"\x1b[35m~{_RESET}" # purple
-ICON_DOWNLOADING = f"\x1b[33m↓{_RESET}" # yellow
-ICON_CONVERTING = f"\x1b[34m↻{_RESET}" # blue
-ICON_DONE = f"\x1b[32m✓{_RESET}" # green
-ICON_FAILED = f"\x1b[31m✗{_RESET}" # red
+
+
+def _icon(sgr: str, ch: str) -> str:
+ return f"\x1b[{sgr}m{ch}{_RESET}"
+
+
+ICON_QUEUED = _icon("35", "~") # purple
+ICON_DOWNLOADING = _icon("33", "↓") # yellow
+ICON_CONVERTING = _icon("34", "↻") # blue
+ICON_DONE = _icon("32", "✓") # green
+ICON_FAILED = _icon("31", "✗") # red
_SGR_RE = re.compile(r"\x1b\[[0-9;]*m")
@@ -122,41 +139,76 @@ def _clamp(text: str) -> str:
return "".join(out)
-def add_job_lines(items: list[tuple[int, str]]) -> None:
- """Main thread: print one or more new tracked lines above the next prompt.
+def _bump(n: int) -> None:
+ """Move every tracked line n rows farther from the cursor. Caller must hold
+ _print_lock.
- Submitting a link can fan out into several lines at once (one per playlist
- entry). Printing N lines pushes the next prompt down by N+1 rows — the N new
- lines plus the just-submitted prompt line that stays on screen — so every
- existing tracked line moves up by that much. The new lines, printed top to
- bottom, sit at distances N, N-1, … 1 above the coming prompt. With N=1 this
- reduces to the original "+= 2, new line at distance 1".
+ A line pushed past the top of the screen can no longer be addressed —
+ cursor-up (CSI A) clamps at row 1, so a later rewrite would land on
+ whatever unrelated line sits at the top. Such lines are untracked here,
+ which also keeps the dicts bounded by the screen height instead of growing
+ for the life of the session.
+ """
+ height = shutil.get_terminal_size((80, 24)).lines
+ for k in list(_line_distance):
+ d = _line_distance[k] + n
+ if d >= height:
+ del _line_distance[k]
+ else:
+ _line_distance[k] = d
+
+
+def post_line(text: str, job_id: int | None = None) -> None:
+ """Main thread, between prompts: print a new line above the next prompt;
+ tracked when a job_id is given.
+
+ The just-submitted input line stays on screen above the printed line, so
+ existing tracked lines end up 2 rows farther from the next prompt.
"""
with _print_lock:
- n = len(items)
- for k in _line_distance:
- _line_distance[k] += n + 1
- for offset, (job_id, text) in enumerate(items):
- _line_distance[job_id] = n - offset
- sys.stdout.write(_clamp(text) + "\n")
+ _bump(2)
+ if job_id is not None:
+ _line_distance[job_id] = 1
+ sys.stdout.write(_clamp(text) + "\n")
sys.stdout.flush()
-def notice(text: str) -> None:
- """Main thread: print a one-off, untracked line above the next prompt."""
+def insert_lines(items: list[tuple[int | None, str]]) -> None:
+ """Any thread, while the prompt is live: push new lines above it; entries
+ with a job id become tracked, None entries are one-off notices.
+
+ The prompt row is cleared, the new lines printed where it stood, and the
+ prompt plus whatever the user had typed (readline's buffer) redrawn below,
+ so existing tracked lines move exactly n rows. Known limits: input long
+ enough to wrap onto a second row throws the geometry off by the wrapped
+ rows, and the redraw parks the visible cursor at the end of the typed text
+ even if the user had moved it left (readline's own state is unaffected).
+ """
with _print_lock:
- for k in _line_distance:
- _line_distance[k] += 2
- sys.stdout.write(_clamp(text) + "\n")
+ n = len(items)
+ _bump(n)
+ sys.stdout.write("\r\x1b[K")
+ for offset, (job_id, text) in enumerate(items):
+ if job_id is not None:
+ _line_distance[job_id] = n - offset
+ sys.stdout.write(_clamp(text) + "\n")
+ if not _shutting_down:
+ sys.stdout.write(PROMPT + readline.get_line_buffer())
sys.stdout.flush()
def update_job_line(job_id: int, text: str) -> None:
- """Worker thread: rewrite this job's tracked line in place; cursor returns to its prior spot."""
+ """Worker/resolver thread: rewrite this job's tracked line in place; cursor
+ returns to its prior spot. Untracked (pruned or finished) jobs are a no-op."""
with _print_lock:
d = _line_distance.get(job_id)
if d is None:
return
+ if d >= shutil.get_terminal_size((80, 24)).lines:
+ # Scrolled off since the last _bump (e.g. the terminal shrank);
+ # unreachable by cursor-up, so stop tracking it.
+ del _line_distance[job_id]
+ return
sys.stdout.write(
"\x1b7" # save cursor (DECSC)
f"\x1b[{d}A" # up d rows
@@ -167,6 +219,15 @@ def update_job_line(job_id: int, text: str) -> None:
sys.stdout.flush()
+def _forget_job(job_id: int) -> None:
+ """Drop a job's bookkeeping once its line is final and will never be
+ rewritten again."""
+ with _print_lock:
+ _line_distance.pop(job_id, None)
+ _job_title.pop(job_id, None)
+ _job_tag.pop(job_id, None)
+
+
def make_opts(job_id: int, preset: str) -> dict:
last = {"pct": -5.0, "t": 0.0}
@@ -324,7 +385,7 @@ class SquareThumbnailCropper(PostProcessor):
right_ref = pix(w - 1, h // 2)
right_w = 0
- for x in range(w - 1, w // 2, -1):
+ for x in range(w - 1, w // 2 - 1, -1):
if not col_uniform(x, right_ref):
break
right_w += 1
@@ -391,6 +452,54 @@ class SquareThumbnailCropper(PostProcessor):
pass
+def resolver(resolves: "queue.Queue", jobs: "queue.Queue") -> None:
+ """Resolve pasted links off the main thread, then queue download jobs.
+
+ The placeholder line printed at paste time is reused in place for a single
+ video (or a playlist's first entry); further playlist entries are inserted
+ above the live prompt as new tracked lines. Lines are registered before
+ their jobs are enqueued, so a fast worker never tries to update a line
+ whose distance isn't set yet.
+ """
+ while True:
+ item = resolves.get()
+ if item is None:
+ return
+ job_id, url, preset = item
+ try:
+ is_playlist, name, entries = resolve(url)
+ except Exception as exc: # noqa: BLE001
+ err = str(exc).splitlines()[0][:60]
+ update_job_line(job_id, f"[{ICON_FAILED}] {url[:60]} {err}")
+ _forget_job(job_id)
+ continue
+ if not entries:
+ update_job_line(job_id, f"[{ICON_FAILED}] nothing to download in '{name[:40]}'")
+ _forget_job(job_id)
+ continue
+
+ total = len(entries)
+ ids = [job_id] + [next(_ids) for _ in range(total - 1)]
+ for i, (jid, (_eurl, title)) in enumerate(zip(ids, entries), 1):
+ _job_title[jid] = title
+ if is_playlist:
+ _job_tag[jid] = f"({name[:24]} - {i}/{total}) "
+
+ update_job_line(job_id, f"[{ICON_QUEUED}] {_job_tag.get(job_id, '')}{entries[0][1]}")
+ extra: list[tuple[int | None, str]] = [
+ (jid, f"[{ICON_QUEUED}] {_job_tag.get(jid, '')}{title}")
+ for jid, (_eurl, title) in zip(ids[1:], entries[1:])
+ ]
+ if total >= PLAYLIST_LIMIT:
+ # yt-dlp gives no overflow signal under playlistend, so hitting the
+ # limit exactly is the best available proxy for truncation.
+ extra.append((None, f" '{name[:40]}' stopped at {PLAYLIST_LIMIT} entries"))
+ if extra:
+ insert_lines(extra)
+ for jid, (eurl, _title) in zip(ids, entries):
+ jobs.put((jid, eurl, preset))
+
+
def worker(jobs: "queue.Queue") -> None:
while True:
item = jobs.get()
@@ -405,29 +514,47 @@ def worker(jobs: "queue.Queue") -> None:
ydl.add_post_processor(EmbedThumbnailPP(ydl), when="post_process")
info = ydl.extract_info(url, download=True)
title = (info or {}).get("title") or _job_title.get(job_id) or url
- _job_title[job_id] = title
update_job_line(job_id, f"[{ICON_DONE}] {tag}{title[:60]}")
except Exception as exc: # noqa: BLE001
err = str(exc).splitlines()[0][:60]
title = _job_title.get(job_id) or url
update_job_line(job_id, f"[{ICON_FAILED}] {tag}{title[:60]} {err}")
+ _forget_job(job_id)
+
+
+def _drain(q: "queue.Queue") -> None:
+ try:
+ while True:
+ q.get_nowait()
+ except queue.Empty:
+ pass
def main() -> None:
+ global _shutting_down
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
+ resolves: "queue.Queue" = queue.Queue()
jobs: "queue.Queue" = queue.Queue()
- threading.Thread(target=worker, args=(jobs,), daemon=True).start()
+ threading.Thread(target=resolver, args=(resolves, jobs), daemon=True).start()
+ worker_t = threading.Thread(target=worker, args=(jobs,), daemon=True)
+ worker_t.start()
preset = "mp3"
- next_id = 1
print(f"dlit v.{VERSION} -> {DOWNLOAD_DIR} preset={preset} (:q to quit)")
while True:
try:
line = input(PROMPT).strip()
- except (EOFError, KeyboardInterrupt):
+ except KeyboardInterrupt:
+ # Hard abort: don't wait for in-flight work (daemon threads die).
+ print()
+ return
+ except EOFError:
print()
break
if not line:
+ # The empty submit still pushed the prompt down one row.
+ with _print_lock:
+ _bump(1)
continue
if line.startswith(":"):
cmd = line[1:]
@@ -435,39 +562,33 @@ def main() -> None:
break
if cmd in PRESETS:
preset = cmd
- notice(f" preset -> {preset}")
+ post_line(f" preset -> {preset}")
else:
- notice(f" unknown command: :{cmd} (try :mp3, :video, :none, :q)")
+ post_line(f" unknown command: :{cmd} (try :mp3, :video, :none, :q)")
continue
- # A pasted link: resolve it (a quick network probe) to learn whether it's
- # a single video or a playlist, then queue one job per resulting entry.
- try:
- is_playlist, name, entries = resolve(line)
- except Exception as exc: # noqa: BLE001
- err = str(exc).splitlines()[0][:60]
- add_job_lines([(next_id, f"[{ICON_FAILED}] {line} {err}")])
- next_id += 1
- continue
- if not entries:
- notice(f" nothing to download in '{name[:40]}'")
- continue
+ # A pasted link: hand it to the resolver thread so the prompt stays
+ # live; the placeholder line is updated (and fanned out) in place.
+ job_id = next(_ids)
+ post_line(f"[{ICON_QUEUED}] resolving… {line}", job_id)
+ resolves.put((job_id, line, preset))
- total = len(entries)
- lines, pending = [], []
- for i, (entry_url, title) in enumerate(entries, 1):
- _job_title[next_id] = title
- if is_playlist:
- _job_tag[next_id] = f"({name[:24]} - {i}/{total}) "
- label = _job_tag.get(next_id, "") + title
- lines.append((next_id, f"[{ICON_QUEUED}] {label}"))
- pending.append((next_id, entry_url, preset))
- next_id += 1
- # Register the display lines before enqueuing, so a fast worker never
- # tries to update a line whose distance isn't set yet.
- add_job_lines(lines)
- for job in pending:
- jobs.put(job)
+ # Graceful quit (:q / Ctrl-D): discard everything still waiting, but let
+ # an in-flight download finish so no half-written file is left behind.
+ _shutting_down = True
+ with _print_lock:
+ _bump(1) # the :q / Ctrl-D submit pushed the cursor down one row
+ _drain(resolves)
+ resolves.put(None)
+ _drain(jobs)
+ jobs.put(None)
+ worker_t.join(timeout=0.2)
+ if worker_t.is_alive():
+ insert_lines([(None, " finishing current download — Ctrl-C to abort")])
+ try:
+ worker_t.join()
+ except KeyboardInterrupt:
+ print()
if __name__ == "__main__":