+def make_iso_timestamp():
+ """
+ Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
+ """
+ now = datetime.datetime.utcnow()
+ now.replace(tzinfo=datetime.timezone.utc)
+ return now.isoformat()
+
+
+def aggregate_requirement_adherence(r_id, collection_failures, test_results):
+ """
+ Examines all tests associated with a given requirement and determines
+ the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
+
+ * ERROR - At least one ERROR occurred
+ * PASS - At least one PASS and no FAIL or ERRORs.
+ * FAIL - At least one FAIL occurred (no ERRORs)
+ * SKIP - All tests were SKIP
+
+
+ :param r_id: Requirement ID to examing
+ :param collection_failures: Errors that occurred during test setup.
+ :param test_results: List of TestResult
+ :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
+ """
+ errors = any(r_id in f["requirements"] for f in collection_failures)
+ outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
+ return aggregate_results(errors, outcomes, r_id)
+
+
+def aggregate_results(has_errors, outcomes, r_id=None):
+ """
+ Determines the aggregate result for the conditions provided. Assumes the
+ results have been filtered and collected for analysis.
+
+ :param has_errors: True if collection failures occurred for the tests being
+ analyzed.
+ :param outcomes: set of outcomes from the TestResults
+ :param r_id: Optional requirement ID if known
+ :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
+ (see aggregate_requirement_adherence for more detail)
+ """
+ if has_errors:
+ return "ERROR"
+
+ if not outcomes:
+ return "PASS"
+ elif "FAIL" in outcomes:
+ return "FAIL"
+ elif "PASS" in outcomes:
+ return "PASS"
+ elif {"SKIP"} == outcomes:
+ return "SKIP"
+ else:
+ pytest.warns(
+ "Unexpected error aggregating outcomes ({}) for requirement {}".format(
+ outcomes, r_id
+ )
+ )
+ return "ERROR"
+
+
+def aggregate_run_results(collection_failures, test_results):
+ """
+ Determines overall status of run based on all failures and results.
+
+ * 'ERROR' - At least one collection failure occurred during the run.
+ * 'FAIL' - Template failed at least one test
+ * 'PASS' - All tests executed properly and no failures were detected
+
+ :param collection_failures: failures occuring during test setup
+ :param test_results: list of all test executuion results
+ :return: one of 'ERROR', 'FAIL', or 'PASS'
+ """
+ if collection_failures:
+ return "ERROR"
+ elif any(r.is_failed for r in test_results):
+ return "FAIL"
+ else:
+ return "PASS"
+
+
+def error(failure_or_result):
+ """
+ Extracts the error message from a collection failure or test result
+ :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
+ :return: Error message as string
+ """
+ if isinstance(failure_or_result, TestResult):
+ return failure_or_result.error_message
+ else:
+ return failure_or_result["error"]
+
+
+def req_ids(failure_or_result):
+ """
+ Extracts the requirement IDs from a collection failure or test result
+ :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
+ :return: set of Requirement IDs. If no requirements mapped, then an empty set
+ """
+ if isinstance(failure_or_result, TestResult):
+ return set(failure_or_result.requirement_ids)
+ else:
+ return set(failure_or_result["requirements"])
+
+
+def collect_errors(r_id, collection_failures, test_result):
+ """
+ Creates a list of error messages from the collection failures and
+ test results. If r_id is provided, then it collects the error messages
+ where the failure or test is associated with that requirement ID. If
+ r_id is None, then it collects all errors that occur on failures and
+ results that are not mapped to requirements
+ """
+
+ def selector(item):
+ if r_id:
+ return r_id in req_ids(item)
+ else:
+ return not req_ids(item)
+
+ errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
+ return [e for e in errors if e]
+
+
+def generate_json(outpath, template_path, categories):
+ """
+ Creates a JSON summary of the entire test run.
+ """
+ reqs = load_current_requirements()
+ data = {
+ "version": "dublin",
+ "template_directory": template_path,
+ "timestamp": make_iso_timestamp(),
+ "checksum": hash_directory(template_path),
+ "categories": categories,
+ "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
+ "tests": [],
+ "requirements": [],
+ }
+
+ results = data["tests"]
+ for result in COLLECTION_FAILURES:
+ results.append(