META_CSAR_VERSION_KEY = 'CSAR-Version'\r
META_CSAR_VERSION_VALUE = '1.1'\r
META_CREATED_BY_KEY = 'Created-By'\r
-META_CREATED_BY_VALUE = 'ARIA'\r
+META_CREATED_BY_VALUE = 'ONAP'\r
META_ENTRY_DEFINITIONS_KEY = 'Entry-Definitions'\r
+META_ENTRY_MANIFEST_FILE_KEY = 'Entry-Manifest'\r
+META_ENTRY_HISTORY_FILE_KEY = 'Entry-Change-Log'\r
+META_ENTRY_TESTS_DIR_KEY = 'Entry-Tests'\r
+META_ENTRY_LICENSES_DIR_KEY = 'Entry-Licenses'\r
+\r
BASE_METADATA = {\r
META_FILE_VERSION_KEY: META_FILE_VERSION_VALUE,\r
META_CSAR_VERSION_KEY: META_CSAR_VERSION_VALUE,\r
}\r
\r
\r
-def write(source, entry, destination, logger):\r
+def check_file_dir(root, entry, msg, check_for_non=False, check_dir=False):\r
+ path = os.path.join(root, entry)\r
+ if check_for_non:\r
+ ret = not os.path.exists(path)\r
+ error_msg = '{0} already exists. ' + msg\r
+ elif check_dir:\r
+ ret = os.path.isdir(path)\r
+ error_msg = '{0} is not an existing directory. ' + msg\r
+ else:\r
+ ret = os.path.isfile(path)\r
+ error_msg = '{0} is not an existing file. ' + msg\r
+ if not ret:\r
+ raise ValueError(error_msg.format(path))\r
+\r
+\r
+def write(source, entry, destination, logger, args):\r
source = os.path.expanduser(source)\r
destination = os.path.expanduser(destination)\r
- entry_definitions = os.path.join(source, entry)\r
- meta_file = os.path.join(source, META_FILE)\r
- if not os.path.isdir(source):\r
- raise ValueError('{0} is not a directory. Please specify the service template '\r
- 'directory.'.format(source))\r
- if not os.path.isfile(entry_definitions):\r
- raise ValueError('{0} does not exists. Please specify a valid entry point.'\r
- .format(entry_definitions))\r
- if os.path.exists(destination):\r
- raise ValueError('{0} already exists. Please provide a path to where the CSAR should be '\r
- 'created.'.format(destination))\r
- if os.path.exists(meta_file):\r
- raise ValueError('{0} already exists. This commands generates a meta file for you. Please '\r
- 'remove the existing metafile.'.format(meta_file))\r
metadata = BASE_METADATA.copy()\r
+\r
+ check_file_dir(root=source,\r
+ entry='',\r
+ msg='Please specify the service template directory.',\r
+ check_dir=True)\r
+\r
+ check_file_dir(root=source,\r
+ entry=entry,\r
+ msg='Please specify a valid entry point.',\r
+ check_dir=False)\r
metadata[META_ENTRY_DEFINITIONS_KEY] = entry\r
+\r
+ check_file_dir(root='',\r
+ entry=destination,\r
+ msg='Please provide a path to where the CSAR should be created.',\r
+ check_for_non=True)\r
+\r
+ check_file_dir(root=source,\r
+ entry=META_FILE,\r
+ msg='This commands generates a meta file for you. Please '\r
+ 'remove the existing metafile.',\r
+ check_for_non=True)\r
+\r
+ if(args.manifest):\r
+ check_file_dir(root=source,\r
+ entry=args.manifest,\r
+ msg='Please specify a valid manifest file.',\r
+ check_dir=False)\r
+ metadata[META_ENTRY_MANIFEST_FILE_KEY] = args.manifest\r
+\r
+ if(args.history):\r
+ check_file_dir(root=source,\r
+ entry=args.history,\r
+ msg='Please specify a valid change history file.',\r
+ check_dir=False)\r
+ metadata[META_ENTRY_HISTORY_FILE_KEY] = args.history\r
+\r
+ if(args.tests):\r
+ check_file_dir(root=source,\r
+ entry=args.tests,\r
+ msg='Please specify a valid test directory.',\r
+ check_dir=True)\r
+ metadata[META_ENTRY_TESTS_DIR_KEY] = args.tests\r
+\r
+ if(args.licenses):\r
+ check_file_dir(root=source,\r
+ entry=args.licenses,\r
+ msg='Please specify a valid license directory.',\r
+ check_dir=True)\r
+ metadata[META_ENTRY_LICENSES_DIR_KEY] = args.licenses\r
+\r
logger.debug('Compressing root directory to ZIP')\r
with zipfile.ZipFile(destination, 'w', zipfile.ZIP_DEFLATED) as f:\r
- for root, _, files in os.walk(source):\r
+ for root, dirs, files in os.walk(source):\r
for file in files:\r
file_full_path = os.path.join(root, file)\r
file_relative_path = os.path.relpath(file_full_path, source)\r
logger.debug('Writing to archive: {0}'.format(file_relative_path))\r
f.write(file_full_path, file_relative_path)\r
+ # add empty dir\r
+ for dir in dirs:\r
+ dir_full_path = os.path.join(root, dir)\r
+ if len(os.listdir(dir_full_path)) == 0:\r
+ dir_relative_path = os.path.relpath(dir_full_path, source) + os.sep\r
+ logger.debug('Writing to archive: {0}'.format(dir_relative_path))\r
+ f.write(dir_full_path + os.sep, dir_relative_path)\r
+\r
logger.debug('Writing new metadata file to {0}'.format(META_FILE))\r
f.writestr(META_FILE, yaml.dump(metadata, default_flow_style=False))\r
\r
with open(os.path.join(self.destination, self.entry_definitions)) as f:\r
return yaml.load(f)\r
\r
+ @property\r
+ def entry_manifest_file(self):\r
+ return self.metadata.get(META_ENTRY_MANIFEST_FILE_KEY)\r
+\r
+ @property\r
+ def entry_history_file(self):\r
+ return self.metadata.get(META_ENTRY_HISTORY_FILE_KEY)\r
+\r
+ @property\r
+ def entry_tests_dir(self):\r
+ return self.metadata.get(META_ENTRY_TESTS_DIR_KEY)\r
+\r
+ @property\r
+ def entry_licenses_dir(self):\r
+ return self.metadata.get(META_ENTRY_LICENSES_DIR_KEY)\r
+\r
def _extract(self):\r
self.logger.debug('Extracting CSAR contents')\r
if not os.path.exists(self.destination):\r
validate_key(META_CREATED_BY_KEY)\r
validate_key(META_ENTRY_DEFINITIONS_KEY)\r
self.logger.debug('CSAR entry definitions: {0}'.format(self.entry_definitions))\r
- entry_definitions_path = os.path.join(self.destination, self.entry_definitions)\r
- if not os.path.isfile(entry_definitions_path):\r
- raise ValueError('The entry definitions {0} referenced by the metadata file does not '\r
- 'exist.'.format(entry_definitions_path))\r
+ self.logger.debug('CSAR manifest file: {0}'.format(self.entry_manifest_file))\r
+ self.logger.debug('CSAR change history file: {0}'.format(self.entry_history_file))\r
+ self.logger.debug('CSAR tests directory: {0}'.format(self.entry_tests_dir))\r
+ self.logger.debug('CSAR licenses directory: {0}'.format(self.entry_licenses_dir))\r
+\r
+ check_file_dir(self.destination,\r
+ self.entry_definitions,\r
+ 'The entry definitions {0} referenced by the metadata '\r
+ 'file does not exist.'.format(self.entry_definitions),\r
+ check_dir=False)\r
+\r
+ if(self.entry_manifest_file):\r
+ check_file_dir(self.destination,\r
+ self.entry_manifest_file,\r
+ 'The manifest file {0} referenced by the metadata '\r
+ 'file does not exist.'.format(self.entry_manifest_file),\r
+ check_dir=False)\r
+\r
+ if(self.entry_history_file):\r
+ check_file_dir(self.destination,\r
+ self.entry_history_file,\r
+ 'The change history file {0} referenced by the metadata '\r
+ 'file does not exist.'.format(self.entry_history_file),\r
+ check_dir=False)\r
+\r
+ if(self.entry_tests_dir):\r
+ check_file_dir(self.destination,\r
+ self.entry_tests_dir,\r
+ 'The test directory {0} referenced by the metadata '\r
+ 'file does not exist.'.format(self.entry_tests_dir),\r
+ check_dir=True)\r
+\r
+ if(self.entry_licenses_dir):\r
+ check_file_dir(self.destination,\r
+ self.entry_licenses_dir,\r
+ 'The license directory {0} referenced by the metadata '\r
+ 'file does not exist.'.format(self.entry_licenses_dir),\r
+ check_dir=True)\r
\r
def _download(self, url, target):\r
response = requests.get(url, stream=True)\r
# License for the specific language governing permissions and limitations
# under the License.
#
+import collections
import filecmp
-from packager import csar
import logging
+import os
import tempfile
import shutil
-def test_CSARWrite():
- CSAR_RESOURCE_DIR = 'tests/resources/csar'
- CSAR_ENTRY_FILE = 'test_entry.yaml'
- CSAR_OUTPUT_FILE = 'output.csar'
+from packager import csar
+
+CSAR_RESOURCE_DIR = 'tests/resources/csar'
+CSAR_ENTRY_FILE = 'test_entry.yaml'
+CSAR_OUTPUT_FILE = 'output.csar'
+
+Args = collections.namedtuple('Args',
+ ['source', 'entry', 'manifest', 'history', 'tests', 'licenses'])
+
+ARGS_MANIFEST = {
+ 'source': CSAR_RESOURCE_DIR,
+ 'entry': CSAR_ENTRY_FILE,
+ 'manifest': 'test_entry.mf',
+ 'history': 'ChangeLog.txt',
+ 'tests': 'Tests',
+ 'licenses': 'Licenses',
+ }
+ARGS_NO_MANIFEST = {
+ 'source': CSAR_RESOURCE_DIR,
+ 'entry': CSAR_ENTRY_FILE,
+ 'manifest': None,
+ 'history': None,
+ 'tests': None,
+ 'licenses': None,
+ }
+
+
+def csar_write_test(args):
csar_target_dir = tempfile.mkdtemp()
csar_extract_dir = tempfile.mkdtemp()
try:
- csar.write(CSAR_RESOURCE_DIR, CSAR_ENTRY_FILE, csar_target_dir + '/' + CSAR_OUTPUT_FILE, logging)
+ csar.write(args.source, args.entry, csar_target_dir + '/' + CSAR_OUTPUT_FILE, logging, args)
csar.read(csar_target_dir + '/' + CSAR_OUTPUT_FILE, csar_extract_dir, logging)
- assert filecmp.cmp(CSAR_RESOURCE_DIR + '/' + CSAR_ENTRY_FILE, csar_extract_dir + '/' + CSAR_ENTRY_FILE )
+ assert filecmp.cmp(args.source + '/' + args.entry, csar_extract_dir + '/' + args.entry)
+ if(args.manifest):
+ assert filecmp.cmp(args.source + '/' + args.manifest,
+ csar_extract_dir + '/' + args.manifest)
+ if(args.history):
+ assert filecmp.cmp(args.source + '/' + args.history,
+ csar_extract_dir + '/' + args.history)
finally:
shutil.rmtree(csar_target_dir, ignore_errors=True)
shutil.rmtree(csar_extract_dir, ignore_errors=True)
+def test_CSARWrite():
+ csar_write_test(Args(**ARGS_NO_MANIFEST))
+
+
+def test_CSARWrite_manifest():
+ # Because git can not store emptry directory, we need to create manually here
+ license_path = ARGS_MANIFEST['source'] + '/' + ARGS_MANIFEST['licenses']
+ if not os.path.exists(license_path):
+ os.makedirs(license_path)
+ csar_write_test(Args(**ARGS_MANIFEST))