1 # Licensed to the Apache Software Foundation (ASF) under one or more
2 # contributor license agreements. See the NOTICE file distributed with
3 # this work for additional information regarding copyright ownership.
4 # The ASF licenses this file to You under the Apache License, Version 2.0
5 # (the "License"); you may not use this file except in compliance with
6 # the License. You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 File system implementation of the storage resource API ("RAPI").
22 from multiprocessing import RLock
23 from contextlib import contextmanager
24 from functools import partial
25 from distutils import dir_util # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module
27 from aria.storage import (
33 class FileSystemResourceAPI(api.ResourceAPI):
35 File system implementation of the storage resource API ("RAPI").
38 def __init__(self, directory, **kwargs):
40 :param directory: root dir for storage
42 super(FileSystemResourceAPI, self).__init__(**kwargs)
43 self.directory = directory
44 self.base_path = os.path.join(self.directory, self.name)
45 self._join_path = partial(os.path.join, self.base_path)
51 Establishes a connection and destroys it after use.
54 self._establish_connection()
56 except BaseException as e:
57 raise exceptions.StorageError(str(e))
59 self._destroy_connection()
61 def _establish_connection(self):
63 Establishes a connection. Used in the ``connect`` context manager.
67 def _destroy_connection(self):
69 Destroys a connection. Used in the ``connect`` context manager.
74 return '{cls.__name__}(directory={self.directory})'.format(
75 cls=self.__class__, self=self)
77 def create(self, **kwargs):
79 Creates a directory in by path. Tries to create the root directory as well.
81 :param name: path of directory
84 os.makedirs(self.directory)
85 except (OSError, IOError):
88 os.makedirs(self.base_path)
89 except (OSError, IOError):
92 def read(self, entry_id, path, **_):
94 Retrieves the contents of a file.
96 :param entry_id: entry ID
97 :param path: path to resource
98 :return: contents of the file
101 resource_relative_path = os.path.join(self.name, entry_id, path or '')
102 resource = os.path.join(self.directory, resource_relative_path)
103 if not os.path.exists(resource):
104 raise exceptions.StorageError("Resource {0} does not exist".
105 format(resource_relative_path))
106 if not os.path.isfile(resource):
107 resources = os.listdir(resource)
108 if len(resources) != 1:
109 raise exceptions.StorageError(
110 'Failed to read {0}; Reading a directory is '
111 'only allowed when it contains a single resource'.format(resource))
112 resource = os.path.join(resource, resources[0])
113 with open(resource, 'rb') as resource_file:
114 return resource_file.read()
116 def download(self, entry_id, destination, path=None, **_):
118 Downloads a file or directory.
120 :param entry_id: entry ID
121 :param destination: download destination
122 :param path: path to download relative to the root of the entry (otherwise all)
124 resource_relative_path = os.path.join(self.name, entry_id, path or '')
125 resource = os.path.join(self.directory, resource_relative_path)
126 if not os.path.exists(resource):
127 raise exceptions.StorageError("Resource {0} does not exist".
128 format(resource_relative_path))
129 if os.path.isfile(resource):
130 shutil.copy2(resource, destination)
132 dir_util.copy_tree(resource, destination) # pylint: disable=no-member
134 def upload(self, entry_id, source, path=None, **_):
136 Uploads a file or directory.
138 :param entry_id: entry ID
139 :param source: source of the files to upload
140 :param path: the destination of the file/s relative to the entry root dir.
142 resource_directory = os.path.join(self.directory, self.name, entry_id)
143 if not os.path.exists(resource_directory):
144 os.makedirs(resource_directory)
145 destination = os.path.join(resource_directory, path or '')
146 if os.path.isfile(source):
147 shutil.copy2(source, destination)
149 dir_util.copy_tree(source, destination) # pylint: disable=no-member
151 def delete(self, entry_id, path=None, **_):
153 Deletes a file or directory.
155 :param entry_id: entry ID
156 :param path: path to delete relative to the root of the entry (otherwise all)
158 destination = os.path.join(self.directory, self.name, entry_id, path or '')
159 if os.path.exists(destination):
160 if os.path.isfile(destination):
161 os.remove(destination)
163 shutil.rmtree(destination)