Add multi-cluster support 10/82510/3
authorJack Lucas <jflucas@research.att.com>
Mon, 11 Mar 2019 21:43:37 +0000 (17:43 -0400)
committerJack Lucas <jflucas@research.att.com>
Mon, 18 Mar 2019 14:00:28 +0000 (10:00 -0400)
Issue-ID: DCAEGEN2-1136
Change-Id: I314e5d8c501198b3e87c45813201498935c7bacc
Signed-off-by: Jack Lucas <jflucas@research.att.com>
k8s/ChangeLog.md
k8s/configure/configure.py
k8s/k8s-node-type.yaml
k8s/k8sclient/k8sclient.py
k8s/k8splugin/tasks.py
k8s/pom.xml
k8s/setup.py
k8s/tests/test_k8sclient.py
k8s/tests/test_tasks.py

index a59a016..4faaaf7 100644 (file)
@@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
 The format is based on [Keep a Changelog](http://keepachangelog.com/)
 and this project adheres to [Semantic Versioning](http://semver.org/).
 
+## [1.4.10]
+ Support for deploying to multiple Kubernetes clusters.
+
 ## [1.4.9]
 * Support for liveness probes (https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/)
 * fix the readiness probe to run script such as "/opt/app/snmptrap/bin/snmptrapd.sh status"
index de196c0..e15939a 100644 (file)
@@ -1,7 +1,7 @@
 # ============LICENSE_START=======================================================
 # org.onap.dcae
 # ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
 # ================================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ _CONSUL_KEY = "k8s-plugin"              # Key under which CM configuration is st
 # Default configuration values
 DCAE_NAMESPACE = "dcae"
 CONSUL_DNS_NAME = "consul"
+DEFAULT_K8S_LOCATION = "central"
 
 FB_LOG_PATH = "/var/log/onap"
 FB_DATA_PATH = "/usr/share/filebeat/data"
@@ -38,20 +39,21 @@ TLS_IMAGE = "nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.tls-init-c
 def _set_defaults():
     """ Set default configuration parameters """
     return {
-        "namespace" : DCAE_NAMESPACE,               # k8s namespace to use for DCAE
-        "consul_dns_name" : CONSUL_DNS_NAME,        # k8s internal DNS name for Consul
-        "image_pull_secrets" : [],                  # list of k8s secrets for accessing Docker registries
-        "filebeat": {                               # Configuration for setting up filebeat container
-            "log_path" : FB_LOG_PATH,               # mount point for log volume in filebeat container
-            "data_path" : FB_DATA_PATH,             # mount point for data volume in filebeat container
-            "config_path" : FB_CONFIG_PATH,         # mount point for config volume in filebeat container
-            "config_subpath" : FB_CONFIG_SUBPATH,   # subpath for config data in filebeat container
-            "config_map" : FB_CONFIG_MAP,           # ConfigMap holding the filebeat configuration
-            "image": FB_IMAGE                       # Docker image to use for filebeat
+        "namespace" : DCAE_NAMESPACE,                   # k8s namespace to use for DCAE
+        "consul_dns_name" : CONSUL_DNS_NAME,            # k8s internal DNS name for Consul
+        "default_k8s_location" : DEFAULT_K8S_LOCATION,  # default k8s location to deploy components
+        "image_pull_secrets" : [],                      # list of k8s secrets for accessing Docker registries
+        "filebeat": {                                   # Configuration for setting up filebeat container
+            "log_path" : FB_LOG_PATH,                   # mount point for log volume in filebeat container
+            "data_path" : FB_DATA_PATH,                 # mount point for data volume in filebeat container
+            "config_path" : FB_CONFIG_PATH,             # mount point for config volume in filebeat container
+            "config_subpath" : FB_CONFIG_SUBPATH,       # subpath for config data in filebeat container
+            "config_map" : FB_CONFIG_MAP,               # ConfigMap holding the filebeat configuration
+            "image": FB_IMAGE                           # Docker image to use for filebeat
         },
-        "tls": {                                    # Configuration for setting up TLS init container
-            "cert_path" : TLS_CERT_PATH,            # mount point for certificate volume in TLS init container
-            "image": TLS_IMAGE                      # Docker image to use for TLS init container
+        "tls": {                                        # Configuration for setting up TLS init container
+            "cert_path" : TLS_CERT_PATH,                # mount point for certificate volume in TLS init container
+            "image": TLS_IMAGE                          # Docker image to use for TLS init container
         }
     }
 
index c803b81..5a19e8a 100644 (file)
@@ -25,7 +25,7 @@ plugins:
   k8s:
     executor: 'central_deployment_agent'
     package_name: k8splugin
-    package_version: 1.4.9
+    package_version: 1.4.10
 
 data_types:
 
@@ -126,7 +126,6 @@ node_types:
                   Please specify "requests" property and/or a "limits" property, with subproproperties
                   for cpu and memory. (https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/)
 
-
             log_info:
               type: dcae.types.LoggingInfo
               description: >
@@ -153,6 +152,16 @@ node_types:
                 not already present on the Docker host where the container is being launched.
               default: false
 
+            location_id:
+              type: string
+              description: >
+                The identifier for the location where the component is to be deployed.
+                If absent, the plugin uses its configured default location, typically the location
+                where the plugin is running (the central site).  Also used to supply a location to
+                the DMaaP bus controller if the component is being provisioned as a publisher or
+                subscriber to a DMaaP feed or topic.
+              required: false
+
         interfaces:
             dcae.interfaces.update:
                 scale:
index d3417a7..ee4250d 100644 (file)
@@ -27,6 +27,9 @@ from kubernetes import config, client, stream
 PROBE_DEFAULT_PERIOD = 15
 PROBE_DEFAULT_TIMEOUT = 1
 
+# Location of k8s cluster config file ("kubeconfig")
+K8S_CONFIG_PATH="/opt/onap/kubeconfig"
+
 # Regular expression for interval/timeout specification
 INTERVAL_SPEC = re.compile("^([0-9]+)(s|m|h)?$")
 # Conversion factors to seconds
@@ -48,11 +51,10 @@ def _create_service_name(component_name):
 def _create_exposed_service_name(component_name):
     return ("x{0}".format(component_name))[:63]
 
-def _configure_api():
-    # Look for a kubernetes config file in ~/.kube/config
-    kubepath = os.path.join(os.environ["HOME"], '.kube/config')
-    if os.path.exists(kubepath):
-        config.load_kube_config(kubepath)
+def _configure_api(location=None):
+    # Look for a kubernetes config file
+    if os.path.exists(K8S_CONFIG_PATH):
+        config.load_kube_config(config_file=K8S_CONFIG_PATH, context=location, persist_config=False)
     else:
         # Maybe we're running in a k8s container and we can use info provided by k8s
         # We would like to use:
@@ -271,10 +273,10 @@ def _parse_volumes(volume_list):
 
     return volumes, volume_mounts
 
-def _service_exists(namespace, component_name):
+def _service_exists(location, namespace, component_name):
     exists = False
     try:
-        _configure_api()
+        _configure_api(location)
         client.CoreV1Api().read_namespaced_service(_create_service_name(component_name), namespace)
         exists = True
     except client.rest.ApiException:
@@ -282,13 +284,14 @@ def _service_exists(namespace, component_name):
 
     return exists
 
-def _patch_deployment(namespace, deployment, modify):
+def _patch_deployment(location, namespace, deployment, modify):
     '''
-    Gets the current spec for 'deployment' in 'namespace',
+    Gets the current spec for 'deployment' in 'namespace'
+    in the k8s cluster at 'location',
     uses the 'modify' function to change the spec,
     then sends the updated spec to k8s.
     '''
-    _configure_api()
+    _configure_api(location)
 
     # Get deployment spec
     spec = client.ExtensionsV1beta1Api().read_namespaced_deployment(deployment, namespace)
@@ -299,10 +302,11 @@ def _patch_deployment(namespace, deployment, modify):
     # Patch the deploy with updated spec
     client.ExtensionsV1beta1Api().patch_namespaced_deployment(deployment, namespace, spec)
 
-def _execute_command_in_pod(namespace, pod_name, command):
+def _execute_command_in_pod(location, namespace, pod_name, command):
     '''
     Execute the command (specified by an argv-style list in  the "command" parameter) in
-    the specified pod in the specified namespace.  For now at least, we use this only to
+    the specified pod in the specified namespace at the specified location.
+    For now at least, we use this only to
     run a notification script in a pod after a configuration change.
 
     The straightforward way to do this is with the V1 Core API function "connect_get_namespaced_pod_exec".
@@ -323,7 +327,7 @@ def _execute_command_in_pod(namespace, pod_name, command):
     The "stream" approach returns a string containing any output sent by the command to stdout or stderr.
     We'll return that so it can logged.
     '''
-    _configure_api()
+    _configure_api(location)
     try:
         output = stream.stream(client.CoreV1Api().connect_get_namespaced_pod_exec,
                              name=pod_name,
@@ -399,6 +403,8 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
             - timeout:  time (in seconds) to allow a probe to complete
             - endpoint: the path portion of the URL that points to the liveness endpoint for "http" and "https" types
             - path: the full path to the script to be executed in the container for "script" and "docker" types
+        - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
+
 
     '''
 
@@ -406,6 +412,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
     cip_service_created = False
     deployment_description = {
         "namespace": namespace,
+        "location" : kwargs.get("k8s_location"),
         "deployment": '',
         "services": []
     }
@@ -413,7 +420,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
     try:
 
         # Get API handles
-        _configure_api()
+        _configure_api(kwargs.get("k8s_location"))
         core = client.CoreV1Api()
         ext = client.ExtensionsV1beta1Api()
 
@@ -523,7 +530,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
     return dep, deployment_description
 
 def undeploy(deployment_description):
-    _configure_api()
+    _configure_api(deployment_description["location"])
 
     namespace = deployment_description["namespace"]
 
@@ -535,8 +542,8 @@ def undeploy(deployment_description):
     options = client.V1DeleteOptions(propagation_policy="Foreground")
     client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
 
-def is_available(namespace, component_name):
-    _configure_api()
+def is_available(location, namespace, component_name):
+    _configure_api(location)
     dep_status = client.AppsV1beta1Api().read_namespaced_deployment_status(_create_deployment_name(component_name), namespace)
     # Check if the number of available replicas is equal to the number requested and that the replicas match the current spec
     # This check can be used to verify completion of an initial deployment, a scale operation, or an update operation
@@ -549,7 +556,7 @@ def scale(deployment_description, replicas):
         spec.spec.replicas = replicas
         return spec
 
-    _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
+    _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_replica_count)
 
 def upgrade(deployment_description, image, container_index = 0):
     ''' Trigger a rolling upgrade by sending a new image name/tag to k8s '''
@@ -558,7 +565,7 @@ def upgrade(deployment_description, image, container_index = 0):
         spec.spec.template.spec.containers[container_index].image = image
         return spec
 
-    _patch_deployment(deployment_description["namespace"], deployment_description["deployment"], update_image)
+    _patch_deployment(deployment_description["location"], deployment_description["namespace"], deployment_description["deployment"], update_image)
 
 def rollback(deployment_description, rollback_to=0):
     '''
@@ -576,7 +583,7 @@ def rollback(deployment_description, rollback_to=0):
        - https://github.com/kubernetes/kubernetes/pull/63837
     The fix has been merged into the master branch but is not in the latest release.
     '''
-    _configure_api()
+    _configure_api(deployment_description["location"])
     deployment = deployment_description["deployment"]
     namespace = deployment_description["namespace"]
 
@@ -620,8 +627,8 @@ def execute_command_in_deployment(deployment_description, command):
     the pod that has the k8s deployment name.  To list the pods, the code below queries for
     pods with the label carrying the deployment name.
     '''
-
-    _configure_api()
+    location = deployment_description["location"]
+    _configure_api(location)
     deployment = deployment_description["deployment"]
     namespace = deployment_description["namespace"]
 
@@ -633,7 +640,7 @@ def execute_command_in_deployment(deployment_description, command):
     ).items]
 
     def do_execute(pod_name):
-        return _execute_command_in_pod(namespace, pod_name, command)
+        return _execute_command_in_pod(location, namespace, pod_name, command)
 
     # Execute command in the running pods
     return map(do_execute, pod_names)
index 399bc9f..fdb00c5 100644 (file)
@@ -43,6 +43,7 @@ CONSUL_HOST = plugin_conf.get("consul_host")
 CONSUL_INTERNAL_NAME = plugin_conf.get("consul_dns_name")
 DCAE_NAMESPACE = plugin_conf.get("namespace")
 DEFAULT_MAX_WAIT = plugin_conf.get("max_wait", 1800)
+DEFAULT_K8S_LOCATION = plugin_conf.get("default_k8s_location")
 
 # Used to construct delivery urls for data router subscribers. Data router in FTL
 # requires https but this author believes that ONAP is to be defaulted to http.
@@ -54,6 +55,7 @@ CONTAINER_ID = "container_id"
 APPLICATION_CONFIG = "application_config"
 K8S_DEPLOYMENT = "k8s_deployment"
 RESOURCE_KW = "resource_config"
+LOCATION_ID = "location_id"
 
 # Utility methods
 
@@ -106,6 +108,11 @@ def _get_resources(**kwargs):
     ctx.logger.info("set resources to None")
     return None
 
+def  _get_location():
+    ''' Get the k8s location property.  Set to the default if the property is missing, None, or zero-length '''
+    return ctx.node.properties["location_id"] if "location_id" in ctx.node.properties and ctx.node.properties["location_id"] \
+        else DEFAULT_K8S_LOCATION
+
 @merge_inputs_for_create
 @monkeypatch_loggers
 @Policies.gather_policies_to_node()
@@ -238,11 +245,13 @@ def _lookup_service(service_component_name, consul_host=CONSUL_HOST,
     else:
         return results[0]["ServiceAddress"]
 
-def _verify_k8s_deployment(service_component_name, max_wait):
+def _verify_k8s_deployment(location, service_component_name, max_wait):
     """Verify that the k8s Deployment is ready
 
     Args:
     -----
+    location (string): location of the k8s cluster where the component was deployed
+    service_component_name: component's service component name
     max_wait (integer): limit to how may attempts to make which translates to
         seconds because each sleep is one second. 0 means infinite.
 
@@ -253,7 +262,7 @@ def _verify_k8s_deployment(service_component_name, max_wait):
     num_attempts = 1
 
     while True:
-        if k8sclient.is_available(DCAE_NAMESPACE, service_component_name):
+        if k8sclient.is_available(location, DCAE_NAMESPACE, service_component_name):
             return True
         else:
             num_attempts += 1
@@ -287,6 +296,7 @@ def _create_and_start_container(container_name, image, **kwargs):
         - replicas: number of replicas to be launched initially
         - readiness: object with information needed to create a readiness check
         - liveness: object with information needed to create a liveness check
+        - k8s_location: name of the Kubernetes location (cluster) where the component is to be deployed
     '''
     env = { "CONSUL_HOST": CONSUL_INTERNAL_NAME,
             "CONFIG_BINDING_SERVICE": "config-binding-service" }
@@ -310,7 +320,8 @@ def _create_and_start_container(container_name, image, **kwargs):
                      labels = kwargs.get("labels", {}),
                      log_info=kwargs.get("log_info"),
                      readiness=kwargs.get("readiness"),
-                     liveness=kwargs.get("liveness"))
+                     liveness=kwargs.get("liveness"),
+                     k8s_location=kwargs.get("k8s_location"))
 
     # Capture the result of deployment for future use
     ctx.instance.runtime_properties[K8S_DEPLOYMENT] = dep
@@ -333,7 +344,7 @@ def _parse_cloudify_context(**kwargs):
         "cfynodeinstance": ctx.instance.id[:63]
     }
 
-        # Pick up the centralized logging info
+    # Pick up the centralized logging info
     if "log_info" in ctx.node.properties and "log_directory" in ctx.node.properties["log_info"]:
         kwargs["log_info"] = ctx.node.properties["log_info"]
 
@@ -347,6 +358,9 @@ def _parse_cloudify_context(**kwargs):
     if "always_pull_image" in ctx.node.properties:
         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
 
+    # Pick up location
+    kwargs["k8s_location"] = _get_location()
+
     return kwargs
 
 def _enhance_docker_params(**kwargs):
@@ -402,7 +416,8 @@ def _create_and_start_component(**kwargs):
         "labels": kwargs.get("labels", {}),
         "resource_config": kwargs.get("resource_config",{}),
         "readiness": kwargs.get("readiness",{}),
-        "liveness": kwargs.get("liveness",{})}
+        "liveness": kwargs.get("liveness",{}),
+        "k8s_location": kwargs.get("k8s_location")}
     returned_args = _create_and_start_container(service_component_name, image, **sub_kwargs)
     kwargs[K8S_DEPLOYMENT] = returned_args[K8S_DEPLOYMENT]
 
@@ -415,7 +430,7 @@ def _verify_component(**kwargs):
     max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
     ctx.logger.info("Waiting up to {0} secs for {1} to become ready".format(max_wait, service_component_name))
 
-    if _verify_k8s_deployment(service_component_name, max_wait):
+    if _verify_k8s_deployment(kwargs.get("k8s_location"), service_component_name, max_wait):
         ctx.logger.info("k8s deployment is ready for: {0}".format(service_component_name))
     else:
         # The component did not become ready within the "max_wait" interval.
@@ -569,6 +584,10 @@ def create_and_start_container_for_platforms(**kwargs):
         kwargs["replicas"] = ctx.node.properties["replicas"]
     if "always_pull_image" in ctx.node.properties:
         kwargs["always_pull_image"] = ctx.node.properties["always_pull_image"]
+
+    # Pick up location
+    kwargs["k8s_location"] = _get_location()
+
     returned_args = _create_and_start_container(service_component_name, image, **kwargs)
 
     # Verify that the k8s deployment is ready
@@ -586,6 +605,7 @@ def create_and_start_container(**kwargs):
     ctx.instance.runtime_properties[SERVICE_COMPONENT_NAME] = service_component_name
 
     image = ctx.node.properties["image"]
+    kwargs["k8s_location"] = _get_location()
 
     _create_and_start_container(service_component_name, image,**kwargs)
 
@@ -624,7 +644,7 @@ def scale(replicas, **kwargs):
         # Verify that the scaling took place as expected
         max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
         ctx.logger.info("Waiting up to {0} secs for {1} to scale and become ready".format(max_wait, service_component_name))
-        if _verify_k8s_deployment(service_component_name, max_wait):
+        if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
             ctx.logger.info("Scaling complete: {0} from {1} to {2} replica(s)".format(service_component_name, current_replicas, replicas))
 
     else:
@@ -647,7 +667,7 @@ def update_image(image, **kwargs):
         # Verify that the update took place as expected
         max_wait = kwargs.get("max_wait", DEFAULT_MAX_WAIT)
         ctx.logger.info("Waiting up to {0} secs for {1} to be updated and become ready".format(max_wait, service_component_name))
-        if _verify_k8s_deployment(service_component_name, max_wait):
+        if _verify_k8s_deployment(deployment_description["location"], service_component_name, max_wait):
             ctx.logger.info("Update complete: {0} from {1} to {2}".format(service_component_name, current_image, image))
 
     else:
index 3bde42e..22db83c 100644 (file)
@@ -28,7 +28,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
   <groupId>org.onap.dcaegen2.platform.plugins</groupId>
   <artifactId>k8s</artifactId>
   <name>k8s-plugin</name>
-  <version>1.4.8-SNAPSHOT</version>
+  <version>1.4.10-SNAPSHOT</version>
   <url>http://maven.apache.org</url>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
index a64efc8..12f1f5e 100644 (file)
@@ -23,7 +23,7 @@ from setuptools import setup
 setup(
     name='k8splugin',
     description='Cloudify plugin for containerized components deployed using Kubernetes',
-    version="1.4.9",
+    version="1.4.10",
     author='J. F. Lucas, Michael Hwang, Tommy Carpenter',
     packages=['k8splugin','k8sclient','msb','configure'],
     zip_safe=False,
index 2511239..43939ad 100644 (file)
@@ -1,7 +1,7 @@
 # ============LICENSE_START=======================================================
 # org.onap.dcae
 # ================================================================================
-# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2018-2019 AT&T Intellectual Property. All rights reserved.
 # ================================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -205,7 +205,7 @@ def test_deploy(monkeypatch):
         monkeypatch.setattr(ext,"create_namespaced_deployment", pseudo_deploy)
         return ext
 
-    def pseudo_configure():
+    def pseudo_configure(loc):
         pass
 
     monkeypatch.setattr(k8sclient.k8sclient,"_configure_api", pseudo_configure)
index cf78860..d56a443 100644 (file)
@@ -1,7 +1,7 @@
 # ============LICENSE_START=======================================================
 # org.onap.dcae
 # ================================================================================
-# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
+# Copyright (c) 2017-2019 AT&T Intellectual Property. All rights reserved.
 # ================================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -194,21 +194,21 @@ def test_verify_container(monkeypatch, mockconfig):
     from k8splugin import tasks
     from k8splugin.exceptions import DockerPluginDeploymentError
 
-    def fake_is_available_success(ch, scn):
+    def fake_is_available_success(loc, ch, scn):
         return True
 
     monkeypatch.setattr(k8sclient, "is_available",
             fake_is_available_success)
 
-    assert tasks._verify_k8s_deployment("some-name", 3)
+    assert tasks._verify_k8s_deployment("some-location","some-name", 3)
 
-    def fake_is_available_never_good(ch, scn):
+    def fake_is_available_never_good(loc, ch, scn):
         return False
 
     monkeypatch.setattr(k8sclient, "is_available",
             fake_is_available_never_good)
 
-    assert not tasks._verify_k8s_deployment("some-name", 2)
+    assert not tasks._verify_k8s_deployment("some-location", "some-name", 2)
 
 
 def test_update_delivery_url(monkeypatch, mockconfig):