1 # ================================================================================
2 # Copyright (c) 2019 Wipro Limited Intellectual Property. All rights reserved.
3 # Copyright (c) 2020 AT&T Intellectual Property. All rights reserved.
4 # ================================================================================
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ============LICENSE_END=========================================================
19 """unit tests for tasks in clamppolicyplugin"""
23 from datetime import datetime, timedelta
25 from tests.mock_cloudify_ctx import MockCloudifyContextFull
27 LOG_FILE = 'logs/test_clamppolicyplugin.log'
28 POLICY_MODEL_ID = 'policy_model_id'
29 POLICY_ID = 'policy_id'
30 POLICY_VERSION = "policyVersion"
31 POLICY_NAME = "policyName"
32 POLICY_BODY = 'policy_body'
33 POLICY_CONFIG = 'config'
34 CONFIG_NAME = "ConfigName"
35 MONKEYED_POLICY_ID = 'monkeyed.Config_peach'
37 RUN_TS = datetime.utcnow()
40 class MonkeyedLogHandler(object):
41 """keep the shared logger handler here"""
45 def add_handler_to(logger):
46 """adds the local handler to the logger"""
47 if not MonkeyedLogHandler._log_handler:
48 MonkeyedLogHandler._log_handler = logging.FileHandler(LOG_FILE)
49 MonkeyedLogHandler._log_handler.setLevel(logging.DEBUG)
50 formatter = logging.Formatter(
51 fmt='%(asctime)s.%(msecs)03d %(levelname)+8s ' +
52 '%(threadName)s %(name)s.%(funcName)s: %(message)s',
53 datefmt='%Y%m%d_%H%M%S')
54 MonkeyedLogHandler._log_handler.setFormatter(formatter)
55 logger.addHandler(MonkeyedLogHandler._log_handler)
58 class MonkeyedPolicyBody(object):
59 """policy body that policy-engine returns"""
61 def create_policy_body(policy_id, policy_version=1):
62 """returns a fake policy-body"""
63 prev_ver = policy_version - 1
64 timestamp = RUN_TS + timedelta(hours=prev_ver)
66 prev_ver = str(prev_ver)
67 this_ver = str(policy_version)
69 "policy_updated_from_ver": prev_ver,
70 "policy_updated_to_ver": this_ver,
71 "policy_hello": "world!",
72 "policy_updated_ts": timestamp.isoformat()[:-3] + 'Z',
73 "updated_policy_id": policy_id
76 "policyConfigMessage": "Config Retrieved! ",
77 "policyConfigStatus": "CONFIG_RETRIEVED",
79 POLICY_NAME: "{0}.{1}.xml".format(policy_id, this_ver),
80 POLICY_VERSION: this_ver,
81 POLICY_CONFIG: config,
82 "matchingConditions": {
84 CONFIG_NAME: "VigneshK_config_name"
86 "responseAttributes": {},
91 def create_policy(policy_id, policy_version=1):
92 """returns the whole policy object for policy_id and policy_version"""
95 POLICY_BODY: MonkeyedPolicyBody.create_policy_body(policy_id, policy_version)
99 def is_the_same_dict(policy_body_1, policy_body_2):
100 """check whether both policy_body objects are the same"""
101 if not isinstance(policy_body_1, dict) or not isinstance(policy_body_2, dict):
103 for key in policy_body_1.keys():
104 if key not in policy_body_2:
107 val_1 = policy_body_1[key]
108 val_2 = policy_body_2[key]
109 if isinstance(val_1, dict) \
110 and not MonkeyedPolicyBody.is_the_same_dict(val_1, val_2):
112 if (val_1 is None and val_2 is not None) \
113 or (val_1 is not None and val_2 is None) \
119 class MonkeyedResponse(object):
120 """Monkey response"""
121 def __init__(self, full_path, headers=None, resp_json=None):
122 self.full_path = full_path
123 self.status_code = 200
124 self.headers = headers or {}
125 self.resp_json = resp_json
126 self.text = json.dumps(resp_json or {})
129 """returns json of response"""
130 return self.resp_json
132 def raise_for_status(self):
137 class MonkeyedNode(object):
138 """node in cloudify"""
139 BLUEPRINT_ID = 'test_clamp_policy_bp_id'
140 DEPLOYMENT_ID = 'test_clamp_policy_dpl_id'
141 EXECUTION_ID = 'test_clamp_policy_exe_id'
143 def __init__(self, node_id, node_name, node_type, properties, relationships=None):
144 self.node_id = node_id
145 self.node_name = node_name
146 self.ctx = MockCloudifyContextFull(
147 node_id=self.node_id,
148 node_name=self.node_name,
150 blueprint_id=MonkeyedNode.BLUEPRINT_ID,
151 deployment_id=MonkeyedNode.DEPLOYMENT_ID,
152 execution_id=MonkeyedNode.EXECUTION_ID,
153 properties=properties,
154 relationships=relationships
156 MonkeyedLogHandler.add_handler_to(self.ctx.logger)