megfile.s3 module

megfile.s3.get_endpoint_url(profile_name: str | None = None) str[source]

Get the endpoint url of S3

Returns:

S3 endpoint url

megfile.s3.get_s3_client(config: Config | None = None, cache_key: str | None = None, profile_name: str | None = None)[source]

Get S3 client

Returns:

S3 client

megfile.s3.get_s3_session(profile_name=None) Session[source]

Get S3 session

Returns:

S3 session

megfile.s3.is_s3(path: str | BasePath | PathLike) bool[source]
  1. According to aws-cli , test if a path is s3 path.

  2. megfile also support the path like s3[+profile_name]://bucket/key

Parameters:

path – Path to be tested

Returns:

True if path is s3 path, else False

megfile.s3.parse_s3_url(s3_url: str | BasePath | PathLike) Tuple[str, str][source]
megfile.s3.s3_access(path: str | BasePath | PathLike, mode: Access = Access.READ) bool[source]

Test if path has access permission described by mode

Parameters:
  • path – Given path

  • mode – access mode

Returns:

bool, if the bucket of s3_url has read/write access.

megfile.s3.s3_buffered_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False, *, max_workers: int | None = None, max_buffer_size: int | None = None, block_forward: int | None = None, block_size: int | None = None, limited_seekable: bool = False, buffered: bool = False, share_cache_key: str | None = None, cache_path: str | None = None) IO[source]

Open an asynchronous prefetch reader, to support fast sequential read

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Some parameter setting may perform well: max_workers=10 or 20, default value None means using global thread pool

Parameters:
  • s3_url – s3 path

  • mode – Mode to open file, could be one of “rb”, “wb”, “ab”, “rb+”, “wb+” or “ab+”

  • encoding – encoding is the name of the encoding used to decode or encode the file. This should only be used in text mode.

  • errors – errors is an optional string that specifies how encoding and decoding errors are to be handled—this cannot be used in binary mode.

  • followlinks – follow symbolic link, default False

  • max_workers – Max download / upload thread number, None by default, will use global thread pool with 8 threads.

  • max_buffer_size – Max cached buffer size in memory, 128MB by default. Set to 0 will disable cache.

  • block_forward – How many blocks of data cached from offset position, only for read mode.

  • block_size – Size of single block. Each block will be uploaded by single thread.

  • limited_seekable – If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek

Returns:

An opened File object

Raises:

S3FileNotFoundError

megfile.s3.s3_cached_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False, *, cache_path: str | None = None) S3CachedHandler[source]

Open a local-cache file reader / writer, for frequent random read / write

Note

User should make sure that reader / writer are closed correctly

Supports context manager

cache_path can specify the path of cache file. Performance could be better if cache file path is on ssd or tmpfs

Parameters:
  • s3_url – s3 path

  • mode – Mode to open file, could be one of “rb”, “wb”, “ab”, “rb+”, “wb+” or “ab+”

  • encoding – encoding is the name of the encoding used to decode or encode the file. This should only be used in text mode.

  • errors – errors is an optional string that specifies how encoding and decoding errors are to be handled—this cannot be used in binary mode.

  • followlinks – follow symbolic link, default False

  • cache_path – cache file path

Returns:

An opened BufferedReader / BufferedWriter object

megfile.s3.s3_concat(src_paths: List[str | BasePath | PathLike], dst_path: str | BasePath | PathLike, block_size: int = 8388608, max_workers: int = 8) None[source]

Concatenate s3 files to one file.

Parameters:
  • src_paths – Given source paths

  • dst_path – Given destination path

megfile.s3.s3_copy(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, callback: Callable[[int], None] | None = None, followlinks: bool = False, overwrite: bool = True) None[source]

File copy on S3 Copy content of file on src_path to dst_path. It’s caller’s responsibility to ensure the s3_isfile(src_url) is True

Parameters:
  • src_url – Given path

  • dst_path – Target file path

  • callback – Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call

  • followlinks – False if regard symlink as file, else True

  • overwrite – whether or not overwrite file when exists, default is True

megfile.s3.s3_download(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, callback: Callable[[int], None] | None = None, followlinks: bool = False, overwrite: bool = True) None[source]

Downloads a file from s3 to local filesystem.

Parameters:
  • src_url – source s3 path

  • dst_url – target fs path

  • callback – Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call

  • followlinks – False if regard symlink as file, else True

  • overwrite – whether or not overwrite file when exists, default is True

megfile.s3.s3_exists(path: str | BasePath | PathLike, followlinks: bool = False) bool[source]

Test if s3_url exists

If the bucket of s3_url are not permitted to read, return False

Parameters:

path – Given path

Returns:

True if s3_url exists, else False

megfile.s3.s3_getmd5(path: str | BasePath | PathLike, recalculate: bool = False, followlinks: bool = False) str[source]

Get md5 meta info in files that uploaded/copied via megfile

If meta info is lost or non-existent, return None

Parameters:
  • path – Given path

  • recalculate – calculate md5 in real-time or return s3 etag

  • followlinks – If is True, calculate md5 for real file

Returns:

md5 meta info

megfile.s3.s3_getmtime(path: str | BasePath | PathLike, follow_symlinks: bool = False) float[source]

Get last-modified time of the file on the given s3_url path (in Unix timestamp format).

If the path is an existent directory, return the latest modified time of all file in it. The mtime of empty directory is 1970-01-01 00:00:00

If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError

Parameters:

path – Given path

Returns:

Last-modified time

Raises:

S3FileNotFoundError, UnsupportedError

megfile.s3.s3_getsize(path: str | BasePath | PathLike, follow_symlinks: bool = False) int[source]

Get file size on the given s3_url path (in bytes).

If the path in a directory, return the sum of all file size in it, including file in subdirectories (if exist).

The result excludes the size of directory itself. In other words, return 0 Byte on an empty directory path.

If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError

Parameters:

path – Given path

Returns:

File size

Raises:

S3FileNotFoundError, UnsupportedError

megfile.s3.s3_glob(path: str | BasePath | PathLike, recursive: bool = True, missing_ok: bool = True) List[str][source]

Return s3 path list in ascending alphabetical order, in which path matches glob pattern

Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError

Parameters:
  • recursive – If False, ** will not search directory recursively

  • missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError

Raises:

UnsupportedError, when bucket part contains wildcard characters

Returns:

A list contains paths match s3_pathname

megfile.s3.s3_glob_stat(path: str | BasePath | PathLike, recursive: bool = True, missing_ok: bool = True) Iterator[FileEntry][source]

Return a generator contains tuples of path and file stat, in ascending alphabetical order, in which path matches glob pattern

Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError

Parameters:
  • recursive – If False, ** will not search directory recursively

  • missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError

Raises:

UnsupportedError, when bucket part contains wildcard characters

Returns:

A generator contains tuples of path and file stat, in which paths match s3_pathname

megfile.s3.s3_hasbucket(path: str | BasePath | PathLike) bool[source]

Test if the bucket of s3_url exists

Parameters:

path – Given path

Returns:

True if bucket of s3_url exists, else False

megfile.s3.s3_iglob(path: str | BasePath | PathLike, recursive: bool = True, missing_ok: bool = True) Iterator[str][source]

Return s3 path iterator in ascending alphabetical order, in which path matches glob pattern

Notes: Only glob in bucket. If trying to match bucket with wildcard characters, raise UnsupportedError

Parameters:
  • recursive – If False, ** will not search directory recursively

  • missing_ok – If False and target path doesn’t match any file, raise FileNotFoundError

Raises:

UnsupportedError, when bucket part contains wildcard characters

Returns:

An iterator contains paths match s3_pathname

megfile.s3.s3_isdir(path: str | BasePath | PathLike, followlinks: bool = False) bool[source]

Test if an s3 url is directory Specific procedures are as follows: If there exists a suffix, of which os.path.join(s3_url, suffix) is a file If the url is empty bucket or s3://

Parameters:
  • path – Given path

  • followlinks – whether followlinks is True or False, result is the same. Because s3 symlink not support dir.

Returns:

True if path is s3 directory, else False

megfile.s3.s3_isfile(path: str | BasePath | PathLike, followlinks: bool = False) bool[source]

Test if an s3_url is file

Parameters:

path – Given path

Returns:

True if path is s3 file, else False

Test whether a path is link

Parameters:

path – Given path

Returns:

True if a path is link, else False

Raises:

S3NotALinkError

megfile.s3.s3_listdir(path: str | BasePath | PathLike) List[str][source]

Get all contents of given s3_url. The result is in ascending alphabetical order.

Parameters:

path – Given path

Returns:

All contents have prefix of s3_url in ascending alphabetical order

Raises:

S3FileNotFoundError, S3NotADirectoryError

megfile.s3.s3_load_content(s3_url, start: int | None = None, stop: int | None = None) bytes[source]

Get specified file from [start, stop) in bytes

Parameters:
  • s3_url – Specified path

  • start – start index

  • stop – stop index

Returns:

bytes content in range [start, stop)

megfile.s3.s3_load_from(path: str | BasePath | PathLike) BinaryIO[source]

Read all content in binary on specified path and write into memory

User should close the BinaryIO manually

Parameters:

path – Given path

Returns:

BinaryIO

megfile.s3.s3_lstat(path: str | BasePath | PathLike) StatResult[source]

Like Path.stat() but, if the path points to a symbolic link, return the symbolic link’s information rather than its target’s.

megfile.s3.s3_makedirs(path: str | BasePath | PathLike, exist_ok: bool = False)[source]

Create an s3 directory. Purely creating directory is invalid because it’s unavailable on OSS. This function is to test the target bucket have WRITE access.

Parameters:
  • path – Given path

  • exist_ok – If False and target directory exists, raise S3FileExistsError

Raises:

S3BucketNotFoundError, S3FileExistsError

megfile.s3.s3_memory_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False) S3MemoryHandler[source]

Open a memory-cache file reader / writer, for frequent random read / write

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Parameters:

mode – Mode to open file, could be one of “rb”, “wb”, “ab”, “rb+”, “wb+” or “ab+”

Returns:

An opened BufferedReader / BufferedWriter object

megfile.s3.s3_move(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, overwrite: bool = True) None[source]

Move file/directory path from src_url to dst_url

Parameters:
  • src_url – Given path

  • dst_url – Given destination path

  • overwrite – whether or not overwrite file when exists

megfile.s3.s3_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False, *, max_workers: int | None = None, max_buffer_size: int | None = None, block_forward: int | None = None, block_size: int | None = None, limited_seekable: bool = False, buffered: bool = False, share_cache_key: str | None = None, cache_path: str | None = None) IO

Open an asynchronous prefetch reader, to support fast sequential read

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Some parameter setting may perform well: max_workers=10 or 20, default value None means using global thread pool

Parameters:
  • s3_url – s3 path

  • mode – Mode to open file, could be one of “rb”, “wb”, “ab”, “rb+”, “wb+” or “ab+”

  • encoding – encoding is the name of the encoding used to decode or encode the file. This should only be used in text mode.

  • errors – errors is an optional string that specifies how encoding and decoding errors are to be handled—this cannot be used in binary mode.

  • followlinks – follow symbolic link, default False

  • max_workers – Max download / upload thread number, None by default, will use global thread pool with 8 threads.

  • max_buffer_size – Max cached buffer size in memory, 128MB by default. Set to 0 will disable cache.

  • block_forward – How many blocks of data cached from offset position, only for read mode.

  • block_size – Size of single block. Each block will be uploaded by single thread.

  • limited_seekable – If write-handle supports limited seek (both file head part and tail part can seek block_size). Notes: This parameter are valid only for write-handle. Read-handle support arbitrary seek

Returns:

An opened File object

Raises:

S3FileNotFoundError

megfile.s3.s3_path_join(path: str | BasePath | PathLike, *other_paths: str | BasePath | PathLike) str[source]

Concat 2 or more path to a complete path

Parameters:
  • path – Given path

  • other_paths – Paths to be concatenated

Returns:

Concatenated complete path

Note

The difference between this function and os.path.join is that this function ignores left side slash (which indicates absolute path) in other_paths and will directly concat.

e.g. os.path.join(‘/path’, ‘to’, ‘/file’) => ‘/file’, but s3_path_join(‘/path’, ‘to’, ‘/file’) => ‘/path/to/file’

megfile.s3.s3_pipe_open(s3_url: str | BasePath | PathLike, mode: str, followlinks: bool = False, *, join_thread: bool = True) S3PipeHandler[source]

Open a asynchronous read-write reader / writer, to support fast sequential read / write

Note

User should make sure that reader / writer are closed correctly

Supports context manager

When join_thread is False, while the file handle are closing, this function will not wait until the asynchronous writing finishes;

False doesn’t affect read-handle, but this can speed up write-handle because file will be written asynchronously.

But asynchronous behavior can guarantee the file are successfully written, and frequent execution may cause thread and file handle exhaustion

Parameters:
  • s3_url – s3 path

  • mode – Mode to open file, either “r”, “rb”, “w” or “wb”

  • encoding – encoding is the name of the encoding used to decode or encode the file. This should only be used in text mode.

  • errors – errors is an optional string that specifies how encoding and decoding errors are to be handled—this cannot be used in binary mode.

  • followlinks – follow symbolic link, default False

  • join_thread – If wait after function execution until s3 finishes writing

Returns:

An opened BufferedReader / BufferedWriter object

megfile.s3.s3_prefetch_open(s3_url: str | BasePath | PathLike, mode: str = 'rb', followlinks: bool = False, *, max_workers: int | None = None, block_size: int = 8388608) S3PrefetchReader[source]

Open a asynchronous prefetch reader, to support fast sequential read and random read

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Some parameter setting may perform well: max_workers=10 or 20, block_size=8 or 16 MB, default value None means using global thread pool

Parameters:
  • s3_url – s3 path

  • mode – only support “r” or “rb”

  • encoding – encoding is the name of the encoding used to decode or encode the file. This should only be used in text mode.

  • errors – errors is an optional string that specifies how encoding and decoding errors are to be handled—this cannot be used in binary mode.

  • followlinks – follow symbolic link, default False

  • max_workers – Max download thread number, None by default

  • block_size – Max data size downloaded by each thread, in bytes, 8MB by default

Returns:

An opened S3PrefetchReader object

Raises:

S3FileNotFoundError

Return a string representing the path to which the symbolic link points.

Returns:

Return a string representing the path to which the symbolic link points.

Raises:

S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError, S3NotALinkError

megfile.s3.s3_remove(path: str | BasePath | PathLike, missing_ok: bool = False) None[source]

Remove the file or directory on s3, s3:// and s3://bucket are not permitted to remove

Parameters:
  • path – Given path

  • missing_ok – if False and target file/directory not exists, raise S3FileNotFoundError

Raises:

S3PermissionError, S3FileNotFoundError, UnsupportedError

megfile.s3.s3_rename(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, overwrite: bool = True) None[source]

Move s3 file path from src_url to dst_url

Parameters:
  • dst_url – Given destination path

  • overwrite – whether or not overwrite file when exists

megfile.s3.s3_save_as(file_object: BinaryIO, path: str | BasePath | PathLike)[source]

Write the opened binary stream to specified path, but the stream won’t be closed

Parameters:
  • path – Given path

  • file_object – Stream to be read

megfile.s3.s3_scan(path: str | BasePath | PathLike, missing_ok: bool = True, followlinks: bool = False) Iterator[str][source]

Iteratively traverse only files in given s3 directory, in alphabetical order. Every iteration on generator yields a path string.

If s3_url is a file path, yields the file only

If s3_url is a non-existent path, return an empty generator

If s3_url is a bucket path, return all file paths in the bucket

If s3_url is an empty bucket, return an empty generator

If s3_url doesn’t contain any bucket, which is s3_url == ‘s3://’, raise UnsupportedError. walk() on complete s3 is not supported in megfile

Parameters:
  • path – Given path

  • missing_ok – If False and there’s no file in the directory, raise FileNotFoundError

Raises:

UnsupportedError

Returns:

A file path generator

megfile.s3.s3_scan_stat(path: str | BasePath | PathLike, missing_ok: bool = True, followlinks: bool = False) Iterator[FileEntry][source]

Iteratively traverse only files in given directory, in alphabetical order. Every iteration on generator yields a tuple of path string and file stat

Parameters:
  • path – Given path

  • missing_ok – If False and there’s no file in the directory, raise FileNotFoundError

Raises:

UnsupportedError

Returns:

A file path generator

megfile.s3.s3_scandir(path: str | BasePath | PathLike) Iterator[FileEntry][source]

Get all contents of given s3_url, the order of result is in arbitrary order.

Parameters:

path – Given path

Returns:

All contents have prefix of s3_url

Raises:

S3FileNotFoundError, S3NotADirectoryError

megfile.s3.s3_share_cache_open(s3_url: str | BasePath | PathLike, mode: str = 'rb', followlinks: bool = False, *, cache_key: str = 'lru', max_workers: int | None = None, block_size: int = 8388608) S3ShareCacheReader[source]

Open a asynchronous prefetch reader, to support fast sequential read and random read

Note

User should make sure that reader / writer are closed correctly

Supports context manager

Some parameter setting may perform well: max_workers=10 or 20, block_size=8 or 16 MB, default value None means using global thread pool

Parameters:
  • s3_url – s3 path

  • mode – only support “r” or “rb”

  • encoding – encoding is the name of the encoding used to decode or encode the file. This should only be used in text mode.

  • errors – errors is an optional string that specifies how encoding and decoding errors are to be handled—this cannot be used in binary mode.

  • followlinks – follow symbolic link, default False

  • max_workers – Max download thread number, None by default

  • block_size – Max data size downloaded by each thread, in bytes, 8MB by default

Returns:

An opened S3ShareCacheReader object

Raises:

S3FileNotFoundError

megfile.s3.s3_stat(path: str | BasePath | PathLike, follow_symlinks=True) StatResult[source]

Get StatResult of s3_url file, including file size and mtime, referring to s3_getsize and s3_getmtime

If s3_url is not an existent path, which means s3_exist(s3_url) returns False, then raise S3FileNotFoundError

If attempt to get StatResult of complete s3, such as s3_dir_url == ‘s3://’, raise S3BucketNotFoundError

Parameters:

path – Given path

Returns:

StatResult

Raises:

S3FileNotFoundError, S3BucketNotFoundError

Create a symbolic link pointing to src_path named dst_path.

Parameters:
  • src_path – Given path

  • dst_path – Destination path

Raises:

S3NameTooLongError, S3BucketNotFoundError, S3IsADirectoryError

megfile.s3.s3_sync(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, followlinks: bool = False, force: bool = False, overwrite: bool = True) None[source]

Copy file/directory on src_url to dst_url

Parameters:
  • src_url – Given path

  • dst_url – Given destination path

  • followlinks – False if regard symlink as file, else True

  • force – Sync file forcible, do not ignore same files, priority is higher than ‘overwrite’, default is False

  • overwrite – whether or not overwrite file when exists, default is True

Remove the file on s3

Parameters:
  • path – Given path

  • missing_ok – if False and target file not exists, raise S3FileNotFoundError

Raises:

S3PermissionError, S3FileNotFoundError, S3IsADirectoryError

megfile.s3.s3_upload(src_url: str | BasePath | PathLike, dst_url: str | BasePath | PathLike, callback: Callable[[int], None] | None = None, followlinks: bool = False, overwrite: bool = True) None[source]

Uploads a file from local filesystem to s3.

Parameters:
  • src_url – source fs path

  • dst_url – target s3 path

  • callback – Called periodically during copy, and the input parameter is the data size (in bytes) of copy since the last call

  • followlinks – False if regard symlink as file, else True

  • overwrite – whether or not overwrite file when exists, default is True

megfile.s3.s3_walk(path: str | BasePath | PathLike, followlinks: bool = False) Iterator[Tuple[str, List[str], List[str]]][source]

Iteratively traverse the given s3 directory, in top-bottom order. In other words, firstly traverse parent directory, if subdirectories exist, traverse the subdirectories in alphabetical order.

Every iteration on generator yields a 3-tuple: (root, dirs, files)

  • root: Current s3 path;

  • dirs: Name list of subdirectories in current directory. The list is sorted by name in ascending alphabetical order;

  • files: Name list of files in current directory. The list is sorted by name in ascending alphabetical order;

If s3_url is a file path, return an empty generator

If s3_url is a non-existent path, return an empty generator

If s3_url is a bucket path, bucket will be the top directory, and will be returned at first iteration of generator

If s3_url is an empty bucket, only yield one 3-tuple (notes: s3 doesn’t have empty directory)

If s3_url doesn’t contain any bucket, which is s3_url == ‘s3://’, raise UnsupportedError. walk() on complete s3 is not supported in megfile

Parameters:
  • path – Given path

  • followlinks – whether followlinks is True or False, result is the same. Because s3 symlink not support dir.

Raises:

UnsupportedError

Returns:

A 3-tuple generator