+def make_iso_timestamp():
+ """
+ Creates a timestamp in ISO 8601 format in UTC format. Used for JSON output.
+ """
+ now = datetime.datetime.utcnow()
+ now.replace(tzinfo=datetime.timezone.utc)
+ return now.isoformat()
+
+
+def aggregate_requirement_adherence(r_id, collection_failures, test_results):
+ """
+ Examines all tests associated with a given requirement and determines
+ the aggregate result (PASS, FAIL, ERROR, or SKIP) for the requirement.
+
+ * ERROR - At least one ERROR occurred
+ * PASS - At least one PASS and no FAIL or ERRORs.
+ * FAIL - At least one FAIL occurred (no ERRORs)
+ * SKIP - All tests were SKIP
+
+
+ :param r_id: Requirement ID to examing
+ :param collection_failures: Errors that occurred during test setup.
+ :param test_results: List of TestResult
+ :return: 'PASS', 'FAIL', 'SKIP', or 'ERROR'
+ """
+ errors = any(r_id in f["requirements"] for f in collection_failures)
+ outcomes = set(r.outcome for r in test_results if r_id in r.requirement_ids)
+ return aggregate_results(errors, outcomes, r_id)
+
+
+def aggregate_results(has_errors, outcomes, r_id=None):
+ """
+ Determines the aggregate result for the conditions provided. Assumes the
+ results have been filtered and collected for analysis.
+
+ :param has_errors: True if collection failures occurred for the tests being
+ analyzed.
+ :param outcomes: set of outcomes from the TestResults
+ :param r_id: Optional requirement ID if known
+ :return: 'ERROR', 'PASS', 'FAIL', or 'SKIP'
+ (see aggregate_requirement_adherence for more detail)
+ """
+ if has_errors:
+ return "ERROR"
+
+ if not outcomes:
+ return "PASS"
+ elif "FAIL" in outcomes:
+ return "FAIL"
+ elif "PASS" in outcomes:
+ return "PASS"
+ elif {"SKIP"} == outcomes:
+ return "SKIP"
+ else:
+ pytest.warns(
+ "Unexpected error aggregating outcomes ({}) for requirement {}".format(
+ outcomes, r_id
+ )
+ )
+ return "ERROR"
+
+
+def aggregate_run_results(collection_failures, test_results):
+ """
+ Determines overall status of run based on all failures and results.
+
+ * 'ERROR' - At least one collection failure occurred during the run.
+ * 'FAIL' - Template failed at least one test
+ * 'PASS' - All tests executed properly and no failures were detected
+
+ :param collection_failures: failures occuring during test setup
+ :param test_results: list of all test executuion results
+ :return: one of 'ERROR', 'FAIL', or 'PASS'
+ """
+ if collection_failures:
+ return "ERROR"
+ elif any(r.is_failed for r in test_results):
+ return "FAIL"
+ else:
+ return "PASS"
+
+
+def error(failure_or_result):
+ """
+ Extracts the error message from a collection failure or test result
+ :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
+ :return: Error message as string
+ """
+ if isinstance(failure_or_result, TestResult):
+ return failure_or_result.error_message
+ else:
+ return failure_or_result["error"]
+
+
+def req_ids(failure_or_result):
+ """
+ Extracts the requirement IDs from a collection failure or test result
+ :param failure_or_result: Entry from COLLECTION_FAILURE or a TestResult
+ :return: set of Requirement IDs. If no requirements mapped, then an empty set
+ """
+ if isinstance(failure_or_result, TestResult):
+ return set(failure_or_result.requirement_ids)
+ else:
+ return set(failure_or_result["requirements"])
+
+
+def collect_errors(r_id, collection_failures, test_result):
+ """
+ Creates a list of error messages from the collection failures and
+ test results. If r_id is provided, then it collects the error messages
+ where the failure or test is associated with that requirement ID. If
+ r_id is None, then it collects all errors that occur on failures and
+ results that are not mapped to requirements
+ """
+
+ def selector(item):
+ if r_id:
+ return r_id in req_ids(item)
+ else:
+ return not req_ids(item)
+
+ errors = (error(x) for x in chain(collection_failures, test_result) if selector(x))
+ return [e for e in errors if e]
+
+
+def relative_paths(base_dir, paths):
+ return [os.path.relpath(p, base_dir) for p in paths]
+
+
+def generate_json(outpath, template_path, categories):
+ """
+ Creates a JSON summary of the entire test run.
+ """
+ reqs = load_current_requirements()
+ data = {
+ "version": "dublin",
+ "template_directory": os.path.splitdrive(template_path)[1].replace(
+ os.path.sep, "/"
+ ),
+ "timestamp": make_iso_timestamp(),
+ "checksum": hash_directory(template_path),
+ "categories": categories,
+ "outcome": aggregate_run_results(COLLECTION_FAILURES, ALL_RESULTS),
+ "tests": [],
+ "requirements": [],
+ }
+
+ results = data["tests"]
+ for result in COLLECTION_FAILURES:
+ results.append(
+ {
+ "files": [],
+ "test_module": result["module"],
+ "test_case": result["test"],
+ "result": "ERROR",
+ "error": result["error"],
+ "requirements": result["requirements"],
+ }
+ )
+ for result in ALL_RESULTS:
+ results.append(
+ {
+ "files": relative_paths(template_path, result.files),
+ "test_module": result.test_module,
+ "test_case": result.test_case,
+ "result": result.outcome,
+ "error": result.error_message if result.is_failed else "",
+ "requirements": result.requirements_metadata(reqs),
+ }
+ )
+
+ requirements = data["requirements"]
+ for r_id, r_data in reqs.items():
+ result = aggregate_requirement_adherence(r_id, COLLECTION_FAILURES, ALL_RESULTS)
+ if result:
+ requirements.append(
+ {
+ "id": r_id,
+ "text": r_data["description"],
+ "keyword": r_data["keyword"],
+ "result": result,
+ "errors": collect_errors(r_id, COLLECTION_FAILURES, ALL_RESULTS),
+ }
+ )
+ # If there are tests that aren't mapped to a requirement, then we'll
+ # map them to a special entry so the results are coherent.
+ unmapped_outcomes = {r.outcome for r in ALL_RESULTS if not r.requirement_ids}
+ has_errors = any(not f["requirements"] for f in COLLECTION_FAILURES)
+ if unmapped_outcomes or has_errors:
+ requirements.append(