Seed policysync container code 69/117769/11
authorcluckenbaugh <cl5597@att.com>
Thu, 11 Feb 2021 23:26:30 +0000 (18:26 -0500)
committercluckenbaugh <cl5597@att.com>
Wed, 17 Feb 2021 22:45:30 +0000 (17:45 -0500)
For use by helm microservices to receive policy

Issue-ID: DCAEGEN2-2556
Change-Id: I2d9cb92ab480a90c63a9d8e6242848f7ca2df0f3
Signed-off-by: cluckenbaugh <cl5597@att.com>
20 files changed:
dcae-services-policy-sync/Dockerfile [new file with mode: 0644]
dcae-services-policy-sync/README.md [new file with mode: 0644]
dcae-services-policy-sync/policysync/__init__.py [new file with mode: 0644]
dcae-services-policy-sync/policysync/clients.py [new file with mode: 0644]
dcae-services-policy-sync/policysync/cmd.py [new file with mode: 0644]
dcae-services-policy-sync/policysync/coroutines.py [new file with mode: 0644]
dcae-services-policy-sync/policysync/inventory.py [new file with mode: 0644]
dcae-services-policy-sync/policysync/metrics.py [new file with mode: 0644]
dcae-services-policy-sync/policysync/util.py [new file with mode: 0644]
dcae-services-policy-sync/pom.xml [new file with mode: 0644]
dcae-services-policy-sync/setup.py [new file with mode: 0644]
dcae-services-policy-sync/tests/mocks.py [new file with mode: 0644]
dcae-services-policy-sync/tests/test_client_v0.py [new file with mode: 0644]
dcae-services-policy-sync/tests/test_client_v1.py [new file with mode: 0644]
dcae-services-policy-sync/tests/test_cmd.py [new file with mode: 0644]
dcae-services-policy-sync/tests/test_coroutines.py [new file with mode: 0644]
dcae-services-policy-sync/tests/test_inventory.py [new file with mode: 0644]
dcae-services-policy-sync/tox.ini [new file with mode: 0644]
mvn-phase-script.sh
pom.xml

diff --git a/dcae-services-policy-sync/Dockerfile b/dcae-services-policy-sync/Dockerfile
new file mode 100644 (file)
index 0000000..f317a85
--- /dev/null
@@ -0,0 +1,46 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+FROM nexus3.onap.org:10001/onap/integration-python:7.0.1 as build
+
+
+USER root
+
+RUN python3 -m venv /policysync
+# Need GCC, musl and associated dependencies to compile dependencies against musl
+RUN apk add --no-cache --virtual .build-deps gcc musl-dev
+
+WORKDIR /app
+
+
+# Install dependencies first to speed up builds 
+ADD setup.py setup.py
+RUN /policysync/bin/pip install -e .
+
+# Add the code now
+ADD policysync policysync 
+RUN /policysync/bin/pip install .
+
+FROM nexus3.onap.org:10001/onap/integration-python:7.0.1 as runtime
+
+COPY --from=build /policysync /policysync
+
+USER onap
+ENTRYPOINT [ "/policysync/bin/policysync" ]
+
+
+
+
diff --git a/dcae-services-policy-sync/README.md b/dcae-services-policy-sync/README.md
new file mode 100644 (file)
index 0000000..519aea6
--- /dev/null
@@ -0,0 +1,150 @@
+# Policy Sync\r
+This page serves as an implementation for the Policy sync container described in the [wiki](https://wiki.onap.org/display/DW/Policy+function+as+Sidecar)\r
+\r
+\r
+Policy Sync utility is a python based utility that interfaces with the ONAP/ECOMP policy websocket and REST APIs. It is designed to keep a local listing of policies in sync with an environment's policy distribution point (PDP). It functions well as a Kubernetes sidecar container which can pull down the latest policies for consumption by an application container. \r
+\r
+The sync utility primarily utilizes the PDP's websocket notification API to receive policy update notifications. It also includes a periodic check of the  PDP for resilliency purposes in the event of websocket API issues. \r
+\r
+\r
+## Build and Run\r
+Easiest way to use is via docker by building the provided docker file\r
+\r
+```bash\r
+docker build . -t policy-puller\r
+```\r
+\r
+If you want to run it in a non containerized environment, an easy way is to use python virtual environments.\r
+```bash\r
+# Create a virtual environment in venv folder and activate it\r
+python3 -m venv venv\r
+source venv/bin/activate\r
+\r
+# install the utility\r
+pip install .\r
+\r
+# Utility is now installed and usable in your virtual environment. Test it with:\r
+policysync -h \r
+```\r
+\r
+## Configuration\r
+\r
+Configuration is currently done via either env variables or by flag. Flags take precedence env variables, env variables take precedence over default\r
+\r
+### General configuration\r
+General configuration that is used regardless of which PDP API you are using. \r
+\r
+| ENV Variable              | Flag               | Description                                  | Default                           |\r
+| --------------------------| -------------------|----------------------------------------------|-----------------------------------|\r
+| POLICY_SYNC_PDP_URL       | --pdp-url          | PDP URL to query                             | None (must be set in env or flag) |\r
+| POLICY_SYNC_FILTER        | --filters          | yaml list of regex of policies to match      | []                                |\r
+| POLICY_SYNC_ID            | --ids              | yaml list of ids of policies to match        | []                                |\r
+| POLICY_SYNC_DURATION      | --duration         | duration in seconds for periodic checks      | 2600                              |\r
+| POLICY_SYNC_OUTFILE       | --outfile          | File to output policies to                   | ./policies.json                   |\r
+| POLICY_SYNC_PDP_USER      | --pdp-user         | Set user if you need basic auth for PDP      | None                              |\r
+| POLICY_SYNC_PDP_PASS      | --pdp-password     | Set pass if you need basic auth for PDP      | None                              |\r
+| POLICY_SYNC_HTTP_METRICS  | --http-metrics     | Whether to expose prometheus metrics         | True                              |  \r
+| POLICY_SYNC_HTTP_BIND     | --http-bind        | host:port for exporting prometheus metrics   | localhost:8000                    |\r
+| POLICY_SYNC_LOGGING_CONFIG| --logging-config   | Path to a python formatted logging file      | None (logs will write to stderr)  |\r
+| POLICY_SYNC_V0_ENABLE     | --use-v0         | Set to true to enable usage of legacy v0 API   | False                             |\r
+\r
+### V1 Specific Configuration (Used as of the Dublin release)\r
+Configurable variables used for the V1 API used in the ONAP Dublin Release.\r
+\r
+Note: Policy filters are not currently supported in the current policy release but will be eventually. \r
+\r
+| ENV Variable                     | Flag                   | Description                            | Default                      |\r
+| ---------------------------------|------------------------|----------------------------------------|------------------------------|\r
+| POLICY_SYNC_V1_DECISION_ENDPOINT | --v1-decision-endpoint | Endpoint to query for PDP decisions    | policy/pdpx/v1/decision      |\r
+| POLICY_SYNC_V1_DMAAP_URL         | --v1-dmaap-topic       | Dmaap url with topic for notifications | None                         |\r
+| POLICY_SYNC_V1_DMAAP_USER        | --v1-dmaap-user        | User to use for DMaaP notifications    | None                         |\r
+| POLICY_SYNC_V1_DMAAP_PASS        | --v1-dmaap-pass        | Password to use for DMaaP notifications| None                         |\r
+\r
+\r
+\r
+### V0 Specific Configuration (Legacy Policy API)\r
+Configurable variables used for the legacy V0 API Prior to the ONAP release. Only valid when --use-v0 is set to True\r
+\r
+\r
+| ENV Variable                     | Flag                   | Description                            | Default                      |\r
+| ---------------------------------|------------------------|----------------------------------------|------------------------------|\r
+| POLICY_SYNC_V0_NOTIFIY_ENDPOINT  | --v0-notifiy-endpoint  | websock endpoint for pdp notifications |  pdp/notifications           |\r
+| POLICY_SYNC_V0_DECISION_ENDPOINT | --v0-decision-endpoint | rest endpoint for pdp decisions        |  pdp/api                     |\r
+\r
+## Usage\r
+\r
+You can run in a pure docker setup:\r
+```bash\r
+# Run the container\r
+docker run \r
+    --env POLICY_SYNC_PDP_USER=<username> \\r
+    --env POLICY_SYNC_PDP_PASS=<password> \\r
+    --env POLICY_SYNC_PDP_URL=<path_to_pdp> \\r
+    --env POLICY_SYNC_V1_DMAAP_URL='https://<dmaap_host>:3905/events/<dmaap_topic>' \\r
+    --env POLICY_SYNC_V1_DMAAP_PASS='<user>' \\r
+    --env POLICY_SYNC_V1_DMAAP_USER='<pass>' \\r
+    --env POLICY_SYNC_ID=['DCAE.Config_MS_AGING_UVERSE_PROD'] \\r
+    -v $(pwd)/policy-volume:/etc/policy \\r
+    nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0\r
+```\r
+\r
+Or on Kubernetes: \r
+```yaml\r
+# policy-config-map\r
+apiVersion: v1\r
+kind: policy-config-map\r
+metadata:\r
+  name: special-config\r
+  namespace: default\r
+data:\r
+  POLICY_SYNC_PDP_USER: myusername\r
+  POLICY_SYNC_PDP_PASS: mypassword\r
+  POLICY_SYNC_PDP_URL: <path_to_pdp>\r
+  POLICY_SYNC_V1_DMAAP_URL: 'https://<dmaap_host>:3905/events/<dmaap_topic>' \\r
+  POLICY_SYNC_V1_DMAAP_PASS: '<user>' \\r
+  POLICY_SYNC_V1_DMAAP_USER: '<pass>' \\r
+  POLICY_SYNC_FILTER: '["DCAE.Config_MS_AGING_UVERSE_PROD"]'\r
+  \r
+  \r
+---\r
+\r
+apiVersion: v1\r
+kind: Pod\r
+metadata:\r
+  name: Sidecar sample app\r
+spec:\r
+  restartPolicy: Never\r
\r
\r
+  # The shared volume that the two containers use to communicate...empty dir for simplicity\r
+  volumes:\r
+  - name: policy-shared\r
+    emptyDir: {}\r
\r
+  containers:\r
\r
+  # Sample app that uses inotifyd (part of busybox/alpine). For demonstration purposes only...\r
+  - name: main\r
+    image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0\r
+    volumeMounts:\r
+    - name: policy-shared\r
+      mountPath: /etc/policies.json\r
+      subPath: policies.json\r
+    # For details on what this does see: https://wiki.alpinelinux.org/wiki/Inotifyd\r
+    # you can replace '-' arg below with a shell script to do more interesting\r
+    cmd: [ "inotifyd", "-", "/etc/policies.json:c" ]\r
\r
\r
+    # The sidecar app which keeps the policies in sync\r
+  - name: policy-sync\r
+    image: nexus3.onap.org:10001/onap/org.onap.dcaegen2.deployments.policy-sync:1.0.0\r
+    envFrom:\r
+      - configMapRef:\r
+          name: special-config\r
+    \r
+    volumeMounts:\r
+    - name: policy-shared\r
+      mountPath: /etc/policies\r
+```\r
+\r
+\r
diff --git a/dcae-services-policy-sync/policysync/__init__.py b/dcae-services-policy-sync/policysync/__init__.py
new file mode 100644 (file)
index 0000000..7c04dfd
--- /dev/null
@@ -0,0 +1,16 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" nothing here """
diff --git a/dcae-services-policy-sync/policysync/clients.py b/dcae-services-policy-sync/policysync/clients.py
new file mode 100644 (file)
index 0000000..698bc86
--- /dev/null
@@ -0,0 +1,512 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Clients for communicating with both the post dublin and pre dublin APIs"""
+import json
+import re
+import base64
+import uuid
+import asyncio
+import aiohttp
+import policysync.metrics as metrics
+from .util import get_module_logger
+
+logger = get_module_logger(__name__)
+
+# Websocket config
+WS_HEARTBEAT = 60
+WS_NOTIFICATIONS_ENDPOINT = "pdp/notifications"
+# REST config
+V1_DECISION_ENDPOINT = "policy/pdpx/v1/decision"
+V0_DECISION_ENDPOINT = "pdp/api"
+
+APPLICATION_JSON = "application/json"
+
+
+def get_single_regex(filters, ids):
+    """given a list of filters and ids returns a single regex for matching"""
+    filters = [] if filters is None else filters
+    ids = [] if ids is None else ["{}[.][0-9]+[.]xml".format(x) for x in ids]
+    return "|".join(filters + ids) if filters is not None else ""
+
+
+class BasePolicyClient:
+    """ Base policy client that is pluggable into inventory """
+    def __init__(self, pdp_url, headers=None):
+        self.headers = {} if headers is None else headers
+        self.session = None
+        self.pdp_url = pdp_url
+
+    def _init_rest_session(self):
+        """
+        initialize an aiohttp rest session
+        :returns: an aiohttp rest session
+        """
+        if self.session is None:
+            self.session = aiohttp.ClientSession(
+                headers=self.headers, raise_for_status=True
+            )
+
+        return self.session
+
+    async def _run_request(self, endpoint, request_data):
+        """
+        execute a particular REST request
+        :param endpoint: str rest endpoint to query
+        :param request_data: dictionary request data
+        :returns:  dictionary response data
+        """
+        session = self._init_rest_session()
+        async with session.post(
+            "{0}/{1}".format(self.pdp_url, endpoint), json=request_data
+        ) as resp:
+            data = await resp.read()
+            return json.loads(data)
+
+    def supports_notifications(self):
+        """
+        does this particular client support real time notifictions
+        :returns: True
+        """
+        # in derived classes we may use self
+        # pylint: disable=no-self-use
+        return True
+
+    async def list_policies(self, filters=None, ids=None):
+        """
+        used to get a list of policies matching a particular ID
+        :param filters: list of regex filter strings for matching
+        :param ids: list of id strings for matching
+        :returns: List of policies matching filters or ids
+        """
+        raise NotImplementedError
+
+    async def get_config(self, filters=None, ids=None):
+        """
+        used to get a list of policies matching a particular ID
+        :returns: List of policies matching filters or ids
+        """
+        raise NotImplementedError
+
+    async def notificationhandler(self, callback, ids=None, filters=None):
+        """
+        Clients should implement this to support real time notifications
+        :param callback: func to execute when a matching notification is found
+        :param ids: list of id strings for matching
+        """
+        raise NotImplementedError
+
+    async def close(self):
+        """ close the policy client """
+        logger.info("closing websocket clients...")
+        if self.session:
+            await self.session.close()
+
+
+class PolicyClientV0(BasePolicyClient):
+    """
+    Supports the legacy v0 policy API use prior to ONAP Dublin
+    """
+    async def close(self):
+        """ close the policy client """
+        await super().close()
+        if self.ws_session is not None:
+            await self.ws_session.close()
+
+    def __init__(
+        self,
+        headers,
+        pdp_url,
+        decision_endpoint=V0_DECISION_ENDPOINT,
+        ws_endpoint=WS_NOTIFICATIONS_ENDPOINT
+    ):
+        """
+        Initialize a v0 policy client
+        :param headers: Headers to use for policy rest api
+        :param pdp_url: URL of the PDP
+        :param decision_endpoint: root for the decison API
+        :param websocket_endpoint: root of the websocket endpoint
+        """
+        super().__init__(pdp_url, headers=headers)
+        self.ws_session = None
+        self.session = None
+        self.decision_endpoint = decision_endpoint
+        self.ws_endpoint = ws_endpoint
+        self._ws = None
+
+    def _init_ws_session(self):
+        """initialize a websocket session for notifications"""
+        if self.ws_session is None:
+            self.ws_session = aiohttp.ClientSession()
+
+        return self.ws_session
+
+    @metrics.list_policy_exceptions.count_exceptions()
+    async def list_policies(self, filters=None, ids=None):
+        """
+        used to get a list of policies matching a particular ID
+        :param filters: list of regex filter strings for matching
+        :param ids: list of id strings for matching
+        :returns: List of policies matching filters or ids
+        """
+        request_data = self._prepare_request(filters, ids)
+        policies = await self._run_request(
+            f"{self.decision_endpoint}/listPolicy", request_data
+        )
+        return set(policies)
+
+    @classmethod
+    def _prepare_request(cls, filters, ids):
+        """prepare the request body for the v0 api"""
+        regex = get_single_regex(filters, ids)
+        return {"policyName": regex}
+
+    @metrics.get_config_exceptions.count_exceptions()
+    async def get_config(self, filters=None, ids=None):
+        """
+        Used to get the actual policy configuration from PDP
+        :return: the policy objects that are currently active
+        for the given set of filters
+        """
+        request_data = self._prepare_request(filters, ids)
+        policies = await self._run_request(
+            f"{self.decision_endpoint}/getConfig", request_data)
+
+        for policy in policies:
+            try:
+                policy["config"] = json.loads(policy["config"])
+            except json.JSONDecodeError:
+                pass
+
+        return policies
+
+    @classmethod
+    def _needs_update(cls, update, ids=None, filters=None):
+        """
+        Expect something like this
+        {
+            "removedPolicies": [{
+                "policyName": "xyz.45.xml",
+                "versionNo": "45"
+            }],
+            "loadedPolicies": [{
+                "policyName": "xyz.46.xml",
+                "versionNo": "46",
+                "matches": {
+                    "ONAPName": "DCAE",
+                    "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                    "service": "DCAE_HighlandPark_AgingConfig",
+                    "guard": "false",
+                    "location": " Edge",
+                    "TTLDate": "NA",
+                    "uuid": "TestUUID",
+                    "RiskLevel": "5",
+                    "RiskType": "default"
+                },
+                "updateType": "UPDATE"
+            }],
+            "notificationType": "BOTH"
+        }
+        """
+        for policy in update.get("removedPolicies", []) + update.get(
+            "loadedPolicies", []
+        ):
+            if (
+                re.match(get_single_regex(filters, ids), policy["policyName"])
+                is not None
+            ):
+                return True
+
+        return False
+
+    async def notificationhandler(self, callback, ids=None, filters=None):
+        """
+        websocket based notification handler for
+        :param callback: function to execute when
+        a matching notification is found
+        :param ids: list of id strings for matching
+        """
+
+        url = self.pdp_url.replace("https", "wss")
+
+        # The websocket we start here will periodically
+        # send heartbeat (ping frames) to policy
+        # this ensures that we are never left hanging
+        # with our communication with policy.
+        session = self._init_ws_session()
+        try:
+            websocket = await session.ws_connect(
+                "{0}/{1}".format(url, self.ws_endpoint), heartbeat=WS_HEARTBEAT
+            )
+            logger.info("websock with policy established")
+            async for msg in websocket:
+                # check for websocket errors
+                #  break out of this async for loop. to attempt reconnection
+                if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
+                    break
+
+                if msg.type is (aiohttp.WSMsgType.TEXT):
+                    if self._needs_update(
+                        json.loads(msg.data),
+                        ids=ids,
+                        filters=filters
+                    ):
+                        logger.debug(
+                            "notification received from pdp websocket -> %s", msg
+                        )
+                        await callback()
+                else:
+                    logger.warning(
+                        "unexpected websocket message type received %s", msg.type
+                    )
+        except aiohttp.ClientError:
+            logger.exception("Received connection error with websocket")
+
+
+class PolicyClientV1(BasePolicyClient):
+    """
+    Supports the v1 policy API introduced in ONAP's dublin release
+    """
+
+    async def close(self):
+        """ close the policy client """
+        await super().close()
+        if self.dmaap_session is not None:
+            await self.dmaap_session.close()
+
+    def _init_dmaap_session(self):
+        """ initialize a dmaap session for notifications """
+        if self.dmaap_session is None:
+            self.dmaap_session = aiohttp.ClientSession(
+                    headers=self.dmaap_headers,
+                    raise_for_status=True
+                )
+
+        return self.dmaap_session
+
+    def __init__(
+        self,
+        headers,
+        pdp_url,
+        **kwargs,
+    ):
+        super().__init__(pdp_url, headers=headers)
+        self._ws = None
+        self.audit_uuid = str(uuid.uuid4())
+        self.dmaap_url = kwargs.get('dmaap_url')
+        self.dmaap_timeout = 15000
+        self.dmaap_session = None
+        self.dmaap_headers = kwargs.get('dmaap_headers', {})
+        self.decision = kwargs.get('v1_decision', V1_DECISION_ENDPOINT)
+
+    async def list_policies(self, filters=None, ids=None):
+        """
+        ONAP has no real equivalent to this.
+        :returns: None
+        """
+        # in derived classes we may use self
+        # pylint: disable=no-self-use
+        return None
+
+    @classmethod
+    def convert_to_policy(cls, policy_body):
+        """
+        Convert raw policy to format expected by microservices
+        :param policy_body: raw dictionary output from pdp
+        :returns: data in proper formatting
+        """
+        pdp_metadata = policy_body.get("metadata", {})
+        policy_id = pdp_metadata.get("policy-id")
+        policy_version = policy_body.get("version")
+        if not policy_id or policy_version is None:
+            logger.warning("Malformed policy is missing policy-id and version")
+            return None
+
+        policy_body["policyName"] = "{}.{}.xml".format(
+            policy_id, str(policy_version.replace(".", "-"))
+        )
+        policy_body["policyVersion"] = str(policy_version)
+        if "properties" in policy_body:
+            policy_body["config"] = policy_body["properties"]
+            del policy_body["properties"]
+
+        return policy_body
+
+    @metrics.get_config_exceptions.count_exceptions()
+    async def get_config(self, filters=None, ids=None):
+        """
+        Used to get the actual policy configuration from PDP
+        :returns: the policy objects that are currently active
+        for the given set of filters
+        """
+        if ids is None:
+            ids = []
+
+        request_data = {
+                "ONAPName": "DCAE",
+                "ONAPComponent": "policy-sync",
+                "ONAPInstance": self.audit_uuid,
+                "action": "configure",
+                "resource": {"policy-id": ids}
+        }
+
+        data = await self._run_request(self.decision, request_data)
+        out = []
+        for policy_body in data["policies"].values():
+            policy = self.convert_to_policy(policy_body)
+            if policy is not None:
+                out.append(policy)
+
+        return out
+
+    def supports_notifications(self):
+        """
+        Does this policy client support real time notifications
+        :returns: True if the dmaap url is set else return false
+        """
+        return self.dmaap_url is not None
+
+    @classmethod
+    def _needs_update(cls, update, ids):
+        """
+        expect something like this
+        {
+            "deployed-policies": [
+                {
+                    "policy-type": "onap.policies.monitoring.tcagen2",
+                    "policy-type-version": "1.0.0",
+                    "policy-id": "onap.scaleout.tca",
+                    "policy-version": "2.0.0",
+                    "success-count": 3,
+                    "failure-count": 0
+                }
+            ],
+            "undeployed-policies": [
+                {
+                    "policy-type": "onap.policies.monitoring.tcagen2",
+                    "policy-type-version": "1.0.0",
+                    "policy-id": "onap.firewall.tca",
+                    "policy-version": "6.0.0",
+                    "success-count": 3,
+                    "failure-count": 0
+                }
+            ]
+        }
+        """
+        for policy in update.get("deployed-policies", []) + update.get(
+            "undeployed-policies", []
+        ):
+            if policy.get("policy-id") in ids:
+                return True
+
+        return False
+
+    async def poll_dmaap(self, callback, ids=None):
+        """
+        one GET request to dmaap
+        :param callback: function to execute when a
+        matching notification is found
+        :param ids: list of id strings for matching
+        """
+        query = f"?timeout={self.dmaap_timeout}"
+        url = f"{self.dmaap_url}/{self.audit_uuid}/0{query}"
+        logger.info("polling topic: %s", url)
+        session = self._init_dmaap_session()
+        try:
+            async with session.get(url) as response:
+                messages = await response.read()
+
+                for msg in json.loads(messages):
+                    if self._needs_update(json.loads(msg), ids):
+                        logger.info(
+                            "notification received from dmaap -> %s", msg
+                        )
+                        await callback()
+        except aiohttp.ClientError:
+            logger.exception('received connection error from dmaap topic')
+            # wait some time
+            await asyncio.sleep(30)
+
+    async def notificationhandler(self, callback, ids=None, filters=None):
+        """
+        dmaap based notification handler for
+        :param callback: function to execute when a
+        matching notification is found
+        :param ids: list of id strings for matching
+        """
+        if filters is not None:
+            logger.warning("filters are not supported with pdp v1..ignoring")
+        while True:
+            await self.poll_dmaap(callback, ids=ids)
+
+
+def get_client(
+    pdp_url,
+    use_v0=False,
+    **kwargs
+):
+    """
+    get a particular policy client
+    :param use_v0: whether this should be a v0 client or
+    :return: A policy client
+    """
+    if pdp_url is None:
+        raise ValueError("POLICY_SYNC_PDP_URL set or --pdp flag not set")
+
+    pdp_headers = {
+        "Accept": APPLICATION_JSON,
+        "Content-Type": APPLICATION_JSON
+    }
+
+    if 'pdp_user' in kwargs and 'pdp_password' in kwargs:
+        auth = base64.b64encode(
+            "{}:{}".format(
+                    kwargs.get('pdp_user'),
+                    kwargs.get('pdp_password')
+                ).encode("utf-8")
+        )
+        pdp_headers["Authorization"] = "Basic {}".format(auth.decode("utf-8"))
+
+    dmaap_headers = {
+        "Accept": APPLICATION_JSON,
+        "Content-Type": APPLICATION_JSON
+    }
+
+    logger.info(kwargs.get('dmaap_password'))
+    if 'dmaap_user' in kwargs and 'dmaap_password' in kwargs:
+        auth = base64.b64encode(
+            "{}:{}".format(
+                    kwargs.get('dmaap_user'),
+                    kwargs.get('dmaap_password')
+                ).encode("utf-8")
+        ).decode("utf-8")
+        dmaap_headers["Authorization"] = f"Basic {auth}"
+
+    # Create client (either v0 or v1) based on arguments)
+    if use_v0:
+        return PolicyClientV0(
+            pdp_headers,
+            pdp_url,
+            decision_endpoint=kwargs.get('v0_decision'),
+            ws_endpoint=kwargs.get('v0_notifications')
+        )
+
+    return PolicyClientV1(
+        pdp_headers,
+        pdp_url,
+        v1_decision=kwargs.get('v1_decision'),
+        dmaap_url=kwargs.get('dmaap_url'),
+        dmaap_headers=dmaap_headers
+    )
diff --git a/dcae-services-policy-sync/policysync/cmd.py b/dcae-services-policy-sync/policysync/cmd.py
new file mode 100644 (file)
index 0000000..9055674
--- /dev/null
@@ -0,0 +1,234 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""
+CLI parsing for the sync utility.
+convert flags/env variables to configuration
+"""
+import argparse
+import collections
+import os
+import sys
+import logging
+import logging.config
+from urllib.parse import urlsplit
+import yaml
+import policysync.clients as clients
+import policysync.coroutines
+from .util import get_module_logger
+
+
+logger = get_module_logger(__name__)
+
+APPLICATION_JSON = "application/json"
+
+
+Config = collections.namedtuple(
+    'Config', ['out_file', 'check_period', 'filters', 'ids', 'client', 'bind'])
+
+
+def parsecmd(args):
+    """
+    Parse the command into a config object
+    :param args: arguments list for parsing
+    :returns: Config for the policy sync
+    """
+    parser = argparse.ArgumentParser(
+        description="Keeps a file updated with policies matching a filter.",
+        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+    )
+
+    parser.add_argument(
+        "--out",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_OUTFILE", "policies.json"),
+        help="Output file to dump to",
+    )
+
+    parser.add_argument(
+        "--duration",
+        type=int,
+        default=os.environ.get("POLICY_SYNC_DURATION", 1200),
+        help="frequency (in seconds) to conduct periodic check",
+    )
+
+    parser.add_argument(
+        "--filters",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_FILTER", "[]"),
+        help="Regex of policies that you are interested in.",
+    )
+    parser.add_argument(
+        "--ids",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_ID", "[]"),
+        help="Specific names of policies you are interested in.",
+    )
+
+    parser.add_argument(
+        "--pdp-user",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_PDP_USER", None),
+        help="PDP basic auth username",
+    )
+    parser.add_argument(
+        "--pdp-pass",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_PDP_PASS", None),
+        help="PDP basic auth password",
+    )
+
+    parser.add_argument(
+        "--pdp-url",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_PDP_URL", None),
+        help="PDP to connect to",
+    )
+
+    parser.add_argument(
+        "--http-bind",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_HTTP_BIND", "localhost:8000"),
+        help="The bind address for container metrics",
+    )
+
+    parser.add_argument(
+        "--http-metrics",
+        type=bool,
+        default=os.environ.get("POLICY_SYNC_HTTP_METRICS", True),
+        help="turn on or off the prometheus metrics",
+    )
+
+    parser.add_argument(
+        "--use-v0",
+        type=bool,
+        default=os.environ.get("POLICY_SYNC_V0_ENABLE", False),
+        help="Turn on usage of the legacy v0 policy API",
+    )
+
+    parser.add_argument(
+        "--logging-config",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_LOGGING_CONFIG", None),
+        help="Python formatted logging configuration file",
+    )
+
+    # V0 API specific configuration
+    parser.add_argument(
+        "--v0-notify-endpoint",
+        type=str,
+        default=os.environ.get(
+            "POLICY_SYNC_V0_NOTIFIY_ENDPOINT", "pdp/notifications"
+        ),
+        help="Path of the v0 websocket notification",
+    )
+
+    parser.add_argument(
+        "--v0-decision-endpoint",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_V0_DECISION_ENDPOINT", "pdp/api"),
+        help="path of the v0 decision endpoint",
+    )
+
+    # V1 API specific configuration
+    parser.add_argument(
+        "--v1-dmaap-topic",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_V1_DMAAP_URL", None),
+        help="URL of the dmaap topic used in v1 api for notifications",
+    )
+
+    parser.add_argument(
+        "--v1-dmaap-user",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_V1_DMAAP_USER", None),
+        help="User to use with with the dmaap topic"
+    )
+
+    parser.add_argument(
+        "--v1-dmaap-pass",
+        type=str,
+        default=os.environ.get("POLICY_SYNC_V1_DMAAP_PASS", None),
+        help="Password to use with the dmaap topic"
+    )
+
+    parser.add_argument(
+        "--v1-decision-endpoint",
+        type=str,
+        default=os.environ.get(
+            "POLICY_SYNC_V1_PDP_DECISION_ENDPOINT",
+            "policy/pdpx/v1/decision"
+        ),
+        help="Decision endpoint used in the v1 api for notifications",
+    )
+
+    args = parser.parse_args(args)
+
+    if args.logging_config:
+        logging.config.fileConfig(
+            args.logging_config,
+            disable_existing_loggers=False
+        )
+    else:
+        handler = logging.StreamHandler()
+        formatter = logging.Formatter(
+            "[%(asctime)s][%(levelname)-5s]%(message)s"
+        )
+        root = logging.getLogger()
+        handler.setFormatter(formatter)
+        root.addHandler(handler)
+        root.setLevel(logging.INFO)
+
+    bind = args.http_bind if args.http_metrics else None
+
+    client = clients.get_client(
+        args.pdp_url,
+        pdp_user=args.pdp_user,
+        pdp_password=args.pdp_pass,
+        use_v0=args.use_v0,
+        v0_decision=args.v0_decision_endpoint,
+        v0_notifications=args.v0_notify_endpoint,
+        v1_decision=args.v1_decision_endpoint,
+        dmaap_url=args.v1_dmaap_topic,
+        dmaap_user=args.v1_dmaap_user,
+        dmaap_password=args.v1_dmaap_pass
+    )
+
+    if bind is not None:
+        bind = urlsplit("//" + bind)
+
+    return Config(
+        out_file=args.out,
+        check_period=args.duration,
+        filters=yaml.safe_load(args.filters),
+        ids=yaml.safe_load(args.ids),
+        client=client,
+        bind=bind,
+    )
+
+
+def main():
+    """
+    Parse the arguments passed in via the command line and start the app
+    """
+    try:
+        config = parsecmd(sys.argv[1:])
+    except ValueError:
+        logger.error(
+            "There was no POLICY_SYNC_PDP_URL set or --pdp flag set"
+        )
+        return -1
+    policysync.coroutines.start_event_loop(config)
+    return 0
diff --git a/dcae-services-policy-sync/policysync/coroutines.py b/dcae-services-policy-sync/policysync/coroutines.py
new file mode 100644 (file)
index 0000000..236d6f2
--- /dev/null
@@ -0,0 +1,182 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""
+Asyncio coroutine setup for both periodic and real time notification tasks """
+import signal
+import asyncio
+from prometheus_client import start_http_server
+from .inventory import Inventory
+from .util import get_module_logger
+
+SLEEP_ON_ERROR = 10
+logger = get_module_logger(__name__)
+
+
+async def notify_task(inventory, sleep):
+    """
+    start the notification task
+    :param inventory: Inventory
+    :param sleep: how long to wait on error in seconds
+    """
+
+    logger.info("opening notificationhandler for policy...")
+    await inventory.client.notificationhandler(
+        inventory.check_and_update,
+        ids=inventory.policy_ids,
+        filters=inventory.policy_filters,
+    )
+    logger.warning("websocket closed or errored...will attempt reconnection")
+    await asyncio.sleep(sleep)
+
+
+async def periodic_task(inventory, sleep):
+    """
+    start the periodic task
+    :param inventory: Inventory
+    :param sleep: how long to wait between periodic checks
+    """
+    await asyncio.sleep(sleep)
+    logger.info("Executing periodic check of PDP policies")
+    await inventory.update()
+
+
+async def task_runner(inventory, sleep, task, should_run):
+    """
+    Runs a task in an event loop
+    :param inventory: Inventory
+    :param sleep: how long to wait between loop iterations
+    :param task: coroutine to run
+    :param should_run: function for should this task continue to run
+    """
+    # pylint: disable=broad-except
+    while should_run():
+        try:
+            await task(inventory, sleep)
+        except asyncio.CancelledError:
+            break
+        except Exception:
+            logger.exception("Received exception")
+
+
+async def shutdown(loop, tasks, inventory):
+    """
+    shutdown the event loop and cancel all tasks
+    :param loop: Asyncio eventloop
+    :param tasks: list of asyncio tasks
+    :param inventory: the inventory object
+    """
+
+    logger.info("caught signal")
+    # Stop the websocket routines
+    for task in tasks:
+        task.cancel()
+        await task
+
+    # Close the client
+    await inventory.close()
+    loop.stop()
+
+
+def _setup_coroutines(
+    loop,
+    inventory,
+    shutdown_handler,
+    task_r,
+    **kwargs
+):
+    """ sets up the application coroutines"""
+    # Task runner takes a function for stop condition
+    # (for testing purposes) but should always run in practice
+    # pylint: disable=broad-except
+    def infinite_condition():
+        return True
+
+    logger.info("Starting gather of all policies...")
+    try:
+        loop.run_until_complete(inventory.gather())
+    except Exception:
+        logger.exception('received exception on initial gather')
+
+    # websocket and the periodic check of policies
+    tasks = [
+        loop.create_task(
+            task_r(
+                inventory,
+                kwargs.get('check_period', 2400),
+                periodic_task,
+                infinite_condition
+            )
+        )
+    ]
+
+    if inventory.client.supports_notifications():
+        tasks.append(
+            loop.create_task(
+                task_r(
+                    inventory,
+                    SLEEP_ON_ERROR,
+                    notify_task,
+                    infinite_condition
+                )
+            )
+        )
+    else:
+        logger.warning(
+            "Defaulting to polling... Provide a dmaap url to receive faster updates"
+        )
+
+    # Add shutdown handlers for sigint and sigterm
+    for signame in ("SIGINT", "SIGTERM"):
+        sig = getattr(signal, signame)
+        loop.add_signal_handler(
+            sig,
+            lambda: asyncio.ensure_future(
+                shutdown_handler(loop, tasks, inventory)
+            ),
+        )
+
+    # Start prometheus server daemonthread for metrics/healthchecking
+    if 'bind' in kwargs:
+        metrics_server = kwargs.get('metrics_server', start_http_server)
+        metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname)
+
+
+def start_event_loop(config):
+    """
+    start the event loop that runs the application
+    :param config: Config object for the application
+    """
+    loop = asyncio.get_event_loop()
+    inventory = Inventory(
+        config.filters,
+        config.ids,
+        config.out_file,
+        config.client
+    )
+
+    _setup_coroutines(
+        loop,
+        inventory,
+        shutdown,
+        task_runner,
+        metrics_server=start_http_server,
+        bind=config.bind,
+        check_period=config.check_period
+    )
+
+    loop.run_forever()
+    loop.close()
+    logger.info("shutdown complete")
diff --git a/dcae-services-policy-sync/policysync/inventory.py b/dcae-services-policy-sync/policysync/inventory.py
new file mode 100644 (file)
index 0000000..0eb91b5
--- /dev/null
@@ -0,0 +1,169 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" In memory data store for policies which are currently used by a mS """
+import asyncio
+import json
+import uuid
+import os
+import tempfile
+import aiohttp
+from datetime import datetime
+from .util import get_module_logger
+
+logger = get_module_logger(__name__)
+
+ACTION_GATHERED = "gathered"
+ACTION_UPDATED = "updated"
+OUTFILE_INDENT = 4
+
+
+class Inventory:
+    """ In memory data store for policies which are currently used by a mS """
+    def __init__(self, filters, ids, outfile, client):
+        self.policy_filters = filters
+        self.policy_ids = ids
+        self.hp_active_inventory = set()
+        self.get_lock = asyncio.Lock()
+        self.file = outfile
+        self.queue = asyncio.Queue()
+        self.client = client
+
+    async def gather(self):
+        """
+        Run at startup to gather an initial inventory of policies
+        """
+        return await self._sync_inventory(ACTION_GATHERED)
+
+    async def update(self):
+        """
+        Run to update an inventory of policies on the fly
+        """
+        return await self._sync_inventory(ACTION_UPDATED)
+
+    async def check_and_update(self):
+        """ check and update the policy inventory """
+        return await self.update()
+
+    async def close(self):
+        """ close the policy inventory and its associated client """
+        await self.client.close()
+
+    def _atomic_dump(self, data):
+        """ atomically dump the policy content to a file by rename """
+        try:
+            temp_file = tempfile.NamedTemporaryFile(
+                delete=False,
+                dir=os.path.dirname(self.file),
+                prefix=os.path.basename(self.file),
+                mode="w",
+            )
+            try:
+                temp_file.write(data)
+            finally:
+                # fsync the file so its on disk
+                temp_file.flush()
+                os.fsync(temp_file.fileno())
+        finally:
+            temp_file.close()
+
+        os.rename(temp_file.name, os.path.abspath(self.file))
+
+    async def get_policy_content(self, action=ACTION_UPDATED):
+        """
+        get the policy content off the PDP
+        :param action: what action to present
+        :returns: True/False depending on if update was successful
+        """
+        logger.info("Starting policy update process...")
+        try:
+            policy_bodies = await self.client.get_config(
+                filters=self.policy_filters, ids=self.policy_ids
+            )
+        except aiohttp.ClientError:
+            logger.exception('Conncection Error while connecting to PDP')
+            return False
+        
+        # match the format a bit of the Config Binding Service
+        out = {
+            "policies": {"items": policy_bodies},
+            "event": {
+                "action": action,
+                "timestamp": (datetime.utcnow().isoformat()[:-3] + "Z"),
+                "update_id": str(uuid.uuid4()),
+                "policies_count": len(policy_bodies),
+            },
+        }
+
+        # Atomically dump the file to disk
+        tmp = {
+            x.get("policyName") for x in policy_bodies if "policyName" in x
+        }
+
+        if tmp != self.hp_active_inventory:
+            data = json.dumps(out)
+            loop = asyncio.get_event_loop()
+            await loop.run_in_executor(None, self._atomic_dump, data)
+            logger.info(
+                "Update complete. Policies dumped to: %s", self.file
+            )
+            self.hp_active_inventory = tmp
+            return True
+        else:
+            logger.info("No updates needed for now")
+            return False
+
+    async def _sync_inventory(self, action):
+        """
+        Pull an inventory of policies. Commit changes if there is a change.
+        return: boolean to represent whether changes were commited
+        """
+        try:
+            pdp_inventory = await self.client.list_policies(
+                filters=self.policy_filters, ids=self.policy_ids
+            )
+        except aiohttp.ClientError:
+            logger.exception("Inventory sync failed due to a connection error")
+            return False
+
+        logger.debug("pdp_inventory -> %s", pdp_inventory)
+
+        # Below needs to be under a lock because of
+        # the call to getConfig being awaited.
+        async with self.get_lock:
+            if self.hp_active_inventory != pdp_inventory or \
+                 pdp_inventory is None:
+
+                # Log a delta of what has changed related to this policy update
+                if pdp_inventory is not None and \
+                     self.hp_active_inventory is not None:
+                    msg = {
+                            "removed": list(
+                                self.hp_active_inventory - pdp_inventory
+                            ),
+                            "added": list(
+                                pdp_inventory - self.hp_active_inventory
+                            ),
+                    }
+                    logger.info(
+                        "PDP indicates the following changes: %s ", msg
+                    )
+
+                return await self.get_policy_content(action)
+
+            logger.info(
+                "local matches pdp. no update required for now"
+            )
+            return False
diff --git a/dcae-services-policy-sync/policysync/metrics.py b/dcae-services-policy-sync/policysync/metrics.py
new file mode 100644 (file)
index 0000000..7f825fc
--- /dev/null
@@ -0,0 +1,38 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" counters and gaugages for various metrics """
+from prometheus_client import Counter, Gauge
+
+
+policy_updates_counter = Counter(
+    "policy_updates", "Number of total policy updates commited"
+)
+websock_closures = Counter(
+    "websocket_errors_and_closures", "Number of websocket closures or errors"
+)
+list_policy_exceptions = Counter(
+    "list_policy_exception",
+    "Exceptions that have occured as a result of calling listPolicy",
+)
+get_config_exceptions = Counter(
+    "get_config_exception",
+    "Exceptions that have occured as a result of calling getConfig",
+)
+
+active_policies_gauge = Gauge(
+    "active_policies",
+    "Number of policies that have been retrieved off the PDP"
+)
diff --git a/dcae-services-policy-sync/policysync/util.py b/dcae-services-policy-sync/policysync/util.py
new file mode 100644 (file)
index 0000000..1bbac5a
--- /dev/null
@@ -0,0 +1,10 @@
+""" utility functions (currenlty just for logging) """
+import logging
+
+
+def get_module_logger(mod_name):
+    """
+    To use this, do logger = get_module_logger(__name__)
+    """
+    logger = logging.getLogger(mod_name)
+    return logger
diff --git a/dcae-services-policy-sync/pom.xml b/dcae-services-policy-sync/pom.xml
new file mode 100644 (file)
index 0000000..f5d38d8
--- /dev/null
@@ -0,0 +1,172 @@
+<?xml version="1.0"?>
+<!--
+================================================================================
+Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+================================================================================
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+============LICENSE_END=========================================================
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.onap.dcaegen2.deployments</groupId>
+    <artifactId>deployments</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+  </parent>
+  <groupId>org.onap.dcaegen2.deployments</groupId>
+  <artifactId>dcae-services-policy-sync</artifactId>
+  <name>dcaegen2-deployments-dcae-services-policy-sync</name>
+  <version>1.0.0</version>
+  <url>http://maven.apache.org</url>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <sonar.skip>true</sonar.skip>
+    <sonar.sources>.</sonar.sources>
+    <!-- customize the SONARQUBE URL -->
+    <!-- sonar.host.url>http://localhost:9000</sonar.host.url -->
+    <!-- below are language dependent -->
+    <!-- for Python -->
+    <sonar.language>py</sonar.language>
+    <sonar.pluginName>Python</sonar.pluginName>
+    <sonar.inclusions>**/*.py</sonar.inclusions>
+    <!-- for JavaScaript -->
+    <!--
+    <sonar.language>js</sonar.language>
+    <sonar.pluginName>JS</sonar.pluginName>
+    <sonar.inclusions>**/*.js</sonar.inclusions>
+    -->
+  </properties>
+  <build>
+    <finalName>${project.artifactId}-${project.version}</finalName>
+    <plugins>
+      <!-- plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.4.1</version>
+        <configuration>
+          <descriptors>
+            <descriptor>assembly/dep.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>make-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin -->
+      <!-- now we configure custom action (calling a script) at various lifecycle phases -->
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2.1</version>
+        <executions>
+          <execution>
+            <id>clean phase script</id>
+            <phase>clean</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+                <argument>clean</argument>
+              </arguments>
+            </configuration>
+          </execution>
+          <execution>
+            <id>generate-sources script</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+                <argument>generate-sources</argument>
+              </arguments>
+            </configuration>
+          </execution>
+          <execution>
+            <id>compile script</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+                <argument>compile</argument>
+              </arguments>
+            </configuration>
+          </execution>
+          <execution>
+            <id>package script</id>
+            <phase>package</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+                <argument>package</argument>
+              </arguments>
+            </configuration>
+          </execution>
+          <execution>
+            <id>test script</id>
+            <phase>test</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+                <argument>test</argument>
+              </arguments>
+            </configuration>
+          </execution>
+          <execution>
+            <id>install script</id>
+            <phase>install</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+                <argument>install</argument>
+              </arguments>
+            </configuration>
+          </execution>
+          <execution>
+            <id>deploy script</id>
+            <phase>deploy</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+                <argument>deploy</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file
diff --git a/dcae-services-policy-sync/setup.py b/dcae-services-policy-sync/setup.py
new file mode 100644 (file)
index 0000000..f5de9a2
--- /dev/null
@@ -0,0 +1,30 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from setuptools import setup, find_packages
+
+setup(
+    name="policysync",
+    author="Chris Luckenbaugh",
+    version="1.0.0",
+    packages=find_packages(),
+    include_package_data=True,
+    install_requires=["aiohttp>=2.3", "PyYAML", "prometheus_client"],
+    entry_points="""
+        [console_scripts]
+        policysync=policysync.cmd:main
+    """,
+)
diff --git a/dcae-services-policy-sync/tests/mocks.py b/dcae-services-policy-sync/tests/mocks.py
new file mode 100644 (file)
index 0000000..9a3d6cd
--- /dev/null
@@ -0,0 +1,191 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from urllib.parse import urlsplit
+import asyncio, aiohttp
+
+
+class MockConfig:
+    def __init__(self):
+        self.check_period = 60
+        self.quiet_period = 0
+        self.bind = urlsplit("//localhost:8080")
+
+
+class MockFileDumper:
+    def __init__(self):
+        self.closed = False
+
+    async def close(self):
+        self.closed = True
+
+
+class MockInventory:
+    def __init__(self, queue=None):
+        self.was_updated = False
+        self.was_gathered = False
+        self.client = MockClient()
+        self.queue = queue
+        self.quiet = 0
+        self.updates = []
+        self.policy_filters = []
+        self.policy_ids = []
+
+    async def update(self):
+        self.was_updated = True
+        return True
+
+    async def gather(self):
+        self.was_gathered = True
+        print("got here GATHERED")
+        return True
+
+    async def close(self):
+        self.client.closed = True
+
+    async def check_and_update(self):
+        await self.update()
+
+    async def get_policy_content(self, action="UPDATED"):
+        self.updates.append(action)
+
+
+class MockClient:
+    def __init__(self, raise_on_listpolicies=False, raise_on_getconfig=False):
+        self.closed = False
+        self.opened = False
+        self.raise_on_listpolicies = raise_on_listpolicies
+        self.raise_on_getconfig = raise_on_getconfig
+
+    async def close(self):
+        self.closed = True
+
+    async def notificationhandler(self, callback, ids=[], filters=[]):
+        await callback()
+
+    def supports_notifications(self):
+        return True
+
+    async def list_policies(self, filters=[], ids=[]):
+        if self.raise_on_listpolicies:
+            raise aiohttp.ClientError 
+
+        return set(
+            [
+                "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml"
+            ]
+        )
+
+    async def get_config(self, filters=[], ids=[]):
+        if self.raise_on_getconfig:
+            raise aiohttp.ClientError
+
+        return [
+            {
+                "policyConfigMessage": "Config Retrieved!",
+                "policyConfigStatus": "CONFIG_RETRIEVED",
+                "type": "JSON",
+                "config": {
+                    "service": "DCAE_HighlandPark_AgingConfig",
+                    "location": " Edge",
+                    "uuid": "TestUUID",
+                    "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+                    "configName": "DCAE_HighlandPark_AgingConfig",
+                    "templateVersion": "1607",
+                    "priority": "4",
+                    "version": 11.0,
+                    "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
+                    "riskType": "test",
+                    "riskLevel": "2",
+                    "guard": "False",
+                    "content": {
+                        "signature": {
+                            "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
+                        },
+                        "name": "testAging",
+                        "context": ["PROD"],
+                        "priority": 1,
+                        "prePublishAging": 40,
+                        "preCorrelationAging": 20,
+                    },
+                    "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+                },
+                "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+                "policyType": "MicroService",
+                "policyVersion": "78",
+                "matchingConditions": {
+                    "ECOMPName": "DCAE",
+                    "ONAPName": "DCAE",
+                    "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                    "service": "DCAE_HighlandPark_AgingConfig",
+                    "uuid": "TestUUID",
+                    "Location": " Edge",
+                },
+                "responseAttributes": {},
+                "property": None,
+            },
+            {
+                "policyConfigMessage": "Config Retrieved! ",
+                "policyConfigStatus": "CONFIG_RETRIEVED",
+                "type": "JSON",
+                "config": "adlskjfadslkjf",
+                "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+                "policyType": "MicroService",
+                "policyVersion": "78",
+                "matchingConditions": {
+                    "ECOMPName": "DCAE",
+                    "ONAPName": "DCAE",
+                    "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                    "service": "DCAE_HighlandPark_AgingConfig",
+                    "uuid": "TestUUID",
+                    "Location": " Edge",
+                },
+                "responseAttributes": {},
+                "property": None,
+            },
+        ]
+
+
+class MockLoop:
+    def __init__(self):
+        self.stopped = False
+        self.handlers = []
+        self.tasks = []
+
+    def stop(self):
+        self.stopped = True
+
+    def add_signal_handler(self, signal, handler):
+        self.handlers.append(signal)
+
+    def create_task(self, task):
+        self.tasks.append(task)
+
+    def run_until_complete(self, task):
+        loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(loop)
+        loop.run_until_complete(task)
+
+
+class MockTask:
+    def __init__(self):
+        self.canceled = False
+
+    def cancel(self):
+        self.canceled = True
+
+    def __await__(self):
+        return iter([])
diff --git a/dcae-services-policy-sync/tests/test_client_v0.py b/dcae-services-policy-sync/tests/test_client_v0.py
new file mode 100644 (file)
index 0000000..6ca590e
--- /dev/null
@@ -0,0 +1,191 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from aiohttp import web, WSMsgType
+import json, pytest, re
+from policysync.clients import (
+    PolicyClientV0 as PolicyClient,
+    WS_HEARTBEAT
+)
+
+
+async def listpolicy(request):
+    return web.json_response(["hello"])
+
+
+async def getconfig(request):
+    j = [
+        {
+            "policyConfigMessage": "Config Retrieved!",
+            "policyConfigStatus": "CONFIG_RETRIEVED",
+            "type": "JSON",
+            "config": '{"service":"DCAE_HighlandPark_AgingConfig","location":" Edge","uuid":"TestUUID","policyName":"DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging","configName":"DCAE_HighlandPark_AgingConfig","templateVersion":"1607","priority":"4","version":11.0,"policyScope":"resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8","riskType":"test","riskLevel":"2","guard":"False","content":{"signature":{"filter_clause":"event.faultFields.alarmCondition LIKE(\'%chrisluckenbaugh%\')"},"name":"testAging","context":["PROD"],"priority":1,"prePublishAging":40,"preCorrelationAging":20},"policyNameWithPrefix":"DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging"}',
+            "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+            "policyType": "MicroService",
+            "policyVersion": "78",
+            "matchingConditions": {
+                "ECOMPName": "DCAE",
+                "ONAPName": "DCAE",
+                "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                "service": "DCAE_HighlandPark_AgingConfig",
+                "uuid": "TestUUID",
+                "Location": " Edge",
+            },
+            "responseAttributes": {},
+            "property": None,
+        },
+        {
+            "policyConfigMessage": "Config Retrieved! ",
+            "policyConfigStatus": "CONFIG_RETRIEVED",
+            "type": "JSON",
+            "config": "adlskjfadslkjf",
+            "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+            "policyType": "MicroService",
+            "policyVersion": "78",
+            "matchingConditions": {
+                "ECOMPName": "DCAE",
+                "ONAPName": "DCAE",
+                "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                "service": "DCAE_HighlandPark_AgingConfig",
+                "uuid": "TestUUID",
+                "Location": " Edge",
+            },
+            "responseAttributes": {},
+            "property": None,
+        },
+    ]
+
+    return web.json_response(j)
+
+
+async def wshandler(request):
+    resp = web.WebSocketResponse()
+    available = resp.can_prepare(request)
+    await resp.prepare(request)
+    await resp.send_str('{ "loadedPolicies": [{ "policyName": "bar"}] }')
+    await resp.send_bytes(b"bar!!!")
+    await resp.close("closed")
+
+
+@pytest.fixture
+def policyclient(aiohttp_client, loop):
+    app = web.Application()
+    app.router.add_route("POST", "/pdp/api/listPolicy", listpolicy)
+    app.router.add_route("POST", "/pdp/api/getConfig", getconfig)
+    app.router.add_get("/pdp/notifications", wshandler)
+    fake_client = loop.run_until_complete(aiohttp_client(app))
+    server = "{}://{}:{}".format("http", fake_client.host, fake_client.port)
+    return PolicyClient({}, server)
+
+
+async def test_listpolicies(policyclient):
+    j = await policyclient.list_policies(filters=["bar"])
+    assert j == set(["hello"])
+    await policyclient.close()
+    assert policyclient.session.closed
+
+
+async def test_getconfig(policyclient):
+    j = await policyclient.get_config(filters=["bar"])
+
+    assert j == [
+        {
+            "policyConfigMessage": "Config Retrieved!",
+            "policyConfigStatus": "CONFIG_RETRIEVED",
+            "type": "JSON",
+            "config": {
+                "service": "DCAE_HighlandPark_AgingConfig",
+                "location": " Edge",
+                "uuid": "TestUUID",
+                "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+                "configName": "DCAE_HighlandPark_AgingConfig",
+                "templateVersion": "1607",
+                "priority": "4",
+                "version": 11.0,
+                "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
+                "riskType": "test",
+                "riskLevel": "2",
+                "guard": "False",
+                "content": {
+                    "signature": {
+                        "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
+                    },
+                    "name": "testAging",
+                    "context": ["PROD"],
+                    "priority": 1,
+                    "prePublishAging": 40,
+                    "preCorrelationAging": 20,
+                },
+                "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+            },
+            "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+            "policyType": "MicroService",
+            "policyVersion": "78",
+            "matchingConditions": {
+                "ECOMPName": "DCAE",
+                "ONAPName": "DCAE",
+                "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                "service": "DCAE_HighlandPark_AgingConfig",
+                "uuid": "TestUUID",
+                "Location": " Edge",
+            },
+            "responseAttributes": {},
+            "property": None,
+        },
+        {
+            "policyConfigMessage": "Config Retrieved! ",
+            "policyConfigStatus": "CONFIG_RETRIEVED",
+            "type": "JSON",
+            "config": "adlskjfadslkjf",
+            "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+            "policyType": "MicroService",
+            "policyVersion": "78",
+            "matchingConditions": {
+                "ECOMPName": "DCAE",
+                "ONAPName": "DCAE",
+                "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                "service": "DCAE_HighlandPark_AgingConfig",
+                "uuid": "TestUUID",
+                "Location": " Edge",
+            },
+            "responseAttributes": {},
+            "property": None,
+        },
+    ]
+    await policyclient.close()
+
+
+async def test_supports_notifications(policyclient):
+    assert policyclient.supports_notifications()
+
+
+async def test_needs_update(policyclient):
+    assert policyclient._needs_update(
+        {"loadedPolicies": [{"policyName": "bar"}]}, [], ["bar"] 
+    )
+    assert not policyclient._needs_update(
+        {"loadedPolicies": [{"policyName": "bar"}]}, [], ["foo"]
+    )
+
+
+async def test_ws(policyclient):
+    async def ws_callback():
+        assert True
+
+    await policyclient.notificationhandler(ws_callback, filters=["bar"])
+    await policyclient.close()
+
+    assert policyclient.ws_session.closed
diff --git a/dcae-services-policy-sync/tests/test_client_v1.py b/dcae-services-policy-sync/tests/test_client_v1.py
new file mode 100644 (file)
index 0000000..6994a6f
--- /dev/null
@@ -0,0 +1,216 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+from aiohttp import web, WSMsgType
+import json, pytest, re
+from policysync.clients import PolicyClientV1 as PolicyClient 
+
+DECISION_ENDPOINT = 'policy/pdpx/v1/decision'
+async def get_decision(request):
+    req_data = await request.json()
+    assert req_data['ONAPName'] == 'DCAE'
+    assert req_data['ONAPComponent'] == 'policy-sync'
+    assert req_data['action'] == 'configure'
+    assert req_data['resource'] == {
+        'policy-id': [
+            'onap.scaleout.tca', 
+            'onap.restart.tca'
+        ]
+    }
+
+
+    j = {
+        "policies": {
+            "onap.scaleout.tca": {
+                "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+                "version": "1.0.0",
+                "metadata": {"policy-id": "onap.scaleout.tca"},
+                "properties": {
+                    "tca_policy": {
+                        "domain": "measurementsForVfScaling",
+                        "metricsPerEventName": [
+                            {
+                                "eventName": "vLoadBalancer",
+                                "controlLoopSchemaType": "VNF",
+                                "policyScope": "type=configuration",
+                                "policyName": "onap.scaleout.tca",
+                                "policyVersion": "v0.0.1",
+                                "thresholds": [
+                                    {
+                                        "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3",
+                                        "closedLoopEventStatus": "ONSET",
+                                        "version": "1.0.2",
+                                        "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated",
+                                        "thresholdValue": 500,
+                                        "direction": "LESS_OR_EQUAL",
+                                        "severity": "MAJOR",
+                                    },
+                                    {
+                                        "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3",
+                                        "closedLoopEventStatus": "ONSET",
+                                        "version": "1.0.2",
+                                        "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated",
+                                        "thresholdValue": 5000,
+                                        "direction": "GREATER_OR_EQUAL",
+                                        "severity": "CRITICAL",
+                                    },
+                                ],
+                            }
+                        ],
+                    }
+                },
+            },
+            "onap.restart.tca": {
+                "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+                "version": "1.0.0",
+                "metadata": {"policy-id": "onap.restart.tca", "policy-version": 1},
+                "properties": {
+                    "tca_policy": {
+                        "domain": "measurementsForVfScaling",
+                        "metricsPerEventName": [
+                            {
+                                "eventName": "Measurement_vGMUX",
+                                "controlLoopSchemaType": "VNF",
+                                "policyScope": "DCAE",
+                                "policyName": "DCAE.Config_tca-hi-lo",
+                                "policyVersion": "v0.0.1",
+                                "thresholds": [
+                                    {
+                                        "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e",
+                                        "version": "1.0.2",
+                                        "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value",
+                                        "thresholdValue": 0,
+                                        "direction": "EQUAL",
+                                        "severity": "MAJOR",
+                                        "closedLoopEventStatus": "ABATED",
+                                    },
+                                    {
+                                        "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e",
+                                        "version": "1.0.2",
+                                        "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value",
+                                        "thresholdValue": 0,
+                                        "direction": "GREATER",
+                                        "severity": "CRITICAL",
+                                        "closedLoopEventStatus": "ONSET",
+                                    },
+                                ],
+                            }
+                        ],
+                    }
+                },
+            },
+        }
+    }
+
+    return web.json_response(j)
+
+
+@pytest.fixture
+def policyclient(aiohttp_client, loop):
+    app = web.Application()
+    app.router.add_route("POST", "/" + DECISION_ENDPOINT, get_decision)
+    fake_client = loop.run_until_complete(aiohttp_client(app))
+    server = "{}://{}:{}".format("http", fake_client.host, fake_client.port)
+    return PolicyClient({}, server)
+
+
+async def test_getconfig(policyclient):
+    j = await policyclient.get_config(ids=['onap.scaleout.tca', 'onap.restart.tca' ])
+    assert j == [{
+        "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+        "version": "1.0.0",
+        "metadata": {
+            "policy-id": "onap.scaleout.tca"
+        },
+        "policyName": "onap.scaleout.tca.1-0-0.xml",
+        "policyVersion": "1.0.0",
+        "config": {
+            "tca_policy": {
+                "domain": "measurementsForVfScaling",
+                "metricsPerEventName": [{
+                    "eventName": "vLoadBalancer",
+                    "controlLoopSchemaType": "VNF",
+                    "policyScope": "type=configuration",
+                    "policyName": "onap.scaleout.tca",
+                    "policyVersion": "v0.0.1",
+                    "thresholds": [{
+                            "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3",
+                            "closedLoopEventStatus": "ONSET",
+                            "version": "1.0.2",
+                            "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated",
+                            "thresholdValue": 500,
+                            "direction": "LESS_OR_EQUAL",
+                            "severity": "MAJOR"
+                        },
+                        {
+                            "closedLoopControlName": "ControlLoop-vDNS-6f37f56d-a87d-4b85-b6a9-cc953cf779b3",
+                            "closedLoopEventStatus": "ONSET",
+                            "version": "1.0.2",
+                            "fieldPath": "$.event.measurementsForVfScalingFields.vNicPerformanceArray[*].receivedBroadcastPacketsAccumulated",
+                            "thresholdValue": 5000,
+                            "direction": "GREATER_OR_EQUAL",
+                            "severity": "CRITICAL"
+                        }
+                    ]
+                }]
+            }
+        }
+    }, {
+        "type": "onap.policies.monitoring.cdap.tca.hi.lo.app",
+        "version": "1.0.0",
+        "metadata": {
+            "policy-id": "onap.restart.tca",
+            "policy-version": 1
+        },
+        "policyName": "onap.restart.tca.1-0-0.xml",
+        "policyVersion": "1.0.0",
+        "config": {
+            "tca_policy": {
+                "domain": "measurementsForVfScaling",
+                "metricsPerEventName": [{
+                    "eventName": "Measurement_vGMUX",
+                    "controlLoopSchemaType": "VNF",
+                    "policyScope": "DCAE",
+                    "policyName": "DCAE.Config_tca-hi-lo",
+                    "policyVersion": "v0.0.1",
+                    "thresholds": [{
+                            "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e",
+                            "version": "1.0.2",
+                            "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value",
+                            "thresholdValue": 0,
+                            "direction": "EQUAL",
+                            "severity": "MAJOR",
+                            "closedLoopEventStatus": "ABATED"
+                        },
+                        {
+                            "closedLoopControlName": "ControlLoop-vCPE-48f0c2c3-a172-4192-9ae3-052274181b6e",
+                            "version": "1.0.2",
+                            "fieldPath": "$.event.measurementsForVfScalingFields.additionalMeasurements[*].arrayOfFields[0].value",
+                            "thresholdValue": 0,
+                            "direction": "GREATER",
+                            "severity": "CRITICAL",
+                            "closedLoopEventStatus": "ONSET"
+                        }
+                    ]
+                }]
+            }
+        }
+    }]
+    await policyclient.close()
+
+
+async def test_supports_notifications(policyclient):
+    assert not policyclient.supports_notifications()
diff --git a/dcae-services-policy-sync/tests/test_cmd.py b/dcae-services-policy-sync/tests/test_cmd.py
new file mode 100644 (file)
index 0000000..3c061c0
--- /dev/null
@@ -0,0 +1,79 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import pytest, json, sys, logging, logging.config
+from policysync.cmd import Config, main, parsecmd
+import policysync.coroutines
+
+
+class TestConfig:
+    def test_parse_args(self):
+        args = [
+            "--out",
+            "out",
+            "--pdp-user",
+            "chris",
+            "--pdp-pass",
+            "notapassword",
+            "--pdp-url",
+            "blah",
+            "--duration",
+            "60",
+            "--filters",
+            "[blah]",
+        ]
+
+        c = parsecmd(args)
+
+        assert c.filters == ["blah"]
+        assert c.check_period == 60
+        assert c.out_file == "out"
+
+    def test_parse_args_no_auth(self):
+        c = parsecmd(
+            ["--out", "out", "--pdp-url", "blah", "--duration", "60", "--filters", "[blah]"]
+        )
+
+        assert c.client.pdp_url == "blah"
+        assert c.filters == ["blah"]
+        assert c.check_period == 60
+        assert c.out_file == "out"
+
+    def test_parse_args_no_pdp(self):
+        args = []
+        with pytest.raises(ValueError):
+            parsecmd(args)
+
+    def test_parse_bad_bind(self):
+        args = [
+            "--out",
+            "out",
+            "--pdp-user",
+            "chris",
+            "--pdp-pass",
+            "notapassword",
+            "--pdp-url",
+            "blah",
+            "--duration",
+            "60",
+            "--filters",
+            "[blah]",
+            "--http-bind",
+            "l[ocalhost:100",
+        ]
+
+        with pytest.raises(ValueError):
+            parsecmd(args)
diff --git a/dcae-services-policy-sync/tests/test_coroutines.py b/dcae-services-policy-sync/tests/test_coroutines.py
new file mode 100644 (file)
index 0000000..4c90ae8
--- /dev/null
@@ -0,0 +1,142 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import pytest, json, sys, asyncio, signal
+from tests.mocks import (
+    MockClient,
+    MockTask,
+    MockLoop,
+    MockInventory,
+    MockConfig,
+    MockFileDumper,
+)
+from policysync.coroutines import (
+    shutdown,
+    periodic_task,
+    notify_task,
+    task_runner,
+    _setup_coroutines,
+    SLEEP_ON_ERROR,
+)
+import policysync.coroutines as coroutines
+
+
+async def test_shutdownhandler():
+    client = MockClient()
+    tasks = [MockTask()]
+    loop = MockLoop()
+    inventory = MockInventory()
+
+    await shutdown( loop, tasks, inventory)
+
+    # Assert that a shutdown results in all tasks in the loop being canceled
+    for x in tasks:
+        assert x.canceled
+
+    # ... And the the PDP client is closed
+    assert inventory.client.closed
+
+    # ... And that the event loop is stopped
+    assert loop.stopped
+
+
+async def test_periodic():
+    inventory = MockInventory()
+    await periodic_task(inventory, 1)
+    assert inventory.was_updated
+
+
+async def test_ws():
+    inventory = MockInventory()
+    await notify_task(inventory, 1)
+    assert inventory.was_updated
+
+
+async def test_task_runner():
+    def should_run():
+        if should_run.counter == 0:
+            should_run.counter += 1
+            return True
+        else:
+            return False
+
+    should_run.counter = 0
+
+    def mocktask(inventory):
+        assert True
+
+    await task_runner(MockInventory(), 1, mocktask, should_run)
+
+
+async def test_task_runner_cancel():
+    def should_run():
+        if should_run.counter == 0:
+            should_run.counter += 1
+            return True
+        elif should_run.counter == 1:
+            # If we get here then fail the test
+            assert False, "Task runner should have broken out of loop before this"
+            return False
+
+    should_run.counter = 0
+
+    # We create a mock task that raises a cancellation error (sent when a asyncio task is canceled)
+    def mocktask(inventory, sleep):
+        raise asyncio.CancelledError
+
+    await task_runner(MockInventory(), 1, mocktask, should_run)
+
+
+def test_setup_coroutines():
+    loop = MockLoop()
+
+    def fake_task_runner(inventory, sleep, task, should_run):
+        return (sleep, task)
+
+    def fake_shutdown(sig, loop, tasks, client):
+        return sig
+
+    def fake_metrics_server(port, addr=None):
+        fake_metrics_server.started = True
+
+    fake_metrics_server.started = False
+
+    inventory = MockInventory()
+    client = MockClient()
+    config = MockConfig()
+
+    _setup_coroutines(
+        loop,
+        inventory,
+        fake_shutdown,
+        fake_task_runner,
+        metrics_server=fake_metrics_server,
+        check_period=config.check_period,
+        bind=config.bind,
+    )
+
+    # By the end of setup coroutines we should have...
+
+    # Gathered initial set of policies
+    assert inventory.was_gathered
+
+    # started the websocket and periodic task running
+    assert (SLEEP_ON_ERROR, notify_task) in loop.tasks
+    assert (config.check_period, periodic_task) in loop.tasks
+
+    # Signal handlers for SIGINT and SIGTERM
+    assert signal.SIGINT in loop.handlers
+    assert signal.SIGTERM in loop.handlers
diff --git a/dcae-services-policy-sync/tests/test_inventory.py b/dcae-services-policy-sync/tests/test_inventory.py
new file mode 100644 (file)
index 0000000..5b6f21b
--- /dev/null
@@ -0,0 +1,153 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+import pytest, json, aiohttp, asyncio
+from policysync.inventory import (
+    Inventory,
+    ACTION_GATHERED,
+    ACTION_UPDATED,
+)
+from tests.mocks import MockClient
+
+
+class MockMessage:
+    def __init__(self, type, data):
+        self.type = type
+        self.data = data
+
+
+@pytest.fixture()
+def inventory(request, tmpdir):
+    f1 = tmpdir.mkdir("sub").join("myfile")
+    print(f1)
+    return Inventory(["DCAE.Config_MS_AGING_UVERSE_PROD_.*"], [], f1, MockClient())
+
+
+class TestInventory:
+    @pytest.mark.asyncio
+    async def test_close(self, inventory):
+        await inventory.close()
+        assert inventory.client.closed
+
+    @pytest.mark.asyncio
+    async def test_get_policy_content(self, inventory):
+        await inventory.get_policy_content()
+        with open(inventory.file) as f:
+            data = json.load(f)
+
+        assert data["policies"] == {
+            "items": [
+                {
+                    "policyConfigMessage": "Config Retrieved!",
+                    "policyConfigStatus": "CONFIG_RETRIEVED",
+                    "type": "JSON",
+                    "config": {
+                        "service": "DCAE_HighlandPark_AgingConfig",
+                        "location": " Edge",
+                        "uuid": "TestUUID",
+                        "policyName": "DCAE.AGING_UVERS_PROD_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+                        "configName": "DCAE_HighlandPark_AgingConfig",
+                        "templateVersion": "1607",
+                        "priority": "4",
+                        "version": 11.0,
+                        "policyScope": "resource=Test1,service=vSCP,type=configuration,closedLoopControlName=vSCP_F5_Firewall_d925ed73_7831_4d02_9545_db4e101f88f8",
+                        "riskType": "test",
+                        "riskLevel": "2",
+                        "guard": "False",
+                        "content": {
+                            "signature": {
+                                "filter_clause": "event.faultFields.alarmCondition LIKE('%chrisluckenbaugh%')"
+                            },
+                            "name": "testAging",
+                            "context": ["PROD"],
+                            "priority": 1,
+                            "prePublishAging": 40,
+                            "preCorrelationAging": 20,
+                        },
+                        "policyNameWithPrefix": "DCAE.AGING_UVERSE_PSL_Tosca_HP_GOC_Model_cl55973_IT64_testAging",
+                    },
+                    "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+                    "policyType": "MicroService",
+                    "policyVersion": "78",
+                    "matchingConditions": {
+                        "ECOMPName": "DCAE",
+                        "ONAPName": "DCAE",
+                        "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                        "service": "DCAE_HighlandPark_AgingConfig",
+                        "uuid": "TestUUID",
+                        "Location": " Edge",
+                    },
+                    "responseAttributes": {},
+                    "property": None,
+                },
+                {
+                    "policyConfigMessage": "Config Retrieved! ",
+                    "policyConfigStatus": "CONFIG_RETRIEVED",
+                    "type": "JSON",
+                    "config": "adlskjfadslkjf",
+                    "policyName": "DCAE.Config_MS_AGING_UVERSE_PROD_Tosca_HP_AGING_Model_cl55973_IT64_testAging.78.xml",
+                    "policyType": "MicroService",
+                    "policyVersion": "78",
+                    "matchingConditions": {
+                        "ECOMPName": "DCAE",
+                        "ONAPName": "DCAE",
+                        "ConfigName": "DCAE_HighlandPark_AgingConfig",
+                        "service": "DCAE_HighlandPark_AgingConfig",
+                        "uuid": "TestUUID",
+                        "Location": " Edge",
+                    },
+                    "responseAttributes": {},
+                    "property": None,
+                },
+            ]
+        }
+
+        assert data["event"]["action"] == ACTION_UPDATED
+
+    @pytest.mark.asyncio
+    async def test_update(self, inventory):
+        await inventory.update()
+        assert len(inventory.hp_active_inventory) == 1
+
+        assert not await inventory.update()
+
+    @pytest.mark.asyncio
+    async def test_update_listpolicies_exception(self, inventory):
+        inventory.client.raise_on_listpolicies = True
+        assert not await inventory.update()
+
+    @pytest.mark.asyncio
+    async def test_update_getconfig_exception(self, inventory):
+        inventory.client.raise_on_getconfig = True
+        await inventory.get_policy_content()
+
+    @pytest.mark.asyncio
+    async def test_gather(self, inventory):
+        await inventory.gather()
+
+        # We should gather one policy
+        assert len(inventory.hp_active_inventory) == 1
+
+        # type in event should be gather
+        with open(inventory.file) as f:
+            data = json.load(f)
+
+        assert data["event"]["action"] == ACTION_GATHERED
+
+    @pytest.mark.asyncio
+    async def test_ws_text(self, inventory):
+        result = await inventory.check_and_update()
+        assert result == True
diff --git a/dcae-services-policy-sync/tox.ini b/dcae-services-policy-sync/tox.ini
new file mode 100644 (file)
index 0000000..17235c7
--- /dev/null
@@ -0,0 +1,42 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+
+# content of: tox.ini , put in same dir as setup.py
+[tox]
+envlist = py36, py38
+
+[testenv]
+deps=
+    pytest
+    coverage
+    pytest-cov
+    pytest-asyncio
+    pytest-aiohttp
+setenv =
+    PYTHONPATH={toxinidir}
+    REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
+recreate = True
+commands=
+    python --version
+    pytest -s --cov policysync --cov-report=xml --cov-report=term tests --verbose
+
+[testenv:lint]
+deps =
+    flake8
+    pylint
+commands =
+    flake8 policysync
+    pylint policysync
\ No newline at end of file
index 0e9c8f4..71df21a 100755 (executable)
@@ -62,6 +62,27 @@ compile)
   ;;
 test)
   echo "==> test phase script"
+  case $MVN_PROJECT_MODULEID in
+  dcae-services-policy-sync)
+    set -e -x
+    CURDIR=$(pwd)
+    TOXINIS=$(find . -name "tox.ini")
+    for TOXINI in "${TOXINIS[@]}"; do
+      DIR=$(echo "$TOXINI" | rev | cut -f2- -d'/' | rev)
+      cd "${CURDIR}/${DIR}"
+      rm -rf ./venv-tox ./.tox
+      virtualenv ./venv-tox
+      source ./venv-tox/bin/activate
+      pip install pip==20.3.4
+      pip install --upgrade argparse
+      pip install tox
+      pip freeze
+      tox
+      deactivate
+      rm -rf ./venv-tox ./.tox
+    done 
+    ;;
+  esac
   ;;
 package)
   echo "==> package phase script"
diff --git a/pom.xml b/pom.xml
index f27ea85..5130161 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,7 @@ limitations under the License.
      <module>healthcheck-container</module>
      <module>tls-init-container</module>
      <module>consul-loader-container</module>
+     <module>dcae-services-policy-sync</module>
      <!--<module>multisite-init-container</module> -->
      <module>dcae-k8s-cleanup-container</module>
   </modules>