def get_artifacts(asset_type):
+ """
+ Get artifacts by given asset type
+ :param asset_type:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}"
resource = resource.format(assetType=asset_type)
ret = call_sdc(resource, "GET")
def get_artifact(asset_type, csar_id):
+ """
+ Get artifact by given asset type and csar id
+ :param asset_type:
+ :param csar_id:
+ :return:
+ """
artifacts = get_artifacts(asset_type)
for artifact in artifacts:
if artifact["uuid"] == csar_id:
def get_asset(asset_type, uuid):
+ """
+ Get asset by given type and UUID
+ :param asset_type:
+ :param uuid:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}/{uuid}/metadata".format(assetType=asset_type, uuid=uuid)
ret = call_sdc(resource, "GET")
if ret[0] != 0:
def delete_artifact(asset_type, asset_id, artifact_id):
+ """
+ Delete artifact by conditions from SDC
+ :param asset_type:
+ :param asset_id:
+ :param artifact_id:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}/{uuid}/artifacts/{artifactUUID}"
resource = resource.format(assetType=asset_type, uuid=asset_id, artifactUUID=artifact_id)
ret = call_sdc(resource, "DELETE")
def download_artifacts(download_url, local_path, file_name):
+ """
+ Downlaod artifacts from SDC
+ :param download_url:
+ :param local_path:
+ :param file_name:
+ :return:
+ """
additional_headers = {
'X-ECOMP-InstanceID': 'VFC',
'accept': 'application/octet-stream'
local_file.write(ret[1])
local_file.close()
return local_file_name
+
+
+def create_consumer(name, salt, password):
+ """
+ Create a consumer to access the SDC
+ :param name:
+ :param salt:
+ :param password:
+ :return:
+ """
+ req_data = {
+ 'consumerName': name,
+ 'consumerSalt': salt,
+ 'consumerPassword': password
+ }
+ req_data = json.JSONEncoder().encode(req_data)
+ resource = '/sdc2/rest/v1/consumers'
+ headers = {'USER_ID': 'jh0003'}
+ ret = restcall.call_req(base_url=SDC_BASE_URL,
+ user="",
+ passwd="",
+ auth_type=restcall.rest_no_auth,
+ resource=resource,
+ method="POST",
+ content=req_data,
+ additional_headers=headers)
+ if ret[0] != 0:
+ logger.error("Status code is %s, detail is %s.", ret[2], ret[1])
+ raise CatalogException("Failed to create consumer from sdc.")
+
+
+def register_for_topics(key):
+ """
+ Register a topics of SDC
+ :param key:
+ :return:
+ """
+ req_data = {
+ 'apiPublicKey': key,
+ 'distrEnvName': 'AUTO',
+ 'isConsumerToSdcDistrStatusTopic': False,
+ 'distEnvEndPoints': []
+ }
+ req_data = json.JSONEncoder().encode(req_data)
+ url = '/sdc/v1/registerForDistribution'
+ ret = call_sdc(url, 'POST', req_data)
+ if ret[0] != 0:
+ logger.error("Status code is %s, detail is %s.", ret[2], ret[1])
+ raise CatalogException("Failed to register from sdc.")
+ return json.JSONDecoder().decode(ret[1])