def get_artifacts(asset_type):
+ """
+ Get artifacts by given asset type
+ :param asset_type:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}"
resource = resource.format(assetType=asset_type)
ret = call_sdc(resource, "GET")
def get_artifact(asset_type, csar_id):
+ """
+ Get artifact by given asset type and csar id
+ :param asset_type:
+ :param csar_id:
+ :return:
+ """
artifacts = get_artifacts(asset_type)
for artifact in artifacts:
if artifact["uuid"] == csar_id:
def get_asset(asset_type, uuid):
+ """
+ Get asset by given type and UUID
+ :param asset_type:
+ :param uuid:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}/{uuid}/metadata".format(assetType=asset_type, uuid=uuid)
ret = call_sdc(resource, "GET")
if ret[0] != 0:
def delete_artifact(asset_type, asset_id, artifact_id):
+ """
+ Delete artifact by conditions from SDC
+ :param asset_type:
+ :param asset_id:
+ :param artifact_id:
+ :return:
+ """
resource = "/sdc/v1/catalog/{assetType}/{uuid}/artifacts/{artifactUUID}"
resource = resource.format(assetType=asset_type, uuid=asset_id, artifactUUID=artifact_id)
ret = call_sdc(resource, "DELETE")
def download_artifacts(download_url, local_path, file_name):
+ """
+ Downlaod artifacts from SDC
+ :param download_url:
+ :param local_path:
+ :param file_name:
+ :return:
+ """
additional_headers = {
'X-ECOMP-InstanceID': 'VFC',
'accept': 'application/octet-stream'
def create_consumer(name, salt, password):
+ """
+ Create a consumer to access the SDC
+ :param name:
+ :param salt:
+ :param password:
+ :return:
+ """
req_data = {
'consumerName': name,
'consumerSalt': salt,
def register_for_topics(key):
+ """
+ Register a topics of SDC
+ :param key:
+ :return:
+ """
req_data = {
'apiPublicKey': key,
'distrEnvName': 'AUTO',