# Python
**/*.pyc
+**/*.egg-info/
**/.apt_generated
# Logs
# Generated dependency list
direct-dependencies.txt
+.coverage
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for blueprints_grpc/blueprint_processing_server.py —
+AbstractScriptFunction and BluePrintProcessingServer.
+"""
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from google.protobuf import struct_pb2
+from proto.BluePrintProcessing_pb2 import (
+ ExecutionServiceInput,
+ ExecutionServiceOutput,
+)
+from proto.BluePrintCommon_pb2 import (
+ CommonHeader,
+ ActionIdentifiers,
+)
+
+from blueprints_grpc.blueprint_processing_server import (
+ AbstractScriptFunction,
+ BluePrintProcessingServer,
+)
+from blueprints_grpc.script_executor_configuration import ScriptExecutorConfiguration
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _make_input(bp_name="test-bp", bp_version="1.0.0", action_name="TestAction",
+ request_id="req-1"):
+ inp = ExecutionServiceInput()
+ inp.commonHeader.requestId = request_id
+ inp.commonHeader.subRequestId = "sub-1"
+ inp.commonHeader.originatorId = "CDS"
+ inp.actionIdentifiers.blueprintName = bp_name
+ inp.actionIdentifiers.blueprintVersion = bp_version
+ inp.actionIdentifiers.actionName = action_name
+ inp.payload.update({"key": "value"})
+ return inp
+
+
+# ---------------------------------------------------------------------------
+# AbstractScriptFunction
+# ---------------------------------------------------------------------------
+
+class TestAbstractScriptFunction:
+
+ def test_set_context(self):
+ func = AbstractScriptFunction()
+ mock_ctx = MagicMock()
+ func.set_context(mock_ctx)
+ assert func.context is mock_ctx
+
+ def test_process_returns_none(self):
+ func = AbstractScriptFunction()
+ assert func.process(MagicMock()) is None
+
+ def test_recover_returns_none(self):
+ func = AbstractScriptFunction()
+ assert func.recover(RuntimeError("test"), MagicMock()) is None
+
+
+# ---------------------------------------------------------------------------
+# BluePrintProcessingServer
+# ---------------------------------------------------------------------------
+
+class TestBluePrintProcessingServer:
+
+ @pytest.fixture
+ def config(self, tmp_path):
+ config_file = tmp_path / "config.ini"
+ config_file.write_text(
+ "[blueprintsprocessor]\nblueprintDeployPath=/opt/blueprints\n"
+ "[scriptExecutor]\nport=50052\n"
+ )
+ return ScriptExecutorConfiguration(str(config_file))
+
+ def test_init_stores_configuration(self, config):
+ server = BluePrintProcessingServer(config)
+ assert server.configuration is config
+
+ def test_init_creates_logger(self, config):
+ server = BluePrintProcessingServer(config)
+ assert server.logger is not None
+ assert server.logger.name == "BluePrintProcessingServer"
+
+ @patch("blueprints_grpc.blueprint_processing_server.instance_for_input")
+ def test_process_yields_from_instance(self, mock_instance_for_input, config):
+ """Verify process iterates over requests, creates instances, and yields responses."""
+ # Create a mock script function instance that yields two responses
+ mock_script = MagicMock(spec=AbstractScriptFunction)
+ response1 = ExecutionServiceOutput()
+ response1.commonHeader.requestId = "resp-1"
+ response2 = ExecutionServiceOutput()
+ response2.commonHeader.requestId = "resp-2"
+ mock_script.process.return_value = iter([response1, response2])
+ mock_instance_for_input.return_value = mock_script
+
+ server = BluePrintProcessingServer(config)
+ request = _make_input()
+ context = MagicMock()
+
+ responses = list(server.process(iter([request]), context))
+
+ mock_instance_for_input.assert_called_once_with(config, request)
+ mock_script.set_context.assert_called_once_with(context)
+ mock_script.process.assert_called_once_with(request)
+ assert len(responses) == 2
+ assert responses[0].commonHeader.requestId == "resp-1"
+ assert responses[1].commonHeader.requestId == "resp-2"
+
+ @patch("blueprints_grpc.blueprint_processing_server.instance_for_input")
+ def test_process_handles_multiple_requests(self, mock_instance_for_input, config):
+ """Multiple requests should each get their own instance."""
+ mock_script1 = MagicMock(spec=AbstractScriptFunction)
+ resp1 = ExecutionServiceOutput()
+ resp1.commonHeader.requestId = "r1"
+ mock_script1.process.return_value = iter([resp1])
+
+ mock_script2 = MagicMock(spec=AbstractScriptFunction)
+ resp2 = ExecutionServiceOutput()
+ resp2.commonHeader.requestId = "r2"
+ mock_script2.process.return_value = iter([resp2])
+
+ mock_instance_for_input.side_effect = [mock_script1, mock_script2]
+
+ server = BluePrintProcessingServer(config)
+ req1 = _make_input(request_id="req-1")
+ req2 = _make_input(request_id="req-2")
+ context = MagicMock()
+
+ responses = list(server.process(iter([req1, req2]), context))
+
+ assert len(responses) == 2
+ assert mock_instance_for_input.call_count == 2
+
+ @patch("blueprints_grpc.blueprint_processing_server.instance_for_input")
+ def test_process_empty_iterator(self, mock_instance_for_input, config):
+ """An empty request iterator should yield no responses."""
+ server = BluePrintProcessingServer(config)
+ context = MagicMock()
+
+ responses = list(server.process(iter([]), context))
+
+ assert responses == []
+ mock_instance_for_input.assert_not_called()
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for blueprints_grpc/executor_utils.py — utility functions for building
+gRPC responses, loading dynamic script instances, and working with protobuf
+messages.
+"""
+
+import json
+import re
+import tempfile
+import os
+from unittest.mock import patch, MagicMock
+
+import pytest
+
+from proto.BluePrintCommon_pb2 import (
+ EVENT_COMPONENT_EXECUTED,
+ EVENT_COMPONENT_NOTIFICATION,
+ EVENT_COMPONENT_PROCESSING,
+ EVENT_COMPONENT_TRACE,
+ ActionIdentifiers,
+ CommonHeader,
+)
+from proto.BluePrintProcessing_pb2 import (
+ ExecutionServiceInput,
+ ExecutionServiceOutput,
+)
+from google.protobuf import json_format
+
+from blueprints_grpc.executor_utils import (
+ current_time,
+ blueprint_id,
+ blueprint_location,
+ instance_for_input,
+ log_response,
+ send_notification,
+ ack_response,
+ success_response,
+ failure_response,
+ create_response_payload_from_json,
+)
+from blueprints_grpc.script_executor_configuration import ScriptExecutorConfiguration
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _make_input(bp_name="test-bp", bp_version="1.0.0", action_name="TestAction",
+ request_id="req-1", sub_request_id="sub-1", originator_id="CDS"):
+ """Create a properly populated ExecutionServiceInput."""
+ inp = ExecutionServiceInput()
+ inp.commonHeader.requestId = request_id
+ inp.commonHeader.subRequestId = sub_request_id
+ inp.commonHeader.originatorId = originator_id
+ inp.actionIdentifiers.blueprintName = bp_name
+ inp.actionIdentifiers.blueprintVersion = bp_version
+ inp.actionIdentifiers.actionName = action_name
+ return inp
+
+
+# ---------------------------------------------------------------------------
+# current_time
+# ---------------------------------------------------------------------------
+
+class TestCurrentTime:
+
+ def test_returns_string(self):
+ result = current_time()
+ assert isinstance(result, str)
+
+ def test_format_is_iso_like(self):
+ result = current_time()
+ # Pattern: YYYY-MM-DDTHH:MM:SS.ffffffZ
+ assert re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z", result)
+
+
+# ---------------------------------------------------------------------------
+# blueprint_id
+# ---------------------------------------------------------------------------
+
+class TestBlueprintId:
+
+ def test_returns_name_slash_version(self):
+ inp = _make_input(bp_name="my-bp", bp_version="2.0.0")
+ assert blueprint_id(inp) == "my-bp/2.0.0"
+
+ def test_different_values(self):
+ inp = _make_input(bp_name="other", bp_version="3.1.0")
+ assert blueprint_id(inp) == "other/3.1.0"
+
+
+# ---------------------------------------------------------------------------
+# blueprint_location
+# ---------------------------------------------------------------------------
+
+class TestBlueprintLocation:
+
+ def test_returns_deploy_path_plus_name_version(self, tmp_path):
+ config_file = tmp_path / "config.ini"
+ config_file.write_text(
+ "[blueprintsprocessor]\nblueprintDeployPath=/opt/blueprints\n"
+ "[scriptExecutor]\nport=50052\n"
+ )
+ config = ScriptExecutorConfiguration(str(config_file))
+ inp = _make_input(bp_name="test-bp", bp_version="1.0.0")
+
+ result = blueprint_location(config, inp)
+ assert result == "/opt/blueprints/test-bp/1.0.0"
+
+
+# ---------------------------------------------------------------------------
+# instance_for_input
+# ---------------------------------------------------------------------------
+
+class TestInstanceForInput:
+
+ def test_loads_class_from_sample_cba(self):
+ """Use the sample-cba test resource to verify dynamic module loading."""
+ # The sample CBA is at test/resources/sample-cba/1.0.0/Scripts/python/__init__.py
+ # and the action class is SampleScript
+ test_resources_dir = os.path.join(
+ os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
+ "test", "resources"
+ )
+
+ config_file = os.path.join(
+ os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
+ "..", "configuration-local.ini"
+ )
+
+ if not os.path.exists(config_file):
+ pytest.skip("configuration-local.ini not found")
+
+ config = ScriptExecutorConfiguration(config_file)
+ inp = _make_input(bp_name="sample-cba", bp_version="1.0.0", action_name="SampleScript")
+
+ instance = instance_for_input(config, inp)
+ # It should have the process method from AbstractScriptFunction subclass
+ assert hasattr(instance, "process")
+ assert instance.__class__.__name__ == "SampleScript"
+
+ def test_raises_on_missing_module(self, tmp_path):
+ """If the script file doesn't exist, loading should fail."""
+ config_file = tmp_path / "config.ini"
+ config_file.write_text(
+ f"[blueprintsprocessor]\nblueprintDeployPath={tmp_path}\n"
+ "[scriptExecutor]\nport=50052\n"
+ )
+ config = ScriptExecutorConfiguration(str(config_file))
+ inp = _make_input(bp_name="nonexistent", bp_version="0.0.0", action_name="Missing")
+
+ with pytest.raises((FileNotFoundError, ModuleNotFoundError)):
+ instance_for_input(config, inp)
+
+ def test_raises_on_missing_action_class(self, tmp_path):
+ """If the script file exists but the action class doesn't, should raise AttributeError."""
+ bp_dir = tmp_path / "mybp" / "1.0.0" / "Scripts" / "python"
+ bp_dir.mkdir(parents=True)
+ (bp_dir / "__init__.py").write_text("class Exists:\n pass\n")
+
+ config_file = tmp_path / "config.ini"
+ config_file.write_text(
+ f"[blueprintsprocessor]\nblueprintDeployPath={tmp_path}\n"
+ "[scriptExecutor]\nport=50052\n"
+ )
+ config = ScriptExecutorConfiguration(str(config_file))
+ inp = _make_input(bp_name="mybp", bp_version="1.0.0", action_name="NonExistentClass")
+
+ with pytest.raises(AttributeError):
+ instance_for_input(config, inp)
+
+
+# ---------------------------------------------------------------------------
+# log_response
+# ---------------------------------------------------------------------------
+
+class TestLogResponse:
+
+ def test_returns_execution_output(self):
+ inp = _make_input()
+ result = log_response(inp, "test message")
+ assert isinstance(result, ExecutionServiceOutput)
+
+ def test_event_type_is_trace(self):
+ inp = _make_input()
+ result = log_response(inp, "trace msg")
+ assert result.status.eventType == EVENT_COMPONENT_TRACE
+
+ def test_payload_contains_message(self):
+ inp = _make_input()
+ result = log_response(inp, "hello world")
+ payload_dict = json_format.MessageToDict(result.payload)
+ assert payload_dict["message"] == "hello world"
+
+ def test_preserves_common_header(self):
+ inp = _make_input(request_id="req-99")
+ result = log_response(inp, "msg")
+ assert result.commonHeader.requestId == "req-99"
+
+ def test_preserves_action_identifiers(self):
+ inp = _make_input(bp_name="bp1", action_name="Act1")
+ result = log_response(inp, "msg")
+ assert result.actionIdentifiers.blueprintName == "bp1"
+ assert result.actionIdentifiers.actionName == "Act1"
+
+ def test_timestamp_is_set(self):
+ inp = _make_input()
+ result = log_response(inp, "msg")
+ assert result.status.timestamp != ""
+
+
+# ---------------------------------------------------------------------------
+# send_notification
+# ---------------------------------------------------------------------------
+
+class TestSendNotification:
+
+ def test_returns_execution_output(self):
+ inp = _make_input()
+ result = send_notification(inp, "notification msg")
+ assert isinstance(result, ExecutionServiceOutput)
+
+ def test_event_type_is_notification(self):
+ inp = _make_input()
+ result = send_notification(inp, "notify")
+ assert result.status.eventType == EVENT_COMPONENT_NOTIFICATION
+
+ def test_payload_contains_message(self):
+ inp = _make_input()
+ result = send_notification(inp, "alert!")
+ payload_dict = json_format.MessageToDict(result.payload)
+ assert payload_dict["message"] == "alert!"
+
+
+# ---------------------------------------------------------------------------
+# ack_response
+# ---------------------------------------------------------------------------
+
+class TestAckResponse:
+
+ def test_returns_execution_output(self):
+ inp = _make_input()
+ result = ack_response(inp)
+ assert isinstance(result, ExecutionServiceOutput)
+
+ def test_event_type_is_processing(self):
+ inp = _make_input()
+ result = ack_response(inp)
+ assert result.status.eventType == EVENT_COMPONENT_PROCESSING
+
+ def test_preserves_action_identifiers(self):
+ inp = _make_input(bp_name="ack-bp", bp_version="2.0.0")
+ result = ack_response(inp)
+ assert result.actionIdentifiers.blueprintName == "ack-bp"
+ assert result.actionIdentifiers.blueprintVersion == "2.0.0"
+
+ def test_timestamp_is_set(self):
+ inp = _make_input()
+ result = ack_response(inp)
+ assert result.status.timestamp != ""
+
+
+# ---------------------------------------------------------------------------
+# success_response
+# ---------------------------------------------------------------------------
+
+class TestSuccessResponse:
+
+ def test_returns_execution_output(self):
+ inp = _make_input()
+ result = success_response(inp, {"key": "val"}, 200)
+ assert isinstance(result, ExecutionServiceOutput)
+
+ def test_event_type_is_executed(self):
+ inp = _make_input()
+ result = success_response(inp, {}, 200)
+ assert result.status.eventType == EVENT_COMPONENT_EXECUTED
+
+ def test_status_code(self):
+ inp = _make_input()
+ result = success_response(inp, {}, 200)
+ assert result.status.code == 200
+
+ def test_status_message_is_success(self):
+ inp = _make_input()
+ result = success_response(inp, {}, 200)
+ assert result.status.message == "success"
+
+ def test_payload_contains_action_response(self):
+ inp = _make_input(action_name="MyAction")
+ result = success_response(inp, {"result": "ok"}, 200)
+ payload_dict = json_format.MessageToDict(result.payload)
+ assert "MyAction-response" in payload_dict
+ assert payload_dict["MyAction-response"]["result"] == "ok"
+
+ def test_custom_status_code(self):
+ inp = _make_input()
+ result = success_response(inp, {}, 201)
+ assert result.status.code == 201
+
+
+# ---------------------------------------------------------------------------
+# failure_response
+# ---------------------------------------------------------------------------
+
+class TestFailureResponse:
+
+ def test_returns_execution_output(self):
+ inp = _make_input()
+ result = failure_response(inp, {}, 500, "it broke")
+ assert isinstance(result, ExecutionServiceOutput)
+
+ def test_event_type_is_executed(self):
+ inp = _make_input()
+ result = failure_response(inp, {}, 500, "error")
+ assert result.status.eventType == EVENT_COMPONENT_EXECUTED
+
+ def test_status_code(self):
+ inp = _make_input()
+ result = failure_response(inp, {}, 503, "unavailable")
+ assert result.status.code == 503
+
+ def test_status_message_is_failure(self):
+ inp = _make_input()
+ result = failure_response(inp, {}, 500, "bad")
+ assert result.status.message == "failure"
+
+ def test_error_message(self):
+ inp = _make_input()
+ result = failure_response(inp, {}, 500, "disk full")
+ assert result.status.errorMessage == "disk full"
+
+ def test_payload_contains_action_response(self):
+ inp = _make_input(action_name="FailAction")
+ result = failure_response(inp, {"err": "details"}, 500, "error")
+ payload_dict = json_format.MessageToDict(result.payload)
+ assert "FailAction-response" in payload_dict
+ assert payload_dict["FailAction-response"]["err"] == "details"
+
+
+# ---------------------------------------------------------------------------
+# create_response_payload_from_json
+# ---------------------------------------------------------------------------
+
+class TestCreateResponsePayloadFromJson:
+
+ def test_returns_struct(self):
+ from google.protobuf import struct_pb2
+ result = create_response_payload_from_json("action", {"key": "val"})
+ assert isinstance(result, struct_pb2.Struct)
+
+ def test_wraps_in_action_response_key(self):
+ result = create_response_payload_from_json("DoThing", {"x": 1})
+ result_dict = json_format.MessageToDict(result)
+ assert "DoThing-response" in result_dict
+
+ def test_empty_properties(self):
+ result = create_response_payload_from_json("Empty", {})
+ result_dict = json_format.MessageToDict(result)
+ assert "Empty-response" in result_dict
+
+ def test_nested_properties(self):
+ props = {"outer": {"inner": "deep"}}
+ result = create_response_payload_from_json("Nested", props)
+ result_dict = json_format.MessageToDict(result)
+ assert result_dict["Nested-response"]["outer"]["inner"] == "deep"
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for blueprints_grpc/request_header_validator_interceptor.py —
+gRPC server-side interceptor for authorization header validation.
+"""
+
+from unittest.mock import MagicMock
+
+import grpc
+import pytest
+
+from blueprints_grpc.request_header_validator_interceptor import (
+ _unary_unary_rpc_terminator,
+ RequestHeaderValidatorInterceptor,
+)
+
+
+# ---------------------------------------------------------------------------
+# _unary_unary_rpc_terminator
+# ---------------------------------------------------------------------------
+
+class TestUnaryUnaryRpcTerminator:
+
+ def test_returns_rpc_method_handler(self):
+ handler = _unary_unary_rpc_terminator(
+ grpc.StatusCode.UNAUTHENTICATED, "Access denied"
+ )
+ assert handler is not None
+
+ def test_handler_aborts_context(self):
+ handler = _unary_unary_rpc_terminator(
+ grpc.StatusCode.UNAUTHENTICATED, "No access"
+ )
+ context = MagicMock()
+ # The handler wraps a function; invoke the unary_unary handler
+ handler.unary_unary(MagicMock(), context)
+ context.abort.assert_called_once_with(
+ grpc.StatusCode.UNAUTHENTICATED, "No access"
+ )
+
+
+# ---------------------------------------------------------------------------
+# RequestHeaderValidatorInterceptor
+# ---------------------------------------------------------------------------
+
+class TestRequestHeaderValidatorInterceptor:
+
+ def test_init_stores_values(self):
+ interceptor = RequestHeaderValidatorInterceptor(
+ "authorization", "Bearer token123",
+ grpc.StatusCode.UNAUTHENTICATED, "Denied"
+ )
+ assert interceptor._header == "authorization"
+ assert interceptor._value == "Bearer token123"
+
+ def test_valid_header_continues(self):
+ interceptor = RequestHeaderValidatorInterceptor(
+ "authorization", "Bearer valid",
+ grpc.StatusCode.UNAUTHENTICATED, "Denied"
+ )
+ continuation = MagicMock()
+ handler_call_details = MagicMock()
+ handler_call_details.invocation_metadata = [
+ ("authorization", "Bearer valid"),
+ ("other-header", "other-value"),
+ ]
+
+ result = interceptor.intercept_service(continuation, handler_call_details)
+
+ continuation.assert_called_once_with(handler_call_details)
+ assert result == continuation.return_value
+
+ def test_missing_header_returns_terminator(self):
+ interceptor = RequestHeaderValidatorInterceptor(
+ "authorization", "Bearer valid",
+ grpc.StatusCode.UNAUTHENTICATED, "Denied"
+ )
+ continuation = MagicMock()
+ handler_call_details = MagicMock()
+ handler_call_details.invocation_metadata = [
+ ("other-header", "other-value"),
+ ]
+
+ result = interceptor.intercept_service(continuation, handler_call_details)
+
+ continuation.assert_not_called()
+ assert result is interceptor._terminator
+
+ def test_wrong_value_returns_terminator(self):
+ interceptor = RequestHeaderValidatorInterceptor(
+ "authorization", "Bearer correct",
+ grpc.StatusCode.UNAUTHENTICATED, "Denied"
+ )
+ continuation = MagicMock()
+ handler_call_details = MagicMock()
+ handler_call_details.invocation_metadata = [
+ ("authorization", "Bearer wrong"),
+ ]
+
+ result = interceptor.intercept_service(continuation, handler_call_details)
+
+ continuation.assert_not_called()
+ assert result is interceptor._terminator
+
+ def test_empty_metadata_returns_terminator(self):
+ interceptor = RequestHeaderValidatorInterceptor(
+ "authorization", "Bearer token",
+ grpc.StatusCode.UNAUTHENTICATED, "No creds"
+ )
+ continuation = MagicMock()
+ handler_call_details = MagicMock()
+ handler_call_details.invocation_metadata = []
+
+ result = interceptor.intercept_service(continuation, handler_call_details)
+
+ continuation.assert_not_called()
+ assert result is interceptor._terminator
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for blueprints_grpc/script_executor_configuration.py —
+INI-based configuration reader for the script executor.
+"""
+
+import os
+import pytest
+
+from blueprints_grpc.script_executor_configuration import ScriptExecutorConfiguration
+
+
+# ---------------------------------------------------------------------------
+# ScriptExecutorConfiguration tests
+# ---------------------------------------------------------------------------
+
+class TestScriptExecutorConfiguration:
+
+ @pytest.fixture
+ def config_file(self, tmp_path):
+ """Create a minimal config file and return its path."""
+ p = tmp_path / "test-config.ini"
+ p.write_text(
+ "[scriptExecutor]\n"
+ "port=50052\n"
+ "authType=basic-auth\n"
+ "token=my-secret-token\n"
+ "logFile=app.log\n"
+ "maxWorkers=10\n"
+ "\n"
+ "[blueprintsprocessor]\n"
+ "blueprintDeployPath=/opt/blueprints/deploy\n"
+ "blueprintArchivePath=/opt/blueprints/archive\n"
+ )
+ return str(p)
+
+ def test_init_reads_file(self, config_file):
+ config = ScriptExecutorConfiguration(config_file)
+ assert config.config is not None
+
+ def test_get_section(self, config_file):
+ config = ScriptExecutorConfiguration(config_file)
+ section = config.get_section("scriptExecutor")
+ assert section["port"] == "50052"
+ assert section["authtype"] == "basic-auth"
+
+ def test_get_property(self, config_file):
+ config = ScriptExecutorConfiguration(config_file)
+ assert config.get_property("scriptExecutor", "port") == "50052"
+ assert config.get_property("blueprintsprocessor", "blueprintDeployPath") == "/opt/blueprints/deploy"
+
+ def test_script_executor_property(self, config_file):
+ config = ScriptExecutorConfiguration(config_file)
+ assert config.script_executor_property("port") == "50052"
+ assert config.script_executor_property("authType") == "basic-auth"
+ assert config.script_executor_property("token") == "my-secret-token"
+ assert config.script_executor_property("maxWorkers") == "10"
+
+ def test_blueprints_processor(self, config_file):
+ config = ScriptExecutorConfiguration(config_file)
+ assert config.blueprints_processor("blueprintDeployPath") == "/opt/blueprints/deploy"
+ assert config.blueprints_processor("blueprintArchivePath") == "/opt/blueprints/archive"
+
+ def test_missing_section_raises(self, config_file):
+ config = ScriptExecutorConfiguration(config_file)
+ with pytest.raises(KeyError):
+ config.get_section("nonexistent")
+
+ def test_missing_property_raises(self, config_file):
+ config = ScriptExecutorConfiguration(config_file)
+ from configparser import NoOptionError
+ with pytest.raises(NoOptionError):
+ config.get_property("scriptExecutor", "nonexistent_key")
+
+ def test_env_vars_interpolated(self, tmp_path, monkeypatch):
+ """ConfigParser with os.environ should interpolate env vars."""
+ monkeypatch.setenv("TEST_DEPLOY_PATH", "/custom/path")
+ p = tmp_path / "env-config.ini"
+ p.write_text(
+ "[blueprintsprocessor]\n"
+ "blueprintDeployPath=%(TEST_DEPLOY_PATH)s/blueprints\n"
+ "[scriptExecutor]\nport=50052\n"
+ )
+ config = ScriptExecutorConfiguration(str(p))
+ assert config.blueprints_processor("blueprintDeployPath") == "/custom/path/blueprints"
+
+ def test_nonexistent_file_results_in_empty_config(self, tmp_path):
+ """ConfigParser.read silently ignores missing files."""
+ config = ScriptExecutorConfiguration(str(tmp_path / "does-not-exist.ini"))
+ # Config is initialized but has no sections
+ assert config.config.sections() == []
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Extended tests for resource_resolution/grpc/client.py — covers secure channel,
+header auth, process method, close, and context manager protocol.
+"""
+
+from unittest.mock import MagicMock, patch, call
+
+import pytest
+
+from resource_resolution.grpc.client import Client
+
+
+class TestClientInit:
+
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_insecure_channel_created_by_default(self, mock_insecure):
+ mock_insecure.return_value = MagicMock()
+ client = Client("localhost:9111")
+ mock_insecure.assert_called_once_with("localhost:9111")
+
+ @patch("resource_resolution.grpc.client.ssl_channel_credentials")
+ @patch("resource_resolution.grpc.client.secure_channel")
+ def test_secure_channel_with_ssl(self, mock_secure, mock_ssl_creds):
+ mock_secure.return_value = MagicMock()
+ client = Client(
+ "localhost:9111",
+ use_ssl=True,
+ root_certificates=b"root-cert",
+ private_key=b"priv-key",
+ certificate_chain=b"cert-chain",
+ )
+ mock_ssl_creds.assert_called_once_with(b"root-cert", b"priv-key", b"cert-chain")
+ mock_secure.assert_called_once_with("localhost:9111", mock_ssl_creds.return_value)
+
+ @patch("resource_resolution.grpc.client.intercept_channel")
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_header_auth_intercepts_channel(self, mock_insecure, mock_intercept):
+ mock_insecure.return_value = MagicMock()
+ mock_intercept.return_value = MagicMock()
+ client = Client(
+ "localhost:9111",
+ use_header_auth=True,
+ header_auth_token="Bearer xyz",
+ )
+ mock_intercept.assert_called_once()
+ # The intercepted channel should be used
+ assert client.channel == mock_intercept.return_value
+
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_no_header_auth_by_default(self, mock_insecure):
+ mock_insecure.return_value = MagicMock()
+ with patch("resource_resolution.grpc.client.intercept_channel") as mock_intercept:
+ Client("localhost:9111")
+ mock_intercept.assert_not_called()
+
+
+class TestClientClose:
+
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_close_calls_channel_close(self, mock_insecure):
+ mock_channel = MagicMock()
+ mock_insecure.return_value = mock_channel
+ client = Client("localhost:9111")
+ client.close()
+ mock_channel.close.assert_called_once()
+
+
+class TestClientContextManager:
+
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_enter_returns_client(self, mock_insecure):
+ mock_insecure.return_value = MagicMock()
+ client = Client("localhost:9111")
+ result = client.__enter__()
+ assert result is client
+
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_exit_closes_channel(self, mock_insecure):
+ mock_channel = MagicMock()
+ mock_insecure.return_value = mock_channel
+ client = Client("localhost:9111")
+ client.__exit__(None, None, None)
+ mock_channel.close.assert_called_once()
+
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_with_statement(self, mock_insecure):
+ mock_channel = MagicMock()
+ mock_insecure.return_value = mock_channel
+ with Client("localhost:9111") as c:
+ assert isinstance(c, Client)
+ mock_channel.close.assert_called_once()
+
+
+class TestClientProcess:
+
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_process_yields_responses(self, mock_insecure):
+ mock_channel = MagicMock()
+ mock_insecure.return_value = mock_channel
+
+ client = Client("localhost:9111")
+
+ # Mock the stub's process method to return iterable responses
+ mock_response_1 = MagicMock()
+ mock_response_2 = MagicMock()
+ client.stub = MagicMock()
+ client.stub.process.return_value = iter([mock_response_1, mock_response_2])
+
+ messages = [MagicMock(), MagicMock()]
+ responses = list(client.process(messages))
+
+ assert len(responses) == 2
+ assert responses[0] is mock_response_1
+ assert responses[1] is mock_response_2
+
+ @patch("resource_resolution.grpc.client.insecure_channel")
+ def test_process_empty_messages(self, mock_insecure):
+ mock_insecure.return_value = MagicMock()
+ client = Client("localhost:9111")
+ client.stub = MagicMock()
+ client.stub.process.return_value = iter([])
+
+ responses = list(client.process(iter([])))
+ assert responses == []
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Extended tests for resource_resolution/http/client.py — covers auth
+combinations, protocol selection, URL construction, and send_request
+with various parameters.
+"""
+
+from unittest.mock import MagicMock, patch
+
+import pytest
+import requests
+
+from resource_resolution.http.client import Client
+
+
+class TestClientAuth:
+
+ def test_no_auth_when_both_none(self):
+ c = Client("127.0.0.1", 8080)
+ assert c.auth is None
+
+ def test_no_auth_when_only_user(self):
+ c = Client("127.0.0.1", 8080, auth_user="user")
+ assert c.auth is None
+
+ def test_no_auth_when_only_pass(self):
+ c = Client("127.0.0.1", 8080, auth_pass="pass")
+ assert c.auth is None
+
+ def test_auth_tuple_when_both_provided(self):
+ c = Client("127.0.0.1", 8080, auth_user="admin", auth_pass="secret")
+ assert c.auth == ("admin", "secret")
+
+
+class TestClientProtocol:
+
+ def test_http_by_default(self):
+ c = Client("127.0.0.1", 8080)
+ assert c.protocol == "http"
+
+ def test_https_when_ssl(self):
+ c = Client("127.0.0.1", 8443, use_ssl=True)
+ assert c.protocol == "https"
+
+
+class TestClientUrl:
+
+ def test_http_url(self):
+ c = Client("myhost", 9090)
+ assert c.url == "http://myhost:9090/api/v1"
+
+ def test_https_url(self):
+ c = Client("myhost", 9443, use_ssl=True)
+ assert c.url == "https://myhost:9443/api/v1"
+
+
+class TestClientSendRequest:
+
+ @patch("resource_resolution.http.client.request")
+ def test_get_request_without_auth(self, mock_request):
+ mock_response = MagicMock()
+ mock_response.raise_for_status = MagicMock()
+ mock_request.return_value = mock_response
+
+ c = Client("127.0.0.1", 8080)
+ result = c.send_request("GET", "test-endpoint")
+
+ mock_request.assert_called_once_with(
+ method="GET",
+ url="http://127.0.0.1:8080/api/v1/test-endpoint",
+ verify=False,
+ auth=None,
+ )
+ mock_response.raise_for_status.assert_called_once()
+ assert result is mock_response
+
+ @patch("resource_resolution.http.client.request")
+ def test_post_request_with_auth(self, mock_request):
+ mock_response = MagicMock()
+ mock_response.raise_for_status = MagicMock()
+ mock_request.return_value = mock_response
+
+ c = Client("127.0.0.1", 8080, auth_user="user", auth_pass="pass")
+ result = c.send_request(
+ "POST", "resource",
+ headers={"Content-Type": "application/json"},
+ data='{"key": "val"}',
+ )
+
+ mock_request.assert_called_once_with(
+ method="POST",
+ url="http://127.0.0.1:8080/api/v1/resource",
+ verify=False,
+ auth=("user", "pass"),
+ headers={"Content-Type": "application/json"},
+ data='{"key": "val"}',
+ )
+
+ @patch("resource_resolution.http.client.request")
+ def test_send_request_raises_on_http_error(self, mock_request):
+ """raise_for_status should propagate HTTPError."""
+ mock_response = MagicMock()
+ mock_response.raise_for_status.side_effect = requests.HTTPError("404 Not Found")
+ mock_request.return_value = mock_response
+
+ c = Client("127.0.0.1", 8080)
+ with pytest.raises(requests.HTTPError):
+ c.send_request("GET", "missing")
+
+ @patch("resource_resolution.http.client.request")
+ def test_send_request_with_params(self, mock_request):
+ mock_response = MagicMock()
+ mock_response.raise_for_status = MagicMock()
+ mock_request.return_value = mock_response
+
+ c = Client("127.0.0.1", 8080)
+ c.send_request("GET", "search", params={"q": "test"})
+
+ mock_request.assert_called_once_with(
+ method="GET",
+ url="http://127.0.0.1:8080/api/v1/search",
+ verify=False,
+ auth=None,
+ params={"q": "test"},
+ )
+
+ @patch("resource_resolution.http.client.request")
+ def test_ssl_url_in_request(self, mock_request):
+ mock_response = MagicMock()
+ mock_response.raise_for_status = MagicMock()
+ mock_request.return_value = mock_response
+
+ c = Client("secure-host", 443, use_ssl=True)
+ c.send_request("GET", "endpoint")
+
+ mock_request.assert_called_once_with(
+ method="GET",
+ url="https://secure-host:443/api/v1/endpoint",
+ verify=False,
+ auth=None,
+ )
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Extended tests for resource_resolution/resource_resolution.py —
+covers context manager, execute_workflows, Template.store,
+store_template/retrieve_template with resource_type/resource_id,
+and edge cases in _check_template_resolve_params.
+"""
+
+import json
+from unittest.mock import MagicMock, patch, PropertyMock
+
+import pytest
+
+from proto.BluePrintProcessing_pb2 import ExecutionServiceOutput
+
+from resource_resolution.resource_resolution import (
+ ResourceResolution,
+ Template,
+ WorkflowExecution,
+ WorkflowExecutionResult,
+ WorkflowMode,
+)
+
+
+# ---------------------------------------------------------------------------
+# ResourceResolution context manager
+# ---------------------------------------------------------------------------
+
+class TestResourceResolutionContextManager:
+
+ @patch("resource_resolution.resource_resolution.GrpcClient")
+ def test_enter_creates_grpc_client(self, MockGrpcClient):
+ rr = ResourceResolution(server_address="10.0.0.1", grpc_server_port=9111)
+ result = rr.__enter__()
+
+ MockGrpcClient.assert_called_once_with(
+ server_address="10.0.0.1:9111",
+ use_ssl=False,
+ root_certificates=None,
+ private_key=None,
+ certificate_chain=None,
+ use_header_auth=False,
+ header_auth_token=None,
+ )
+ assert result is rr
+ assert rr.grpc_client is MockGrpcClient.return_value
+
+ @patch("resource_resolution.resource_resolution.GrpcClient")
+ def test_exit_closes_grpc_client(self, MockGrpcClient):
+ rr = ResourceResolution()
+ rr.__enter__()
+ rr.__exit__(None, None, None)
+
+ MockGrpcClient.return_value.close.assert_called_once()
+
+ @patch("resource_resolution.resource_resolution.GrpcClient")
+ def test_with_statement(self, MockGrpcClient):
+ with ResourceResolution() as rr:
+ assert rr.grpc_client is MockGrpcClient.return_value
+ MockGrpcClient.return_value.close.assert_called_once()
+
+ @patch("resource_resolution.resource_resolution.GrpcClient")
+ def test_enter_with_ssl(self, MockGrpcClient):
+ rr = ResourceResolution(
+ server_address="secure.host",
+ grpc_server_port=9999,
+ use_ssl=True,
+ root_certificates=b"root",
+ private_key=b"key",
+ certificate_chain=b"chain",
+ )
+ rr.__enter__()
+
+ MockGrpcClient.assert_called_once_with(
+ server_address="secure.host:9999",
+ use_ssl=True,
+ root_certificates=b"root",
+ private_key=b"key",
+ certificate_chain=b"chain",
+ use_header_auth=False,
+ header_auth_token=None,
+ )
+
+ @patch("resource_resolution.resource_resolution.GrpcClient")
+ def test_enter_with_header_auth(self, MockGrpcClient):
+ rr = ResourceResolution(
+ use_header_auth=True,
+ header_auth_token="Bearer abc",
+ )
+ rr.__enter__()
+
+ call_kwargs = MockGrpcClient.call_args[1]
+ assert call_kwargs["use_header_auth"] is True
+ assert call_kwargs["header_auth_token"] == "Bearer abc"
+
+
+# ---------------------------------------------------------------------------
+# ResourceResolution.execute_workflows
+# ---------------------------------------------------------------------------
+
+class TestExecuteWorkflows:
+
+ @patch("resource_resolution.resource_resolution.GrpcClient")
+ def test_executes_single_workflow(self, MockGrpcClient):
+ mock_response = ExecutionServiceOutput()
+ mock_response.status.code = 200
+ MockGrpcClient.return_value.process.return_value = iter([mock_response])
+
+ workflow = WorkflowExecution("bp", "1.0", "wf")
+
+ with ResourceResolution() as rr:
+ results = list(rr.execute_workflows(workflow))
+
+ assert len(results) == 1
+ assert isinstance(results[0], WorkflowExecutionResult)
+ assert results[0].workflow_execution is workflow
+ assert results[0].execution_output is mock_response
+
+ @patch("resource_resolution.resource_resolution.GrpcClient")
+ def test_executes_multiple_workflows(self, MockGrpcClient):
+ resp1 = ExecutionServiceOutput()
+ resp1.status.code = 200
+ resp2 = ExecutionServiceOutput()
+ resp2.status.code = 500
+
+ MockGrpcClient.return_value.process.return_value = iter([resp1, resp2])
+
+ wf1 = WorkflowExecution("bp1", "1.0", "wf1")
+ wf2 = WorkflowExecution("bp2", "2.0", "wf2")
+
+ with ResourceResolution() as rr:
+ results = list(rr.execute_workflows(wf1, wf2))
+
+ assert len(results) == 2
+ assert results[0].workflow_execution is wf1
+ assert results[1].workflow_execution is wf2
+ assert not results[0].has_error
+ assert results[1].has_error
+
+ def test_raises_without_client(self):
+ rr = ResourceResolution()
+ workflow = WorkflowExecution("bp", "1.0", "wf")
+
+ with pytest.raises(AttributeError, match="gRPC client not connected"):
+ list(rr.execute_workflows(workflow))
+
+
+# ---------------------------------------------------------------------------
+# _check_template_resolve_params edge cases
+# ---------------------------------------------------------------------------
+
+class TestCheckTemplateResolveParams:
+
+ def test_resolution_key_only_is_valid(self):
+ rr = ResourceResolution()
+ # Should not raise
+ rr._check_template_resolve_params(resolution_key="key1")
+
+ def test_resource_type_and_id_is_valid(self):
+ rr = ResourceResolution()
+ rr._check_template_resolve_params(resource_type="vnf", resource_id="123")
+
+ def test_no_params_raises(self):
+ rr = ResourceResolution()
+ with pytest.raises(AttributeError):
+ rr._check_template_resolve_params()
+
+ def test_resource_type_only_raises(self):
+ rr = ResourceResolution()
+ with pytest.raises(AttributeError):
+ rr._check_template_resolve_params(resource_type="vnf")
+
+ def test_resource_id_only_raises(self):
+ rr = ResourceResolution()
+ with pytest.raises(AttributeError):
+ rr._check_template_resolve_params(resource_id="123")
+
+
+# ---------------------------------------------------------------------------
+# store_template with resource_type/resource_id
+# ---------------------------------------------------------------------------
+
+class TestStoreTemplate:
+
+ def test_store_with_resolution_key(self):
+ rr = ResourceResolution()
+ rr.http_client = MagicMock()
+ rr.store_template(
+ blueprint_name="bp",
+ blueprint_version="1.0",
+ artifact_name="art",
+ resolution_key="key1",
+ result="template_result",
+ )
+ rr.http_client.send_request.assert_called_once_with(
+ "POST",
+ "template/bp/1.0/art/key1",
+ headers={"Content-Type": "application/json"},
+ data=json.dumps({"result": "template_result"}),
+ )
+
+ def test_store_with_resource_type_and_id(self):
+ rr = ResourceResolution()
+ rr.http_client = MagicMock()
+ rr.store_template(
+ blueprint_name="bp",
+ blueprint_version="2.0",
+ artifact_name="art",
+ resource_type="vnf",
+ resource_id="vnf-001",
+ result="some result",
+ )
+ rr.http_client.send_request.assert_called_once_with(
+ "POST",
+ "template/bp/2.0/vnf/vnf-001",
+ headers={"Content-Type": "application/json"},
+ data=json.dumps({"result": "some result"}),
+ )
+
+ def test_store_raises_without_valid_params(self):
+ rr = ResourceResolution()
+ rr.http_client = MagicMock()
+ with pytest.raises(AttributeError):
+ rr.store_template(
+ blueprint_name="bp",
+ blueprint_version="1.0",
+ artifact_name="art",
+ result="data",
+ )
+
+
+# ---------------------------------------------------------------------------
+# retrieve_template with resource_type/resource_id
+# ---------------------------------------------------------------------------
+
+class TestRetrieveTemplate:
+
+ def test_retrieve_with_resolution_key(self):
+ rr = ResourceResolution()
+ rr.http_client = MagicMock()
+ mock_response = MagicMock()
+ mock_response.json.return_value = {"result": "template data"}
+ rr.http_client.send_request.return_value = mock_response
+
+ template = rr.retrieve_template(
+ blueprint_name="bp",
+ blueprint_version="1.0",
+ artifact_name="art",
+ resolution_key="key1",
+ )
+
+ rr.http_client.send_request.assert_called_once_with(
+ "GET",
+ "template",
+ headers={"Accept": "application/json"},
+ params={
+ "bpName": "bp",
+ "bpVersion": "1.0",
+ "artifactName": "art",
+ "resolutionKey": "key1",
+ },
+ )
+ assert isinstance(template, Template)
+ assert template.result == "template data"
+ assert template.blueprint_name == "bp"
+ assert template.resolution_key == "key1"
+
+ def test_retrieve_with_resource_type_and_id(self):
+ rr = ResourceResolution()
+ rr.http_client = MagicMock()
+ mock_response = MagicMock()
+ mock_response.json.return_value = {"result": "type+id data"}
+ rr.http_client.send_request.return_value = mock_response
+
+ template = rr.retrieve_template(
+ blueprint_name="bp",
+ blueprint_version="2.0",
+ artifact_name="art",
+ resource_type="pnf",
+ resource_id="pnf-99",
+ )
+
+ call_kwargs = rr.http_client.send_request.call_args
+ params = call_kwargs[1]["params"]
+ assert "resourceType" in params
+ assert params["resourceType"] == "pnf"
+ assert params["resourceId"] == "pnf-99"
+ assert "resolutionKey" not in params
+
+ assert template.resource_type == "pnf"
+ assert template.resource_id == "pnf-99"
+
+
+# ---------------------------------------------------------------------------
+# Template.store
+# ---------------------------------------------------------------------------
+
+class TestTemplateStore:
+
+ def test_store_calls_resource_resolution_store_template(self):
+ mock_rr = MagicMock(spec=ResourceResolution)
+ template = Template(
+ resource_resolution=mock_rr,
+ blueprint_name="bp",
+ blueprint_version="1.0",
+ artifact_name="art",
+ result="stored result",
+ resolution_key="key1",
+ )
+
+ template.store()
+
+ mock_rr.store_template.assert_called_once_with(
+ blueprint_name="bp",
+ blueprint_version="1.0",
+ artifact_name="art",
+ result="stored result",
+ resolution_key="key1",
+ resource_type=None,
+ resource_id=None,
+ )
+
+ def test_store_with_resource_type_id(self):
+ mock_rr = MagicMock(spec=ResourceResolution)
+ template = Template(
+ resource_resolution=mock_rr,
+ blueprint_name="bp",
+ blueprint_version="2.0",
+ artifact_name="art",
+ result="data",
+ resource_type="vnf",
+ resource_id="vnf-1",
+ )
+
+ template.store()
+
+ mock_rr.store_template.assert_called_once_with(
+ blueprint_name="bp",
+ blueprint_version="2.0",
+ artifact_name="art",
+ result="data",
+ resolution_key=None,
+ resource_type="vnf",
+ resource_id="vnf-1",
+ )
+
+
+# ---------------------------------------------------------------------------
+# WorkflowExecution edge cases
+# ---------------------------------------------------------------------------
+
+class TestWorkflowExecutionEdgeCases:
+
+ def test_none_inputs_default_to_empty_dict(self):
+ wf = WorkflowExecution("bp", "1.0", "wf", workflow_inputs=None)
+ assert wf.workflow_inputs == {}
+
+ def test_async_mode(self):
+ wf = WorkflowExecution("bp", "1.0", "wf", workflow_mode=WorkflowMode.ASYNC)
+ msg = wf.message
+ assert msg.actionIdentifiers.mode == "async"
+
+ def test_sync_mode_default(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ msg = wf.message
+ assert msg.actionIdentifiers.mode == "sync"
+
+
+# ---------------------------------------------------------------------------
+# WorkflowExecutionResult edge cases
+# ---------------------------------------------------------------------------
+
+class TestWorkflowExecutionResultEdgeCases:
+
+ def test_has_error_false_for_200(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ output = ExecutionServiceOutput()
+ output.status.code = 200
+ result = WorkflowExecutionResult(wf, output)
+ assert not result.has_error
+
+ def test_has_error_true_for_non_200(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ output = ExecutionServiceOutput()
+ output.status.code = 500
+ result = WorkflowExecutionResult(wf, output)
+ assert result.has_error
+
+ def test_error_message_raises_when_no_error(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ output = ExecutionServiceOutput()
+ output.status.code = 200
+ result = WorkflowExecutionResult(wf, output)
+ with pytest.raises(AttributeError, match="Execution does not finish with error"):
+ _ = result.error_message
+
+ def test_error_message_returns_string_when_error(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ output = ExecutionServiceOutput()
+ output.status.code = 500
+ output.status.errorMessage = "Something broke"
+ result = WorkflowExecutionResult(wf, output)
+ assert result.error_message == "Something broke"
+
+ def test_payload_returns_dict(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ output = ExecutionServiceOutput()
+ output.payload.update({"data": "value"})
+ result = WorkflowExecutionResult(wf, output)
+ assert result.payload == {"data": "value"}
+
+ def test_blueprint_name_from_output(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ output = ExecutionServiceOutput()
+ output.actionIdentifiers.blueprintName = "server-bp"
+ result = WorkflowExecutionResult(wf, output)
+ assert result.blueprint_name == "server-bp"
+
+ def test_blueprint_version_from_output(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ output = ExecutionServiceOutput()
+ output.actionIdentifiers.blueprintVersion = "3.0"
+ result = WorkflowExecutionResult(wf, output)
+ assert result.blueprint_version == "3.0"
+
+ def test_workflow_name_from_output(self):
+ wf = WorkflowExecution("bp", "1.0", "wf")
+ output = ExecutionServiceOutput()
+ output.actionIdentifiers.actionName = "server-wf"
+ result = WorkflowExecutionResult(wf, output)
+ assert result.workflow_name == "server-wf"
+
+
+# ---------------------------------------------------------------------------
+# ResourceResolution init with env vars
+# ---------------------------------------------------------------------------
+
+class TestResourceResolutionInit:
+
+ @patch.dict("os.environ", {"AUTH_TOKEN": "env-token", "API_USERNAME": "env-user", "API_PASSWORD": "env-pass"})
+ def test_env_vars_used_when_no_explicit_values(self):
+ rr = ResourceResolution()
+ assert rr.grpc_client_header_auth_token == "env-token"
+ assert rr.http_client.auth_user == "env-user"
+ assert rr.http_client.auth_pass == "env-pass"
+
+ def test_explicit_values_override_env(self):
+ rr = ResourceResolution(
+ header_auth_token="explicit-token",
+ http_auth_user="explicit-user",
+ http_auth_pass="explicit-pass",
+ )
+ assert rr.grpc_client_header_auth_token == "explicit-token"
+ assert rr.http_client.auth_user == "explicit-user"
+ assert rr.http_client.auth_pass == "explicit-pass"
+
+ def test_default_values(self):
+ rr = ResourceResolution()
+ assert rr.grpc_client_server_address == "127.0.0.1"
+ assert rr.grpc_client_server_port == 9111
+ assert rr.grpc_client_use_ssl is False
+ assert rr.grpc_client_use_header_auth is False