Increase command executor test coverage 47/143447/1
authorFiete Ostkamp <fiete.ostkamp@telekom.de>
Wed, 4 Mar 2026 13:48:39 +0000 (14:48 +0100)
committerFiete Ostkamp <fiete.ostkamp@telekom.de>
Wed, 4 Mar 2026 13:48:39 +0000 (14:48 +0100)
- increase coverage from 0% (no tests) to 83%

Issue-ID: CCSDK-4156
Change-Id: I171ddd1b9e13ee4603170bb6ed3302b098d664c5
Signed-off-by: Fiete Ostkamp <fiete.ostkamp@telekom.de>
.gitignore
ms/command-executor/src/test/python/__init__.py [new file with mode: 0644]
ms/command-executor/src/test/python/test_command_executor_handler.py [new file with mode: 0644]
ms/command-executor/src/test/python/test_command_executor_server.py [new file with mode: 0644]
ms/command-executor/src/test/python/test_payload_coder.py [new file with mode: 0644]
ms/command-executor/src/test/python/test_request_header_validator_interceptor.py [new file with mode: 0644]
ms/command-executor/src/test/python/test_utils.py [new file with mode: 0644]

index c3809a5..8b3e0c3 100644 (file)
@@ -67,6 +67,7 @@ typings/
 # dotenv environment variables file
 .env
 .env.test
+.venv
 
 # parcel-bundler cache (https://parceljs.org/)
 .cache
diff --git a/ms/command-executor/src/test/python/__init__.py b/ms/command-executor/src/test/python/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/ms/command-executor/src/test/python/test_command_executor_handler.py b/ms/command-executor/src/test/python/test_command_executor_handler.py
new file mode 100644 (file)
index 0000000..069d2c1
--- /dev/null
@@ -0,0 +1,678 @@
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for command_executor_handler.py — CommandExecutorHandler methods
+covering blueprint upload, env preparation, command execution, venv
+creation, and package installation.
+"""
+
+import io
+import json
+import os
+import tempfile
+import zipfile
+from subprocess import CalledProcessError, PIPE
+from types import SimpleNamespace
+from unittest.mock import patch, MagicMock, mock_open, PropertyMock
+
+import pytest
+
+import proto.CommandExecutor_pb2 as CommandExecutor_pb2
+import utils
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _make_identifiers(name="test-bp", version="1.0.0", uuid="uuid-1"):
+    return SimpleNamespace(
+        blueprintName=name,
+        blueprintVersion=version,
+        blueprintUUID=uuid,
+    )
+
+
+def _make_request(name="test-bp", version="1.0.0", uuid="uuid-1",
+                  timeout=30, request_id="req-1", sub_request_id="sub-1",
+                  originator_id="orig-1", correlation_id="corr-1",
+                  command="", properties=None, packages=None,
+                  archive_type="CBA_ZIP", bin_data=b""):
+    return SimpleNamespace(
+        identifiers=_make_identifiers(name, version, uuid),
+        timeOut=timeout,
+        requestId=request_id,
+        subRequestId=sub_request_id,
+        originatorId=originator_id,
+        correlationId=correlation_id,
+        command=command,
+        properties=properties,
+        packages=packages or [],
+        archiveType=archive_type,
+        binData=bin_data,
+    )
+
+
+def _create_valid_zip_bytes():
+    """Create a valid in-memory zip file with a single entry."""
+    buf = io.BytesIO()
+    with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
+        zf.writestr("test.txt", "hello world")
+    return buf.getvalue()
+
+
+def _make_handler(request=None, skip_prometheus=True):
+    """Create a CommandExecutorHandler with prometheus mocked out."""
+    if request is None:
+        request = _make_request()
+
+    with patch("command_executor_handler.prometheus") as mock_prom, \
+         patch.dict(os.environ, {}, clear=False):
+        # Mock prometheus so it doesn't try to start an HTTP server
+        mock_histogram = MagicMock()
+        mock_counter = MagicMock()
+        mock_prom.REGISTRY = MagicMock()
+        mock_prom.REGISTRY._command_executor_histogram = mock_histogram
+        mock_prom.REGISTRY._command_executor_counter = mock_counter
+        mock_prom.REGISTRY._command_executor_prometheus_server_started = True
+        mock_prom.Histogram.return_value = mock_histogram
+        mock_prom.Counter.return_value = mock_counter
+
+        from command_executor_handler import CommandExecutorHandler
+        handler = CommandExecutorHandler(request)
+
+    return handler
+
+
+# ===================================================================
+# __init__ and basic attribute tests
+# ===================================================================
+
+class TestHandlerInit:
+
+    def test_basic_attributes(self):
+        req = _make_request(name="bp-init", version="2.0.0", uuid="u-init",
+                            timeout=60, request_id="r-init", sub_request_id="s-init")
+        handler = _make_handler(req)
+        assert handler.blueprint_name == "bp-init"
+        assert handler.blueprint_version == "2.0.0"
+        assert handler.uuid == "u-init"
+        assert handler.execution_timeout == 60
+        assert handler.request_id == "r-init"
+        assert handler.sub_request_id == "s-init"
+
+    def test_blueprint_dir_path(self):
+        req = _make_request(name="my-bp", version="1.0.0", uuid="u1")
+        handler = _make_handler(req)
+        expected = "/opt/app/onap/blueprints/deploy/my-bp/1.0.0/u1"
+        assert handler.blueprint_dir == expected
+
+    def test_blueprint_name_version_uuid(self):
+        req = _make_request(name="n", version="v", uuid="u")
+        handler = _make_handler(req)
+        assert handler.blueprint_name_version_uuid == "n/v/u"
+
+    def test_blueprint_name_version_legacy(self):
+        req = _make_request(name="n", version="v")
+        handler = _make_handler(req)
+        assert handler.blueprint_name_version == "n/v"
+
+
+# ===================================================================
+# is_installed / blueprint_dir_exists / blueprint_tosca_meta_file_exists
+# ===================================================================
+
+class TestFileChecks:
+
+    def test_is_installed_true(self):
+        handler = _make_handler()
+        with patch("os.path.exists", return_value=True):
+            assert handler.is_installed() is True
+
+    def test_is_installed_false(self):
+        handler = _make_handler()
+        with patch("os.path.exists", return_value=False):
+            assert handler.is_installed() is False
+
+    def test_blueprint_dir_exists(self):
+        handler = _make_handler()
+        with patch("os.path.exists", return_value=True):
+            assert handler.blueprint_dir_exists() is True
+
+    def test_blueprint_tosca_meta_file_exists(self):
+        handler = _make_handler()
+        with patch("os.path.exists", return_value=False):
+            assert handler.blueprint_tosca_meta_file_exists() is False
+
+
+# ===================================================================
+# is_valid_archive_type
+# ===================================================================
+
+class TestIsValidArchiveType:
+
+    def test_cba_zip(self):
+        handler = _make_handler()
+        assert handler.is_valid_archive_type("CBA_ZIP") is True
+
+    def test_cba_gzip(self):
+        handler = _make_handler()
+        assert handler.is_valid_archive_type("CBA_GZIP") is True
+
+    def test_invalid_type(self):
+        handler = _make_handler()
+        assert handler.is_valid_archive_type("TAR") is False
+
+    def test_empty_string(self):
+        handler = _make_handler()
+        assert handler.is_valid_archive_type("") is False
+
+
+# ===================================================================
+# err_exit
+# ===================================================================
+
+class TestErrExit:
+
+    def test_returns_failure_data(self):
+        handler = _make_handler()
+        result = handler.err_exit("something broke")
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+        assert "something broke" in result[utils.ERR_MSG_KEY]
+
+
+# ===================================================================
+# uploadBlueprint
+# ===================================================================
+
+class TestUploadBlueprint:
+
+    def test_invalid_archive_type(self):
+        req = _make_request(archive_type="UNKNOWN")
+        handler = _make_handler(req)
+        upload_req = SimpleNamespace(archiveType="UNKNOWN", binData=b"",
+                                     **{'__str__': lambda s: "req"})
+        # Use request object with the right fields
+        result = handler.uploadBlueprint(
+            SimpleNamespace(archiveType="UNKNOWN", binData=b"")
+        )
+        assert result.status == CommandExecutor_pb2.FAILURE
+
+    def test_successful_zip_upload(self):
+        zip_bytes = _create_valid_zip_bytes()
+        req = _make_request(archive_type="CBA_ZIP", bin_data=zip_bytes)
+        handler = _make_handler(req)
+
+        with patch("os.makedirs") as mock_mkdirs:
+            upload_req = SimpleNamespace(archiveType="CBA_ZIP", binData=zip_bytes)
+            # Mock ZipFile to avoid writing to filesystem
+            with patch("command_executor_handler.ZipFile") as mock_zf:
+                mock_zf_instance = MagicMock()
+                mock_zf.return_value.__enter__ = MagicMock(return_value=mock_zf_instance)
+                mock_zf.return_value.__exit__ = MagicMock(return_value=False)
+                result = handler.uploadBlueprint(upload_req)
+
+        assert result.status == CommandExecutor_pb2.SUCCESS
+
+    def test_makedirs_failure(self):
+        req = _make_request()
+        handler = _make_handler(req)
+
+        with patch("os.makedirs", side_effect=OSError("permission denied")):
+            upload_req = SimpleNamespace(archiveType="CBA_ZIP", binData=b"data")
+            result = handler.uploadBlueprint(upload_req)
+
+        assert result.status == CommandExecutor_pb2.FAILURE
+
+    def test_gzip_returns_failure_todo(self):
+        """CBA_GZIP is recognized but not yet implemented, should return FAILURE."""
+        req = _make_request()
+        handler = _make_handler(req)
+
+        with patch("os.makedirs"):
+            upload_req = SimpleNamespace(archiveType="CBA_GZIP", binData=b"data")
+            result = handler.uploadBlueprint(upload_req)
+
+        assert result.status == CommandExecutor_pb2.FAILURE
+
+
+# ===================================================================
+# prepare_env
+# ===================================================================
+
+class TestPrepareEnv:
+
+    def test_blueprint_dir_not_found(self):
+        req = _make_request()
+        handler = _make_handler(req)
+
+        with patch.object(handler, 'blueprint_dir_exists', return_value=False):
+            result = handler.prepare_env(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+        assert result.get(utils.REUPLOAD_CBA_KEY) is True
+
+    def test_tosca_meta_not_found(self):
+        req = _make_request()
+        handler = _make_handler(req)
+
+        with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+             patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=False):
+            result = handler.prepare_env(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+        assert result.get(utils.REUPLOAD_CBA_KEY) is True
+
+    def test_already_installed_reads_file(self):
+        req = _make_request()
+        handler = _make_handler(req)
+
+        with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+             patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \
+             patch.object(handler, 'is_installed', return_value=True), \
+             patch("command_executor_handler.open", mock_open(read_data="previously installed packages")):
+            result = handler.prepare_env(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True
+        assert "previously installed packages" in result[utils.RESULTS_LOG_KEY][0]
+
+    def test_already_installed_read_failure(self):
+        req = _make_request()
+        handler = _make_handler(req)
+
+        with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+             patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \
+             patch.object(handler, 'is_installed', return_value=True), \
+             patch("command_executor_handler.open", side_effect=IOError("read error")):
+            result = handler.prepare_env(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+    def test_create_venv_failure(self):
+        req = _make_request()
+        handler = _make_handler(req)
+
+        with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+             patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \
+             patch.object(handler, 'is_installed', return_value=False), \
+             patch.object(handler, 'create_venv', return_value=utils.build_ret_data(False, error="venv error")):
+            result = handler.prepare_env(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+    def test_pip_upgrade_failure(self):
+        req = _make_request()
+        handler = _make_handler(req)
+
+        with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+             patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \
+             patch.object(handler, 'is_installed', return_value=False), \
+             patch.object(handler, 'create_venv', return_value=utils.build_ret_data(True)), \
+             patch.object(handler, 'upgrade_pip', return_value=False):
+            result = handler.prepare_env(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+
+# ===================================================================
+# execute_command
+# ===================================================================
+
+class TestExecuteCommand:
+
+    def test_successful_execution(self):
+        req = _make_request(
+            command="/opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py arg1",
+            timeout=30,
+        )
+        handler = _make_handler(req)
+
+        mock_completed = MagicMock()
+        mock_completed.returncode = 0
+
+        with patch.object(handler, 'is_installed', return_value=True), \
+             patch("os.utime"), \
+             patch("subprocess.run", return_value=mock_completed), \
+             patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+            result = handler.execute_command(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True
+
+    def test_failed_execution_nonzero_rc(self):
+        req = _make_request(
+            command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py",
+            timeout=30,
+        )
+        handler = _make_handler(req)
+
+        mock_completed = MagicMock()
+        mock_completed.returncode = 1
+
+        with patch.object(handler, 'is_installed', return_value=True), \
+             patch("os.utime"), \
+             patch("subprocess.run", return_value=mock_completed), \
+             patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+            result = handler.execute_command(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+    def test_timeout_execution(self):
+        from subprocess import TimeoutExpired
+
+        req = _make_request(
+            command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py",
+            timeout=5,
+        )
+        handler = _make_handler(req)
+
+        with patch.object(handler, 'is_installed', return_value=True), \
+             patch("os.utime"), \
+             patch("subprocess.run", side_effect=TimeoutExpired("cmd", 5)), \
+             patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+            result = handler.execute_command(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+    def test_general_exception(self):
+        req = _make_request(
+            command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py",
+            timeout=30,
+        )
+        handler = _make_handler(req)
+
+        with patch.object(handler, 'is_installed', return_value=True), \
+             patch("os.utime", side_effect=Exception("unexpected")):
+            result = handler.execute_command(req)
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+    def test_sr7_compatibility_path_rewrite(self):
+        """If request.command uses name/version but not UUID, UUID should be inserted."""
+        req = _make_request(
+            name="my-bp", version="1.0.0", uuid="u-1",
+            command="python /opt/app/onap/blueprints/deploy/my-bp/1.0.0/Scripts/python/test.py",
+            timeout=30,
+        )
+        handler = _make_handler(req)
+
+        mock_completed = MagicMock()
+        mock_completed.returncode = 0
+        captured_cmd = []
+
+        def capture_run(cmd, **kwargs):
+            captured_cmd.append(cmd)
+            return mock_completed
+
+        with patch.object(handler, 'is_installed', return_value=True), \
+             patch("os.utime"), \
+             patch("subprocess.run", side_effect=capture_run), \
+             patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+            handler.execute_command(req)
+
+        # The command should have been rewritten to include the UUID
+        assert "my-bp/1.0.0/u-1" in captured_cmd[0]
+
+    def test_ansible_playbook_adds_interpreter(self):
+        """ansible-playbook commands should get ansible_python_interpreter set."""
+        req = _make_request(
+            command="ansible-playbook /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/playbook.yml",
+            timeout=30,
+        )
+        handler = _make_handler(req)
+
+        mock_completed = MagicMock()
+        mock_completed.returncode = 0
+        captured_cmd = []
+
+        def capture_run(cmd, **kwargs):
+            captured_cmd.append(cmd)
+            return mock_completed
+
+        with patch.object(handler, 'is_installed', return_value=True), \
+             patch("os.utime"), \
+             patch("subprocess.run", side_effect=capture_run), \
+             patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+            handler.execute_command(req)
+
+        assert "ansible_python_interpreter=" in captured_cmd[0]
+
+
+# ===================================================================
+# create_venv
+# ===================================================================
+
+class TestCreateVenv:
+
+    def test_successful_creation(self):
+        handler = _make_handler()
+        with patch("venv.create") as mock_create:
+            result = handler.create_venv()
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True
+        mock_create.assert_called_once()
+
+    def test_creation_failure(self):
+        handler = _make_handler()
+        with patch("venv.create", side_effect=Exception("venv failed")):
+            result = handler.create_venv()
+
+        assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+        assert "venv failed" in str(result.get(utils.ERR_MSG_KEY, ""))
+
+    def test_system_site_packages_disabled(self):
+        handler = _make_handler()
+        with patch("venv.create") as mock_create, \
+             patch.dict(os.environ, {"CREATE_VENV_DISABLE_SITE_PACKAGES": "1"}):
+            handler.create_venv()
+
+        call_kwargs = mock_create.call_args
+        # system_site_packages should be False when env var is set
+        assert call_kwargs[1].get("system_site_packages") is False or \
+               (len(call_kwargs[0]) > 0 and call_kwargs[1].get("system_site_packages") is False)
+
+    def test_system_site_packages_enabled_by_default(self):
+        handler = _make_handler()
+        env = os.environ.copy()
+        env.pop("CREATE_VENV_DISABLE_SITE_PACKAGES", None)
+        with patch("venv.create") as mock_create, \
+             patch.dict(os.environ, env, clear=True):
+            handler.create_venv()
+
+        call_kwargs = mock_create.call_args
+        assert call_kwargs[1].get("system_site_packages") is True
+
+
+# ===================================================================
+# upgrade_pip
+# ===================================================================
+
+class TestUpgradePip:
+
+    def test_successful_upgrade(self):
+        handler = _make_handler()
+        results = []
+
+        mock_result = MagicMock()
+        mock_result.stdout = b"Successfully installed pip-23.0"
+
+        with patch("subprocess.run", return_value=mock_result):
+            success = handler.upgrade_pip(results)
+
+        assert success is True
+        assert "Successfully installed pip-23.0" in results[0]
+
+    def test_failed_upgrade(self):
+        handler = _make_handler()
+        results = []
+
+        with patch("subprocess.run", side_effect=CalledProcessError(
+                1, "pip", stderr=b"pip upgrade failed")):
+            success = handler.upgrade_pip(results)
+
+        assert success is False
+        assert "pip upgrade failed" in results[0]
+
+
+# ===================================================================
+# install_python_packages
+# ===================================================================
+
+class TestInstallPythonPackages:
+
+    def test_successful_install(self):
+        handler = _make_handler()
+        results = []
+
+        mock_result = MagicMock()
+        mock_result.stdout = b"Successfully installed package-1.0"
+
+        with patch("subprocess.run", return_value=mock_result):
+            success = handler.install_python_packages("some-package", results)
+
+        assert success is True
+
+    def test_failed_install(self):
+        handler = _make_handler()
+        results = []
+
+        with patch("subprocess.run", side_effect=CalledProcessError(
+                1, "pip install", stderr=b"No matching distribution")):
+            success = handler.install_python_packages("bad-package", results)
+
+        assert success is False
+        assert "No matching distribution" in results[0]
+
+    def test_requirements_txt_uses_full_path(self):
+        handler = _make_handler()
+        results = []
+        captured_cmd = []
+
+        mock_result = MagicMock()
+        mock_result.stdout = b"ok"
+
+        def capture_run(cmd, **kwargs):
+            captured_cmd.append(cmd)
+            return mock_result
+
+        with patch("subprocess.run", side_effect=capture_run):
+            handler.install_python_packages("requirements.txt", results)
+
+        # Should use the full path to pip and requirements.txt
+        assert any("bin/pip" in str(c) for c in captured_cmd)
+        assert any("requirements.txt" in str(c) for c in captured_cmd)
+
+    def test_utility_package_uses_cp(self):
+        handler = _make_handler()
+        results = []
+        captured_cmd = []
+
+        mock_result = MagicMock()
+        mock_result.stdout = b"ok"
+
+        def capture_run(cmd, **kwargs):
+            captured_cmd.append(cmd)
+            return mock_result
+
+        with patch("subprocess.run", side_effect=capture_run):
+            handler.install_python_packages("UTILITY", results)
+
+        assert captured_cmd[0][0] == "cp"
+
+    def test_pip_install_user_flag(self):
+        handler = _make_handler()
+        results = []
+        captured_cmd = []
+
+        mock_result = MagicMock()
+        mock_result.stdout = b"ok"
+
+        def capture_run(cmd, **kwargs):
+            captured_cmd.append(cmd)
+            return mock_result
+
+        with patch("subprocess.run", side_effect=capture_run), \
+             patch.dict(os.environ, {"PIP_INSTALL_USER_FLAG": "1"}):
+            handler.install_python_packages("some-package", results)
+
+        assert "--user" in captured_cmd[0]
+
+
+# ===================================================================
+# install_ansible_packages
+# ===================================================================
+
+class TestInstallAnsiblePackages:
+
+    def test_successful_install(self):
+        handler = _make_handler()
+        results = []
+
+        mock_result = MagicMock()
+        mock_result.stdout = b"Role installed successfully"
+
+        with patch("subprocess.run", return_value=mock_result):
+            success = handler.install_ansible_packages("my-role", results)
+
+        assert success is True
+
+    def test_failed_install(self):
+        handler = _make_handler()
+        results = []
+
+        with patch("subprocess.run", side_effect=CalledProcessError(
+                1, "ansible-galaxy", stderr=b"Role not found")):
+            success = handler.install_ansible_packages("bad-role", results)
+
+        assert success is False
+        assert "Role not found" in results[0]
+
+    def test_uses_ansible_galaxy_command(self):
+        handler = _make_handler()
+        results = []
+        captured_cmd = []
+
+        mock_result = MagicMock()
+        mock_result.stdout = b"ok"
+
+        def capture_run(cmd, **kwargs):
+            captured_cmd.append(cmd)
+            return mock_result
+
+        with patch("subprocess.run", side_effect=capture_run):
+            handler.install_ansible_packages("some-role", results)
+
+        assert captured_cmd[0][0] == "ansible-galaxy"
+        assert "install" in captured_cmd[0]
+        assert "some-role" in captured_cmd[0]
+
+    def test_http_proxy_is_passed(self):
+        handler = _make_handler()
+        results = []
+        captured_env = []
+
+        mock_result = MagicMock()
+        mock_result.stdout = b"ok"
+
+        def capture_run(cmd, **kwargs):
+            captured_env.append(kwargs.get("env", {}))
+            return mock_result
+
+        with patch("subprocess.run", side_effect=capture_run), \
+             patch.dict(os.environ, {"http_proxy": "http://proxy:8080"}):
+            handler.install_ansible_packages("role", results)
+
+        assert captured_env[0].get("https_proxy") == "http://proxy:8080"
diff --git a/ms/command-executor/src/test/python/test_command_executor_server.py b/ms/command-executor/src/test/python/test_command_executor_server.py
new file mode 100644 (file)
index 0000000..9d6db28
--- /dev/null
@@ -0,0 +1,490 @@
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration tests for command_executor_server.py — CommandExecutorServer gRPC servicer.
+
+Includes:
+  - Unit-level tests that mock CommandExecutorHandler to verify branching logic
+  - Full gRPC integration tests that start a real server, register the servicer,
+    and make real RPC calls through a channel/stub with mocked handler internals
+"""
+
+import io
+import json
+import logging
+import os
+import zipfile
+from concurrent import futures
+from types import SimpleNamespace
+from unittest.mock import patch, MagicMock
+
+import grpc
+import pytest
+
+import proto.CommandExecutor_pb2 as pb2
+import proto.CommandExecutor_pb2_grpc as pb2_grpc
+import utils
+from command_executor_server import CommandExecutorServer
+
+
+# ---------------------------------------------------------------------------
+# Helpers — build real protobuf messages
+# ---------------------------------------------------------------------------
+
+def _identifiers(name="test-bp", version="1.0.0", uuid="uuid-123"):
+    ids = pb2.Identifiers()
+    ids.blueprintName = name
+    ids.blueprintVersion = version
+    ids.blueprintUUID = uuid
+    return ids
+
+
+def _upload_request(name="test-bp", version="1.0.0", uuid="uuid-123",
+                    request_id="req-1", sub_request_id="sub-1",
+                    originator_id="orig-1", archive_type="CBA_ZIP",
+                    bin_data=b""):
+    req = pb2.UploadBlueprintInput()
+    req.identifiers.CopyFrom(_identifiers(name, version, uuid))
+    req.requestId = request_id
+    req.subRequestId = sub_request_id
+    req.originatorId = originator_id
+    req.archiveType = archive_type
+    req.binData = bin_data
+    return req
+
+
+def _prepare_env_request(name="test-bp", version="1.0.0", uuid="uuid-123",
+                         request_id="req-2", sub_request_id="sub-2",
+                         originator_id="orig-1", timeout=30):
+    req = pb2.PrepareEnvInput()
+    req.identifiers.CopyFrom(_identifiers(name, version, uuid))
+    req.requestId = request_id
+    req.subRequestId = sub_request_id
+    req.originatorId = originator_id
+    req.timeOut = timeout
+    return req
+
+
+def _execution_request(name="test-bp", version="1.0.0", uuid="uuid-123",
+                       request_id="req-3", sub_request_id="sub-3",
+                       originator_id="orig-1", command="python test.py",
+                       timeout=30):
+    req = pb2.ExecutionInput()
+    req.identifiers.CopyFrom(_identifiers(name, version, uuid))
+    req.requestId = request_id
+    req.subRequestId = sub_request_id
+    req.originatorId = originator_id
+    req.command = command
+    req.timeOut = timeout
+    return req
+
+
+def _create_valid_zip_bytes():
+    """Create a valid in-memory zip with a single entry."""
+    buf = io.BytesIO()
+    with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
+        zf.writestr("test.txt", "hello world")
+    return buf.getvalue()
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+@pytest.fixture
+def server_instance():
+    """A plain CommandExecutorServer (no gRPC wiring)."""
+    return CommandExecutorServer()
+
+
+@pytest.fixture
+def mock_context():
+    """A MagicMock standing in for gRPC ServicerContext."""
+    return MagicMock()
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — mock CommandExecutorHandler to test server branch logic
+# ---------------------------------------------------------------------------
+
+class TestUploadBlueprint:
+    """uploadBlueprint delegates to handler.uploadBlueprint and returns result."""
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_returns_handler_result(self, MockHandler, server_instance, mock_context):
+        fake_response = pb2.UploadBlueprintOutput()
+        fake_response.requestId = "req-1"
+        fake_response.status = pb2.SUCCESS
+        MockHandler.return_value.uploadBlueprint.return_value = fake_response
+
+        req = _upload_request()
+        result = server_instance.uploadBlueprint(req, mock_context)
+
+        MockHandler.assert_called_once_with(req)
+        MockHandler.return_value.uploadBlueprint.assert_called_once_with(req)
+        assert result == fake_response
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_passes_request_to_handler(self, MockHandler, server_instance, mock_context):
+        MockHandler.return_value.uploadBlueprint.return_value = pb2.UploadBlueprintOutput()
+
+        req = _upload_request(name="my-bp", version="2.0.0", uuid="u-456")
+        server_instance.uploadBlueprint(req, mock_context)
+
+        # Handler was constructed with the exact request
+        MockHandler.assert_called_once_with(req)
+
+
+class TestPrepareEnv:
+    """prepareEnv delegates to handler.prepare_env, logs, and builds grpc response."""
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_successful_prepare_env(self, MockHandler, server_instance, mock_context):
+        MockHandler.return_value.prepare_env.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: ["pkg installed"],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _prepare_env_request(request_id="req-ok")
+        result = server_instance.prepareEnv(req, mock_context)
+
+        MockHandler.assert_called_once_with(req)
+        MockHandler.return_value.prepare_env.assert_called_once_with(req)
+        # Result is an ExecutionOutput
+        assert isinstance(result, pb2.ExecutionOutput)
+        assert result.requestId == "req-ok"
+        assert result.status == pb2.SUCCESS
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_failed_prepare_env(self, MockHandler, server_instance, mock_context):
+        MockHandler.return_value.prepare_env.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: False,
+            utils.RESULTS_LOG_KEY: [],
+            utils.ERR_MSG_KEY: "pip install failed",
+        }
+
+        req = _prepare_env_request(request_id="req-fail")
+        result = server_instance.prepareEnv(req, mock_context)
+
+        assert isinstance(result, pb2.ExecutionOutput)
+        assert result.requestId == "req-fail"
+        assert result.status == pb2.FAILURE
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_prepare_env_logs_success(self, MockHandler, server_instance, mock_context, caplog):
+        MockHandler.return_value.prepare_env.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: ["ok"],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _prepare_env_request()
+        with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+            server_instance.prepareEnv(req, mock_context)
+
+        assert any("Package installation logs" in r.message for r in caplog.records)
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_prepare_env_logs_failure(self, MockHandler, server_instance, mock_context, caplog):
+        MockHandler.return_value.prepare_env.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: False,
+            utils.RESULTS_LOG_KEY: [],
+            utils.ERR_MSG_KEY: "missing dep",
+        }
+
+        req = _prepare_env_request()
+        with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+            server_instance.prepareEnv(req, mock_context)
+
+        assert any("Failed to prepare python environment" in r.message for r in caplog.records)
+
+
+class TestExecuteCommand:
+    """executeCommand delegates to handler.execute_command, logs, builds grpc response."""
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_successful_execution(self, MockHandler, server_instance, mock_context):
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: ["all ok"],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _execution_request(request_id="req-exec")
+        result = server_instance.executeCommand(req, mock_context)
+
+        MockHandler.assert_called_once_with(req)
+        MockHandler.return_value.execute_command.assert_called_once_with(req)
+        assert isinstance(result, pb2.ExecutionOutput)
+        assert result.requestId == "req-exec"
+        assert result.status == pb2.SUCCESS
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_failed_execution_with_err_msg(self, MockHandler, server_instance, mock_context):
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: False,
+            utils.RESULTS_LOG_KEY: ["step1"],
+            utils.ERR_MSG_KEY: "script crashed",
+        }
+
+        req = _execution_request(request_id="req-fail")
+        result = server_instance.executeCommand(req, mock_context)
+
+        assert result.status == pb2.FAILURE
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_failed_execution_without_err_msg(self, MockHandler, server_instance, mock_context, caplog):
+        """When ERR_MSG_KEY is missing from the response dict, no error text is logged."""
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: False,
+            utils.RESULTS_LOG_KEY: ["partial output"],
+        }
+
+        req = _execution_request()
+        with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+            result = server_instance.executeCommand(req, mock_context)
+
+        assert result.status == pb2.FAILURE
+        # The "Error returned:" substring should NOT appear since ERR_MSG_KEY is absent
+        failure_msgs = [r.message for r in caplog.records if "Failed to executeCommand" in r.message]
+        assert len(failure_msgs) == 1
+        assert "Error returned:" not in failure_msgs[0]
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_logs_success(self, MockHandler, server_instance, mock_context, caplog):
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: [],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _execution_request()
+        with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+            server_instance.executeCommand(req, mock_context)
+
+        assert any("Execution finished successfully" in r.message for r in caplog.records)
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_logs_failure_with_error_detail(self, MockHandler, server_instance, mock_context, caplog):
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: False,
+            utils.RESULTS_LOG_KEY: ["started"],
+            utils.ERR_MSG_KEY: "timeout!",
+        }
+
+        req = _execution_request()
+        with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+            server_instance.executeCommand(req, mock_context)
+
+        failure_msgs = [r.message for r in caplog.records if "Failed to executeCommand" in r.message]
+        assert len(failure_msgs) == 1
+        assert "Error returned: timeout!" in failure_msgs[0]
+
+    @patch.dict(os.environ, {"CE_DEBUG": "true"})
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_ce_debug_logs_request(self, MockHandler, server_instance, mock_context, caplog):
+        """When CE_DEBUG=true, the full request is logged."""
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: [],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _execution_request()
+        with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+            server_instance.executeCommand(req, mock_context)
+
+        # At least 3 log entries: received + request dump + success + payload
+        assert len(caplog.records) >= 3
+
+    @patch.dict(os.environ, {"CE_DEBUG": "false"})
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_ce_debug_off_does_not_log_request(self, MockHandler, server_instance, mock_context, caplog):
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: [],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _execution_request()
+        with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+            server_instance.executeCommand(req, mock_context)
+
+        # The request object itself should not appear in logs
+        logged_messages = " ".join(r.message for r in caplog.records if isinstance(r.message, str))
+        assert "Received executeCommand" in logged_messages
+
+
+# ---------------------------------------------------------------------------
+# gRPC integration tests — real server + channel + stub
+# ---------------------------------------------------------------------------
+
+class TestGrpcIntegration:
+    """
+    Spin up a real gRPC server with CommandExecutorServer registered,
+    make RPC calls through a channel, and verify end-to-end message
+    serialization / deserialization with mocked handler internals.
+    """
+
+    @pytest.fixture(autouse=True)
+    def grpc_server(self):
+        """Start a gRPC server on a free port, yield a stub, then shut down."""
+        self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
+        self.servicer = CommandExecutorServer()
+        pb2_grpc.add_CommandExecutorServiceServicer_to_server(self.servicer, self.server)
+        port = self.server.add_insecure_port("[::]:0")  # OS assigns a free port
+        self.server.start()
+
+        channel = grpc.insecure_channel(f"localhost:{port}")
+        self.stub = pb2_grpc.CommandExecutorServiceStub(channel)
+
+        yield
+
+        channel.close()
+        self.server.stop(grace=0)
+
+    # -- uploadBlueprint over gRPC --
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_upload_blueprint_grpc_roundtrip(self, MockHandler):
+        expected = pb2.UploadBlueprintOutput()
+        expected.requestId = "req-grpc-1"
+        expected.status = pb2.SUCCESS
+        expected.payload = '{"result": "uploaded"}'
+        MockHandler.return_value.uploadBlueprint.return_value = expected
+
+        req = _upload_request(
+            name="grpc-bp", version="3.0.0", uuid="grpc-uuid",
+            request_id="req-grpc-1", bin_data=_create_valid_zip_bytes(),
+        )
+        response = self.stub.uploadBlueprint(req)
+
+        assert response.requestId == "req-grpc-1"
+        assert response.status == pb2.SUCCESS
+        assert response.payload == '{"result": "uploaded"}'
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_upload_blueprint_grpc_failure(self, MockHandler):
+        expected = pb2.UploadBlueprintOutput()
+        expected.requestId = "req-grpc-fail"
+        expected.status = pb2.FAILURE
+        expected.payload = '{"error": "bad archive"}'
+        MockHandler.return_value.uploadBlueprint.return_value = expected
+
+        req = _upload_request(request_id="req-grpc-fail")
+        response = self.stub.uploadBlueprint(req)
+
+        assert response.status == pb2.FAILURE
+
+    # -- prepareEnv over gRPC --
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_prepare_env_grpc_success(self, MockHandler):
+        MockHandler.return_value.prepare_env.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: ["dep1 installed"],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _prepare_env_request(request_id="req-prep-grpc")
+        response = self.stub.prepareEnv(req)
+
+        assert response.requestId == "req-prep-grpc"
+        assert response.status == pb2.SUCCESS
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_prepare_env_grpc_failure(self, MockHandler):
+        MockHandler.return_value.prepare_env.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: False,
+            utils.RESULTS_LOG_KEY: [],
+            utils.ERR_MSG_KEY: "network error during pip install",
+        }
+
+        req = _prepare_env_request(request_id="req-prep-fail")
+        response = self.stub.prepareEnv(req)
+
+        assert response.requestId == "req-prep-fail"
+        assert response.status == pb2.FAILURE
+
+    # -- executeCommand over gRPC --
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_execute_command_grpc_success(self, MockHandler):
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: ["output line"],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _execution_request(request_id="req-exec-grpc", command="python run.py")
+        response = self.stub.executeCommand(req)
+
+        assert response.requestId == "req-exec-grpc"
+        assert response.status == pb2.SUCCESS
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_execute_command_grpc_failure(self, MockHandler):
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: False,
+            utils.RESULTS_LOG_KEY: ["partial"],
+            utils.ERR_MSG_KEY: "script error",
+        }
+
+        req = _execution_request(request_id="req-exec-fail")
+        response = self.stub.executeCommand(req)
+
+        assert response.requestId == "req-exec-fail"
+        assert response.status == pb2.FAILURE
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_execute_command_grpc_preserves_payload(self, MockHandler):
+        """Verify payload round-trips correctly through gRPC serialization."""
+        payload_data = json.dumps({"key": "value", "number": 42})
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: [payload_data],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _execution_request(request_id="req-payload")
+        response = self.stub.executeCommand(req)
+
+        assert response.requestId == "req-payload"
+        assert response.status == pb2.SUCCESS
+
+    @patch("command_executor_server.CommandExecutorHandler")
+    def test_request_fields_received_by_handler(self, MockHandler):
+        """Verify that the proto request fields survive serialization to the server."""
+        MockHandler.return_value.execute_command.return_value = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: [],
+            utils.ERR_MSG_KEY: "",
+        }
+
+        req = _execution_request(
+            name="field-bp", version="5.0.0", uuid="field-uuid",
+            request_id="field-req", command="python check.py",
+        )
+        self.stub.executeCommand(req)
+
+        # The handler was constructed with a request that has the right identifiers
+        call_args = MockHandler.call_args[0][0]
+        assert call_args.identifiers.blueprintName == "field-bp"
+        assert call_args.identifiers.blueprintVersion == "5.0.0"
+        assert call_args.requestId == "field-req"
+        assert call_args.command == "python check.py"
diff --git a/ms/command-executor/src/test/python/test_payload_coder.py b/ms/command-executor/src/test/python/test_payload_coder.py
new file mode 100644 (file)
index 0000000..ead4a37
--- /dev/null
@@ -0,0 +1,181 @@
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for cds_utils/payload_coder.py — MIME payload and error message
+encoding used by CBA scripts to communicate results back to the
+command executor.
+"""
+
+import json
+import sys
+from io import StringIO
+from unittest.mock import patch
+
+from cds_utils.payload_coder import (
+    send_response_data_payload,
+    send_response_err_msg,
+    send_response_err_msg_and_exit,
+)
+
+
+class TestSendResponseDataPayload:
+
+    def test_output_contains_begin_end_markers(self):
+        """Output must be wrapped in BEGIN/END_EXTRA_PAYLOAD markers."""
+        captured = StringIO()
+        with patch("sys.stdout", captured):
+            send_response_data_payload({"status": "ok"})
+
+        output = captured.getvalue()
+        assert "BEGIN_EXTRA_PAYLOAD" in output
+        assert "END_EXTRA_PAYLOAD" in output
+
+    def test_payload_is_json_encoded(self):
+        """The JSON payload must appear between the markers."""
+        captured = StringIO()
+        with patch("sys.stdout", captured):
+            send_response_data_payload({"key": "value", "count": 42})
+
+        output = captured.getvalue()
+        # The encoded JSON should be somewhere in the output
+        assert '"key"' in output
+        assert '"value"' in output
+        assert "42" in output
+
+    def test_output_is_mime_formatted(self):
+        """Output should be MIME multipart form-data."""
+        captured = StringIO()
+        with patch("sys.stdout", captured):
+            send_response_data_payload({"a": 1})
+
+        output = captured.getvalue()
+        assert "Content-Type:" in output
+        assert "form-data" in output
+
+    def test_empty_payload(self):
+        """An empty dict should still produce valid markers and MIME."""
+        captured = StringIO()
+        with patch("sys.stdout", captured):
+            send_response_data_payload({})
+
+        output = captured.getvalue()
+        assert "BEGIN_EXTRA_PAYLOAD" in output
+        assert "END_EXTRA_PAYLOAD" in output
+        assert "{}" in output
+
+    def test_roundtrip_with_parse_cmd_exec_output(self):
+        """The output of send_response_data_payload should be parseable
+        by utils.parse_cmd_exec_output (integration-style test)."""
+        import tempfile
+        from unittest.mock import MagicMock
+        import utils
+
+        payload_data = {"result": "success", "code": 200}
+
+        captured = StringIO()
+        with patch("sys.stdout", captured):
+            send_response_data_payload(payload_data)
+
+        output = captured.getvalue()
+
+        with tempfile.TemporaryFile(mode="w+") as f:
+            f.write(output)
+            results_log = []
+            payload_result = {}
+            err_msg_result = []
+            utils.parse_cmd_exec_output(
+                f, MagicMock(), payload_result, err_msg_result, results_log, {}
+            )
+
+        assert payload_result["result"] == "success"
+        assert payload_result["code"] == 200
+
+
+class TestSendResponseErrMsg:
+
+    def test_output_contains_begin_end_markers(self):
+        captured = StringIO()
+        with patch("sys.stdout", captured):
+            send_response_err_msg("something went wrong")
+
+        output = captured.getvalue()
+        assert "BEGIN_EXTRA_RET_ERR_MSG" in output
+        assert "END_EXTRA_RET_ERR_MSG" in output
+
+    def test_error_message_is_included(self):
+        captured = StringIO()
+        with patch("sys.stdout", captured):
+            send_response_err_msg("disk full")
+
+        output = captured.getvalue()
+        assert "disk full" in output
+
+    def test_roundtrip_with_parse_cmd_exec_output(self):
+        """The err msg output should be parseable by parse_cmd_exec_output."""
+        import tempfile
+        from unittest.mock import MagicMock
+        import utils
+
+        captured = StringIO()
+        with patch("sys.stdout", captured):
+            send_response_err_msg("custom error from script")
+
+        output = captured.getvalue()
+
+        with tempfile.TemporaryFile(mode="w+") as f:
+            f.write(output)
+            results_log = []
+            payload_result = {}
+            err_msg_result = []
+            utils.parse_cmd_exec_output(
+                f, MagicMock(), payload_result, err_msg_result, results_log, {}
+            )
+
+        assert len(err_msg_result) == 1
+        assert "custom error from script" in err_msg_result[0]
+
+
+class TestSendResponseErrMsgAndExit:
+
+    def test_output_contains_markers(self):
+        captured = StringIO()
+        with patch("sys.stdout", captured), \
+             pytest.raises(SystemExit) as exc_info:
+            send_response_err_msg_and_exit("fatal error")
+
+        output = captured.getvalue()
+        assert "BEGIN_EXTRA_RET_ERR_MSG" in output
+        assert "END_EXTRA_RET_ERR_MSG" in output
+        assert "fatal error" in output
+
+    def test_exits_with_default_code_1(self):
+        with patch("sys.stdout", StringIO()), \
+             pytest.raises(SystemExit) as exc_info:
+            send_response_err_msg_and_exit("oops")
+
+        assert exc_info.value.code == 1
+
+    def test_exits_with_custom_code(self):
+        with patch("sys.stdout", StringIO()), \
+             pytest.raises(SystemExit) as exc_info:
+            send_response_err_msg_and_exit("oops", code=42)
+
+        assert exc_info.value.code == 42
+
+
+# Need pytest import for raises
+import pytest
diff --git a/ms/command-executor/src/test/python/test_request_header_validator_interceptor.py b/ms/command-executor/src/test/python/test_request_header_validator_interceptor.py
new file mode 100644 (file)
index 0000000..5fe5306
--- /dev/null
@@ -0,0 +1,130 @@
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for request_header_validator_interceptor.py — gRPC server interceptor
+that validates authorization headers on incoming requests.
+"""
+
+from unittest.mock import MagicMock, patch
+
+import grpc
+
+from request_header_validator_interceptor import (
+    RequestHeaderValidatorInterceptor,
+    _unary_unary_rpc_terminator,
+)
+
+
+# ===================================================================
+# _unary_unary_rpc_terminator
+# ===================================================================
+
+class TestUnaryUnaryRpcTerminator:
+
+    def test_returns_rpc_method_handler(self):
+        handler = _unary_unary_rpc_terminator(
+            grpc.StatusCode.UNAUTHENTICATED, "Access denied"
+        )
+        # grpc.unary_unary_rpc_method_handler returns an RpcMethodHandler
+        assert handler is not None
+        assert handler.unary_unary is not None
+
+    def test_terminator_aborts_context(self):
+        handler = _unary_unary_rpc_terminator(
+            grpc.StatusCode.UNAUTHENTICATED, "No access"
+        )
+        mock_context = MagicMock()
+        # Call the actual terminate function
+        handler.unary_unary(None, mock_context)
+        mock_context.abort.assert_called_once_with(
+            grpc.StatusCode.UNAUTHENTICATED, "No access"
+        )
+
+
+# ===================================================================
+# RequestHeaderValidatorInterceptor
+# ===================================================================
+
+class TestRequestHeaderValidatorInterceptor:
+
+    def _make_interceptor(self, header="authorization", value="Basic abc123"):
+        return RequestHeaderValidatorInterceptor(
+            header, value,
+            grpc.StatusCode.UNAUTHENTICATED, "Access denied!"
+        )
+
+    def test_valid_header_continues(self):
+        interceptor = self._make_interceptor(
+            header="authorization", value="Basic abc123"
+        )
+
+        # Mock handler_call_details with matching metadata
+        mock_details = MagicMock()
+        mock_details.invocation_metadata = [
+            ("authorization", "Basic abc123"),
+            ("other-header", "other-value"),
+        ]
+
+        mock_continuation = MagicMock(return_value="continued_handler")
+        result = interceptor.intercept_service(mock_continuation, mock_details)
+
+        mock_continuation.assert_called_once_with(mock_details)
+        assert result == "continued_handler"
+
+    def test_missing_header_returns_terminator(self):
+        interceptor = self._make_interceptor(
+            header="authorization", value="Basic abc123"
+        )
+
+        mock_details = MagicMock()
+        mock_details.invocation_metadata = [
+            ("other-header", "other-value"),
+        ]
+
+        mock_continuation = MagicMock()
+        result = interceptor.intercept_service(mock_continuation, mock_details)
+
+        # Continuation should NOT be called
+        mock_continuation.assert_not_called()
+        # Result should be the terminator (an RpcMethodHandler)
+        assert result is not None
+
+    def test_wrong_value_returns_terminator(self):
+        interceptor = self._make_interceptor(
+            header="authorization", value="Basic abc123"
+        )
+
+        mock_details = MagicMock()
+        mock_details.invocation_metadata = [
+            ("authorization", "Basic WRONG"),
+        ]
+
+        mock_continuation = MagicMock()
+        result = interceptor.intercept_service(mock_continuation, mock_details)
+
+        mock_continuation.assert_not_called()
+
+    def test_empty_metadata_returns_terminator(self):
+        interceptor = self._make_interceptor()
+
+        mock_details = MagicMock()
+        mock_details.invocation_metadata = []
+
+        mock_continuation = MagicMock()
+        result = interceptor.intercept_service(mock_continuation, mock_details)
+
+        mock_continuation.assert_not_called()
diff --git a/ms/command-executor/src/test/python/test_utils.py b/ms/command-executor/src/test/python/test_utils.py
new file mode 100644 (file)
index 0000000..41800d7
--- /dev/null
@@ -0,0 +1,391 @@
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for utils.py — gRPC response building, request field extraction,
+output parsing, and log truncation.
+"""
+
+import json
+import tempfile
+from types import SimpleNamespace
+from unittest.mock import patch, MagicMock
+
+import pytest
+
+import proto.CommandExecutor_pb2 as CommandExecutor_pb2
+import utils
+
+
+# ---------------------------------------------------------------------------
+# Helpers — lightweight request stub
+# ---------------------------------------------------------------------------
+
+def _make_identifiers(name="bp1", version="1.0.0", uuid="abc-123"):
+    return SimpleNamespace(blueprintName=name, blueprintVersion=version, blueprintUUID=uuid)
+
+
+def _make_request(name="bp1", version="1.0.0", uuid="abc-123",
+                  timeout=30, request_id="req-1", sub_request_id="sub-1",
+                  originator_id="orig-1"):
+    return SimpleNamespace(
+        identifiers=_make_identifiers(name, version, uuid),
+        timeOut=timeout,
+        requestId=request_id,
+        subRequestId=sub_request_id,
+        originatorId=originator_id,
+    )
+
+
+# ===================================================================
+# Request field extraction
+# ===================================================================
+
+class TestRequestFieldExtraction:
+    """Tests for the simple accessor functions at the top of utils.py."""
+
+    def test_blueprint_name_version_uuid(self):
+        req = _make_request(name="my-bp", version="2.0.0", uuid="u-1")
+        assert utils.blueprint_name_version_uuid(req) == "my-bp/2.0.0/u-1"
+
+    def test_blueprint_name_version(self):
+        req = _make_request(name="my-bp", version="2.0.0")
+        assert utils.blueprint_name_version(req) == "my-bp/2.0.0"
+
+    def test_get_blueprint_name(self):
+        req = _make_request(name="test-cba")
+        assert utils.get_blueprint_name(req) == "test-cba"
+
+    def test_get_blueprint_version(self):
+        req = _make_request(version="3.1.0")
+        assert utils.get_blueprint_version(req) == "3.1.0"
+
+    def test_get_blueprint_uuid(self):
+        req = _make_request(uuid="12345")
+        assert utils.get_blueprint_uuid(req) == "12345"
+
+    def test_get_blueprint_timeout(self):
+        req = _make_request(timeout=120)
+        assert utils.get_blueprint_timeout(req) == 120
+
+    def test_get_blueprint_requestid(self):
+        req = _make_request(request_id="r-99")
+        assert utils.get_blueprint_requestid(req) == "r-99"
+
+    def test_get_blueprint_subRequestId(self):
+        req = _make_request(sub_request_id="sr-42")
+        assert utils.get_blueprint_subRequestId(req) == "sr-42"
+
+
+# ===================================================================
+# getExtraLogData
+# ===================================================================
+
+class TestGetExtraLogData:
+
+    def test_with_request(self):
+        req = _make_request(request_id="r1", sub_request_id="s1", originator_id="o1")
+        extra = utils.getExtraLogData(req)
+        assert extra == {
+            'request_id': 'r1',
+            'subrequest_id': 's1',
+            'originator_id': 'o1',
+        }
+
+    def test_without_request(self):
+        extra = utils.getExtraLogData()
+        assert extra == {
+            'request_id': '',
+            'subrequest_id': '',
+            'originator_id': '',
+        }
+
+    def test_with_none(self):
+        extra = utils.getExtraLogData(None)
+        assert extra == {
+            'request_id': '',
+            'subrequest_id': '',
+            'originator_id': '',
+        }
+
+
+# ===================================================================
+# build_ret_data
+# ===================================================================
+
+class TestBuildRetData:
+
+    def test_success_minimal(self):
+        ret = utils.build_ret_data(True)
+        assert ret[utils.CDS_IS_SUCCESSFUL_KEY] is True
+        assert ret[utils.RESULTS_LOG_KEY] == []
+        assert utils.ERR_MSG_KEY not in ret
+        assert utils.REUPLOAD_CBA_KEY not in ret
+
+    def test_failure_with_error(self):
+        ret = utils.build_ret_data(False, error="something broke")
+        assert ret[utils.CDS_IS_SUCCESSFUL_KEY] is False
+        assert ret[utils.ERR_MSG_KEY] == "something broke"
+
+    def test_with_results_log(self):
+        logs = ["line1", "line2"]
+        ret = utils.build_ret_data(True, results_log=logs)
+        assert ret[utils.RESULTS_LOG_KEY] == logs
+
+    def test_reupload_cba_flag(self):
+        ret = utils.build_ret_data(False, reupload_cba=True)
+        assert ret[utils.REUPLOAD_CBA_KEY] is True
+
+    def test_reupload_cba_absent_when_false(self):
+        ret = utils.build_ret_data(False, reupload_cba=False)
+        assert utils.REUPLOAD_CBA_KEY not in ret
+
+
+# ===================================================================
+# build_grpc_response
+# ===================================================================
+
+class TestBuildGrpcResponse:
+
+    def test_success_response(self):
+        data = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: ["log line 1"],
+        }
+        resp = utils.build_grpc_response("req-1", data)
+
+        assert isinstance(resp, CommandExecutor_pb2.ExecutionOutput)
+        assert resp.requestId == "req-1"
+        assert resp.status == CommandExecutor_pb2.SUCCESS
+        assert resp.errMsg == ""
+        # CDS_IS_SUCCESSFUL_KEY and RESULTS_LOG_KEY should be popped from data
+        payload = json.loads(resp.payload)
+        assert utils.CDS_IS_SUCCESSFUL_KEY not in payload
+        assert utils.RESULTS_LOG_KEY not in payload
+
+    def test_failure_response_with_error(self):
+        data = {
+            utils.CDS_IS_SUCCESSFUL_KEY: False,
+            utils.RESULTS_LOG_KEY: ["log1"],
+            utils.ERR_MSG_KEY: ["Error line 1", "Error line 2"],
+        }
+        resp = utils.build_grpc_response("req-2", data)
+
+        assert resp.status == CommandExecutor_pb2.FAILURE
+        assert "Error line 1" in resp.errMsg
+        assert "Error line 2" in resp.errMsg
+
+    def test_response_contains_timestamp(self):
+        data = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: [],
+        }
+        resp = utils.build_grpc_response("req-3", data)
+        assert resp.timestamp.seconds > 0
+
+    def test_response_logs_preserved(self):
+        data = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: ["line A", "line B", "line C"],
+        }
+        resp = utils.build_grpc_response("req-4", data)
+        assert list(resp.response) == ["line A", "line B", "line C"]
+
+    def test_extra_payload_fields_preserved(self):
+        data = {
+            utils.CDS_IS_SUCCESSFUL_KEY: True,
+            utils.RESULTS_LOG_KEY: [],
+            "custom_key": "custom_value",
+        }
+        resp = utils.build_grpc_response("req-5", data)
+        payload = json.loads(resp.payload)
+        assert payload["custom_key"] == "custom_value"
+
+
+# ===================================================================
+# build_grpc_blueprint_validation_response
+# ===================================================================
+
+class TestBuildGrpcBlueprintValidationResponse:
+
+    def test_success(self):
+        resp = utils.build_grpc_blueprint_validation_response(
+            "req-1", "sub-1", "uuid-1", success=True
+        )
+        assert isinstance(resp, CommandExecutor_pb2.BlueprintValidationOutput)
+        assert resp.requestId == "req-1"
+        assert resp.subRequestId == "sub-1"
+        assert resp.cbaUUID == "uuid-1"
+        assert resp.status == CommandExecutor_pb2.SUCCESS
+
+    def test_failure(self):
+        resp = utils.build_grpc_blueprint_validation_response(
+            "req-2", "sub-2", "uuid-2", success=False
+        )
+        assert resp.status == CommandExecutor_pb2.FAILURE
+
+    def test_has_timestamp(self):
+        resp = utils.build_grpc_blueprint_validation_response("r", "s", "u")
+        assert resp.timestamp.seconds > 0
+
+
+# ===================================================================
+# build_grpc_blueprint_upload_response
+# ===================================================================
+
+class TestBuildGrpcBlueprintUploadResponse:
+
+    def test_success(self):
+        resp = utils.build_grpc_blueprint_upload_response("req-1", "sub-1", success=True)
+        assert isinstance(resp, CommandExecutor_pb2.UploadBlueprintOutput)
+        assert resp.status == CommandExecutor_pb2.SUCCESS
+        assert resp.requestId == "req-1"
+        assert resp.subRequestId == "sub-1"
+
+    def test_failure_with_payload(self):
+        resp = utils.build_grpc_blueprint_upload_response(
+            "req-2", "sub-2", success=False, payload=["err1", "err2"]
+        )
+        assert resp.status == CommandExecutor_pb2.FAILURE
+        payload = json.loads(resp.payload)
+        assert payload == ["err1", "err2"]
+
+    def test_has_timestamp(self):
+        resp = utils.build_grpc_blueprint_upload_response("r", "s")
+        assert resp.timestamp.seconds > 0
+
+
+# ===================================================================
+# truncate_execution_output
+# ===================================================================
+
+class TestTruncateExecutionOutput:
+
+    def test_small_output_not_truncated(self):
+        eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}")
+        eo.response.append("short log line")
+        result = utils.truncate_execution_output(eo)
+        assert list(result.response) == ["short log line"]
+
+    def test_large_output_is_truncated(self):
+        eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}")
+        # Fill with enough data to exceed RESPONSE_MAX_SIZE (4MB)
+        large_line = "x" * 10000
+        for _ in range(500):
+            eo.response.append(large_line)
+
+        result = utils.truncate_execution_output(eo)
+        # The last entry should be the truncation message
+        assert result.response[-1].startswith("[...] TRUNCATED CHARS")
+        assert result.ByteSize() <= utils.RESPONSE_MAX_SIZE + 1000  # small overhead ok
+
+    def test_truncation_message_contains_char_count(self):
+        eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}")
+        large_line = "y" * 10000
+        for _ in range(500):
+            eo.response.append(large_line)
+
+        result = utils.truncate_execution_output(eo)
+        last_line = result.response[-1]
+        # Extract the number from "[...] TRUNCATED CHARS : NNN"
+        assert "TRUNCATED CHARS" in last_line
+        count_str = last_line.split(":")[-1].strip()
+        assert int(count_str) > 0
+
+
+# ===================================================================
+# parse_cmd_exec_output
+# ===================================================================
+
+class TestParseCmdExecOutput:
+
+    def _make_logger(self):
+        logger = MagicMock()
+        return logger
+
+    def test_plain_output(self):
+        """Lines without special markers go to results_log."""
+        content = "line 1\nline 2\nline 3\n"
+        with tempfile.TemporaryFile(mode="w+") as f:
+            f.write(content)
+            results_log = []
+            payload = {}
+            err_msg = []
+            utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {})
+
+        assert results_log == ["line 1", "line 2", "line 3"]
+        assert payload == {}
+        assert err_msg == []
+
+    def test_error_message_section(self):
+        """Lines between BEGIN/END_EXTRA_RET_ERR_MSG go to err_msg_result."""
+        content = (
+            "some output\n"
+            "BEGIN_EXTRA_RET_ERR_MSG\n"
+            "error detail 1\n"
+            "error detail 2\n"
+            "END_EXTRA_RET_ERR_MSG\n"
+            "more output\n"
+        )
+        with tempfile.TemporaryFile(mode="w+") as f:
+            f.write(content)
+            results_log = []
+            payload = {}
+            err_msg = []
+            utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {})
+
+        assert "error detail 1\nerror detail 2" in err_msg[0]
+        assert "some output" in results_log
+        assert "more output" in results_log
+
+    def test_payload_section(self):
+        """Lines between BEGIN/END_EXTRA_PAYLOAD are parsed as MIME payload."""
+        # Build the MIME payload the same way payload_coder does
+        from email.mime import multipart, text as mime_text
+        m = multipart.MIMEMultipart("form-data")
+        data = mime_text.MIMEText("response_payload", "json", "utf8")
+        data.set_payload(json.JSONEncoder().encode({"key": "value"}))
+        m.attach(data)
+
+        content = (
+            "output before\n"
+            "BEGIN_EXTRA_PAYLOAD\n"
+            + m.as_string() + "\n"
+            "END_EXTRA_PAYLOAD\n"
+            "output after\n"
+        )
+        with tempfile.TemporaryFile(mode="w+") as f:
+            f.write(content)
+            results_log = []
+            payload = {}
+            err_msg = []
+            utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {})
+
+        assert payload.get("key") == "value"
+        assert "output before" in results_log
+        assert "output after" in results_log
+
+    def test_empty_file(self):
+        """Empty output produces empty results."""
+        with tempfile.TemporaryFile(mode="w+") as f:
+            results_log = []
+            payload = {}
+            err_msg = []
+            utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {})
+
+        assert results_log == []
+        assert payload == {}
+        assert err_msg == []