operation = OPERATIONS.get(prop.get('operator'))
attribute_value = candidate.get(attribute)
- if not operation(attribute_value, threshold):
+ if not attribute_value or not operation(attribute_value, threshold):
conflict_list.append(candidate)
- continue
+ break
filtered_candidates = [c for c in _candidate_list if c not in conflict_list]
- return filtered_candidates
+ return filtered_candidates
\ No newline at end of file
"area_traffic_cap_ul":null,
"inventory_provider":"aai",
"exp_data_rate_ul":100,
+ "exp_data_rate_dl":100,
"max_number_of_ues":0,
"ue_mobility_level":"stationary",
"candidate_type":"nssi",
"traffic_density":0,
"payload_size":0,
- "exp_data_rate_dl":100,
"jitter":0,
"survival_time":0,
"resource_sharing_level":"0",
candidates_file = './conductor/tests/unit/data/plugins/inventory_provider/nssi_candidate.json'
candidates = json.loads(open(candidates_file).read())
-
+ # test 1
properties = {'evaluate':
[{'attribute': 'latency', 'threshold': 30, 'operator': 'lte'},
{'attribute': 'exp_data_rate_ul', 'threshold': 70, 'operator': 'gte'}]}
self.assertEqual(candidates, threshold_obj.solve(decision_path, candidates, None))
+ # test 2
properties = {'evaluate':
[{'attribute': 'latency', 'threshold': 10, 'operator': 'lte'},
{'attribute': 'exp_data_rate_ul', 'threshold': 120, 'operator': 'gte'}]}
self.assertEqual([], threshold_obj.solve(decision_path, candidates, None))
+ # test 3
+ properties = {'evaluate':
+ [{'attribute': 'latency', 'threshold': 10, 'operator': 'lte'},
+ {'attribute': 'area_traffic_cap', 'threshold': 120, 'operator': 'gte'}]}
+
+ threshold_obj = Threshold('urllc_threshold', 'threshold', ['URLLC'], _priority=0,
+ _properties=properties)
+
+ self.assertEqual([], threshold_obj.solve(decision_path, candidates, None))
+
if __name__ == "__main__":
unittest.main()