move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / dcae-policy / tests / mock_setup.py
1 # ================================================================================
2 # Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
3 # ================================================================================
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #      http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ============LICENSE_END=========================================================
16 #
17
18 """unit tests for tasks in dcaepolicyplugin"""
19
20 import json
21 import logging
22 from datetime import datetime, timedelta
23
24 from tests.mock_cloudify_ctx import MockCloudifyContextFull
25
26 LOG_FILE = 'logs/test_dcaepolicyplugin.log'
27 POLICY_ID = 'policy_id'
28 POLICY_VERSION = "policyVersion"
29 POLICY_NAME = "policyName"
30 POLICY_BODY = 'policy_body'
31 POLICY_CONFIG = 'config'
32 CONFIG_NAME = "ConfigName"
33 MONKEYED_POLICY_ID = 'monkeyed.Config_peach'
34
35 RUN_TS = datetime.utcnow()
36
37
38 class MonkeyedLogHandler(object):
39     """keep the shared logger handler here"""
40     _log_handler = None
41
42     @staticmethod
43     def add_handler_to(logger):
44         """adds the local handler to the logger"""
45         if not MonkeyedLogHandler._log_handler:
46             MonkeyedLogHandler._log_handler = logging.FileHandler(LOG_FILE)
47             MonkeyedLogHandler._log_handler.setLevel(logging.DEBUG)
48             formatter = logging.Formatter(
49                 fmt='%(asctime)s.%(msecs)03d %(levelname)+8s ' +
50                     '%(threadName)s %(name)s.%(funcName)s: %(message)s',
51                 datefmt='%Y%m%d_%H%M%S')
52             MonkeyedLogHandler._log_handler.setFormatter(formatter)
53         logger.addHandler(MonkeyedLogHandler._log_handler)
54
55
56 class MonkeyedPolicyBody(object):
57     """policy body that policy-engine returns"""
58     @staticmethod
59     def create_policy_body(policy_id, policy_version=1):
60         """returns a fake policy-body"""
61         prev_ver = policy_version - 1
62         timestamp = RUN_TS + timedelta(hours=prev_ver)
63
64         prev_ver = str(prev_ver)
65         this_ver = str(policy_version)
66         config = {
67             "policy_updated_from_ver": prev_ver,
68             "policy_updated_to_ver": this_ver,
69             "policy_hello": "world!",
70             "policy_updated_ts": timestamp.isoformat()[:-3] + 'Z',
71             "updated_policy_id": policy_id
72         }
73         return {
74             "policyConfigMessage": "Config Retrieved! ",
75             "policyConfigStatus": "CONFIG_RETRIEVED",
76             "type": "JSON",
77             POLICY_NAME: "{0}.{1}.xml".format(policy_id, this_ver),
78             POLICY_VERSION: this_ver,
79             POLICY_CONFIG: config,
80             "matchingConditions": {
81                 "ONAPName": "DCAE",
82                 CONFIG_NAME: "alex_config_name"
83             },
84             "responseAttributes": {},
85             "property": None
86         }
87
88     @staticmethod
89     def create_policy(policy_id, policy_version=1):
90         """returns the whole policy object for policy_id and policy_version"""
91         return {
92             POLICY_ID: policy_id,
93             POLICY_BODY: MonkeyedPolicyBody.create_policy_body(policy_id, policy_version)
94         }
95
96     @staticmethod
97     def is_the_same_dict(policy_body_1, policy_body_2):
98         """check whether both policy_body objects are the same"""
99         if not isinstance(policy_body_1, dict) or not isinstance(policy_body_2, dict):
100             return False
101         for key in policy_body_1.keys():
102             if key not in policy_body_2:
103                 return False
104
105             val_1 = policy_body_1[key]
106             val_2 = policy_body_2[key]
107             if isinstance(val_1, dict) \
108             and not MonkeyedPolicyBody.is_the_same_dict(val_1, val_2):
109                 return False
110             if (val_1 is None and val_2 is not None) \
111             or (val_1 is not None and val_2 is None) \
112             or (val_1 != val_2):
113                 return False
114         return True
115
116
117 class MonkeyedResponse(object):
118     """Monkey response"""
119     def __init__(self, full_path, headers=None, resp_json=None):
120         self.full_path = full_path
121         self.status_code = 200
122         self.headers = headers or {}
123         self.resp_json = resp_json
124         self.text = json.dumps(resp_json or {})
125
126     def json(self):
127         """returns json of response"""
128         return self.resp_json
129
130     def raise_for_status(self):
131         """always happy"""
132         pass
133
134
135 class MonkeyedNode(object):
136     """node in cloudify"""
137     BLUEPRINT_ID = 'test_dcae_policy_bp_id'
138     DEPLOYMENT_ID = 'test_dcae_policy_dpl_id'
139     EXECUTION_ID = 'test_dcae_policy_exe_id'
140
141     def __init__(self, node_id, node_name, node_type, properties, relationships=None):
142         self.node_id = node_id
143         self.node_name = node_name
144         self.ctx = MockCloudifyContextFull(
145             node_id=self.node_id,
146             node_name=self.node_name,
147             node_type=node_type,
148             blueprint_id=MonkeyedNode.BLUEPRINT_ID,
149             deployment_id=MonkeyedNode.DEPLOYMENT_ID,
150             execution_id=MonkeyedNode.EXECUTION_ID,
151             properties=properties,
152             relationships=relationships
153         )
154         MonkeyedLogHandler.add_handler_to(self.ctx.logger)