Source code for ocflcore.persistence.storage.filesystem

# -*- coding: utf-8 -*-
#
# Copyright (C) 2021 CERN.
# Copyright (C) 2021 Data Futures.
#
# OCFL Core is free software; you can redistribute it and/or modify it under the
# terms of the MIT License; see LICENSE file for more details.

"""File system storage implementations for OCFL."""

import shutil
from os import makedirs
from os.path import dirname, join

from .base import Storage


[docs]class FileSystemStorage(Storage): """File system storage.""" def __init__(self, root_path): """Construct the file system. :param root_path: Path to the storage root. """ self._root = root_path def _p(self, path): """Absolute path.""" return join(self._root, path)
[docs] def write(self, file_path, stream): """Write stream to the given file path in the storage root. Automatically creates missing directories, and uses a 1MB chunk size. """ file_path = self._p(file_path) dir_path = dirname(file_path) if dir_path: makedirs(dir_path, exist_ok=True) chunk_size = 10 * 1024 * 1024 # 10mb with open(file_path, "wb") as fp: # Write in chunks while 1: chunk = stream.read(chunk_size) if not chunk: break fp.write(chunk)
[docs] def move(self, other_storage, path): """Move between storage systems.""" other_path = join(other_storage._root, path) our_path = self._p(path) shutil.move(other_path, our_path)
# TODO: support other storage types