main
 1from __future__ import annotations
 2
 3import io
 4from typing import Callable
 5from typing_extensions import override
 6
 7
 8class CancelledError(Exception):
 9    def __init__(self, msg: str) -> None:
10        self.msg = msg
11        super().__init__(msg)
12
13    @override
14    def __str__(self) -> str:
15        return self.msg
16
17    __repr__ = __str__
18
19
20class BufferReader(io.BytesIO):
21    def __init__(self, buf: bytes = b"", desc: str | None = None) -> None:
22        super().__init__(buf)
23        self._len = len(buf)
24        self._progress = 0
25        self._callback = progress(len(buf), desc=desc)
26
27    def __len__(self) -> int:
28        return self._len
29
30    @override
31    def read(self, n: int | None = -1) -> bytes:
32        chunk = io.BytesIO.read(self, n)
33        self._progress += len(chunk)
34
35        try:
36            self._callback(self._progress)
37        except Exception as e:  # catches exception from the callback
38            raise CancelledError("The upload was cancelled: {}".format(e)) from e
39
40        return chunk
41
42
43def progress(total: float, desc: str | None) -> Callable[[float], None]:
44    import tqdm
45
46    meter = tqdm.tqdm(total=total, unit_scale=True, desc=desc)
47
48    def incr(progress: float) -> None:
49        meter.n = progress
50        if progress == total:
51            meter.close()
52        else:
53            meter.refresh()
54
55    return incr
56
57
58def MB(i: int) -> int:
59    return int(i // 1024**2)