# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ File system implementation of the storage resource API ("RAPI"). """ import os import shutil from multiprocessing import RLock from contextlib import contextmanager from functools import partial from distutils import dir_util # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module from aria.storage import ( api, exceptions ) class FileSystemResourceAPI(api.ResourceAPI): """ File system implementation of the storage resource API ("RAPI"). """ def __init__(self, directory, **kwargs): """ :param directory: root dir for storage """ super(FileSystemResourceAPI, self).__init__(**kwargs) self.directory = directory self.base_path = os.path.join(self.directory, self.name) self._join_path = partial(os.path.join, self.base_path) self._lock = RLock() @contextmanager def connect(self): """ Establishes a connection and destroys it after use. """ try: self._establish_connection() yield self except BaseException as e: raise exceptions.StorageError(str(e)) finally: self._destroy_connection() def _establish_connection(self): """ Establishes a connection. Used in the ``connect`` context manager. """ self._lock.acquire() def _destroy_connection(self): """ Destroys a connection. Used in the ``connect`` context manager. """ self._lock.release() def __repr__(self): return '{cls.__name__}(directory={self.directory})'.format( cls=self.__class__, self=self) def create(self, **kwargs): """ Creates a directory in by path. Tries to create the root directory as well. :param name: path of directory """ try: os.makedirs(self.directory) except (OSError, IOError): pass try: os.makedirs(self.base_path) except (OSError, IOError): pass def read(self, entry_id, path, **_): """ Retrieves the contents of a file. :param entry_id: entry ID :param path: path to resource :return: contents of the file :rtype: bytes """ resource_relative_path = os.path.join(self.name, entry_id, path or '') resource = os.path.join(self.directory, resource_relative_path) if not os.path.exists(resource): raise exceptions.StorageError("Resource {0} does not exist". format(resource_relative_path)) if not os.path.isfile(resource): resources = os.listdir(resource) if len(resources) != 1: raise exceptions.StorageError( 'Failed to read {0}; Reading a directory is ' 'only allowed when it contains a single resource'.format(resource)) resource = os.path.join(resource, resources[0]) with open(resource, 'rb') as resource_file: return resource_file.read() def download(self, entry_id, destination, path=None, **_): """ Downloads a file or directory. :param entry_id: entry ID :param destination: download destination :param path: path to download relative to the root of the entry (otherwise all) """ resource_relative_path = os.path.join(self.name, entry_id, path or '') resource = os.path.join(self.directory, resource_relative_path) if not os.path.exists(resource): raise exceptions.StorageError("Resource {0} does not exist". format(resource_relative_path)) if os.path.isfile(resource): shutil.copy2(resource, destination) else: dir_util.copy_tree(resource, destination) # pylint: disable=no-member def upload(self, entry_id, source, path=None, **_): """ Uploads a file or directory. :param entry_id: entry ID :param source: source of the files to upload :param path: the destination of the file/s relative to the entry root dir. """ resource_directory = os.path.join(self.directory, self.name, entry_id) if not os.path.exists(resource_directory): os.makedirs(resource_directory) destination = os.path.join(resource_directory, path or '') if os.path.isfile(source): shutil.copy2(source, destination) else: dir_util.copy_tree(source, destination) # pylint: disable=no-member def delete(self, entry_id, path=None, **_): """ Deletes a file or directory. :param entry_id: entry ID :param path: path to delete relative to the root of the entry (otherwise all) """ destination = os.path.join(self.directory, self.name, entry_id, path or '') if os.path.exists(destination): if os.path.isfile(destination): os.remove(destination) else: shutil.rmtree(destination) return True return False