Skip to content

Module yapapi.storage

Storage models.

Sub-modules

Variables

AsyncReader

Classes

ComposedStorageProvider

class ComposedStorageProvider(
    input_storage:yapapi.storage.InputStorageProvider,
    output_storage:yapapi.storage.OutputStorageProvider
)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors (in MRO)

  • yapapi.storage.StorageProvider
  • yapapi.storage.InputStorageProvider
  • yapapi.storage.OutputStorageProvider
  • abc.ABC

Methods

new_destination
def new_destination(
    self,
    destination_file:Union[os.PathLike, NoneType]=None
) -> yapapi.storage.Destination

Creates slot for receiving file.

Parameters

destination_file: Optional hint where received data should be placed.

upload_bytes
def upload_bytes(
    self,
    data:bytes
) -> yapapi.storage.Source
upload_file
def upload_file(
    self,
    path:os.PathLike
) -> yapapi.storage.Source
upload_stream
def upload_stream(
    self,
    length:int,
    stream:AsyncIterator[bytes]
) -> yapapi.storage.Source

Content

class Content(
    /,
    *args,
    **kwargs
)

Content(length, stream)

Ancestors (in MRO)

  • builtins.tuple

Static methods

from_reader
def from_reader(
    length:int,
    s:Union[asyncio.streams.StreamReader, aiohttp.streams.StreamReader]
)

Instance variables

length

Alias for field number 0

stream

Alias for field number 1

Methods

count
def count(
    ...
)

T.count(value) -> integer -- return number of occurrences of value

index
def index(
    ...
)

T.index(value, [start, [stop]]) -> integer -- return first index of value. Raises ValueError if the value is not present.

Destination

class Destination(
    /,
    *args,
    **kwargs
)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors (in MRO)

  • abc.ABC

Descendants

  • yapapi.storage.gftp.GftpDestination
  • yapapi.storage.webdav._DavDestination

Instance variables

upload_url

Methods

download_file
def download_file(
    self,
    destination_file:os.PathLike
)
download_stream
def download_stream(
    self
) -> yapapi.storage.Content

InputStorageProvider

class InputStorageProvider(
    /,
    *args,
    **kwargs
)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors (in MRO)

  • abc.ABC

Descendants

  • yapapi.storage.StorageProvider

Methods

upload_bytes
def upload_bytes(
    self,
    data:bytes
) -> yapapi.storage.Source
upload_file
def upload_file(
    self,
    path:os.PathLike
) -> yapapi.storage.Source
upload_stream
def upload_stream(
    self,
    length:int,
    stream:AsyncIterator[bytes]
) -> yapapi.storage.Source

OutputStorageProvider

class OutputStorageProvider(
    /,
    *args,
    **kwargs
)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors (in MRO)

  • abc.ABC

Descendants

  • yapapi.storage.StorageProvider

Methods

new_destination
def new_destination(
    self,
    destination_file:Union[os.PathLike, NoneType]=None
) -> yapapi.storage.Destination

Creates slot for receiving file.

Parameters

destination_file: Optional hint where received data should be placed.

Source

class Source(
    /,
    *args,
    **kwargs
)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors (in MRO)

  • abc.ABC

Descendants

  • yapapi.storage.gftp.GftpSource
  • yapapi.storage.webdav._DavSource

Instance variables

download_url

Methods

content_length
def content_length(
    self
) -> int

StorageProvider

class StorageProvider(
    /,
    *args,
    **kwargs
)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors (in MRO)

  • yapapi.storage.InputStorageProvider
  • yapapi.storage.OutputStorageProvider
  • abc.ABC

Descendants

  • yapapi.storage.ComposedStorageProvider
  • yapapi.storage.gftp.GftpProvider
  • yapapi.storage.webdav.DavStorageProvider

Methods

new_destination
def new_destination(
    self,
    destination_file:Union[os.PathLike, NoneType]=None
) -> yapapi.storage.Destination

Creates slot for receiving file.

Parameters

destination_file: Optional hint where received data should be placed.

upload_bytes
def upload_bytes(
    self,
    data:bytes
) -> yapapi.storage.Source
upload_file
def upload_file(
    self,
    path:os.PathLike
) -> yapapi.storage.Source
upload_stream
def upload_stream(
    self,
    length:int,
    stream:AsyncIterator[bytes]
) -> yapapi.storage.Source