Merge "Initial delivery of helm charts to deploy mod2 components. Resolved all the...
[dcaegen2/platform.git] / oti / event-handler / otihandler / cfy_client.py
1 # ================================================================================
2 # Copyright (c) 2019-2020 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16
17 """Our client interface to Cloudify"""
18 import base64
19 import copy
20 import json
21 import logging
22 import os
23
24 import requests
25
26 from otihandler.consul_client import ConsulClient
27
28
29 class CfyClientConsulError(RuntimeError):
30     pass
31
32
33 class CloudifyClient(object):
34     """quick replacement for cloudify_rest_client -- this supports pagination and scans all DCAE tenants"""
35
36     def __init__(self, **kwargs):
37         self._protocol = kwargs.get('protocol', 'http')
38         self._host = kwargs.get('host')
39         self._port = kwargs.get('port')
40         self._headers = kwargs.get('headers')
41
42         self.node_instances = self
43
44     def list(self, **kwargs):
45         url_mask = "{}://{}:{}/api/v3.1/tenants".format(self._protocol, self._host, self._port)
46         # s = Session()
47         # req = Request('GET', url_mask, headers=self._headers)
48         # prepped = req.prepare()
49         # response = s.send(prepped,verify=False,timeout=30)
50         response = requests.get(url_mask, headers=self._headers, timeout=30)
51         obj = response.json()
52         tenants = [x["name"] for x in obj["items"]]
53         tenants_with_containers = [x for x in tenants if 'DCAE' in x]
54
55         size = 1000
56         url_mask = "{}://{}:{}/api/v3.1/node-instances?_size={}&_offset={}".format(
57                    self._protocol, self._host, self._port, size, "{}")
58         if kwargs:
59             for (key,val) in kwargs.items():
60                 if isinstance(val, str):
61                     url_mask = url_mask + '&{}={}'.format(key, val)
62                 elif isinstance(val, list):
63                     url_mask = url_mask + '&{}={}'.format(key, ','.join(val))
64
65         for tenant in tenants_with_containers:
66             self._headers_with_tenant = copy.deepcopy(self._headers)
67             self._headers_with_tenant['Tenant'] = tenant
68
69             offset = 0
70             total = 1
71             while offset < total:
72                 # s = Session()
73                 # req = Request('GET', url_mask.format(offset), headers=self._headers_with_tenant)
74                 # prepped = req.prepare()
75                 # response = s.send(prepped, verify=False, timeout=30)
76                 response = requests.get(url_mask.format(offset), headers=self._headers_with_tenant, timeout=30)
77                 response.raise_for_status()
78                 obj = response.json()
79                 offset = offset + len(obj["items"])
80                 total = obj["metadata"]["pagination"]["total"]
81                 for item in obj["items"]:
82                     yield NodeInstance(item)
83
84     def update_node_instance(self, node_instance_id, body, **kwargs):
85         headers = copy.deepcopy(self._headers_with_tenant)
86         headers['Content-Type'] = "application/json"
87         url_mask = "{}://{}:{}/api/v3.1/node-instances/{}".format(
88                    self._protocol, self._host, self._port, node_instance_id)
89         response = requests.patch(url_mask, json=body, headers=headers, timeout=30)
90         obj = response.json()
91         return obj
92
93
94 class NodeInstance(object):
95     """quick replacement for cloudify_rest_client"""
96
97     def __init__(self, instance):
98         self.id = instance.get("id")
99         self.deployment_id = instance.get("deployment_id")
100         self.host_id = instance.get("host_id")
101         self.runtime_properties = instance.get("runtime_properties")
102         self.relationships = instance.get("relationships")
103         self.state = instance.get("state")
104         self.version = instance.get("version")
105         self.node_id = instance.get("node_id")
106         self.scaling_groups = instance.get("scaling_groups")
107
108
109 class CfyClient(object):
110     _logger = logging.getLogger("oti_handler.cfy_client")
111     _client = None
112
113
114     @staticmethod
115     def __set_cloudify_manager_client():
116         """Create connection to Cloudify_Manager."""
117
118         if CfyClient._client:
119             return
120
121         host = None
122         port = None
123         obj = json.loads(os.environ.get("CLOUDIFY", "{}")).get("cloudify")
124         source = "CLOUDIFY environment variable"
125         if not obj:
126             CM_KEY = 'cloudify_manager'
127             source = "Consul key '{}'".format(CM_KEY)
128
129             try:
130                 results = ConsulClient.lookup_service(CM_KEY)
131             except Exception as e:
132                 msg = "Unexpected exception {}: {!s} from ConsulClient.lookup_service({})".format(type(e).__name__, e, CM_KEY)
133                 CfyClient._logger.error(msg)
134                 raise CfyClientConsulError(msg)
135             result = results[0]
136             host = result['ServiceAddress']
137             port = result['ServicePort']
138
139             try:
140                 obj = ConsulClient.get_value(CM_KEY)
141             except Exception as e:
142                 msg = "Unexpected exception {}: {!s} from ConsulClient.get_value({})".format(type(e).__name__, e, CM_KEY)
143                 CfyClient._logger.error(msg)
144                 raise CfyClientConsulError(msg)
145             if not obj:
146                 raise CfyClientConsulError("{} value is empty or invalid".format(source))
147
148             obj = obj.get('cloudify')
149
150         if not obj:
151             raise CfyClientConsulError("{} value is missing 'cloudify' key or value".format(source))
152
153         host = obj.get('address', host)
154         if not host:
155             raise CfyClientConsulError("{} value is missing 'cloudify.address'".format(source))
156
157         port = obj.get('port', port)
158         if not port:
159             raise CfyClientConsulError("{} value is missing 'cloudify.port'".format(source))
160
161         protocol = obj.get('protocol')
162         if not protocol:
163             raise CfyClientConsulError("{} value is missing 'cloudify.protocol'".format(source))
164         username = obj.get('user')
165         if not username:
166             raise CfyClientConsulError("{} value is missing 'cloudify.user'".format(source))
167         password = obj.get('password')
168         if not password:
169             raise CfyClientConsulError("{} value is missing 'cloudify.password'".format(source))
170
171         b64_encoded_str = base64.b64encode(bytes("{}:{}".format(username, password), "utf-8")).decode("utf-8")
172         headers = {'Authorization': 'Basic ' + b64_encoded_str.rstrip('\n')}
173         #headers = {'Authorization': 'Basic ' + '{}:{}'.format(username, password).encode("base64").rstrip('\n')}
174         
175         CfyClient._client = CloudifyClient(host=host, port=port, protocol=protocol, headers=headers)
176     
177
178     @staticmethod
179     def query_k8_components(in_cluster_fqdn):
180         """
181         Iterate components that belong to a cluster fqdn.
182
183         Parameters
184         ----------
185         in_cluster_fqdn : string
186             k8s cluster FQDN
187
188         Returns
189         -------
190         A generator of tuples of component information
191             [ (proxy_fqdn, namespace, scn, replicas, scn_port), ... ]
192         """
193
194         cnt_found = 0
195         CfyClient.__set_cloudify_manager_client()
196         for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
197             rtp = node_instance.runtime_properties
198             scn_port = None
199             cluster_fqdn = None
200             proxy_fqdn = None
201             dti_info = rtp.get('dti_info')
202             if dti_info:
203                 env_items = dti_info.get('env')
204                 for env in env_items:
205                     if env.get("name") == 'KUBE_CLUSTER_FQDN':
206                         cluster_fqdn = env.get("value")
207                     if env.get("name") == 'KUBE_PROXY_FQDN':
208                         proxy_fqdn = env.get("value")
209                 ports = dti_info.get('ports')
210                 if ports:
211                     scn_port = ports[0].split(':')[0]
212             else:
213                 continue
214
215             if in_cluster_fqdn != cluster_fqdn:
216                 continue
217
218             controller_type = rtp.get('k8s_controller_type')
219             if not controller_type:
220                 CfyClient._logger.debug("controller type is missing")
221                 continue
222             elif controller_type != "statefulset":
223                 CfyClient._logger.debug("not a stateful set")
224                 continue
225
226             container_id = rtp.get('k8s_deployment')
227             if not container_id:
228                 CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(
229                     node_instance.deployment_id, node_instance.id))
230                 continue
231
232             try:
233                 namespace = container_id.get('namespace')
234             except:
235                 namespace = ''
236                 pass
237
238             replicas = 1
239             try:
240                 replicas = rtp.get('replicas')
241             except:
242                 pass
243
244             scn = rtp.get('service_component_name')
245             if not scn:
246                 CfyClient._logger.debug(
247                     "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
248                                                                                     node_instance.id))
249                 continue
250
251             cnt_found += 1
252             yield (proxy_fqdn, namespace, scn, replicas, scn_port)
253             continue
254
255         msg = "Found {} components (collectors) for cluster={}" \
256             .format(cnt_found, in_cluster_fqdn)
257         CfyClient._logger.debug(msg)
258
259
260     @staticmethod
261     def iter_components(dcae_target_type, dcae_service_location='', component_type=''):
262         """
263         Iterate components that handle a given dcae_target_type.
264     
265         Parameters
266         ----------
267         dcae_target_type : string
268             VNF Type
269         dcae_service_location : string
270             Location of the component (optional)
271         component_type : string
272             Type of the component (optional)
273     
274         Returns
275         -------
276         A generator of tuples of component information
277            [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
278         or
279            [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
280
281         """
282     
283         cnt_found = 0
284     
285         # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
286         dockerhosts = []
287         k8s_svcs_tagged_with_clli = []
288         if dcae_service_location:
289             try:
290                 dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
291             except Exception as e:
292                 msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
293                 CfyClient._logger.error(msg)
294                 raise CfyClientConsulError(msg)
295             try:
296                 k8s_svcs_tagged_with_clli = ConsulClient.search_services("_component_kubernetes_master", [dcae_service_location])
297             except Exception as e:
298                 msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(type(e).__name__, e, "_component_kubernetes_master", [dcae_service_location])
299                 CfyClient._logger.error(msg)
300                 raise CfyClientConsulError(msg)
301     
302         CfyClient.__set_cloudify_manager_client()
303         for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
304             rtp = node_instance.runtime_properties
305
306             # Skip this node_instance if it is not a collector
307             container_type = "docker"
308             container_id = rtp.get('container_id')
309             docker_host = ''
310             svc_with_my_clli_tags = ''
311             if not container_id:
312                 container_type = "k8s"
313                 container_id = rtp.get('k8s_deployment')
314                 if not container_id:
315                     CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
316                     continue
317             docker_config = rtp.get('docker_config')
318             if not docker_config:
319                 CfyClient._logger.debug("{} {} runtime_properties has no docker_config".format(node_instance.deployment_id, node_instance.id))
320                 continue
321             dti_reconfig_script = ""
322             if container_type == "docker":
323                 dti_reconfig_script = rtp.get('dti_reconfig_script')
324                 if not dti_reconfig_script:
325                     CfyClient._logger.debug("{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id, node_instance.id))
326                     continue
327             elif container_type == "k8s":
328                 dti_reconfig_script = docker_config.get('reconfigs',{}).get('dti')
329                 if not dti_reconfig_script:
330                     CfyClient._logger.debug("{} {} runtime_properties docker_config has no reconfigs.dti".format(node_instance.deployment_id, node_instance.id))
331                     continue
332
333             scn = rtp.get('service_component_name')
334             scn_address = None
335             scn_port = None
336             if not scn:
337                 CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
338                 continue
339             if container_type == "docker":
340                 docker_host = rtp.get('selected_container_destination')
341                 if not docker_host:
342                     CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
343                     continue
344             elif container_type == "k8s":
345                 try:
346                     srvcCatalogItem = ConsulClient.lookup_service(scn)[0]
347                     scn_address = srvcCatalogItem.get("ServiceAddress")
348                 except:
349                     CfyClient._logger.debug(
350                         "{} {} runtime_properties has no consul svc catalog registry".format(node_instance.deployment_id,
351                                                                                        node_instance.id))
352                     continue
353                 svc_with_my_clli_tags = rtp.get('svc_with_my_clli_tags')
354                  # e.g., scn="s908d92e232ed43..."
355                 if not svc_with_my_clli_tags:
356                     # We should not incur this burden.  k8splugin should store this into runtime properties.
357                     try:
358                         node_name = srvcCatalogItem.get("Node")
359                         if node_name:
360                             # e.g., node_name="zldcdyh1adce3kpma00"
361                             services = ConsulClient.lookup_node(node_name).get("Services")
362                             if services:
363                                 for node_svc in list(services.keys()):
364                                     if "_component_kubernetes_master" in node_svc:
365                                         # e.g., node_svc="zldcdyh1adce3_kp_component_kubernetes_master"
366                                         svc_with_my_clli_tags = node_svc
367                                         break
368                     except:
369                         pass
370                     # ... cache results we find into runtime properties to avoid searching again
371                     if svc_with_my_clli_tags:
372                         CfyClient._logger.debug("{} {} storing runtime property svc_with_my_clli_tags={}".format(
373                             node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags))
374                         rtp['svc_with_my_clli_tags'] = svc_with_my_clli_tags
375                         body = {
376                                  "runtime_properties": rtp,
377                                  "state": node_instance.state,
378                                  "version": 1 + int(node_instance.version)
379                                }
380                         try:
381                             CfyClient._client.update_node_instance(node_instance.id, body)
382                         except:
383                             pass
384
385                 if not svc_with_my_clli_tags:
386                     CfyClient._logger.debug("{} {} runtime_properties has no svc_with_my_clli_tags".format(node_instance.deployment_id, node_instance.id))
387                     continue
388
389                 # get the nodeport for statefulset sidecar service
390                 dti_info = rtp.get('dti_info')
391                 if dti_info:
392                     ports = dti_info.get('ports')
393                     if ports:
394                         scn_port = ports[0].split(':')[1]
395                 docker_host = rtp.get('configuration',{}).get('file_content')
396                 if not docker_host:
397                     CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
398                     continue
399     
400             # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
401             if dcae_service_location:
402                 if container_type == "docker" and docker_host not in dockerhosts:
403                     CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
404                         .format(node_instance.deployment_id, node_instance.id, docker_host, dcae_service_location))
405                     continue
406                 elif container_type == "k8s" and svc_with_my_clli_tags not in k8s_svcs_tagged_with_clli:
407                     CfyClient._logger.debug("{} {} svc_with_my_clli_tags {} is not TAGged with DTI Event dcae_service_location {}"
408                         .format(node_instance.deployment_id, node_instance.id, svc_with_my_clli_tags, dcae_service_location))
409                     continue
410     
411             # If DTI Event specifies component_type, then collector's service_component_type must match
412             if component_type:
413                 c_component_type = rtp.get('service_component_type')
414                 if component_type != c_component_type:
415                     CfyClient._logger.debug("{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
416                     continue
417     
418             # Check if the collector supports this VNF Type
419             # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
420             dti_key = scn + ':oti'
421             try:
422                 obj = ConsulClient.get_value(dti_key)
423             except Exception as e:
424                 CfyClient._logger.error(
425                     "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
426                     .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
427                 )
428                 continue
429             if not obj:
430                 CfyClient._logger.debug("{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id, dti_key))
431                 continue
432             obj_types = set(k.lower() for k in obj)
433             if dcae_target_type.lower() in obj_types:
434                 CfyClient._logger.debug("{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id, dcae_target_type))
435                 cnt_found += 1
436                 yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, dti_reconfig_script, container_type, scn_address, scn_port )
437                 continue
438             else:
439                 CfyClient._logger.debug("{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id, dcae_target_type, dti_key))
440                 continue
441     
442         msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}"\
443               .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
444         CfyClient._logger.debug(msg)
445
446     @staticmethod
447     def iter_components_for_docker(dcae_target_type, dcae_service_location='', component_type=''):
448         """
449         Iterate components that handle a given dcae_target_type to find the components of docker type
450
451         Parameters
452         ----------
453         dcae_target_type : string
454             VNF Type
455         dcae_service_location : string
456             Location of the component (optional)
457         component_type : string
458             Type of the component (optional)
459
460         Returns
461         -------
462         A generator of tuples of component information
463            [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
464
465         """
466
467         cnt_found = 0
468         # get dockerhost and kubernetes_master services that are TAGged for the dcae_service_location (CLLI)
469         dockerhosts = []
470
471         if dcae_service_location:
472             try:
473                 dockerhosts = ConsulClient.search_services("-component-dockerhost-", [dcae_service_location])
474             except Exception as e:
475                 msg = "Unexpected exception {}: {!s} from ConsulClient.search_services({}, {!s})".format(
476                     type(e).__name__, e, "-component-dockerhost-", [dcae_service_location])
477                 CfyClient._logger.error(msg)
478                 raise CfyClientConsulError(msg)
479
480         CfyClient.__set_cloudify_manager_client()
481         for node_instance in CfyClient._client.node_instances.list(_sort="deployment_id"):
482             rtp = node_instance.runtime_properties
483
484             # Skip this node_instance if it is not a collector
485             container_type = "docker"
486             container_id = rtp.get('container_id')
487             if not container_id:
488                 if not container_id:
489                     CfyClient._logger.debug("{} {} runtime_properties has no container_id".format(
490                         node_instance.deployment_id, node_instance.id))
491                     continue
492             docker_config = rtp.get('docker_config')
493             if not docker_config:
494                 CfyClient._logger.debug(
495                     "{} {} runtime_properties has no docker_config".format(node_instance.deployment_id,
496                                                                            node_instance.id))
497                 continue
498             dti_reconfig_script = ""
499             dti_reconfig_script = rtp.get('dti_reconfig_script')
500             if not dti_reconfig_script:
501                 CfyClient._logger.debug(
502                     "{} {} runtime_properties has no dti_reconfig_script".format(node_instance.deployment_id,
503                                                                                  node_instance.id))
504                 continue
505             scn = rtp.get('service_component_name')
506             if not scn:
507                 CfyClient._logger.debug(
508                     "{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id,
509                                                                                     node_instance.id))
510                 continue
511             docker_host = rtp.get('selected_container_destination')
512             if not docker_host:
513                 CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(
514                     node_instance.deployment_id, node_instance.id))
515                 continue
516
517             # If DTI Event specifies dcae_service_location, then collector's dockerhost service in Consul must have that TAG
518             if dcae_service_location:
519                 if docker_host not in dockerhosts:
520                     CfyClient._logger.debug("{} {} dockerhost {} is not TAGged with DTI Event dcae_service_location {}"
521                                             .format(node_instance.deployment_id, node_instance.id, docker_host,
522                                                     dcae_service_location))
523                     continue
524
525             # If DTI Event specifies component_type, then collector's service_component_type must match
526             if component_type:
527                 c_component_type = rtp.get('service_component_type')
528                 if component_type != c_component_type:
529                     CfyClient._logger.debug(
530                         "{} {} component_types don't match".format(node_instance.deployment_id, node_instance.id))
531                     continue
532
533             # Check if the collector supports this VNF Type
534             # scn:oti Consul key is authoritative for vnfTypes that a collector supports (not docker_config)
535             dti_key = scn + ':oti'
536             try:
537                 obj = ConsulClient.get_value(dti_key)
538             except Exception as e:
539                 CfyClient._logger.error(
540                     "Unexpected exception {}: {!s} from ConsulClient.get_value({}) for {} {}"
541                         .format(type(e).__name__, e, dti_key, node_instance.deployment_id, node_instance.id)
542                 )
543                 continue
544             if not obj:
545                 CfyClient._logger.debug(
546                     "{} {} Consul key '{}' is empty or invalid".format(node_instance.deployment_id, node_instance.id,
547                                                                        dti_key))
548                 continue
549             obj_types = set(k.lower() for k in obj)
550             if dcae_target_type.lower() in obj_types:
551                 CfyClient._logger.debug(
552                     "{} {} is a valid collector for VNF Type {}".format(node_instance.deployment_id, node_instance.id,
553                                                                         dcae_target_type))
554                 cnt_found += 1
555                 yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id,
556                        node_instance.state, docker_host, dti_reconfig_script, container_type, '', '')
557                 continue
558             else:
559                 CfyClient._logger.debug(
560                     "{} {} VNF Type {} is not in Consul key '{}'".format(node_instance.deployment_id, node_instance.id,
561                                                                          dcae_target_type, dti_key))
562                 continue
563
564         msg = "Found {} components (collectors) for dcae_target_type={}, dcae_service_location={}, component_type={}" \
565             .format(cnt_found, dcae_target_type, dcae_service_location, component_type)
566         CfyClient._logger.debug(msg)
567
568
569     @staticmethod
570     def iter_components_of_deployment(deployment_id, node_id=None, reconfig_type="app"):
571         """
572         Iterate components of a specific deployment_id.
573
574         Parameters
575         ----------
576         deployment_id : string
577             Cloudify deployment ID that created the component(s).
578         node_id : string
579             Cloudify node ID that created the component.
580         reconfig_type : string
581             "app"
582     
583         Returns
584         -------
585         A generator of tuples of component information
586            [ (scn, deployment_id, container_id, node_id, node_instance_id, node_instance_state, docker_host, reconfig_script, "docker"), ... ]
587         or
588            [ (scn, deployment_id, k8s_deployment, node_id, node_instance_id, node_instance_state, config_content, reconfig_script, "k8s"), ... ]
589
590         """
591
592         cnt_found = 0
593
594         CfyClient.__set_cloudify_manager_client()
595         for node_instance in CfyClient._client.node_instances.list(
596                 deployment_id=deployment_id,
597                 _include=['id','node_id','deployment_id','state','runtime_properties']
598             ):
599             if node_id and node_instance.node_id != node_id:
600                 continue
601
602             rtp = node_instance.runtime_properties
603     
604             # Skip this node_instance if it is not a collector
605             container_type = "docker"
606             container_id = rtp.get('container_id')
607             if not container_id:
608                 container_type = "k8s"
609                 container_id = rtp.get('k8s_deployment')
610                 if not container_id:
611                     CfyClient._logger.debug("{} {} runtime_properties has no container_id or k8s_deployment".format(node_instance.deployment_id, node_instance.id))
612                     continue
613             reconfig_script = rtp.get('docker_config',{}).get('reconfigs',{}).get(reconfig_type)
614             if not reconfig_script:
615                 CfyClient._logger.debug("{} {} runtime_properties has no docker_config.reconfigs.{}".format(node_instance.deployment_id, node_instance.id, reconfig_type))
616                 continue
617             scn = rtp.get('service_component_name')
618             if not scn:
619                 CfyClient._logger.debug("{} {} runtime_properties has no service_component_name".format(node_instance.deployment_id, node_instance.id))
620                 continue
621             if container_type == "docker":
622                 docker_host = rtp.get('selected_container_destination')
623                 if not docker_host:
624                     CfyClient._logger.debug("{} {} runtime_properties has no selected_container_destination".format(node_instance.deployment_id, node_instance.id))
625                     continue
626             elif container_type == "k8s":
627                 docker_host = rtp.get('configuration',{}).get('file_content')
628                 if not docker_host:
629                     CfyClient._logger.debug("{} {} runtime_properties has no configuration.file_content".format(node_instance.deployment_id, node_instance.id))
630                     continue
631     
632             CfyClient._logger.debug("{} {} is a {}-reconfigurable component".format(node_instance.deployment_id, node_instance.id, reconfig_type))
633             cnt_found += 1
634             yield (scn, node_instance.deployment_id, container_id, node_instance.node_id, node_instance.id, node_instance.state, docker_host, reconfig_script, container_type)
635             continue
636     
637         msg = "Found {} {}-reconfigurable components".format(cnt_found, reconfig_type)
638         CfyClient._logger.debug(msg)