From: Fiete Ostkamp Date: Wed, 4 Mar 2026 14:38:54 +0000 (+0100) Subject: Increase py-executor test coverage X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=447c36019676e220a02c43f3c1f966e14e05e6ed;p=ccsdk%2Fcds.git Increase py-executor test coverage Issue-ID: CCSDK-4158 Change-Id: I9602a1c40b5d8574f829388003dc9baf2ea8dd72 Signed-off-by: Fiete Ostkamp --- diff --git a/.gitignore b/.gitignore index 8b3e0c3d1..a25b18906 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ # Python **/*.pyc +**/*.egg-info/ **/.apt_generated # Logs @@ -161,3 +162,4 @@ MacOS # Generated dependency list direct-dependencies.txt +.coverage diff --git a/ms/py-executor/blueprints_grpc/tests/__init__.py b/ms/py-executor/blueprints_grpc/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ms/py-executor/blueprints_grpc/tests/test_blueprint_processing_server.py b/ms/py-executor/blueprints_grpc/tests/test_blueprint_processing_server.py new file mode 100644 index 000000000..09ace3fb4 --- /dev/null +++ b/ms/py-executor/blueprints_grpc/tests/test_blueprint_processing_server.py @@ -0,0 +1,164 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for blueprints_grpc/blueprint_processing_server.py — +AbstractScriptFunction and BluePrintProcessingServer. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from google.protobuf import struct_pb2 +from proto.BluePrintProcessing_pb2 import ( + ExecutionServiceInput, + ExecutionServiceOutput, +) +from proto.BluePrintCommon_pb2 import ( + CommonHeader, + ActionIdentifiers, +) + +from blueprints_grpc.blueprint_processing_server import ( + AbstractScriptFunction, + BluePrintProcessingServer, +) +from blueprints_grpc.script_executor_configuration import ScriptExecutorConfiguration + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_input(bp_name="test-bp", bp_version="1.0.0", action_name="TestAction", + request_id="req-1"): + inp = ExecutionServiceInput() + inp.commonHeader.requestId = request_id + inp.commonHeader.subRequestId = "sub-1" + inp.commonHeader.originatorId = "CDS" + inp.actionIdentifiers.blueprintName = bp_name + inp.actionIdentifiers.blueprintVersion = bp_version + inp.actionIdentifiers.actionName = action_name + inp.payload.update({"key": "value"}) + return inp + + +# --------------------------------------------------------------------------- +# AbstractScriptFunction +# --------------------------------------------------------------------------- + +class TestAbstractScriptFunction: + + def test_set_context(self): + func = AbstractScriptFunction() + mock_ctx = MagicMock() + func.set_context(mock_ctx) + assert func.context is mock_ctx + + def test_process_returns_none(self): + func = AbstractScriptFunction() + assert func.process(MagicMock()) is None + + def test_recover_returns_none(self): + func = AbstractScriptFunction() + assert func.recover(RuntimeError("test"), MagicMock()) is None + + +# --------------------------------------------------------------------------- +# BluePrintProcessingServer +# --------------------------------------------------------------------------- + +class TestBluePrintProcessingServer: + + @pytest.fixture + def config(self, tmp_path): + config_file = tmp_path / "config.ini" + config_file.write_text( + "[blueprintsprocessor]\nblueprintDeployPath=/opt/blueprints\n" + "[scriptExecutor]\nport=50052\n" + ) + return ScriptExecutorConfiguration(str(config_file)) + + def test_init_stores_configuration(self, config): + server = BluePrintProcessingServer(config) + assert server.configuration is config + + def test_init_creates_logger(self, config): + server = BluePrintProcessingServer(config) + assert server.logger is not None + assert server.logger.name == "BluePrintProcessingServer" + + @patch("blueprints_grpc.blueprint_processing_server.instance_for_input") + def test_process_yields_from_instance(self, mock_instance_for_input, config): + """Verify process iterates over requests, creates instances, and yields responses.""" + # Create a mock script function instance that yields two responses + mock_script = MagicMock(spec=AbstractScriptFunction) + response1 = ExecutionServiceOutput() + response1.commonHeader.requestId = "resp-1" + response2 = ExecutionServiceOutput() + response2.commonHeader.requestId = "resp-2" + mock_script.process.return_value = iter([response1, response2]) + mock_instance_for_input.return_value = mock_script + + server = BluePrintProcessingServer(config) + request = _make_input() + context = MagicMock() + + responses = list(server.process(iter([request]), context)) + + mock_instance_for_input.assert_called_once_with(config, request) + mock_script.set_context.assert_called_once_with(context) + mock_script.process.assert_called_once_with(request) + assert len(responses) == 2 + assert responses[0].commonHeader.requestId == "resp-1" + assert responses[1].commonHeader.requestId == "resp-2" + + @patch("blueprints_grpc.blueprint_processing_server.instance_for_input") + def test_process_handles_multiple_requests(self, mock_instance_for_input, config): + """Multiple requests should each get their own instance.""" + mock_script1 = MagicMock(spec=AbstractScriptFunction) + resp1 = ExecutionServiceOutput() + resp1.commonHeader.requestId = "r1" + mock_script1.process.return_value = iter([resp1]) + + mock_script2 = MagicMock(spec=AbstractScriptFunction) + resp2 = ExecutionServiceOutput() + resp2.commonHeader.requestId = "r2" + mock_script2.process.return_value = iter([resp2]) + + mock_instance_for_input.side_effect = [mock_script1, mock_script2] + + server = BluePrintProcessingServer(config) + req1 = _make_input(request_id="req-1") + req2 = _make_input(request_id="req-2") + context = MagicMock() + + responses = list(server.process(iter([req1, req2]), context)) + + assert len(responses) == 2 + assert mock_instance_for_input.call_count == 2 + + @patch("blueprints_grpc.blueprint_processing_server.instance_for_input") + def test_process_empty_iterator(self, mock_instance_for_input, config): + """An empty request iterator should yield no responses.""" + server = BluePrintProcessingServer(config) + context = MagicMock() + + responses = list(server.process(iter([]), context)) + + assert responses == [] + mock_instance_for_input.assert_not_called() diff --git a/ms/py-executor/blueprints_grpc/tests/test_executor_utils.py b/ms/py-executor/blueprints_grpc/tests/test_executor_utils.py new file mode 100644 index 000000000..c4adb5d8f --- /dev/null +++ b/ms/py-executor/blueprints_grpc/tests/test_executor_utils.py @@ -0,0 +1,382 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for blueprints_grpc/executor_utils.py — utility functions for building +gRPC responses, loading dynamic script instances, and working with protobuf +messages. +""" + +import json +import re +import tempfile +import os +from unittest.mock import patch, MagicMock + +import pytest + +from proto.BluePrintCommon_pb2 import ( + EVENT_COMPONENT_EXECUTED, + EVENT_COMPONENT_NOTIFICATION, + EVENT_COMPONENT_PROCESSING, + EVENT_COMPONENT_TRACE, + ActionIdentifiers, + CommonHeader, +) +from proto.BluePrintProcessing_pb2 import ( + ExecutionServiceInput, + ExecutionServiceOutput, +) +from google.protobuf import json_format + +from blueprints_grpc.executor_utils import ( + current_time, + blueprint_id, + blueprint_location, + instance_for_input, + log_response, + send_notification, + ack_response, + success_response, + failure_response, + create_response_payload_from_json, +) +from blueprints_grpc.script_executor_configuration import ScriptExecutorConfiguration + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_input(bp_name="test-bp", bp_version="1.0.0", action_name="TestAction", + request_id="req-1", sub_request_id="sub-1", originator_id="CDS"): + """Create a properly populated ExecutionServiceInput.""" + inp = ExecutionServiceInput() + inp.commonHeader.requestId = request_id + inp.commonHeader.subRequestId = sub_request_id + inp.commonHeader.originatorId = originator_id + inp.actionIdentifiers.blueprintName = bp_name + inp.actionIdentifiers.blueprintVersion = bp_version + inp.actionIdentifiers.actionName = action_name + return inp + + +# --------------------------------------------------------------------------- +# current_time +# --------------------------------------------------------------------------- + +class TestCurrentTime: + + def test_returns_string(self): + result = current_time() + assert isinstance(result, str) + + def test_format_is_iso_like(self): + result = current_time() + # Pattern: YYYY-MM-DDTHH:MM:SS.ffffffZ + assert re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z", result) + + +# --------------------------------------------------------------------------- +# blueprint_id +# --------------------------------------------------------------------------- + +class TestBlueprintId: + + def test_returns_name_slash_version(self): + inp = _make_input(bp_name="my-bp", bp_version="2.0.0") + assert blueprint_id(inp) == "my-bp/2.0.0" + + def test_different_values(self): + inp = _make_input(bp_name="other", bp_version="3.1.0") + assert blueprint_id(inp) == "other/3.1.0" + + +# --------------------------------------------------------------------------- +# blueprint_location +# --------------------------------------------------------------------------- + +class TestBlueprintLocation: + + def test_returns_deploy_path_plus_name_version(self, tmp_path): + config_file = tmp_path / "config.ini" + config_file.write_text( + "[blueprintsprocessor]\nblueprintDeployPath=/opt/blueprints\n" + "[scriptExecutor]\nport=50052\n" + ) + config = ScriptExecutorConfiguration(str(config_file)) + inp = _make_input(bp_name="test-bp", bp_version="1.0.0") + + result = blueprint_location(config, inp) + assert result == "/opt/blueprints/test-bp/1.0.0" + + +# --------------------------------------------------------------------------- +# instance_for_input +# --------------------------------------------------------------------------- + +class TestInstanceForInput: + + def test_loads_class_from_sample_cba(self): + """Use the sample-cba test resource to verify dynamic module loading.""" + # The sample CBA is at test/resources/sample-cba/1.0.0/Scripts/python/__init__.py + # and the action class is SampleScript + test_resources_dir = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "test", "resources" + ) + + config_file = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "..", "configuration-local.ini" + ) + + if not os.path.exists(config_file): + pytest.skip("configuration-local.ini not found") + + config = ScriptExecutorConfiguration(config_file) + inp = _make_input(bp_name="sample-cba", bp_version="1.0.0", action_name="SampleScript") + + instance = instance_for_input(config, inp) + # It should have the process method from AbstractScriptFunction subclass + assert hasattr(instance, "process") + assert instance.__class__.__name__ == "SampleScript" + + def test_raises_on_missing_module(self, tmp_path): + """If the script file doesn't exist, loading should fail.""" + config_file = tmp_path / "config.ini" + config_file.write_text( + f"[blueprintsprocessor]\nblueprintDeployPath={tmp_path}\n" + "[scriptExecutor]\nport=50052\n" + ) + config = ScriptExecutorConfiguration(str(config_file)) + inp = _make_input(bp_name="nonexistent", bp_version="0.0.0", action_name="Missing") + + with pytest.raises((FileNotFoundError, ModuleNotFoundError)): + instance_for_input(config, inp) + + def test_raises_on_missing_action_class(self, tmp_path): + """If the script file exists but the action class doesn't, should raise AttributeError.""" + bp_dir = tmp_path / "mybp" / "1.0.0" / "Scripts" / "python" + bp_dir.mkdir(parents=True) + (bp_dir / "__init__.py").write_text("class Exists:\n pass\n") + + config_file = tmp_path / "config.ini" + config_file.write_text( + f"[blueprintsprocessor]\nblueprintDeployPath={tmp_path}\n" + "[scriptExecutor]\nport=50052\n" + ) + config = ScriptExecutorConfiguration(str(config_file)) + inp = _make_input(bp_name="mybp", bp_version="1.0.0", action_name="NonExistentClass") + + with pytest.raises(AttributeError): + instance_for_input(config, inp) + + +# --------------------------------------------------------------------------- +# log_response +# --------------------------------------------------------------------------- + +class TestLogResponse: + + def test_returns_execution_output(self): + inp = _make_input() + result = log_response(inp, "test message") + assert isinstance(result, ExecutionServiceOutput) + + def test_event_type_is_trace(self): + inp = _make_input() + result = log_response(inp, "trace msg") + assert result.status.eventType == EVENT_COMPONENT_TRACE + + def test_payload_contains_message(self): + inp = _make_input() + result = log_response(inp, "hello world") + payload_dict = json_format.MessageToDict(result.payload) + assert payload_dict["message"] == "hello world" + + def test_preserves_common_header(self): + inp = _make_input(request_id="req-99") + result = log_response(inp, "msg") + assert result.commonHeader.requestId == "req-99" + + def test_preserves_action_identifiers(self): + inp = _make_input(bp_name="bp1", action_name="Act1") + result = log_response(inp, "msg") + assert result.actionIdentifiers.blueprintName == "bp1" + assert result.actionIdentifiers.actionName == "Act1" + + def test_timestamp_is_set(self): + inp = _make_input() + result = log_response(inp, "msg") + assert result.status.timestamp != "" + + +# --------------------------------------------------------------------------- +# send_notification +# --------------------------------------------------------------------------- + +class TestSendNotification: + + def test_returns_execution_output(self): + inp = _make_input() + result = send_notification(inp, "notification msg") + assert isinstance(result, ExecutionServiceOutput) + + def test_event_type_is_notification(self): + inp = _make_input() + result = send_notification(inp, "notify") + assert result.status.eventType == EVENT_COMPONENT_NOTIFICATION + + def test_payload_contains_message(self): + inp = _make_input() + result = send_notification(inp, "alert!") + payload_dict = json_format.MessageToDict(result.payload) + assert payload_dict["message"] == "alert!" + + +# --------------------------------------------------------------------------- +# ack_response +# --------------------------------------------------------------------------- + +class TestAckResponse: + + def test_returns_execution_output(self): + inp = _make_input() + result = ack_response(inp) + assert isinstance(result, ExecutionServiceOutput) + + def test_event_type_is_processing(self): + inp = _make_input() + result = ack_response(inp) + assert result.status.eventType == EVENT_COMPONENT_PROCESSING + + def test_preserves_action_identifiers(self): + inp = _make_input(bp_name="ack-bp", bp_version="2.0.0") + result = ack_response(inp) + assert result.actionIdentifiers.blueprintName == "ack-bp" + assert result.actionIdentifiers.blueprintVersion == "2.0.0" + + def test_timestamp_is_set(self): + inp = _make_input() + result = ack_response(inp) + assert result.status.timestamp != "" + + +# --------------------------------------------------------------------------- +# success_response +# --------------------------------------------------------------------------- + +class TestSuccessResponse: + + def test_returns_execution_output(self): + inp = _make_input() + result = success_response(inp, {"key": "val"}, 200) + assert isinstance(result, ExecutionServiceOutput) + + def test_event_type_is_executed(self): + inp = _make_input() + result = success_response(inp, {}, 200) + assert result.status.eventType == EVENT_COMPONENT_EXECUTED + + def test_status_code(self): + inp = _make_input() + result = success_response(inp, {}, 200) + assert result.status.code == 200 + + def test_status_message_is_success(self): + inp = _make_input() + result = success_response(inp, {}, 200) + assert result.status.message == "success" + + def test_payload_contains_action_response(self): + inp = _make_input(action_name="MyAction") + result = success_response(inp, {"result": "ok"}, 200) + payload_dict = json_format.MessageToDict(result.payload) + assert "MyAction-response" in payload_dict + assert payload_dict["MyAction-response"]["result"] == "ok" + + def test_custom_status_code(self): + inp = _make_input() + result = success_response(inp, {}, 201) + assert result.status.code == 201 + + +# --------------------------------------------------------------------------- +# failure_response +# --------------------------------------------------------------------------- + +class TestFailureResponse: + + def test_returns_execution_output(self): + inp = _make_input() + result = failure_response(inp, {}, 500, "it broke") + assert isinstance(result, ExecutionServiceOutput) + + def test_event_type_is_executed(self): + inp = _make_input() + result = failure_response(inp, {}, 500, "error") + assert result.status.eventType == EVENT_COMPONENT_EXECUTED + + def test_status_code(self): + inp = _make_input() + result = failure_response(inp, {}, 503, "unavailable") + assert result.status.code == 503 + + def test_status_message_is_failure(self): + inp = _make_input() + result = failure_response(inp, {}, 500, "bad") + assert result.status.message == "failure" + + def test_error_message(self): + inp = _make_input() + result = failure_response(inp, {}, 500, "disk full") + assert result.status.errorMessage == "disk full" + + def test_payload_contains_action_response(self): + inp = _make_input(action_name="FailAction") + result = failure_response(inp, {"err": "details"}, 500, "error") + payload_dict = json_format.MessageToDict(result.payload) + assert "FailAction-response" in payload_dict + assert payload_dict["FailAction-response"]["err"] == "details" + + +# --------------------------------------------------------------------------- +# create_response_payload_from_json +# --------------------------------------------------------------------------- + +class TestCreateResponsePayloadFromJson: + + def test_returns_struct(self): + from google.protobuf import struct_pb2 + result = create_response_payload_from_json("action", {"key": "val"}) + assert isinstance(result, struct_pb2.Struct) + + def test_wraps_in_action_response_key(self): + result = create_response_payload_from_json("DoThing", {"x": 1}) + result_dict = json_format.MessageToDict(result) + assert "DoThing-response" in result_dict + + def test_empty_properties(self): + result = create_response_payload_from_json("Empty", {}) + result_dict = json_format.MessageToDict(result) + assert "Empty-response" in result_dict + + def test_nested_properties(self): + props = {"outer": {"inner": "deep"}} + result = create_response_payload_from_json("Nested", props) + result_dict = json_format.MessageToDict(result) + assert result_dict["Nested-response"]["outer"]["inner"] == "deep" diff --git a/ms/py-executor/blueprints_grpc/tests/test_request_header_validator_interceptor.py b/ms/py-executor/blueprints_grpc/tests/test_request_header_validator_interceptor.py new file mode 100644 index 000000000..1562adf0c --- /dev/null +++ b/ms/py-executor/blueprints_grpc/tests/test_request_header_validator_interceptor.py @@ -0,0 +1,132 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for blueprints_grpc/request_header_validator_interceptor.py — +gRPC server-side interceptor for authorization header validation. +""" + +from unittest.mock import MagicMock + +import grpc +import pytest + +from blueprints_grpc.request_header_validator_interceptor import ( + _unary_unary_rpc_terminator, + RequestHeaderValidatorInterceptor, +) + + +# --------------------------------------------------------------------------- +# _unary_unary_rpc_terminator +# --------------------------------------------------------------------------- + +class TestUnaryUnaryRpcTerminator: + + def test_returns_rpc_method_handler(self): + handler = _unary_unary_rpc_terminator( + grpc.StatusCode.UNAUTHENTICATED, "Access denied" + ) + assert handler is not None + + def test_handler_aborts_context(self): + handler = _unary_unary_rpc_terminator( + grpc.StatusCode.UNAUTHENTICATED, "No access" + ) + context = MagicMock() + # The handler wraps a function; invoke the unary_unary handler + handler.unary_unary(MagicMock(), context) + context.abort.assert_called_once_with( + grpc.StatusCode.UNAUTHENTICATED, "No access" + ) + + +# --------------------------------------------------------------------------- +# RequestHeaderValidatorInterceptor +# --------------------------------------------------------------------------- + +class TestRequestHeaderValidatorInterceptor: + + def test_init_stores_values(self): + interceptor = RequestHeaderValidatorInterceptor( + "authorization", "Bearer token123", + grpc.StatusCode.UNAUTHENTICATED, "Denied" + ) + assert interceptor._header == "authorization" + assert interceptor._value == "Bearer token123" + + def test_valid_header_continues(self): + interceptor = RequestHeaderValidatorInterceptor( + "authorization", "Bearer valid", + grpc.StatusCode.UNAUTHENTICATED, "Denied" + ) + continuation = MagicMock() + handler_call_details = MagicMock() + handler_call_details.invocation_metadata = [ + ("authorization", "Bearer valid"), + ("other-header", "other-value"), + ] + + result = interceptor.intercept_service(continuation, handler_call_details) + + continuation.assert_called_once_with(handler_call_details) + assert result == continuation.return_value + + def test_missing_header_returns_terminator(self): + interceptor = RequestHeaderValidatorInterceptor( + "authorization", "Bearer valid", + grpc.StatusCode.UNAUTHENTICATED, "Denied" + ) + continuation = MagicMock() + handler_call_details = MagicMock() + handler_call_details.invocation_metadata = [ + ("other-header", "other-value"), + ] + + result = interceptor.intercept_service(continuation, handler_call_details) + + continuation.assert_not_called() + assert result is interceptor._terminator + + def test_wrong_value_returns_terminator(self): + interceptor = RequestHeaderValidatorInterceptor( + "authorization", "Bearer correct", + grpc.StatusCode.UNAUTHENTICATED, "Denied" + ) + continuation = MagicMock() + handler_call_details = MagicMock() + handler_call_details.invocation_metadata = [ + ("authorization", "Bearer wrong"), + ] + + result = interceptor.intercept_service(continuation, handler_call_details) + + continuation.assert_not_called() + assert result is interceptor._terminator + + def test_empty_metadata_returns_terminator(self): + interceptor = RequestHeaderValidatorInterceptor( + "authorization", "Bearer token", + grpc.StatusCode.UNAUTHENTICATED, "No creds" + ) + continuation = MagicMock() + handler_call_details = MagicMock() + handler_call_details.invocation_metadata = [] + + result = interceptor.intercept_service(continuation, handler_call_details) + + continuation.assert_not_called() + assert result is interceptor._terminator diff --git a/ms/py-executor/blueprints_grpc/tests/test_script_executor_configuration.py b/ms/py-executor/blueprints_grpc/tests/test_script_executor_configuration.py new file mode 100644 index 000000000..286da1928 --- /dev/null +++ b/ms/py-executor/blueprints_grpc/tests/test_script_executor_configuration.py @@ -0,0 +1,106 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for blueprints_grpc/script_executor_configuration.py — +INI-based configuration reader for the script executor. +""" + +import os +import pytest + +from blueprints_grpc.script_executor_configuration import ScriptExecutorConfiguration + + +# --------------------------------------------------------------------------- +# ScriptExecutorConfiguration tests +# --------------------------------------------------------------------------- + +class TestScriptExecutorConfiguration: + + @pytest.fixture + def config_file(self, tmp_path): + """Create a minimal config file and return its path.""" + p = tmp_path / "test-config.ini" + p.write_text( + "[scriptExecutor]\n" + "port=50052\n" + "authType=basic-auth\n" + "token=my-secret-token\n" + "logFile=app.log\n" + "maxWorkers=10\n" + "\n" + "[blueprintsprocessor]\n" + "blueprintDeployPath=/opt/blueprints/deploy\n" + "blueprintArchivePath=/opt/blueprints/archive\n" + ) + return str(p) + + def test_init_reads_file(self, config_file): + config = ScriptExecutorConfiguration(config_file) + assert config.config is not None + + def test_get_section(self, config_file): + config = ScriptExecutorConfiguration(config_file) + section = config.get_section("scriptExecutor") + assert section["port"] == "50052" + assert section["authtype"] == "basic-auth" + + def test_get_property(self, config_file): + config = ScriptExecutorConfiguration(config_file) + assert config.get_property("scriptExecutor", "port") == "50052" + assert config.get_property("blueprintsprocessor", "blueprintDeployPath") == "/opt/blueprints/deploy" + + def test_script_executor_property(self, config_file): + config = ScriptExecutorConfiguration(config_file) + assert config.script_executor_property("port") == "50052" + assert config.script_executor_property("authType") == "basic-auth" + assert config.script_executor_property("token") == "my-secret-token" + assert config.script_executor_property("maxWorkers") == "10" + + def test_blueprints_processor(self, config_file): + config = ScriptExecutorConfiguration(config_file) + assert config.blueprints_processor("blueprintDeployPath") == "/opt/blueprints/deploy" + assert config.blueprints_processor("blueprintArchivePath") == "/opt/blueprints/archive" + + def test_missing_section_raises(self, config_file): + config = ScriptExecutorConfiguration(config_file) + with pytest.raises(KeyError): + config.get_section("nonexistent") + + def test_missing_property_raises(self, config_file): + config = ScriptExecutorConfiguration(config_file) + from configparser import NoOptionError + with pytest.raises(NoOptionError): + config.get_property("scriptExecutor", "nonexistent_key") + + def test_env_vars_interpolated(self, tmp_path, monkeypatch): + """ConfigParser with os.environ should interpolate env vars.""" + monkeypatch.setenv("TEST_DEPLOY_PATH", "/custom/path") + p = tmp_path / "env-config.ini" + p.write_text( + "[blueprintsprocessor]\n" + "blueprintDeployPath=%(TEST_DEPLOY_PATH)s/blueprints\n" + "[scriptExecutor]\nport=50052\n" + ) + config = ScriptExecutorConfiguration(str(p)) + assert config.blueprints_processor("blueprintDeployPath") == "/custom/path/blueprints" + + def test_nonexistent_file_results_in_empty_config(self, tmp_path): + """ConfigParser.read silently ignores missing files.""" + config = ScriptExecutorConfiguration(str(tmp_path / "does-not-exist.ini")) + # Config is initialized but has no sections + assert config.config.sections() == [] diff --git a/ms/py-executor/resource_resolution/tests/grpc_client_extended_test.py b/ms/py-executor/resource_resolution/tests/grpc_client_extended_test.py new file mode 100644 index 000000000..07a616753 --- /dev/null +++ b/ms/py-executor/resource_resolution/tests/grpc_client_extended_test.py @@ -0,0 +1,140 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Extended tests for resource_resolution/grpc/client.py — covers secure channel, +header auth, process method, close, and context manager protocol. +""" + +from unittest.mock import MagicMock, patch, call + +import pytest + +from resource_resolution.grpc.client import Client + + +class TestClientInit: + + @patch("resource_resolution.grpc.client.insecure_channel") + def test_insecure_channel_created_by_default(self, mock_insecure): + mock_insecure.return_value = MagicMock() + client = Client("localhost:9111") + mock_insecure.assert_called_once_with("localhost:9111") + + @patch("resource_resolution.grpc.client.ssl_channel_credentials") + @patch("resource_resolution.grpc.client.secure_channel") + def test_secure_channel_with_ssl(self, mock_secure, mock_ssl_creds): + mock_secure.return_value = MagicMock() + client = Client( + "localhost:9111", + use_ssl=True, + root_certificates=b"root-cert", + private_key=b"priv-key", + certificate_chain=b"cert-chain", + ) + mock_ssl_creds.assert_called_once_with(b"root-cert", b"priv-key", b"cert-chain") + mock_secure.assert_called_once_with("localhost:9111", mock_ssl_creds.return_value) + + @patch("resource_resolution.grpc.client.intercept_channel") + @patch("resource_resolution.grpc.client.insecure_channel") + def test_header_auth_intercepts_channel(self, mock_insecure, mock_intercept): + mock_insecure.return_value = MagicMock() + mock_intercept.return_value = MagicMock() + client = Client( + "localhost:9111", + use_header_auth=True, + header_auth_token="Bearer xyz", + ) + mock_intercept.assert_called_once() + # The intercepted channel should be used + assert client.channel == mock_intercept.return_value + + @patch("resource_resolution.grpc.client.insecure_channel") + def test_no_header_auth_by_default(self, mock_insecure): + mock_insecure.return_value = MagicMock() + with patch("resource_resolution.grpc.client.intercept_channel") as mock_intercept: + Client("localhost:9111") + mock_intercept.assert_not_called() + + +class TestClientClose: + + @patch("resource_resolution.grpc.client.insecure_channel") + def test_close_calls_channel_close(self, mock_insecure): + mock_channel = MagicMock() + mock_insecure.return_value = mock_channel + client = Client("localhost:9111") + client.close() + mock_channel.close.assert_called_once() + + +class TestClientContextManager: + + @patch("resource_resolution.grpc.client.insecure_channel") + def test_enter_returns_client(self, mock_insecure): + mock_insecure.return_value = MagicMock() + client = Client("localhost:9111") + result = client.__enter__() + assert result is client + + @patch("resource_resolution.grpc.client.insecure_channel") + def test_exit_closes_channel(self, mock_insecure): + mock_channel = MagicMock() + mock_insecure.return_value = mock_channel + client = Client("localhost:9111") + client.__exit__(None, None, None) + mock_channel.close.assert_called_once() + + @patch("resource_resolution.grpc.client.insecure_channel") + def test_with_statement(self, mock_insecure): + mock_channel = MagicMock() + mock_insecure.return_value = mock_channel + with Client("localhost:9111") as c: + assert isinstance(c, Client) + mock_channel.close.assert_called_once() + + +class TestClientProcess: + + @patch("resource_resolution.grpc.client.insecure_channel") + def test_process_yields_responses(self, mock_insecure): + mock_channel = MagicMock() + mock_insecure.return_value = mock_channel + + client = Client("localhost:9111") + + # Mock the stub's process method to return iterable responses + mock_response_1 = MagicMock() + mock_response_2 = MagicMock() + client.stub = MagicMock() + client.stub.process.return_value = iter([mock_response_1, mock_response_2]) + + messages = [MagicMock(), MagicMock()] + responses = list(client.process(messages)) + + assert len(responses) == 2 + assert responses[0] is mock_response_1 + assert responses[1] is mock_response_2 + + @patch("resource_resolution.grpc.client.insecure_channel") + def test_process_empty_messages(self, mock_insecure): + mock_insecure.return_value = MagicMock() + client = Client("localhost:9111") + client.stub = MagicMock() + client.stub.process.return_value = iter([]) + + responses = list(client.process(iter([]))) + assert responses == [] diff --git a/ms/py-executor/resource_resolution/tests/http_client_extended_test.py b/ms/py-executor/resource_resolution/tests/http_client_extended_test.py new file mode 100644 index 000000000..60f7673b8 --- /dev/null +++ b/ms/py-executor/resource_resolution/tests/http_client_extended_test.py @@ -0,0 +1,156 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Extended tests for resource_resolution/http/client.py — covers auth +combinations, protocol selection, URL construction, and send_request +with various parameters. +""" + +from unittest.mock import MagicMock, patch + +import pytest +import requests + +from resource_resolution.http.client import Client + + +class TestClientAuth: + + def test_no_auth_when_both_none(self): + c = Client("127.0.0.1", 8080) + assert c.auth is None + + def test_no_auth_when_only_user(self): + c = Client("127.0.0.1", 8080, auth_user="user") + assert c.auth is None + + def test_no_auth_when_only_pass(self): + c = Client("127.0.0.1", 8080, auth_pass="pass") + assert c.auth is None + + def test_auth_tuple_when_both_provided(self): + c = Client("127.0.0.1", 8080, auth_user="admin", auth_pass="secret") + assert c.auth == ("admin", "secret") + + +class TestClientProtocol: + + def test_http_by_default(self): + c = Client("127.0.0.1", 8080) + assert c.protocol == "http" + + def test_https_when_ssl(self): + c = Client("127.0.0.1", 8443, use_ssl=True) + assert c.protocol == "https" + + +class TestClientUrl: + + def test_http_url(self): + c = Client("myhost", 9090) + assert c.url == "http://myhost:9090/api/v1" + + def test_https_url(self): + c = Client("myhost", 9443, use_ssl=True) + assert c.url == "https://myhost:9443/api/v1" + + +class TestClientSendRequest: + + @patch("resource_resolution.http.client.request") + def test_get_request_without_auth(self, mock_request): + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_request.return_value = mock_response + + c = Client("127.0.0.1", 8080) + result = c.send_request("GET", "test-endpoint") + + mock_request.assert_called_once_with( + method="GET", + url="http://127.0.0.1:8080/api/v1/test-endpoint", + verify=False, + auth=None, + ) + mock_response.raise_for_status.assert_called_once() + assert result is mock_response + + @patch("resource_resolution.http.client.request") + def test_post_request_with_auth(self, mock_request): + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_request.return_value = mock_response + + c = Client("127.0.0.1", 8080, auth_user="user", auth_pass="pass") + result = c.send_request( + "POST", "resource", + headers={"Content-Type": "application/json"}, + data='{"key": "val"}', + ) + + mock_request.assert_called_once_with( + method="POST", + url="http://127.0.0.1:8080/api/v1/resource", + verify=False, + auth=("user", "pass"), + headers={"Content-Type": "application/json"}, + data='{"key": "val"}', + ) + + @patch("resource_resolution.http.client.request") + def test_send_request_raises_on_http_error(self, mock_request): + """raise_for_status should propagate HTTPError.""" + mock_response = MagicMock() + mock_response.raise_for_status.side_effect = requests.HTTPError("404 Not Found") + mock_request.return_value = mock_response + + c = Client("127.0.0.1", 8080) + with pytest.raises(requests.HTTPError): + c.send_request("GET", "missing") + + @patch("resource_resolution.http.client.request") + def test_send_request_with_params(self, mock_request): + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_request.return_value = mock_response + + c = Client("127.0.0.1", 8080) + c.send_request("GET", "search", params={"q": "test"}) + + mock_request.assert_called_once_with( + method="GET", + url="http://127.0.0.1:8080/api/v1/search", + verify=False, + auth=None, + params={"q": "test"}, + ) + + @patch("resource_resolution.http.client.request") + def test_ssl_url_in_request(self, mock_request): + mock_response = MagicMock() + mock_response.raise_for_status = MagicMock() + mock_request.return_value = mock_response + + c = Client("secure-host", 443, use_ssl=True) + c.send_request("GET", "endpoint") + + mock_request.assert_called_once_with( + method="GET", + url="https://secure-host:443/api/v1/endpoint", + verify=False, + auth=None, + ) diff --git a/ms/py-executor/resource_resolution/tests/resource_resolution_extended_test.py b/ms/py-executor/resource_resolution/tests/resource_resolution_extended_test.py new file mode 100644 index 000000000..76a688e42 --- /dev/null +++ b/ms/py-executor/resource_resolution/tests/resource_resolution_extended_test.py @@ -0,0 +1,477 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Extended tests for resource_resolution/resource_resolution.py — +covers context manager, execute_workflows, Template.store, +store_template/retrieve_template with resource_type/resource_id, +and edge cases in _check_template_resolve_params. +""" + +import json +from unittest.mock import MagicMock, patch, PropertyMock + +import pytest + +from proto.BluePrintProcessing_pb2 import ExecutionServiceOutput + +from resource_resolution.resource_resolution import ( + ResourceResolution, + Template, + WorkflowExecution, + WorkflowExecutionResult, + WorkflowMode, +) + + +# --------------------------------------------------------------------------- +# ResourceResolution context manager +# --------------------------------------------------------------------------- + +class TestResourceResolutionContextManager: + + @patch("resource_resolution.resource_resolution.GrpcClient") + def test_enter_creates_grpc_client(self, MockGrpcClient): + rr = ResourceResolution(server_address="10.0.0.1", grpc_server_port=9111) + result = rr.__enter__() + + MockGrpcClient.assert_called_once_with( + server_address="10.0.0.1:9111", + use_ssl=False, + root_certificates=None, + private_key=None, + certificate_chain=None, + use_header_auth=False, + header_auth_token=None, + ) + assert result is rr + assert rr.grpc_client is MockGrpcClient.return_value + + @patch("resource_resolution.resource_resolution.GrpcClient") + def test_exit_closes_grpc_client(self, MockGrpcClient): + rr = ResourceResolution() + rr.__enter__() + rr.__exit__(None, None, None) + + MockGrpcClient.return_value.close.assert_called_once() + + @patch("resource_resolution.resource_resolution.GrpcClient") + def test_with_statement(self, MockGrpcClient): + with ResourceResolution() as rr: + assert rr.grpc_client is MockGrpcClient.return_value + MockGrpcClient.return_value.close.assert_called_once() + + @patch("resource_resolution.resource_resolution.GrpcClient") + def test_enter_with_ssl(self, MockGrpcClient): + rr = ResourceResolution( + server_address="secure.host", + grpc_server_port=9999, + use_ssl=True, + root_certificates=b"root", + private_key=b"key", + certificate_chain=b"chain", + ) + rr.__enter__() + + MockGrpcClient.assert_called_once_with( + server_address="secure.host:9999", + use_ssl=True, + root_certificates=b"root", + private_key=b"key", + certificate_chain=b"chain", + use_header_auth=False, + header_auth_token=None, + ) + + @patch("resource_resolution.resource_resolution.GrpcClient") + def test_enter_with_header_auth(self, MockGrpcClient): + rr = ResourceResolution( + use_header_auth=True, + header_auth_token="Bearer abc", + ) + rr.__enter__() + + call_kwargs = MockGrpcClient.call_args[1] + assert call_kwargs["use_header_auth"] is True + assert call_kwargs["header_auth_token"] == "Bearer abc" + + +# --------------------------------------------------------------------------- +# ResourceResolution.execute_workflows +# --------------------------------------------------------------------------- + +class TestExecuteWorkflows: + + @patch("resource_resolution.resource_resolution.GrpcClient") + def test_executes_single_workflow(self, MockGrpcClient): + mock_response = ExecutionServiceOutput() + mock_response.status.code = 200 + MockGrpcClient.return_value.process.return_value = iter([mock_response]) + + workflow = WorkflowExecution("bp", "1.0", "wf") + + with ResourceResolution() as rr: + results = list(rr.execute_workflows(workflow)) + + assert len(results) == 1 + assert isinstance(results[0], WorkflowExecutionResult) + assert results[0].workflow_execution is workflow + assert results[0].execution_output is mock_response + + @patch("resource_resolution.resource_resolution.GrpcClient") + def test_executes_multiple_workflows(self, MockGrpcClient): + resp1 = ExecutionServiceOutput() + resp1.status.code = 200 + resp2 = ExecutionServiceOutput() + resp2.status.code = 500 + + MockGrpcClient.return_value.process.return_value = iter([resp1, resp2]) + + wf1 = WorkflowExecution("bp1", "1.0", "wf1") + wf2 = WorkflowExecution("bp2", "2.0", "wf2") + + with ResourceResolution() as rr: + results = list(rr.execute_workflows(wf1, wf2)) + + assert len(results) == 2 + assert results[0].workflow_execution is wf1 + assert results[1].workflow_execution is wf2 + assert not results[0].has_error + assert results[1].has_error + + def test_raises_without_client(self): + rr = ResourceResolution() + workflow = WorkflowExecution("bp", "1.0", "wf") + + with pytest.raises(AttributeError, match="gRPC client not connected"): + list(rr.execute_workflows(workflow)) + + +# --------------------------------------------------------------------------- +# _check_template_resolve_params edge cases +# --------------------------------------------------------------------------- + +class TestCheckTemplateResolveParams: + + def test_resolution_key_only_is_valid(self): + rr = ResourceResolution() + # Should not raise + rr._check_template_resolve_params(resolution_key="key1") + + def test_resource_type_and_id_is_valid(self): + rr = ResourceResolution() + rr._check_template_resolve_params(resource_type="vnf", resource_id="123") + + def test_no_params_raises(self): + rr = ResourceResolution() + with pytest.raises(AttributeError): + rr._check_template_resolve_params() + + def test_resource_type_only_raises(self): + rr = ResourceResolution() + with pytest.raises(AttributeError): + rr._check_template_resolve_params(resource_type="vnf") + + def test_resource_id_only_raises(self): + rr = ResourceResolution() + with pytest.raises(AttributeError): + rr._check_template_resolve_params(resource_id="123") + + +# --------------------------------------------------------------------------- +# store_template with resource_type/resource_id +# --------------------------------------------------------------------------- + +class TestStoreTemplate: + + def test_store_with_resolution_key(self): + rr = ResourceResolution() + rr.http_client = MagicMock() + rr.store_template( + blueprint_name="bp", + blueprint_version="1.0", + artifact_name="art", + resolution_key="key1", + result="template_result", + ) + rr.http_client.send_request.assert_called_once_with( + "POST", + "template/bp/1.0/art/key1", + headers={"Content-Type": "application/json"}, + data=json.dumps({"result": "template_result"}), + ) + + def test_store_with_resource_type_and_id(self): + rr = ResourceResolution() + rr.http_client = MagicMock() + rr.store_template( + blueprint_name="bp", + blueprint_version="2.0", + artifact_name="art", + resource_type="vnf", + resource_id="vnf-001", + result="some result", + ) + rr.http_client.send_request.assert_called_once_with( + "POST", + "template/bp/2.0/vnf/vnf-001", + headers={"Content-Type": "application/json"}, + data=json.dumps({"result": "some result"}), + ) + + def test_store_raises_without_valid_params(self): + rr = ResourceResolution() + rr.http_client = MagicMock() + with pytest.raises(AttributeError): + rr.store_template( + blueprint_name="bp", + blueprint_version="1.0", + artifact_name="art", + result="data", + ) + + +# --------------------------------------------------------------------------- +# retrieve_template with resource_type/resource_id +# --------------------------------------------------------------------------- + +class TestRetrieveTemplate: + + def test_retrieve_with_resolution_key(self): + rr = ResourceResolution() + rr.http_client = MagicMock() + mock_response = MagicMock() + mock_response.json.return_value = {"result": "template data"} + rr.http_client.send_request.return_value = mock_response + + template = rr.retrieve_template( + blueprint_name="bp", + blueprint_version="1.0", + artifact_name="art", + resolution_key="key1", + ) + + rr.http_client.send_request.assert_called_once_with( + "GET", + "template", + headers={"Accept": "application/json"}, + params={ + "bpName": "bp", + "bpVersion": "1.0", + "artifactName": "art", + "resolutionKey": "key1", + }, + ) + assert isinstance(template, Template) + assert template.result == "template data" + assert template.blueprint_name == "bp" + assert template.resolution_key == "key1" + + def test_retrieve_with_resource_type_and_id(self): + rr = ResourceResolution() + rr.http_client = MagicMock() + mock_response = MagicMock() + mock_response.json.return_value = {"result": "type+id data"} + rr.http_client.send_request.return_value = mock_response + + template = rr.retrieve_template( + blueprint_name="bp", + blueprint_version="2.0", + artifact_name="art", + resource_type="pnf", + resource_id="pnf-99", + ) + + call_kwargs = rr.http_client.send_request.call_args + params = call_kwargs[1]["params"] + assert "resourceType" in params + assert params["resourceType"] == "pnf" + assert params["resourceId"] == "pnf-99" + assert "resolutionKey" not in params + + assert template.resource_type == "pnf" + assert template.resource_id == "pnf-99" + + +# --------------------------------------------------------------------------- +# Template.store +# --------------------------------------------------------------------------- + +class TestTemplateStore: + + def test_store_calls_resource_resolution_store_template(self): + mock_rr = MagicMock(spec=ResourceResolution) + template = Template( + resource_resolution=mock_rr, + blueprint_name="bp", + blueprint_version="1.0", + artifact_name="art", + result="stored result", + resolution_key="key1", + ) + + template.store() + + mock_rr.store_template.assert_called_once_with( + blueprint_name="bp", + blueprint_version="1.0", + artifact_name="art", + result="stored result", + resolution_key="key1", + resource_type=None, + resource_id=None, + ) + + def test_store_with_resource_type_id(self): + mock_rr = MagicMock(spec=ResourceResolution) + template = Template( + resource_resolution=mock_rr, + blueprint_name="bp", + blueprint_version="2.0", + artifact_name="art", + result="data", + resource_type="vnf", + resource_id="vnf-1", + ) + + template.store() + + mock_rr.store_template.assert_called_once_with( + blueprint_name="bp", + blueprint_version="2.0", + artifact_name="art", + result="data", + resolution_key=None, + resource_type="vnf", + resource_id="vnf-1", + ) + + +# --------------------------------------------------------------------------- +# WorkflowExecution edge cases +# --------------------------------------------------------------------------- + +class TestWorkflowExecutionEdgeCases: + + def test_none_inputs_default_to_empty_dict(self): + wf = WorkflowExecution("bp", "1.0", "wf", workflow_inputs=None) + assert wf.workflow_inputs == {} + + def test_async_mode(self): + wf = WorkflowExecution("bp", "1.0", "wf", workflow_mode=WorkflowMode.ASYNC) + msg = wf.message + assert msg.actionIdentifiers.mode == "async" + + def test_sync_mode_default(self): + wf = WorkflowExecution("bp", "1.0", "wf") + msg = wf.message + assert msg.actionIdentifiers.mode == "sync" + + +# --------------------------------------------------------------------------- +# WorkflowExecutionResult edge cases +# --------------------------------------------------------------------------- + +class TestWorkflowExecutionResultEdgeCases: + + def test_has_error_false_for_200(self): + wf = WorkflowExecution("bp", "1.0", "wf") + output = ExecutionServiceOutput() + output.status.code = 200 + result = WorkflowExecutionResult(wf, output) + assert not result.has_error + + def test_has_error_true_for_non_200(self): + wf = WorkflowExecution("bp", "1.0", "wf") + output = ExecutionServiceOutput() + output.status.code = 500 + result = WorkflowExecutionResult(wf, output) + assert result.has_error + + def test_error_message_raises_when_no_error(self): + wf = WorkflowExecution("bp", "1.0", "wf") + output = ExecutionServiceOutput() + output.status.code = 200 + result = WorkflowExecutionResult(wf, output) + with pytest.raises(AttributeError, match="Execution does not finish with error"): + _ = result.error_message + + def test_error_message_returns_string_when_error(self): + wf = WorkflowExecution("bp", "1.0", "wf") + output = ExecutionServiceOutput() + output.status.code = 500 + output.status.errorMessage = "Something broke" + result = WorkflowExecutionResult(wf, output) + assert result.error_message == "Something broke" + + def test_payload_returns_dict(self): + wf = WorkflowExecution("bp", "1.0", "wf") + output = ExecutionServiceOutput() + output.payload.update({"data": "value"}) + result = WorkflowExecutionResult(wf, output) + assert result.payload == {"data": "value"} + + def test_blueprint_name_from_output(self): + wf = WorkflowExecution("bp", "1.0", "wf") + output = ExecutionServiceOutput() + output.actionIdentifiers.blueprintName = "server-bp" + result = WorkflowExecutionResult(wf, output) + assert result.blueprint_name == "server-bp" + + def test_blueprint_version_from_output(self): + wf = WorkflowExecution("bp", "1.0", "wf") + output = ExecutionServiceOutput() + output.actionIdentifiers.blueprintVersion = "3.0" + result = WorkflowExecutionResult(wf, output) + assert result.blueprint_version == "3.0" + + def test_workflow_name_from_output(self): + wf = WorkflowExecution("bp", "1.0", "wf") + output = ExecutionServiceOutput() + output.actionIdentifiers.actionName = "server-wf" + result = WorkflowExecutionResult(wf, output) + assert result.workflow_name == "server-wf" + + +# --------------------------------------------------------------------------- +# ResourceResolution init with env vars +# --------------------------------------------------------------------------- + +class TestResourceResolutionInit: + + @patch.dict("os.environ", {"AUTH_TOKEN": "env-token", "API_USERNAME": "env-user", "API_PASSWORD": "env-pass"}) + def test_env_vars_used_when_no_explicit_values(self): + rr = ResourceResolution() + assert rr.grpc_client_header_auth_token == "env-token" + assert rr.http_client.auth_user == "env-user" + assert rr.http_client.auth_pass == "env-pass" + + def test_explicit_values_override_env(self): + rr = ResourceResolution( + header_auth_token="explicit-token", + http_auth_user="explicit-user", + http_auth_pass="explicit-pass", + ) + assert rr.grpc_client_header_auth_token == "explicit-token" + assert rr.http_client.auth_user == "explicit-user" + assert rr.http_client.auth_pass == "explicit-pass" + + def test_default_values(self): + rr = ResourceResolution() + assert rr.grpc_client_server_address == "127.0.0.1" + assert rr.grpc_client_server_port == 9111 + assert rr.grpc_client_use_ssl is False + assert rr.grpc_client_use_header_auth is False