main
1from __future__ import annotations
2
3import io
4from typing import Callable
5from typing_extensions import override
6
7
8class CancelledError(Exception):
9 def __init__(self, msg: str) -> None:
10 self.msg = msg
11 super().__init__(msg)
12
13 @override
14 def __str__(self) -> str:
15 return self.msg
16
17 __repr__ = __str__
18
19
20class BufferReader(io.BytesIO):
21 def __init__(self, buf: bytes = b"", desc: str | None = None) -> None:
22 super().__init__(buf)
23 self._len = len(buf)
24 self._progress = 0
25 self._callback = progress(len(buf), desc=desc)
26
27 def __len__(self) -> int:
28 return self._len
29
30 @override
31 def read(self, n: int | None = -1) -> bytes:
32 chunk = io.BytesIO.read(self, n)
33 self._progress += len(chunk)
34
35 try:
36 self._callback(self._progress)
37 except Exception as e: # catches exception from the callback
38 raise CancelledError("The upload was cancelled: {}".format(e)) from e
39
40 return chunk
41
42
43def progress(total: float, desc: str | None) -> Callable[[float], None]:
44 import tqdm
45
46 meter = tqdm.tqdm(total=total, unit_scale=True, desc=desc)
47
48 def incr(progress: float) -> None:
49 meter.n = progress
50 if progress == total:
51 meter.close()
52 else:
53 meter.refresh()
54
55 return incr
56
57
58def MB(i: int) -> int:
59 return int(i // 1024**2)