1 # ================================================================================
2 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
17 """Our client interface to Cloudify"""
26 from otihandler.consul_client import ConsulClient
29 class CfyClientConsulError(RuntimeError):
33 class CloudifyClient(object):
34 """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants"""
36 def __init__(self, **kwargs):
37 self._protocol = kwargs.get('protocol', 'http')
38 self._host = kwargs.get('host')
39 self._port = kwargs.get('port')
40 self._headers = kwargs.get('headers')
42 self.node_instances = self
44 def list(self, **kwargs):
45 url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port)
47 # req = Request('GET', url_mask, headers=self._headers)
48 # prepped = req.prepare()
49 # response = s.send(prepped,verify=False,timeout=30)
50 response = requests.get(url_mask, headers=self._headers, timeout=30)
52 tenants = [x["name"] for x in obj["items"]]
53 tenants_with_containers = [x for x in tenants if 'DCAE' in x]
56 url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format(
57 self._protocol, self._host, self._port, size, "{}")
59 for (key,val) in kwargs.items():
60 if isinstance(val, str):
61 url_mask = url_mask + '&{}={}'.format(key, val)
62 elif isinstance(val, list):
63 url_mask = url_mask + '&{}={}'.format(key, ','.join(val))
65 for tenant in tenants_with_containers:
66 self._headers_with_tenant = copy.deepcopy(self._headers)
67 self._headers_with_tenant['Tenant'] = tenant
73 # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant)
74 # prepped = req.prepare()
75 # response = s.send(prepped, verify=False, timeout=30)
76 response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30)
77 response.raise_for_status()
79 offset = offset + len(obj["items"])
80 total = obj["metadata"]["pagination"]["total"]
81 for item in obj["items"]:
82 yield NodeInstance(item)
84 def update_node_instance(self, node_instance_id, body, **kwargs):
85 headers = copy.deepcopy(self._headers_with_tenant)
86 headers['Content-Type'] = "application/json"
87 url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format(
88 self._protocol, self._host, self._port, node_instance_id)
89 response = requests.patch(url_mask, json=body, headers=headers, timeout=30)
94 class NodeInstance(object):
95 """quick replacement for cloudify_rest_client"""
97 def __init__(self, instance):
98 self.id = instance.get("id")
99 self.deployment_id = instance.get("deployment_id")
100 self.host_id = instance.get("host_id")
101 self.runtime_properties = instance.get("runtime_properties")
102 self.relationships = instance.get("relationships")
103 self.state = instance.get("state")
104 self.version = instance.get("version")
105 self.node_id = instance.get("node_id")
106 self.scaling_groups = instance.get("scaling_groups")
109 class CfyClient(object):
110 _logger = logging.getLogger("oti_handler.cfy_client")
115 def __set_cloudify_manager_client():
116 """Create connection to Cloudify_Manager."""
118 if CfyClient._client:
123 obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify")
124 source = "CLOUDIFY environment variable"
126 CM_KEY = 'cloudify_manager'
127 source = "Consul key '{}'".format(CM_KEY)
130 results = ConsulClient.lookup_service(CM_KEY)
131 except Exception as e:
132 msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY)
133 CfyClient._logger.error(msg)
134 raise CfyClientConsulError(msg)
136 host = result['ServiceAddress']
137 port = result['ServicePort']
140 obj = ConsulClient.get_value(CM_KEY)
141 except Exception as e:
142 msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY)
143 CfyClient._logger.error(msg)
144 raise CfyClientConsulError(msg)
146 raise CfyClientConsulError("{} value is empty or invalid".format(source))
148 obj = obj.get('cloudify')
151 raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source))
153 host = obj.get('address', host)
155 raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source))
157 port = obj.get('port', port)
159 raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source))
161 protocol = obj.get('protocol')
163 raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source))
164 username = obj.get('user')
166 raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source))
167 password = obj.get('password')
169 raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source))
171 b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), "utf-8")).decode("utf-8")
172 headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')}
173 #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')}
175 CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers)
179 def query_k8_components(in_cluster_fqdn):
181 Iterate components that belong to a cluster fqdn.
185 in_cluster_fqdn : string
190 A generator of tuples of component information
191 [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ]
195 CfyClient.__set_cloudify_manager_client()
196 for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
197 rtp = node_instance.runtime_properties
201 dti_info = rtp.get('dti_info')
203 env_items = dti_info.get('env')
204 for env in env_items:
205 if env.get("name") == 'KUBE_CLUSTER_FQDN':
206 cluster_fqdn = env.get("value")
207 if env.get("name") == 'KUBE_PROXY_FQDN':
208 proxy_fqdn = env.get("value")
209 ports = dti_info.get('ports')
211 scn_port = ports[0].split(':')[0]
215 if in_cluster_fqdn != cluster_fqdn:
218 controller_type = rtp.get('k8s_controller_type')
219 if not controller_type:
220 CfyClient._logger.debug("controller type is missing")
222 elif controller_type != "statefulset":
223 CfyClient._logger.debug("not a stateful set")
226 container_id = rtp.get('k8s_deployment')
228 CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(
229 node_instance.deployment_id, node_instance.id))
233 namespace = container_id.get('namespace')
240 replicas = rtp.get('replicas')
244 scn = rtp.get('service_component_name')
246 CfyClient._logger.debug(
247 "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
252 yield (proxy_fqdn, namespace, scn, replicas, scn_port)
255 msg = "Found {} components (collectors) for cluster={}" \
256 .format(cnt_found, in_cluster_fqdn)
257 CfyClient._logger.debug(msg)
261 def iter_components(dcae_target_type, dcae_service_location='', component_type=''):
263 Iterate components that handle a given dcae_target_type.
267 dcae_target_type : string
269 dcae_service_location : string
270 Location of the component (optional)
271 component_type : string
272 Type of the component (optional)
276 A generator of tuples of component information
277 [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
279 [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
285 # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
287 k8s_svcs_tagged_with_clli = []
288 if dcae_service_location:
290 dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
291 except Exception as e:
292 msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
293 CfyClient._logger.error(msg)
294 raise CfyClientConsulError(msg)
296 k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location])
297 except Exception as e:
298 msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location])
299 CfyClient._logger.error(msg)
300 raise CfyClientConsulError(msg)
302 CfyClient.__set_cloudify_manager_client()
303 for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
304 rtp = node_instance.runtime_properties
306 # Skip this node_instance if it is not a collector
307 container_type = "docker"
308 container_id = rtp.get('container_id')
310 svc_with_my_clli_tags = ''
312 container_type = "k8s"
313 container_id = rtp.get('k8s_deployment')
315 CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
317 docker_config = rtp.get('docker_config')
318 if not docker_config:
319 CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id))
321 dti_reconfig_script = ""
322 if container_type == "docker":
323 dti_reconfig_script = rtp.get('dti_reconfig_script')
324 if not dti_reconfig_script:
325 CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id))
327 elif container_type == "k8s":
328 dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti')
329 if not dti_reconfig_script:
330 CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id))
333 scn = rtp.get('service_component_name')
337 CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
339 if container_type == "docker":
340 docker_host = rtp.get('selected_container_destination')
342 CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
344 elif container_type == "k8s":
346 srvcCatalogItem = ConsulClient.lookup_service(scn)[0]
347 scn_address = srvcCatalogItem.get("ServiceAddress")
349 CfyClient._logger.debug(
350 "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id,
353 svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags')
354 # e.g., scn="s908d92e232ed43..."
355 if not svc_with_my_clli_tags:
356 # We should not incur this burden. k8splugin should store this into runtime properties.
358 node_name = srvcCatalogItem.get("Node")
360 # e.g., node_name="zldcdyh1adce3kpma00"
361 services = ConsulClient.lookup_node(node_name).get("Services")
363 for node_svc in list(services.keys()):
364 if "_component_kubernetes_master" in node_svc:
365 # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master"
366 svc_with_my_clli_tags = node_svc
370 # ... cache results we find into runtime properties to avoid searching again
371 if svc_with_my_clli_tags:
372 CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format(
373 node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags))
374 rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags
376 "runtime_properties": rtp,
377 "state": node_instance.state,
378 "version": 1 + int(node_instance.version)
381 CfyClient._client.update_node_instance(node_instance.id, body)
385 if not svc_with_my_clli_tags:
386 CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id))
389 # get the nodeport for statefulset sidecar service
390 dti_info = rtp.get('dti_info')
392 ports = dti_info.get('ports')
394 scn_port = ports[0].split(':')[1]
395 docker_host = rtp.get('configuration',{}).get('file_content')
397 CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
400 # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
401 if dcae_service_location:
402 if container_type == "docker" and docker_host not in dockerhosts:
403 CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
404 .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location))
406 elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli:
407 CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}"
408 .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location))
411 # If DTI Event specifies component_type, then collector's service_component_type must match
413 c_component_type = rtp.get('service_component_type')
414 if component_type != c_component_type:
415 CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
418 # Check if the collector supports this VNF Type
419 # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
420 dti_key = scn + ':oti'
422 obj = ConsulClient.get_value(dti_key)
423 except Exception as e:
424 CfyClient._logger.error(
425 "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
426 .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
430 CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key))
432 obj_types = set(k.lower() for k in obj)
433 if dcae_target_type.lower() in obj_types:
434 CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type))
436 yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port )
439 CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key))
442 msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\
443 .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
444 CfyClient._logger.debug(msg)
447 def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''):
449 Iterate components that handle a given dcae_target_type to find the components of docker type
453 dcae_target_type : string
455 dcae_service_location : string
456 Location of the component (optional)
457 component_type : string
458 Type of the component (optional)
462 A generator of tuples of component information
463 [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
468 # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
471 if dcae_service_location:
473 dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
474 except Exception as e:
475 msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(
476 type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
477 CfyClient._logger.error(msg)
478 raise CfyClientConsulError(msg)
480 CfyClient.__set_cloudify_manager_client()
481 for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
482 rtp = node_instance.runtime_properties
484 # Skip this node_instance if it is not a collector
485 container_type = "docker"
486 container_id = rtp.get('container_id')
489 CfyClient._logger.debug("{} {} runtime_properties has no container_id".format(
490 node_instance.deployment_id, node_instance.id))
492 docker_config = rtp.get('docker_config')
493 if not docker_config:
494 CfyClient._logger.debug(
495 "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id,
498 dti_reconfig_script = ""
499 dti_reconfig_script = rtp.get('dti_reconfig_script')
500 if not dti_reconfig_script:
501 CfyClient._logger.debug(
502 "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id,
505 scn = rtp.get('service_component_name')
507 CfyClient._logger.debug(
508 "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
511 docker_host = rtp.get('selected_container_destination')
513 CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(
514 node_instance.deployment_id, node_instance.id))
517 # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
518 if dcae_service_location:
519 if docker_host not in dockerhosts:
520 CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
521 .format(node_instance.deployment_id, node_instance.id, docker_host,
522 dcae_service_location))
525 # If DTI Event specifies component_type, then collector's service_component_type must match
527 c_component_type = rtp.get('service_component_type')
528 if component_type != c_component_type:
529 CfyClient._logger.debug(
530 "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
533 # Check if the collector supports this VNF Type
534 # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
535 dti_key = scn + ':oti'
537 obj = ConsulClient.get_value(dti_key)
538 except Exception as e:
539 CfyClient._logger.error(
540 "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
541 .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
545 CfyClient._logger.debug(
546 "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id,
549 obj_types = set(k.lower() for k in obj)
550 if dcae_target_type.lower() in obj_types:
551 CfyClient._logger.debug(
552 "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id,
555 yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id,
556 node_instance.state, docker_host, dti_reconfig_script, container_type, '', '')
559 CfyClient._logger.debug(
560 "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id,
561 dcae_target_type, dti_key))
564 msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \
565 .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
566 CfyClient._logger.debug(msg)
570 def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"):
572 Iterate components of a specific deployment_id.
576 deployment_id : string
577 Cloudify deployment ID that created the component(s).
579 Cloudify node ID that created the component.
580 reconfig_type : string
585 A generator of tuples of component information
586 [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
588 [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
594 CfyClient.__set_cloudify_manager_client()
595 for node_instance in CfyClient._client.node_instances.list(
596 deployment_id=deployment_id,
597 _include=['id','node_id','deployment_id','state','runtime_properties']
599 if node_id and node_instance.node_id != node_id:
602 rtp = node_instance.runtime_properties
604 # Skip this node_instance if it is not a collector
605 container_type = "docker"
606 container_id = rtp.get('container_id')
608 container_type = "k8s"
609 container_id = rtp.get('k8s_deployment')
611 CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
613 reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type)
614 if not reconfig_script:
615 CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type))
617 scn = rtp.get('service_component_name')
619 CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
621 if container_type == "docker":
622 docker_host = rtp.get('selected_container_destination')
624 CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
626 elif container_type == "k8s":
627 docker_host = rtp.get('configuration',{}).get('file_content')
629 CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
632 CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type))
634 yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type)
637 msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type)
638 CfyClient._logger.debug(msg)