⚡ Torrent Downloader Built From Scratch — No uTorrent Nonsense

:magnet: Clean Torrent Downloader — No Bloat, No Bundled Malware, Just Downloads

A one-script torrent client built from scratch. No uTorrent adware. No qBittorrent config rabbit holes. Just pick a file and go.

Most torrent clients install toolbars, crypto miners, or “recommended software” you never asked for. This one is a single Python script — open source, no installer, no background processes, no surprises.

Think of it as a remote control for aria2 — the fastest download engine that already lives on your machine (or installs in one command). The GUI just makes it human-friendly.


🖥️ What It Looks Like

Waiting for a torrent — clean, simple interface:

Downloading at 9.9 MB/s in Aggressive mode — 82 peers connected:

Three speed modes to match your connection:

⚡ Why This Instead of uTorrent / qBittorrent / BiglyBT
Problem With Others This Tool
Bundled adware / toolbars Zero installs — one .py file, nothing else
Background processes eating RAM Runs only when you open it, dies when you close it
Confusing settings pages 3 speed modes: Safe, Balanced, Aggressive — pick one
Forced updates / premium nags Open source — no accounts, no subscriptions, no nagging
Crypto miners hiding in installers Readable source code — see exactly what runs
🛠️ How to Use It (3 Steps)

Step 1 — Install the engine (one time only)

OS Command
Windows choco install aria2 or download from aria2 GitHub releases
Linux sudo apt install aria2
Mac brew install aria2

Also need PyQt6: pip install PyQt6

Step 2 — Run the script

Save the code below as torrent.py, then:

python torrent.py

Step 3 — Download

Click Select .torrent → pick your .torrent file → choose a speed mode → watch it fly.

:high_voltage: Resume support built in. Close the app mid-download, reopen it, load the same torrent — it picks up where you left off. Delete the status files in your Downloads folder to start fresh.

📋 The Full Source Code
#!/usr/bin/env python3
"""Torrent downloader GUI powered by PyQt6 + aria2 RPC."""

from __future__ import annotations

import base64
import json
import os
import shutil
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
import webbrowser
from pathlib import Path
from typing import Dict, Optional

from PyQt6 import QtCore, QtWidgets

FORCE_UNLIMITED_PEER_SPEED = False
DEFAULT_RPC_HOST = "127.0.0.1"
PREFERRED_PORTS = (6800, 6801, 6802, 6803, 6880, 6881)

MODES: Dict[str, Dict[str, str]] = {
    "🐢 Safe": {
        "max-connection-per-server": "4",
        "split": "4",
        "bt-max-peers": "70",
        "bt-request-peer-speed-limit": "64K",
    },
    "⚖️ Balanced": {
        "max-connection-per-server": "16",
        "split": "16",
        "bt-max-peers": "220",
        "bt-request-peer-speed-limit": "32K",
    },
    "🔥 Aggressive": {
        "max-connection-per-server": "16",
        "split": "32",
        "bt-max-peers": "600",
        "bt-request-peer-speed-limit": "0",
    },
}

TRACKERS = [
    "udp://tracker.opentrackr.org:1337/announce",
    "udp://open.demonii.com:1337/announce",
    "udp://tracker.torrent.eu.org:451/announce",
    "udp://tracker.coppersurfer.tk:6969/announce",
    "udp://tracker.openbittorrent.com:6969/announce",
]


def resolve_download_dir() -> Path:
    default = Path(os.environ.get("USERPROFILE", "")).expanduser() / "Downloads"
    if str(default).strip() in {"", "Downloads"}:
        default = Path.home() / "Downloads"
    default.mkdir(parents=True, exist_ok=True)
    probe = default / ".write_test.tmp"
    probe.write_text("ok", encoding="utf-8")
    probe.unlink(missing_ok=True)
    return default


def rpc_url(port: int) -> str:
    return f"http://{DEFAULT_RPC_HOST}:{port}/jsonrpc"


def aria2_request(method: str, params: Optional[list] = None, rpc_endpoint: str = rpc_url(6800)) -> dict:
    payload = {
        "jsonrpc": "2.0",
        "id": "req",
        "method": f"aria2.{method}",
        "params": params or [],
    }
    req = urllib.request.Request(
        rpc_endpoint,
        data=json.dumps(payload).encode("utf-8"),
        headers={"Content-Type": "application/json"},
    )
    try:
        with urllib.request.urlopen(req, timeout=8) as resp:
            data = json.loads(resp.read())
    except urllib.error.HTTPError as exc:
        body = exc.read().decode("utf-8", errors="ignore")
        raise RuntimeError(f"HTTP {exc.code} {exc.reason}\n{body}") from None
    except urllib.error.URLError as exc:
        raise RuntimeError(f"RPC unavailable ({rpc_endpoint}): {exc.reason}") from None

    if "error" in data:
        message = data["error"].get("message", "Unknown error")
        raise RuntimeError(message)
    return data


def human_bytes(value: str | int) -> str:
    n = float(int(value))
    for unit in ["B", "KB", "MB", "GB", "TB"]:
        if n < 1024:
            return f"{n:.1f} {unit}"
        n /= 1024
    return f"{n:.1f} PB"


def pick_free_port(start: int = 6800, stop: int = 6999) -> int:
    for port in range(start, stop + 1):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            if sock.connect_ex((DEFAULT_RPC_HOST, port)) != 0:
                return port
    raise RuntimeError("Unable to find a free RPC port (6800-6999).")


class TorrentWindow(QtWidgets.QWidget):
    def __init__(self) -> None:
        super().__init__()
        self.download_dir = resolve_download_dir()
        self.aria2_proc: Optional[subprocess.Popen] = None
        self.owns_aria2_process = False
        self.gid_current: Optional[str] = None
        self.is_paused = False
        self.rpc_port = 6800
        self.rpc_endpoint = rpc_url(self.rpc_port)

        self.setWindowTitle("📥 Torrent Downloader (PyQt6)")
        self.setFixedSize(620, 450)

        self.status_label = QtWidgets.QLabel("Waiting for .torrent...")
        self.rpc_label = QtWidgets.QLabel("RPC: checking...")
        self.progress = QtWidgets.QProgressBar()
        self.progress.setRange(0, 100)
        self.speed_label = QtWidgets.QLabel("0 B/s")
        self.file_label = QtWidgets.QLabel("")

        self.mode_box = QtWidgets.QComboBox()
        self.mode_box.addItems(MODES.keys())
        self.mode_box.setCurrentText("⚖️ Balanced")

        self.select_button = QtWidgets.QPushButton("📂 Select .torrent")
        self.pause_button = QtWidgets.QPushButton("▶️ Start")
        self.open_button = QtWidgets.QPushButton("📁 Open folder")
        self.tune_button = QtWidgets.QPushButton("⚡ Apply turbo settings")
        self.reconnect_button = QtWidgets.QPushButton("🔌 Reconnect RPC")

        self.hints = QtWidgets.QPlainTextEdit()
        self.hints.setReadOnly(True)
        self.hints.setMaximumHeight(160)

        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(self.status_label)
        layout.addWidget(self.rpc_label)
        layout.addWidget(self.progress)
        layout.addWidget(self.speed_label)
        layout.addWidget(self.file_label)

        row = QtWidgets.QHBoxLayout()
        row.addWidget(QtWidgets.QLabel("⚙️ Speed mode:"))
        row.addWidget(self.mode_box)
        layout.addLayout(row)

        buttons = QtWidgets.QHBoxLayout()
        buttons.addWidget(self.select_button)
        buttons.addWidget(self.pause_button)
        buttons.addWidget(self.open_button)
        layout.addLayout(buttons)

        rpc_buttons = QtWidgets.QHBoxLayout()
        rpc_buttons.addWidget(self.tune_button)
        rpc_buttons.addWidget(self.reconnect_button)
        layout.addLayout(rpc_buttons)

        layout.addWidget(QtWidgets.QLabel(f"📁 Downloading to: {self.download_dir}"))
        layout.addWidget(QtWidgets.QLabel("Speed diagnostics:"))
        layout.addWidget(self.hints)

        self.timer = QtCore.QTimer(self)
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.update_progress)

        self.select_button.clicked.connect(self.choose_torrent)
        self.pause_button.clicked.connect(self.start_pause_resume)
        self.open_button.clicked.connect(lambda: webbrowser.open(str(self.download_dir)))
        self.tune_button.clicked.connect(self.apply_turbo)
        self.reconnect_button.clicked.connect(self.reconnect_rpc)

        self.start_or_connect_aria2()
        self.refresh_speed_hints()

    def rpc_call(self, method: str, params: Optional[list] = None) -> dict:
        return aria2_request(method, params, self.rpc_endpoint)

    def _connect_existing_rpc(self) -> bool:
        for port in PREFERRED_PORTS:
            endpoint = rpc_url(port)
            try:
                self.rpc_endpoint = endpoint
                self.rpc_port = port
                self.rpc_call("getVersion")
                self.owns_aria2_process = False
                self.rpc_label.setText(f"RPC: connected to existing aria2 on port {port}")
                return True
            except Exception:
                continue
        return False

    def start_or_connect_aria2(self) -> None:
        if self._connect_existing_rpc():
            return

        if not shutil.which("aria2c"):
            raise RuntimeError(
                "aria2c was not found in PATH. Install aria2 and try again.\n"
                "Windows: choco install aria2\n"
                "Ubuntu/Debian: sudo apt install aria2"
            )

        candidate_ports = list(PREFERRED_PORTS) + [pick_free_port()]
        last_error = ""

        profiles = [
            self._compatibility_profile_minimal,
            self._compatibility_profile_balanced,
            self._compatibility_profile_turbo,
        ]

        for builder in profiles:
            for port in candidate_ports:
                try:
                    self._start_aria2_on_port(port, builder(port))
                    self.rpc_label.setText(f"RPC: local aria2 started on port {port}")
                    return
                except Exception as exc:
                    last_error = str(exc)
                    continue

        raise RuntimeError(
            "Failed to start aria2 RPC on all candidate ports.\n"
            f"Last error: {last_error}"
        )

    def _compatibility_profile_minimal(self, port: int) -> list[str]:
        return [
            "aria2c",
            "--enable-rpc",
            f"--rpc-listen-port={port}",
            "--rpc-listen-all=false",
            "--rpc-allow-origin-all",
            f"--dir={self.download_dir}",
            "--file-allocation=none",
        ]

    def _compatibility_profile_balanced(self, port: int) -> list[str]:
        return self._compatibility_profile_minimal(port) + [
            "--rpc-max-request-size=16M",
            "--allow-overwrite=true",
            "--continue=true",
            "--max-concurrent-downloads=5",
            "--bt-enable-lpd=true",
            "--enable-dht=true",
            "--enable-dht6=false",
            "--bt-enable-peer-exchange=true",
            "--seed-time=0",
            "--max-overall-download-limit=0",
            "--max-overall-upload-limit=0",
            f"--bt-tracker={','.join(TRACKERS)}",
        ]

    def _compatibility_profile_turbo(self, port: int) -> list[str]:
        return self._compatibility_profile_balanced(port) + [
            "--bt-detach-seed-only=true",
            "--optimize-concurrent-downloads=true",
        ]

    def _start_aria2_on_port(self, port: int, aria2_args: list[str]) -> None:
        proc = subprocess.Popen(aria2_args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True)
        endpoint = rpc_url(port)
        for _ in range(40):
            if proc.poll() is not None:
                stderr_text = ""
                if proc.stderr is not None:
                    stderr_text = proc.stderr.read().strip()
                extra = f" | {stderr_text[:200]}" if stderr_text else ""
                raise RuntimeError(
                    f"aria2 exited immediately after start (port {port}, code {proc.returncode}){extra}"
                )
            try:
                aria2_request("getVersion", rpc_endpoint=endpoint)
                self.aria2_proc = proc
                self.owns_aria2_process = True
                self.rpc_port = port
                self.rpc_endpoint = endpoint
                return
            except Exception:
                time.sleep(0.2)

        proc.terminate()
        raise RuntimeError(f"aria2 RPC is not responding on port {port}")

    def reconnect_rpc(self) -> None:
        try:
            self.start_or_connect_aria2()
            QtWidgets.QMessageBox.information(self, "RPC", f"Connected to {self.rpc_endpoint}")
        except Exception as exc:
            QtWidgets.QMessageBox.critical(self, "RPC", str(exc))

    def build_options(self, mode: str) -> Dict[str, str]:
        options = {
            "dir": str(self.download_dir),
            "allow-overwrite": "true",
            "auto-file-renaming": "false",
            "follow-torrent": "true",
            "enable-dht": "true",
            "bt-enable-lpd": "true",
            "bt-enable-peer-exchange": "true",
            "seed-time": "0",
            "max-upload-limit": "0",
            "max-download-limit": "0",
            "min-split-size": "1M",
            "bt-tracker": ",".join(TRACKERS),
        }
        mode_params = dict(MODES.get(mode, {}))

        if mode == "🔥 Aggressive" and not FORCE_UNLIMITED_PEER_SPEED:
            mode_params.pop("bt-request-peer-speed-limit", None)

        max_conn = min(16, max(1, int(mode_params.get("max-connection-per-server", "16"))))
        split = min(32, max(1, int(mode_params.get("split", "16"))))
        peers = max(50, int(mode_params.get("bt-max-peers", "150")))

        options.update(mode_params)
        options.update(
            {
                "max-connection-per-server": str(max_conn),
                "split": str(split),
                "bt-max-peers": str(peers),
            }
        )
        return options

    def choose_torrent(self) -> bool:
        path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select torrent", str(Path.home()), "Torrent (*.torrent)")
        if not path:
            return False
        try:
            raw = Path(path).read_bytes()
            b64 = base64.b64encode(raw).decode("utf-8")
            options = self.build_options(self.mode_box.currentText())
            gid = self.rpc_call("addTorrent", [b64, [], options])["result"]
        except Exception as exc:
            QtWidgets.QMessageBox.critical(self, "Add error", str(exc))
            return False

        self.gid_current = gid
        self.is_paused = False
        self.pause_button.setText("⏸ Pause")
        self.status_label.setText(f"Mode '{self.mode_box.currentText()}' started")
        self.timer.start()
        return True

    def update_progress(self) -> None:
        if not self.gid_current:
            return
        try:
            status = self.rpc_call(
                "tellStatus",
                [self.gid_current, ["status", "completedLength", "totalLength", "downloadSpeed", "files", "numSeeders", "connections", "errorCode", "errorMessage"]],
            )["result"]
        except Exception as exc:
            self.status_label.setText(f"Request error: {exc}")
            self.timer.stop()
            return

        comp = int(status.get("completedLength", "0"))
        total = max(1, int(status.get("totalLength", "1")))
        speed = int(status.get("downloadSpeed", "0"))
        progress = min(100, int((comp / total) * 100))

        self.progress.setValue(progress)
        self.speed_label.setText(
            f"{human_bytes(speed)}/s | peers: {status.get('connections', '0')} | seeders: {status.get('numSeeders', '0')}"
        )
        if status.get("files"):
            file_name = Path(status["files"][0].get("path", "")).name
            self.file_label.setText(f"📄 {file_name}")

        state = status.get("status", "")
        self.status_label.setText(f"{state} | {progress}%")

        if state == "complete":
            self.timer.stop()
            self.gid_current = None
            self.is_paused = False
            self.pause_button.setText("▶️ Start")
            QtWidgets.QMessageBox.information(self, "Done", f"File downloaded to:\n{self.download_dir}")
            webbrowser.open(str(self.download_dir))
        elif state == "error":
            self.timer.stop()
            error_code = str(status.get("errorCode", ""))
            error_message = str(status.get("errorMessage", "Unknown aria2 error."))
            self.gid_current = None
            self.is_paused = False
            self.pause_button.setText("▶️ Start")
            QtWidgets.QMessageBox.critical(
                self,
                "Download error",
                (
                    "Download failed.\n\n"
                    f"aria2 code: {error_code or 'n/a'}\n"
                    f"aria2 message: {error_message}\n\n"
                    "Tips:\n"
                    "- Try another speed mode (Safe/Balanced).\n"
                    "- Verify torrent has active seeders.\n"
                    "- Try Reconnect RPC, then start again."
                ),
            )

    def _fetch_gid_state(self) -> Optional[str]:
        if not self.gid_current:
            return None
        try:
            res = self.rpc_call("tellStatus", [self.gid_current, ["status"]])["result"]
            return str(res.get("status", ""))
        except Exception:
            return None

    def start_pause_resume(self) -> None:
        if not self.gid_current:
            self.choose_torrent()
            return

        state = self._fetch_gid_state()
        if state in {None, "", "error", "complete", "removed"}:
            self.gid_current = None
            self.is_paused = False
            self.pause_button.setText("▶️ Start")
            QtWidgets.QMessageBox.information(
                self,
                "No active download",
                "Previous task is no longer active. Select a torrent to start a new download.",
            )
            return

        try:
            if state == "paused" or self.is_paused:
                self.rpc_call("unpause", [self.gid_current])
                self.pause_button.setText("⏸ Pause")
                self.is_paused = False
            elif state in {"active", "waiting"}:
                self.rpc_call("pause", [self.gid_current])
                self.pause_button.setText("▶️ Resume")
                self.is_paused = True
            else:
                QtWidgets.QMessageBox.information(self, "Task state", f"Current task state: {state}")
        except Exception as exc:
            QtWidgets.QMessageBox.critical(self, "Error", f"Failed to switch state:\n{exc}")

    def apply_turbo(self) -> None:
        try:
            self.rpc_call(
                "changeGlobalOption",
                [
                    {
                        "max-overall-download-limit": "0",
                        "max-overall-upload-limit": "0",
                        "bt-max-open-files": "512",
                        "max-concurrent-downloads": "8",
                        "optimize-concurrent-downloads": "true",
                    }
                ],
            )
            QtWidgets.QMessageBox.information(self, "Turbo", "Turbo settings applied to speed up downloads.")
            self.refresh_speed_hints()
        except Exception as exc:
            QtWidgets.QMessageBox.critical(self, "Error", str(exc))

    def refresh_speed_hints(self) -> None:
        drawbacks = [
            "1) Older aria2 versions may not support all turbo options — compatibility startup profiles were added.",
            "2) Very high split/max-connection can add overhead and slow weak disk/CPU systems.",
            "3) On weak routers, upload=0 may reduce stability — you can cap upload manually.",
            "4) Slow/empty trackers delay startup — a bootstrap UDP tracker list is included.",
            "5) If port 6800 is busy, the app will automatically choose another RPC port.",
        ]
        self.hints.setPlainText("\n".join(drawbacks))

    def closeEvent(self, event) -> None:  # type: ignore[override]
        self.timer.stop()
        if self.owns_aria2_process and self.aria2_proc and self.aria2_proc.poll() is None:
            self.aria2_proc.terminate()
        super().closeEvent(event)


def main() -> None:
    app = QtWidgets.QApplication(sys.argv)
    try:
        window = TorrentWindow()
    except Exception as exc:
        QtWidgets.QMessageBox.critical(None, "Startup error", str(exc))
        sys.exit(1)
    window.show()
    sys.exit(app.exec())


if __name__ == "__main__":
    main()

Source code is fully readable — audit it, tweak the speed modes, add trackers, change the download folder. It’s yours.


:high_voltage: Quick Hits

Want Do
:magnet: Download a torrent Run script → Select .torrent → done
:rocket: Max speed Switch to Aggressive mode + hit Apply Turbo
:pause_button: Pause & resume Click Pause → close app → reopen → load same .torrent
:magnifying_glass_tilted_left: Read the code It’s ~300 lines of Python — no hidden magic

One script. Zero bloat. Your downloads, your rules.

7 Likes