Move DR subscriber creation before container start 66/91566/2 elalto 5.0.1-ONAP 5.0.2-ONAP
authorJack Lucas <jflucas@research.att.com>
Wed, 3 Jul 2019 15:13:25 +0000 (11:13 -0400)
committerJack Lucas <jflucas@research.att.com>
Wed, 17 Jul 2019 14:07:18 +0000 (10:07 -0400)
Also update Kubernetes client library version
Fix bug that allowed generating a too-long k8s deployment name.

Issue-ID: DCAEGEN2-1651
Issue-ID: DCAEGEN2-1653
Issue-ID: DCAEGEN2-1667
Change-Id: Ied859073fb01d8623278cf9e58c1dcc26fed1712
Signed-off-by: Jack Lucas <jflucas@research.att.com>
k8s/k8s-node-type.yaml
k8s/k8sclient/k8sclient.py
k8s/k8splugin/__init__.py
k8s/k8splugin/tasks.py
k8s/pom.xml
k8s/requirements.txt
k8s/setup.py
k8s/tests/test_tasks.py

index c32d834..86fdb2e 100644 (file)
@@ -22,7 +22,7 @@ plugins:
   k8s:
     executor: 'central_deployment_agent'
     package_name: k8splugin
-    package_version: 1.5.0
+    package_version: 1.6.0
 
 data_types:
 
@@ -94,7 +94,7 @@ data_types:
 
 node_types:
     dcae.nodes.ContainerizedComponent:
-    # Bese type for all containerized components
+    # Base type for all containerized components
     # Captures common properties and interfaces
         derived_from: cloudify.nodes.Root
         properties:
@@ -112,7 +112,7 @@ node_types:
             docker_config:
                 default: {}
                 description: >
-                  This is what is the auxilary portion of the component spec that contains things
+                  Copied from the auxiliary portion of the component spec that contains things
                   like healthcheck definitions for the Docker component. Health checks are
                   optional.
 
@@ -202,9 +202,9 @@ node_types:
                 type: string
                 description: >
                     Manually override and set the name for this Docker container node. If this
-                    is set, then the name will not be auto-generated. Platform services are the
-                    specific use cases for using this parameter because they have static
-                    names for example the CDAP broker.
+                    is set, then the name will not be auto-generated.  Using this feature provides
+                    a service component with a fixed name that's known in advance, but care must be taken
+                    to avoid attempting to deploy two components with the same name.
                 default: Null
 
         interfaces:
@@ -278,11 +278,8 @@ node_types:
         interfaces:
             cloudify.interfaces.lifecycle:
                 create:
-                    # Generate service component name and populate config into Consul
+                    # Generate service component name, populate config into Consul, set up runtime properties for DMaaP plugin
                     implementation: k8s.k8splugin.create_for_components_with_streams
-                start:
-                    # Create Docker container and start
-                    implementation: k8s.k8splugin.create_and_start_container_for_components_with_streams
 
     # ContainerizedPlatformComponent is intended for DCAE platform services.  Unlike the components,
     # platform services have well-known names and well-known ports.
index 273b9f3..9a01536 100644 (file)
@@ -43,10 +43,10 @@ FACTORS = {None: 1, "s": 1, "m": 60, "h": 3600}
 PORTS = re.compile("^([0-9]+)(/(udp|UDP|tcp|TCP))?:([0-9]+)$")
 
 def _create_deployment_name(component_name):
-    return "dep-{0}".format(component_name)
+    return "dep-{0}".format(component_name)[:63]
 
 def _create_service_name(component_name):
-    return "{0}".format(component_name)
+    return "{0}".format(component_name)[:63]
 
 def _create_exposed_service_name(component_name):
     return ("x{0}".format(component_name))[:63]
@@ -545,7 +545,7 @@ def deploy(namespace, component_name, image, replicas, always_pull, k8sconfig, r
             core.delete_namespaced_service(_create_service_name(component_name), namespace)
         # If the deployment was created but not the service, delete the deployment
         if deployment_ok:
-            client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, client.V1DeleteOptions())
+            client.ExtensionsV1beta1Api().delete_namespaced_deployment(_create_deployment_name(component_name), namespace, body=client.V1DeleteOptions())
         raise e
 
     return dep, deployment_description
@@ -561,7 +561,7 @@ def undeploy(deployment_description):
 
     # Have k8s delete the underlying pods and replicaset when deleting the deployment.
     options = client.V1DeleteOptions(propagation_policy="Foreground")
-    client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, options)
+    client.ExtensionsV1beta1Api().delete_namespaced_deployment(deployment_description["deployment"], namespace, body=options)
 
 def is_available(location, namespace, component_name):
     _configure_api(location)
index 7f721b2..aa4ceda 100644 (file)
@@ -24,7 +24,6 @@
 # __version__ = '0.1.0'
 
 from .tasks import create_for_components, create_for_components_with_streams, \
-        create_and_start_container_for_components_with_streams, \
         create_for_platforms, create_and_start_container, \
         create_and_start_container_for_components, create_and_start_container_for_platforms, \
         stop_and_remove_container, cleanup_discovery, policy_update, scale, update_image
\ No newline at end of file
index ecd3ffa..108cf31 100644 (file)
@@ -129,8 +129,9 @@ def create_for_components(**create_inputs):
     """
     _done_for_create(
             **_setup_for_discovery(
-                **_generate_component_name(
-                    **create_inputs)))
+                **_enhance_docker_params(
+                    **_generate_component_name(
+                        **create_inputs))))
 
 
 def _parse_streams(**kwargs):
@@ -144,13 +145,32 @@ def _parse_streams(**kwargs):
 
     def setup_subscribes(s):
         if s["type"] == "data_router":
-            # If username and password has been provided then generate it. The
-            # DMaaP plugin doesn't generate for subscribers. The generation code
-            # and length of username password has been lifted from the DMaaP
-            # plugin.
 
             # Don't want to mutate the source
             s = copy.deepcopy(s)
+
+            # Set up the delivery URL
+            # Using service_component_name as the host name in the subscriber URL
+            # will work in a single-cluster ONAP deployment.  Whether it will also work
+            # in a multi-cluster ONAP deployment--with a central location and one or
+            # more remote ("edge") locations depends on how networking and DNS is set
+            # up in a multi-cluster deployment
+            service_component_name = kwargs["name"]
+            ports,_ = k8sclient.parse_ports(kwargs["ports"])
+            (dport, _) = ports[0]
+            subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
+
+            scheme = s["scheme"] if "scheme" in s else DEFAULT_SCHEME
+            if "route" not in s:
+                raise NonRecoverableError("'route' key missing from data router subscriber")
+            path = s["route"]
+            s["delivery_url"] = "{scheme}://{host}/{path}".format(
+                    scheme=scheme, host=subscriber_host, path=path)
+
+            # If username and password has not been provided then generate it. The
+            # DMaaP plugin doesn't generate for subscribers. The generation code
+            # and length of username password has been lifted from the DMaaP
+            # plugin.
             if not s.get("username", None):
                 s["username"] = utils.random_string(8)
             if not s.get("password", None):
@@ -158,45 +178,10 @@ def _parse_streams(**kwargs):
 
         kwargs[s["name"]] = s
 
-    # NOTE: That the delivery url is constructed and setup in the start operation
     map(setup_subscribes, kwargs["streams_subscribes"])
 
     return kwargs
 
-def _setup_for_discovery_streams(**kwargs):
-    """Setup for discovery of streams
-
-    Specifically, there's a race condition this call addresses for data router
-    subscriber case. The component needs its feed subscriber information but the
-    DMaaP plugin doesn't provide this until after the docker plugin start
-    operation.
-    """
-    dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
-            if s["type"] == "data_router"]
-
-    if dr_subs:
-        dmaap_kv_key = "{0}:dmaap".format(kwargs["name"])
-        conn = dis.create_kv_conn(CONSUL_HOST)
-
-        def add_feed(dr_sub):
-            # delivery url and subscriber id will be fill by the dmaap plugin later
-            v = { "location": dr_sub["location"], "delivery_url": None,
-                    "username": dr_sub["username"], "password": dr_sub["password"],
-                    "subscriber_id": None }
-            return dis.add_to_entry(conn, dmaap_kv_key, dr_sub["name"], v) != None
-
-        try:
-            if all(map(add_feed, dr_subs)):
-                return kwargs
-        except Exception as e:
-            raise NonRecoverableError(e)
-
-        # You should never get here
-        raise NonRecoverableError("Failure updating feed streams in Consul")
-    else:
-        return kwargs
-
-
 @merge_inputs_for_create
 @monkeypatch_loggers
 @Policies.gather_policies_to_node()
@@ -209,16 +194,14 @@ def create_for_components_with_streams(**create_inputs):
     1. Generating service component name
     2. Setup runtime properties for DMaaP plugin
     3. Populating application config into Consul
-    4. Populating DMaaP config for data router subscribers in Consul
     """
     _done_for_create(
             **_setup_for_discovery(
-                **_setup_for_discovery_streams(
-                    **_parse_streams(
+                **_parse_streams(
+                    **_enhance_docker_params(
                         **_generate_component_name(
                             **create_inputs)))))
 
-
 @merge_inputs_for_create
 @monkeypatch_loggers
 @operation
@@ -470,58 +453,7 @@ def create_and_start_container_for_components(**start_inputs):
     _done_for_start(
             **_verify_component(
                 **_create_and_start_component(
-                    **_enhance_docker_params(
-                        **_parse_cloudify_context(**start_inputs)))))
-
-
-def _update_delivery_url(**kwargs):
-    """Update the delivery url for data router subscribers"""
-    dr_subs = [kwargs[s["name"]] for s in kwargs["streams_subscribes"] \
-            if s["type"] == "data_router"]
-
-    if dr_subs:
-        service_component_name = kwargs[SERVICE_COMPONENT_NAME]
-        # TODO: Should NOT be setting up the delivery url with ip addresses
-        # because in the https case, this will not work because data router does
-        # a certificate validation using the fqdn.
-        ports,_ = k8sclient.parse_ports(kwargs["ports"])
-        (dport, _) = ports[0]
-        # Using service_component_name as the host name in the subscriber URL
-        # will work in a single-cluster ONAP deployment.  Whether it will also work
-        # in a multi-cluster ONAP deployment--with a central location and one or
-        # more remote ("edge") locations depends on how networking and DNS is set
-        # up in a multi-cluster deployment
-        subscriber_host = "{host}:{port}".format(host=service_component_name, port=dport)
-
-        for dr_sub in dr_subs:
-            scheme = dr_sub["scheme"] if "scheme" in dr_sub else DEFAULT_SCHEME
-            if "route" not in dr_sub:
-                raise NonRecoverableError("'route' key missing from data router subscriber")
-            path = dr_sub["route"]
-            dr_sub["delivery_url"] = "{scheme}://{host}/{path}".format(
-                    scheme=scheme, host=subscriber_host, path=path)
-            kwargs[dr_sub["name"]] = dr_sub
-
-    return kwargs
-
-@wrap_error_handling_start
-@merge_inputs_for_start
-@monkeypatch_loggers
-@operation
-def create_and_start_container_for_components_with_streams(**start_inputs):
-    """Initiate Kubernetes deployment for service components that have streams
-
-    This operation method is to be used with the ContainerizedServiceComponentUsingDmaap
-    node type. After initiating the Kubernetes deployment, the plugin will verify with
-    Kubernetes that the app is up and responding successfully to readiness probes.
-    """
-    _done_for_start(
-            **_update_delivery_url(
-                **_verify_component(
-                    **_create_and_start_component(
-                        **_enhance_docker_params(
-                            **_parse_cloudify_context(**start_inputs))))))
-
+                    **_parse_cloudify_context(**start_inputs))))
 
 @wrap_error_handling_start
 @monkeypatch_loggers
index d96ebe1..5b66d5a 100644 (file)
@@ -28,7 +28,7 @@ ECOMP is a trademark and service mark of AT&T Intellectual Property.
   <groupId>org.onap.dcaegen2.platform.plugins</groupId>
   <artifactId>k8s</artifactId>
   <name>k8s-plugin</name>
-  <version>1.5.0-SNAPSHOT</version>
+  <version>1.6.0-SNAPSHOT</version>
   <url>http://maven.apache.org</url>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
index 12d9fea..b2c0986 100644 (file)
@@ -1,5 +1,5 @@
 python-consul>=0.6.0,<1.0.0
 uuid==1.30
 onap-dcae-dcaepolicy-lib>=2.4.1,<3.0.0
-kubernetes==4.0.0
+kubernetes==9.0.0
 cloudify-plugins-common==3.4
index 5d27438..b0e43f6 100644 (file)
@@ -23,7 +23,7 @@ from setuptools import setup
 setup(
     name='k8splugin',
     description='Cloudify plugin for containerized components deployed using Kubernetes',
-    version="1.5.0",
+    version="1.6.0",
     author='J. F. Lucas, Michael Hwang, Tommy Carpenter',
     packages=['k8splugin','k8sclient','msb','configure'],
     zip_safe=False,
@@ -33,6 +33,6 @@ setup(
         "onap-dcae-dcaepolicy-lib>=2.4.1,<3.0.0",
         "cloudify-plugins-common==3.4",
         "cloudify-python-importer==0.1.0",
-        "kubernetes==4.0.0"
+        "kubernetes==9.0.0"
     ]
 )
index 933753a..c6781bb 100644 (file)
@@ -53,38 +53,42 @@ def test_parse_streams(monkeypatch, mockconfig):
     assert expected == tasks._parse_streams(**test_input)
 
     # Good case for streams_subscribes (password provided)
-    test_input = { "streams_publishes": {},
+    test_input = { "ports": ["1919:0", "1920:0"],"name": "testcomponent",
+            "streams_publishes": {},
             "streams_subscribes": [{"name": "topic01", "type": "message_router"},
                 {"name": "feed01", "type": "data_router", "username": "hero",
-                    "password": "123456"}] }
+                    "password": "123456", "route":"test/v0"}] }
 
-    expected = {'feed01': {'type': 'data_router', 'name': 'feed01',
-                    'username': 'hero', 'password': '123456'},
-            'streams_publishes': {},
-            'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
+    expected = {'ports': ['1919:0', '1920:0'], 'name': 'testcomponent',
+                'feed01': {'type': 'data_router', 'name': 'feed01',
+                    'username': 'hero', 'password': '123456', 'route': 'test/v0', 'delivery_url':'http://testcomponent:1919/test/v0'},
+                'streams_publishes': {},
+                'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
                 {'type': 'data_router', 'name': 'feed01', 'username': 'hero',
-                    'password': '123456'}],
-            'topic01': {'type': 'message_router', 'name': 'topic01'}}
+                    'password': '123456', 'route':'test/v0'}],
+                'topic01': {'type': 'message_router', 'name': 'topic01'}}
 
     assert expected == tasks._parse_streams(**test_input)
 
     # Good case for streams_subscribes (password generated)
-    test_input = { "streams_publishes": {},
-            "streams_subscribes": [{"name": "topic01", "type": "message_router"},
+    test_input = { "ports": ["1919:0", "1920:0"],"name": "testcomponent",
+        "streams_publishes": {},
+        "streams_subscribes": [{"name": "topic01", "type": "message_router"},
                 {"name": "feed01", "type": "data_router", "username": None,
-                    "password": None}] }
+                    "password": None, "route": "test/v0"}] }
 
     def not_so_random(n):
         return "nosurprise"
 
     monkeypatch.setattr(k8splugin.utils, "random_string", not_so_random)
 
-    expected = {'feed01': {'type': 'data_router', 'name': 'feed01',
-                    'username': 'nosurprise', 'password': 'nosurprise'},
+    expected = { 'ports': ['1919:0', '1920:0'], 'name': 'testcomponent',
+             'feed01': {'type': 'data_router', 'name': 'feed01',
+                    'username': 'nosurprise', 'password': 'nosurprise', 'route':'test/v0', 'delivery_url':'http://testcomponent:1919/test/v0'},
             'streams_publishes': {},
             'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
                 {'type': 'data_router', 'name': 'feed01', 'username': None,
-                    'password': None}],
+                    'password': None, 'route': 'test/v0'}],
             'topic01': {'type': 'message_router', 'name': 'topic01'}}
 
     assert expected == tasks._parse_streams(**test_input)
@@ -114,66 +118,6 @@ def test_setup_for_discovery(monkeypatch, mockconfig):
     with pytest.raises(RecoverableError):
         tasks._setup_for_discovery(**test_input)
 
-
-def test_setup_for_discovery_streams(monkeypatch, mockconfig):
-    import k8splugin
-    from k8splugin import tasks
-    test_input = {'feed01': {'type': 'data_router', 'name': 'feed01',
-                'username': 'hero', 'password': '123456', 'location': 'Bedminster'},
-            'streams_publishes': {},
-            'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
-                {'type': 'data_router', 'name': 'feed01', 'username': 'hero',
-                    'password': '123456', 'location': 'Bedminster'}],
-            'topic01': {'type': 'message_router', 'name': 'topic01'}}
-    test_input["name"] = "some-foo-service-component"
-
-    # Good case
-    def fake_add_to_entry(conn, key, add_name, add_value):
-        """
-        This fake method will check all the pieces that are used to make store
-        details in Consul
-        """
-        if key != test_input["name"] + ":dmaap":
-            return None
-        if add_name != "feed01":
-            return None
-        if add_value != {"location": "Bedminster", "delivery_url": None,
-                "username": "hero", "password": "123456", "subscriber_id": None}:
-            return None
-
-        return "SUCCESS!"
-
-    monkeypatch.setattr(k8splugin.discovery, "add_to_entry",
-            fake_add_to_entry)
-
-    assert tasks._setup_for_discovery_streams(**test_input) == test_input
-
-    # Good case - no data router subscribers
-    test_input = {"streams_publishes": [{"name": "topic00", "type": "message_router"}],
-            'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'}]}
-    test_input["name"] = "some-foo-service-component"
-
-    assert tasks._setup_for_discovery_streams(**test_input) == test_input
-
-    # Bad case - something happened from the Consul call
-    test_input = {'feed01': {'type': 'data_router', 'name': 'feed01',
-                'username': 'hero', 'password': '123456', 'location': 'Bedminster'},
-            'streams_publishes': {},
-            'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
-                {'type': 'data_router', 'name': 'feed01', 'username': 'hero',
-                    'password': '123456', 'location': 'Bedminster'}],
-            'topic01': {'type': 'message_router', 'name': 'topic01'}}
-    test_input["name"] = "some-foo-service-component"
-
-    def barf(conn, key, add_name, add_value):
-        raise RuntimeError("Barf")
-
-    monkeypatch.setattr(k8splugin.discovery, "add_to_entry",
-            barf)
-
-    with pytest.raises(NonRecoverableError):
-        tasks._setup_for_discovery_streams(**test_input)
-
 def test_verify_container(monkeypatch, mockconfig):
     import k8sclient
     from k8splugin import tasks
@@ -195,28 +139,6 @@ def test_verify_container(monkeypatch, mockconfig):
 
     assert not tasks._verify_k8s_deployment("some-location", "some-name", 2)
 
-
-def test_update_delivery_url(monkeypatch, mockconfig):
-    import k8splugin
-    from k8splugin import tasks
-    test_input = {'feed01': {'type': 'data_router', 'name': 'feed01',
-                'username': 'hero', 'password': '123456', 'location': 'Bedminster',
-                'route': 'some-path'},
-            'streams_publishes': {},
-            'streams_subscribes': [{'type': 'message_router', 'name': 'topic01'},
-                {'type': 'data_router', 'name': 'feed01', 'username': 'hero',
-                    'password': '123456', 'location': 'Bedminster',
-                    'route': 'some-path'}],
-            'topic01': {'type': 'message_router', 'name': 'topic01'},
-            'ports': ['8080/tcp:0']}
-    test_input["service_component_name"] = "some-foo-service-component"
-
-    expected = copy.deepcopy(test_input)
-    expected["feed01"]["delivery_url"] = "http://some-foo-service-component:8080/some-path"
-
-    assert tasks._update_delivery_url(**test_input) == expected
-
-
 def test_enhance_docker_params(mockconfig):
     from k8splugin import tasks
     # Good - Test empty docker config