Source code for arxiv.canonical.services.readable
"""Provides :class:`.BytesIOProxy`."""
import io
from typing import Any, Callable, IO, List, Optional, Iterable, Iterator
from typing_extensions import Literal
[docs]class BytesIOProxy(io.BytesIO):
"""
A readable object that wraps a ``read()`` callable.
This gives us lazy, proxied read access to a (presumably expensive)
resource that is consistent with ``io.IOBase``.
"""
def __init__(self, read: Callable[[], bytes]) -> None:
self._read: Optional[Callable[[], bytes]] = read
self._content: Optional[IO[bytes]] = None
@property
def _loaded_content(self) -> IO[bytes]:
if self._read is None:
raise ValueError('Resource is closed')
if self._content is None:
self._content = io.BytesIO(self._read())
return self._content
[docs] def close(self) -> None:
"""Flush and close this stream."""
if self._content is not None:
self._content.close()
else:
self._read = None
super(BytesIOProxy, self).close()
[docs] def fileno(self) -> int:
"""Return the underlying file descriptor of the stream if it exists."""
raise OSError('No underlying file')
[docs] def flush(self) -> None:
"""Flush the write buffers of the stream if applicable."""
return
[docs] def isatty(self) -> bool:
"""Return True if the stream is interactive."""
return False
[docs] def readable(self) -> bool:
"""Return True if the stream can be read from."""
if self._content is not None:
return self._content.readable()
if self._read is None:
raise ValueError('I/O attempted on closed stream')
return True
[docs] def readline(self, size: int = -1) -> bytes:
"""Read and return one line from the stream."""
return self._loaded_content.readline(size)
[docs] def readlines(self, hint: int = -1) -> List[bytes]:
"""Read and return a list of lines from the stream."""
return self._loaded_content.readlines(hint)
[docs] def read(self, size: Optional[int] = -1) -> bytes:
"""Read from the stream."""
if size is not None:
return self._loaded_content.read(size)
return self._loaded_content.read()
[docs] def seek(self, offset: int, whence: int = 0) -> int:
"""Change the stream position to the given byte offset."""
if self._content is not None:
return self._content.seek(offset, whence)
return 0
[docs] def seekable(self) -> bool:
"""Return True if the stream supports random access."""
if self._content is not None:
return self._content.seekable()
return bool(self._read is not None)
[docs] def tell(self) -> int:
"""Return the current stream position."""
if self._content is not None:
return self._content.tell()
return 0
[docs] def truncate(self, size: Optional[int] = None) -> int:
"""Truncation is not supported."""
raise NotImplementedError('Truncation not supported')
[docs] def writable(self) -> bool:
"""Writing is not supported."""
return False
[docs] def writelines(self, lines: Iterable[bytes]) -> None:
"""Writing is not supported."""
raise NotImplementedError('Writing not supported')
def __del__(self) -> None:
"""Prepare for deletion."""
if self._content is not None:
del self._content
[docs]class IterReadWrapper(io.BytesIO):
"""Wraps a response body streaming iterator to provide ``read()``."""
def __init__(self, iter_content: Callable[[int], Iterator[bytes]],
size: int = 4096) -> None:
"""Initialize the streaming iterator."""
self._iter_content = iter_content(size)
self._buff = bytearray()
self._pos = 0
[docs] def seek(self, offset: int, whence: int = 0) -> int:
"""Change the stream position to the given byte offset."""
if whence != 0:
raise NotImplementedError('Only supports 0-based seeks')
if offset > self._pos:
self._read_ahead(offset + 1)
self._pos = offset
return self._pos
[docs] def seekable(self) -> Literal[True]:
"""Indicate that this is a seekable stream."""
return True
[docs] def tell(self) -> int:
"""Return the current stream position."""
return self._pos
[docs] def readable(self) -> Literal[True]:
"""Indicate that it *is* a readable stream."""
return True
[docs] def read(self, size: Optional[int] = -1) -> bytes:
"""Read from the content stream, loading more content if necessary."""
if size == -1 or size is None: # Read everything!
self._buff.extend(bytearray(b''.join(self._iter_content)))
content = self._buff[self._pos:]
else:
if size > len(self._buff) - self._pos:
self._read_ahead(self._pos + size)
content = self._buff[self._pos:self._pos + size]
self._pos += len(content)
return content
def _read_ahead(self, offset: int) -> None:
while offset > len(self._buff):
try:
chunk = next(self._iter_content)
except StopIteration:
break # No more content to read.
if not chunk: # May issue empty chunks due to keep-alive.
continue
self._buff.extend(bytearray(chunk))