# dotenv environment variables file
.env
.env.test
+.venv
# parcel-bundler cache (https://parceljs.org/)
.cache
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for command_executor_handler.py — CommandExecutorHandler methods
+covering blueprint upload, env preparation, command execution, venv
+creation, and package installation.
+"""
+
+import io
+import json
+import os
+import tempfile
+import zipfile
+from subprocess import CalledProcessError, PIPE
+from types import SimpleNamespace
+from unittest.mock import patch, MagicMock, mock_open, PropertyMock
+
+import pytest
+
+import proto.CommandExecutor_pb2 as CommandExecutor_pb2
+import utils
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _make_identifiers(name="test-bp", version="1.0.0", uuid="uuid-1"):
+ return SimpleNamespace(
+ blueprintName=name,
+ blueprintVersion=version,
+ blueprintUUID=uuid,
+ )
+
+
+def _make_request(name="test-bp", version="1.0.0", uuid="uuid-1",
+ timeout=30, request_id="req-1", sub_request_id="sub-1",
+ originator_id="orig-1", correlation_id="corr-1",
+ command="", properties=None, packages=None,
+ archive_type="CBA_ZIP", bin_data=b""):
+ return SimpleNamespace(
+ identifiers=_make_identifiers(name, version, uuid),
+ timeOut=timeout,
+ requestId=request_id,
+ subRequestId=sub_request_id,
+ originatorId=originator_id,
+ correlationId=correlation_id,
+ command=command,
+ properties=properties,
+ packages=packages or [],
+ archiveType=archive_type,
+ binData=bin_data,
+ )
+
+
+def _create_valid_zip_bytes():
+ """Create a valid in-memory zip file with a single entry."""
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
+ zf.writestr("test.txt", "hello world")
+ return buf.getvalue()
+
+
+def _make_handler(request=None, skip_prometheus=True):
+ """Create a CommandExecutorHandler with prometheus mocked out."""
+ if request is None:
+ request = _make_request()
+
+ with patch("command_executor_handler.prometheus") as mock_prom, \
+ patch.dict(os.environ, {}, clear=False):
+ # Mock prometheus so it doesn't try to start an HTTP server
+ mock_histogram = MagicMock()
+ mock_counter = MagicMock()
+ mock_prom.REGISTRY = MagicMock()
+ mock_prom.REGISTRY._command_executor_histogram = mock_histogram
+ mock_prom.REGISTRY._command_executor_counter = mock_counter
+ mock_prom.REGISTRY._command_executor_prometheus_server_started = True
+ mock_prom.Histogram.return_value = mock_histogram
+ mock_prom.Counter.return_value = mock_counter
+
+ from command_executor_handler import CommandExecutorHandler
+ handler = CommandExecutorHandler(request)
+
+ return handler
+
+
+# ===================================================================
+# __init__ and basic attribute tests
+# ===================================================================
+
+class TestHandlerInit:
+
+ def test_basic_attributes(self):
+ req = _make_request(name="bp-init", version="2.0.0", uuid="u-init",
+ timeout=60, request_id="r-init", sub_request_id="s-init")
+ handler = _make_handler(req)
+ assert handler.blueprint_name == "bp-init"
+ assert handler.blueprint_version == "2.0.0"
+ assert handler.uuid == "u-init"
+ assert handler.execution_timeout == 60
+ assert handler.request_id == "r-init"
+ assert handler.sub_request_id == "s-init"
+
+ def test_blueprint_dir_path(self):
+ req = _make_request(name="my-bp", version="1.0.0", uuid="u1")
+ handler = _make_handler(req)
+ expected = "/opt/app/onap/blueprints/deploy/my-bp/1.0.0/u1"
+ assert handler.blueprint_dir == expected
+
+ def test_blueprint_name_version_uuid(self):
+ req = _make_request(name="n", version="v", uuid="u")
+ handler = _make_handler(req)
+ assert handler.blueprint_name_version_uuid == "n/v/u"
+
+ def test_blueprint_name_version_legacy(self):
+ req = _make_request(name="n", version="v")
+ handler = _make_handler(req)
+ assert handler.blueprint_name_version == "n/v"
+
+
+# ===================================================================
+# is_installed / blueprint_dir_exists / blueprint_tosca_meta_file_exists
+# ===================================================================
+
+class TestFileChecks:
+
+ def test_is_installed_true(self):
+ handler = _make_handler()
+ with patch("os.path.exists", return_value=True):
+ assert handler.is_installed() is True
+
+ def test_is_installed_false(self):
+ handler = _make_handler()
+ with patch("os.path.exists", return_value=False):
+ assert handler.is_installed() is False
+
+ def test_blueprint_dir_exists(self):
+ handler = _make_handler()
+ with patch("os.path.exists", return_value=True):
+ assert handler.blueprint_dir_exists() is True
+
+ def test_blueprint_tosca_meta_file_exists(self):
+ handler = _make_handler()
+ with patch("os.path.exists", return_value=False):
+ assert handler.blueprint_tosca_meta_file_exists() is False
+
+
+# ===================================================================
+# is_valid_archive_type
+# ===================================================================
+
+class TestIsValidArchiveType:
+
+ def test_cba_zip(self):
+ handler = _make_handler()
+ assert handler.is_valid_archive_type("CBA_ZIP") is True
+
+ def test_cba_gzip(self):
+ handler = _make_handler()
+ assert handler.is_valid_archive_type("CBA_GZIP") is True
+
+ def test_invalid_type(self):
+ handler = _make_handler()
+ assert handler.is_valid_archive_type("TAR") is False
+
+ def test_empty_string(self):
+ handler = _make_handler()
+ assert handler.is_valid_archive_type("") is False
+
+
+# ===================================================================
+# err_exit
+# ===================================================================
+
+class TestErrExit:
+
+ def test_returns_failure_data(self):
+ handler = _make_handler()
+ result = handler.err_exit("something broke")
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+ assert "something broke" in result[utils.ERR_MSG_KEY]
+
+
+# ===================================================================
+# uploadBlueprint
+# ===================================================================
+
+class TestUploadBlueprint:
+
+ def test_invalid_archive_type(self):
+ req = _make_request(archive_type="UNKNOWN")
+ handler = _make_handler(req)
+ upload_req = SimpleNamespace(archiveType="UNKNOWN", binData=b"",
+ **{'__str__': lambda s: "req"})
+ # Use request object with the right fields
+ result = handler.uploadBlueprint(
+ SimpleNamespace(archiveType="UNKNOWN", binData=b"")
+ )
+ assert result.status == CommandExecutor_pb2.FAILURE
+
+ def test_successful_zip_upload(self):
+ zip_bytes = _create_valid_zip_bytes()
+ req = _make_request(archive_type="CBA_ZIP", bin_data=zip_bytes)
+ handler = _make_handler(req)
+
+ with patch("os.makedirs") as mock_mkdirs:
+ upload_req = SimpleNamespace(archiveType="CBA_ZIP", binData=zip_bytes)
+ # Mock ZipFile to avoid writing to filesystem
+ with patch("command_executor_handler.ZipFile") as mock_zf:
+ mock_zf_instance = MagicMock()
+ mock_zf.return_value.__enter__ = MagicMock(return_value=mock_zf_instance)
+ mock_zf.return_value.__exit__ = MagicMock(return_value=False)
+ result = handler.uploadBlueprint(upload_req)
+
+ assert result.status == CommandExecutor_pb2.SUCCESS
+
+ def test_makedirs_failure(self):
+ req = _make_request()
+ handler = _make_handler(req)
+
+ with patch("os.makedirs", side_effect=OSError("permission denied")):
+ upload_req = SimpleNamespace(archiveType="CBA_ZIP", binData=b"data")
+ result = handler.uploadBlueprint(upload_req)
+
+ assert result.status == CommandExecutor_pb2.FAILURE
+
+ def test_gzip_returns_failure_todo(self):
+ """CBA_GZIP is recognized but not yet implemented, should return FAILURE."""
+ req = _make_request()
+ handler = _make_handler(req)
+
+ with patch("os.makedirs"):
+ upload_req = SimpleNamespace(archiveType="CBA_GZIP", binData=b"data")
+ result = handler.uploadBlueprint(upload_req)
+
+ assert result.status == CommandExecutor_pb2.FAILURE
+
+
+# ===================================================================
+# prepare_env
+# ===================================================================
+
+class TestPrepareEnv:
+
+ def test_blueprint_dir_not_found(self):
+ req = _make_request()
+ handler = _make_handler(req)
+
+ with patch.object(handler, 'blueprint_dir_exists', return_value=False):
+ result = handler.prepare_env(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+ assert result.get(utils.REUPLOAD_CBA_KEY) is True
+
+ def test_tosca_meta_not_found(self):
+ req = _make_request()
+ handler = _make_handler(req)
+
+ with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+ patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=False):
+ result = handler.prepare_env(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+ assert result.get(utils.REUPLOAD_CBA_KEY) is True
+
+ def test_already_installed_reads_file(self):
+ req = _make_request()
+ handler = _make_handler(req)
+
+ with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+ patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \
+ patch.object(handler, 'is_installed', return_value=True), \
+ patch("command_executor_handler.open", mock_open(read_data="previously installed packages")):
+ result = handler.prepare_env(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True
+ assert "previously installed packages" in result[utils.RESULTS_LOG_KEY][0]
+
+ def test_already_installed_read_failure(self):
+ req = _make_request()
+ handler = _make_handler(req)
+
+ with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+ patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \
+ patch.object(handler, 'is_installed', return_value=True), \
+ patch("command_executor_handler.open", side_effect=IOError("read error")):
+ result = handler.prepare_env(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+ def test_create_venv_failure(self):
+ req = _make_request()
+ handler = _make_handler(req)
+
+ with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+ patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \
+ patch.object(handler, 'is_installed', return_value=False), \
+ patch.object(handler, 'create_venv', return_value=utils.build_ret_data(False, error="venv error")):
+ result = handler.prepare_env(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+ def test_pip_upgrade_failure(self):
+ req = _make_request()
+ handler = _make_handler(req)
+
+ with patch.object(handler, 'blueprint_dir_exists', return_value=True), \
+ patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \
+ patch.object(handler, 'is_installed', return_value=False), \
+ patch.object(handler, 'create_venv', return_value=utils.build_ret_data(True)), \
+ patch.object(handler, 'upgrade_pip', return_value=False):
+ result = handler.prepare_env(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+
+# ===================================================================
+# execute_command
+# ===================================================================
+
+class TestExecuteCommand:
+
+ def test_successful_execution(self):
+ req = _make_request(
+ command="/opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py arg1",
+ timeout=30,
+ )
+ handler = _make_handler(req)
+
+ mock_completed = MagicMock()
+ mock_completed.returncode = 0
+
+ with patch.object(handler, 'is_installed', return_value=True), \
+ patch("os.utime"), \
+ patch("subprocess.run", return_value=mock_completed), \
+ patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+ result = handler.execute_command(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True
+
+ def test_failed_execution_nonzero_rc(self):
+ req = _make_request(
+ command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py",
+ timeout=30,
+ )
+ handler = _make_handler(req)
+
+ mock_completed = MagicMock()
+ mock_completed.returncode = 1
+
+ with patch.object(handler, 'is_installed', return_value=True), \
+ patch("os.utime"), \
+ patch("subprocess.run", return_value=mock_completed), \
+ patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+ result = handler.execute_command(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+ def test_timeout_execution(self):
+ from subprocess import TimeoutExpired
+
+ req = _make_request(
+ command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py",
+ timeout=5,
+ )
+ handler = _make_handler(req)
+
+ with patch.object(handler, 'is_installed', return_value=True), \
+ patch("os.utime"), \
+ patch("subprocess.run", side_effect=TimeoutExpired("cmd", 5)), \
+ patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+ result = handler.execute_command(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+ def test_general_exception(self):
+ req = _make_request(
+ command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py",
+ timeout=30,
+ )
+ handler = _make_handler(req)
+
+ with patch.object(handler, 'is_installed', return_value=True), \
+ patch("os.utime", side_effect=Exception("unexpected")):
+ result = handler.execute_command(req)
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+
+ def test_sr7_compatibility_path_rewrite(self):
+ """If request.command uses name/version but not UUID, UUID should be inserted."""
+ req = _make_request(
+ name="my-bp", version="1.0.0", uuid="u-1",
+ command="python /opt/app/onap/blueprints/deploy/my-bp/1.0.0/Scripts/python/test.py",
+ timeout=30,
+ )
+ handler = _make_handler(req)
+
+ mock_completed = MagicMock()
+ mock_completed.returncode = 0
+ captured_cmd = []
+
+ def capture_run(cmd, **kwargs):
+ captured_cmd.append(cmd)
+ return mock_completed
+
+ with patch.object(handler, 'is_installed', return_value=True), \
+ patch("os.utime"), \
+ patch("subprocess.run", side_effect=capture_run), \
+ patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+ handler.execute_command(req)
+
+ # The command should have been rewritten to include the UUID
+ assert "my-bp/1.0.0/u-1" in captured_cmd[0]
+
+ def test_ansible_playbook_adds_interpreter(self):
+ """ansible-playbook commands should get ansible_python_interpreter set."""
+ req = _make_request(
+ command="ansible-playbook /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/playbook.yml",
+ timeout=30,
+ )
+ handler = _make_handler(req)
+
+ mock_completed = MagicMock()
+ mock_completed.returncode = 0
+ captured_cmd = []
+
+ def capture_run(cmd, **kwargs):
+ captured_cmd.append(cmd)
+ return mock_completed
+
+ with patch.object(handler, 'is_installed', return_value=True), \
+ patch("os.utime"), \
+ patch("subprocess.run", side_effect=capture_run), \
+ patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")):
+ handler.execute_command(req)
+
+ assert "ansible_python_interpreter=" in captured_cmd[0]
+
+
+# ===================================================================
+# create_venv
+# ===================================================================
+
+class TestCreateVenv:
+
+ def test_successful_creation(self):
+ handler = _make_handler()
+ with patch("venv.create") as mock_create:
+ result = handler.create_venv()
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True
+ mock_create.assert_called_once()
+
+ def test_creation_failure(self):
+ handler = _make_handler()
+ with patch("venv.create", side_effect=Exception("venv failed")):
+ result = handler.create_venv()
+
+ assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False
+ assert "venv failed" in str(result.get(utils.ERR_MSG_KEY, ""))
+
+ def test_system_site_packages_disabled(self):
+ handler = _make_handler()
+ with patch("venv.create") as mock_create, \
+ patch.dict(os.environ, {"CREATE_VENV_DISABLE_SITE_PACKAGES": "1"}):
+ handler.create_venv()
+
+ call_kwargs = mock_create.call_args
+ # system_site_packages should be False when env var is set
+ assert call_kwargs[1].get("system_site_packages") is False or \
+ (len(call_kwargs[0]) > 0 and call_kwargs[1].get("system_site_packages") is False)
+
+ def test_system_site_packages_enabled_by_default(self):
+ handler = _make_handler()
+ env = os.environ.copy()
+ env.pop("CREATE_VENV_DISABLE_SITE_PACKAGES", None)
+ with patch("venv.create") as mock_create, \
+ patch.dict(os.environ, env, clear=True):
+ handler.create_venv()
+
+ call_kwargs = mock_create.call_args
+ assert call_kwargs[1].get("system_site_packages") is True
+
+
+# ===================================================================
+# upgrade_pip
+# ===================================================================
+
+class TestUpgradePip:
+
+ def test_successful_upgrade(self):
+ handler = _make_handler()
+ results = []
+
+ mock_result = MagicMock()
+ mock_result.stdout = b"Successfully installed pip-23.0"
+
+ with patch("subprocess.run", return_value=mock_result):
+ success = handler.upgrade_pip(results)
+
+ assert success is True
+ assert "Successfully installed pip-23.0" in results[0]
+
+ def test_failed_upgrade(self):
+ handler = _make_handler()
+ results = []
+
+ with patch("subprocess.run", side_effect=CalledProcessError(
+ 1, "pip", stderr=b"pip upgrade failed")):
+ success = handler.upgrade_pip(results)
+
+ assert success is False
+ assert "pip upgrade failed" in results[0]
+
+
+# ===================================================================
+# install_python_packages
+# ===================================================================
+
+class TestInstallPythonPackages:
+
+ def test_successful_install(self):
+ handler = _make_handler()
+ results = []
+
+ mock_result = MagicMock()
+ mock_result.stdout = b"Successfully installed package-1.0"
+
+ with patch("subprocess.run", return_value=mock_result):
+ success = handler.install_python_packages("some-package", results)
+
+ assert success is True
+
+ def test_failed_install(self):
+ handler = _make_handler()
+ results = []
+
+ with patch("subprocess.run", side_effect=CalledProcessError(
+ 1, "pip install", stderr=b"No matching distribution")):
+ success = handler.install_python_packages("bad-package", results)
+
+ assert success is False
+ assert "No matching distribution" in results[0]
+
+ def test_requirements_txt_uses_full_path(self):
+ handler = _make_handler()
+ results = []
+ captured_cmd = []
+
+ mock_result = MagicMock()
+ mock_result.stdout = b"ok"
+
+ def capture_run(cmd, **kwargs):
+ captured_cmd.append(cmd)
+ return mock_result
+
+ with patch("subprocess.run", side_effect=capture_run):
+ handler.install_python_packages("requirements.txt", results)
+
+ # Should use the full path to pip and requirements.txt
+ assert any("bin/pip" in str(c) for c in captured_cmd)
+ assert any("requirements.txt" in str(c) for c in captured_cmd)
+
+ def test_utility_package_uses_cp(self):
+ handler = _make_handler()
+ results = []
+ captured_cmd = []
+
+ mock_result = MagicMock()
+ mock_result.stdout = b"ok"
+
+ def capture_run(cmd, **kwargs):
+ captured_cmd.append(cmd)
+ return mock_result
+
+ with patch("subprocess.run", side_effect=capture_run):
+ handler.install_python_packages("UTILITY", results)
+
+ assert captured_cmd[0][0] == "cp"
+
+ def test_pip_install_user_flag(self):
+ handler = _make_handler()
+ results = []
+ captured_cmd = []
+
+ mock_result = MagicMock()
+ mock_result.stdout = b"ok"
+
+ def capture_run(cmd, **kwargs):
+ captured_cmd.append(cmd)
+ return mock_result
+
+ with patch("subprocess.run", side_effect=capture_run), \
+ patch.dict(os.environ, {"PIP_INSTALL_USER_FLAG": "1"}):
+ handler.install_python_packages("some-package", results)
+
+ assert "--user" in captured_cmd[0]
+
+
+# ===================================================================
+# install_ansible_packages
+# ===================================================================
+
+class TestInstallAnsiblePackages:
+
+ def test_successful_install(self):
+ handler = _make_handler()
+ results = []
+
+ mock_result = MagicMock()
+ mock_result.stdout = b"Role installed successfully"
+
+ with patch("subprocess.run", return_value=mock_result):
+ success = handler.install_ansible_packages("my-role", results)
+
+ assert success is True
+
+ def test_failed_install(self):
+ handler = _make_handler()
+ results = []
+
+ with patch("subprocess.run", side_effect=CalledProcessError(
+ 1, "ansible-galaxy", stderr=b"Role not found")):
+ success = handler.install_ansible_packages("bad-role", results)
+
+ assert success is False
+ assert "Role not found" in results[0]
+
+ def test_uses_ansible_galaxy_command(self):
+ handler = _make_handler()
+ results = []
+ captured_cmd = []
+
+ mock_result = MagicMock()
+ mock_result.stdout = b"ok"
+
+ def capture_run(cmd, **kwargs):
+ captured_cmd.append(cmd)
+ return mock_result
+
+ with patch("subprocess.run", side_effect=capture_run):
+ handler.install_ansible_packages("some-role", results)
+
+ assert captured_cmd[0][0] == "ansible-galaxy"
+ assert "install" in captured_cmd[0]
+ assert "some-role" in captured_cmd[0]
+
+ def test_http_proxy_is_passed(self):
+ handler = _make_handler()
+ results = []
+ captured_env = []
+
+ mock_result = MagicMock()
+ mock_result.stdout = b"ok"
+
+ def capture_run(cmd, **kwargs):
+ captured_env.append(kwargs.get("env", {}))
+ return mock_result
+
+ with patch("subprocess.run", side_effect=capture_run), \
+ patch.dict(os.environ, {"http_proxy": "http://proxy:8080"}):
+ handler.install_ansible_packages("role", results)
+
+ assert captured_env[0].get("https_proxy") == "http://proxy:8080"
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration tests for command_executor_server.py — CommandExecutorServer gRPC servicer.
+
+Includes:
+ - Unit-level tests that mock CommandExecutorHandler to verify branching logic
+ - Full gRPC integration tests that start a real server, register the servicer,
+ and make real RPC calls through a channel/stub with mocked handler internals
+"""
+
+import io
+import json
+import logging
+import os
+import zipfile
+from concurrent import futures
+from types import SimpleNamespace
+from unittest.mock import patch, MagicMock
+
+import grpc
+import pytest
+
+import proto.CommandExecutor_pb2 as pb2
+import proto.CommandExecutor_pb2_grpc as pb2_grpc
+import utils
+from command_executor_server import CommandExecutorServer
+
+
+# ---------------------------------------------------------------------------
+# Helpers — build real protobuf messages
+# ---------------------------------------------------------------------------
+
+def _identifiers(name="test-bp", version="1.0.0", uuid="uuid-123"):
+ ids = pb2.Identifiers()
+ ids.blueprintName = name
+ ids.blueprintVersion = version
+ ids.blueprintUUID = uuid
+ return ids
+
+
+def _upload_request(name="test-bp", version="1.0.0", uuid="uuid-123",
+ request_id="req-1", sub_request_id="sub-1",
+ originator_id="orig-1", archive_type="CBA_ZIP",
+ bin_data=b""):
+ req = pb2.UploadBlueprintInput()
+ req.identifiers.CopyFrom(_identifiers(name, version, uuid))
+ req.requestId = request_id
+ req.subRequestId = sub_request_id
+ req.originatorId = originator_id
+ req.archiveType = archive_type
+ req.binData = bin_data
+ return req
+
+
+def _prepare_env_request(name="test-bp", version="1.0.0", uuid="uuid-123",
+ request_id="req-2", sub_request_id="sub-2",
+ originator_id="orig-1", timeout=30):
+ req = pb2.PrepareEnvInput()
+ req.identifiers.CopyFrom(_identifiers(name, version, uuid))
+ req.requestId = request_id
+ req.subRequestId = sub_request_id
+ req.originatorId = originator_id
+ req.timeOut = timeout
+ return req
+
+
+def _execution_request(name="test-bp", version="1.0.0", uuid="uuid-123",
+ request_id="req-3", sub_request_id="sub-3",
+ originator_id="orig-1", command="python test.py",
+ timeout=30):
+ req = pb2.ExecutionInput()
+ req.identifiers.CopyFrom(_identifiers(name, version, uuid))
+ req.requestId = request_id
+ req.subRequestId = sub_request_id
+ req.originatorId = originator_id
+ req.command = command
+ req.timeOut = timeout
+ return req
+
+
+def _create_valid_zip_bytes():
+ """Create a valid in-memory zip with a single entry."""
+ buf = io.BytesIO()
+ with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:
+ zf.writestr("test.txt", "hello world")
+ return buf.getvalue()
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+@pytest.fixture
+def server_instance():
+ """A plain CommandExecutorServer (no gRPC wiring)."""
+ return CommandExecutorServer()
+
+
+@pytest.fixture
+def mock_context():
+ """A MagicMock standing in for gRPC ServicerContext."""
+ return MagicMock()
+
+
+# ---------------------------------------------------------------------------
+# Unit tests — mock CommandExecutorHandler to test server branch logic
+# ---------------------------------------------------------------------------
+
+class TestUploadBlueprint:
+ """uploadBlueprint delegates to handler.uploadBlueprint and returns result."""
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_returns_handler_result(self, MockHandler, server_instance, mock_context):
+ fake_response = pb2.UploadBlueprintOutput()
+ fake_response.requestId = "req-1"
+ fake_response.status = pb2.SUCCESS
+ MockHandler.return_value.uploadBlueprint.return_value = fake_response
+
+ req = _upload_request()
+ result = server_instance.uploadBlueprint(req, mock_context)
+
+ MockHandler.assert_called_once_with(req)
+ MockHandler.return_value.uploadBlueprint.assert_called_once_with(req)
+ assert result == fake_response
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_passes_request_to_handler(self, MockHandler, server_instance, mock_context):
+ MockHandler.return_value.uploadBlueprint.return_value = pb2.UploadBlueprintOutput()
+
+ req = _upload_request(name="my-bp", version="2.0.0", uuid="u-456")
+ server_instance.uploadBlueprint(req, mock_context)
+
+ # Handler was constructed with the exact request
+ MockHandler.assert_called_once_with(req)
+
+
+class TestPrepareEnv:
+ """prepareEnv delegates to handler.prepare_env, logs, and builds grpc response."""
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_successful_prepare_env(self, MockHandler, server_instance, mock_context):
+ MockHandler.return_value.prepare_env.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: ["pkg installed"],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _prepare_env_request(request_id="req-ok")
+ result = server_instance.prepareEnv(req, mock_context)
+
+ MockHandler.assert_called_once_with(req)
+ MockHandler.return_value.prepare_env.assert_called_once_with(req)
+ # Result is an ExecutionOutput
+ assert isinstance(result, pb2.ExecutionOutput)
+ assert result.requestId == "req-ok"
+ assert result.status == pb2.SUCCESS
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_failed_prepare_env(self, MockHandler, server_instance, mock_context):
+ MockHandler.return_value.prepare_env.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: False,
+ utils.RESULTS_LOG_KEY: [],
+ utils.ERR_MSG_KEY: "pip install failed",
+ }
+
+ req = _prepare_env_request(request_id="req-fail")
+ result = server_instance.prepareEnv(req, mock_context)
+
+ assert isinstance(result, pb2.ExecutionOutput)
+ assert result.requestId == "req-fail"
+ assert result.status == pb2.FAILURE
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_prepare_env_logs_success(self, MockHandler, server_instance, mock_context, caplog):
+ MockHandler.return_value.prepare_env.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: ["ok"],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _prepare_env_request()
+ with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+ server_instance.prepareEnv(req, mock_context)
+
+ assert any("Package installation logs" in r.message for r in caplog.records)
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_prepare_env_logs_failure(self, MockHandler, server_instance, mock_context, caplog):
+ MockHandler.return_value.prepare_env.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: False,
+ utils.RESULTS_LOG_KEY: [],
+ utils.ERR_MSG_KEY: "missing dep",
+ }
+
+ req = _prepare_env_request()
+ with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+ server_instance.prepareEnv(req, mock_context)
+
+ assert any("Failed to prepare python environment" in r.message for r in caplog.records)
+
+
+class TestExecuteCommand:
+ """executeCommand delegates to handler.execute_command, logs, builds grpc response."""
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_successful_execution(self, MockHandler, server_instance, mock_context):
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: ["all ok"],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _execution_request(request_id="req-exec")
+ result = server_instance.executeCommand(req, mock_context)
+
+ MockHandler.assert_called_once_with(req)
+ MockHandler.return_value.execute_command.assert_called_once_with(req)
+ assert isinstance(result, pb2.ExecutionOutput)
+ assert result.requestId == "req-exec"
+ assert result.status == pb2.SUCCESS
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_failed_execution_with_err_msg(self, MockHandler, server_instance, mock_context):
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: False,
+ utils.RESULTS_LOG_KEY: ["step1"],
+ utils.ERR_MSG_KEY: "script crashed",
+ }
+
+ req = _execution_request(request_id="req-fail")
+ result = server_instance.executeCommand(req, mock_context)
+
+ assert result.status == pb2.FAILURE
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_failed_execution_without_err_msg(self, MockHandler, server_instance, mock_context, caplog):
+ """When ERR_MSG_KEY is missing from the response dict, no error text is logged."""
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: False,
+ utils.RESULTS_LOG_KEY: ["partial output"],
+ }
+
+ req = _execution_request()
+ with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+ result = server_instance.executeCommand(req, mock_context)
+
+ assert result.status == pb2.FAILURE
+ # The "Error returned:" substring should NOT appear since ERR_MSG_KEY is absent
+ failure_msgs = [r.message for r in caplog.records if "Failed to executeCommand" in r.message]
+ assert len(failure_msgs) == 1
+ assert "Error returned:" not in failure_msgs[0]
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_logs_success(self, MockHandler, server_instance, mock_context, caplog):
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: [],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _execution_request()
+ with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+ server_instance.executeCommand(req, mock_context)
+
+ assert any("Execution finished successfully" in r.message for r in caplog.records)
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_logs_failure_with_error_detail(self, MockHandler, server_instance, mock_context, caplog):
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: False,
+ utils.RESULTS_LOG_KEY: ["started"],
+ utils.ERR_MSG_KEY: "timeout!",
+ }
+
+ req = _execution_request()
+ with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+ server_instance.executeCommand(req, mock_context)
+
+ failure_msgs = [r.message for r in caplog.records if "Failed to executeCommand" in r.message]
+ assert len(failure_msgs) == 1
+ assert "Error returned: timeout!" in failure_msgs[0]
+
+ @patch.dict(os.environ, {"CE_DEBUG": "true"})
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_ce_debug_logs_request(self, MockHandler, server_instance, mock_context, caplog):
+ """When CE_DEBUG=true, the full request is logged."""
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: [],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _execution_request()
+ with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+ server_instance.executeCommand(req, mock_context)
+
+ # At least 3 log entries: received + request dump + success + payload
+ assert len(caplog.records) >= 3
+
+ @patch.dict(os.environ, {"CE_DEBUG": "false"})
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_ce_debug_off_does_not_log_request(self, MockHandler, server_instance, mock_context, caplog):
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: [],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _execution_request()
+ with caplog.at_level(logging.INFO, logger="CommandExecutorServer"):
+ server_instance.executeCommand(req, mock_context)
+
+ # The request object itself should not appear in logs
+ logged_messages = " ".join(r.message for r in caplog.records if isinstance(r.message, str))
+ assert "Received executeCommand" in logged_messages
+
+
+# ---------------------------------------------------------------------------
+# gRPC integration tests — real server + channel + stub
+# ---------------------------------------------------------------------------
+
+class TestGrpcIntegration:
+ """
+ Spin up a real gRPC server with CommandExecutorServer registered,
+ make RPC calls through a channel, and verify end-to-end message
+ serialization / deserialization with mocked handler internals.
+ """
+
+ @pytest.fixture(autouse=True)
+ def grpc_server(self):
+ """Start a gRPC server on a free port, yield a stub, then shut down."""
+ self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=4))
+ self.servicer = CommandExecutorServer()
+ pb2_grpc.add_CommandExecutorServiceServicer_to_server(self.servicer, self.server)
+ port = self.server.add_insecure_port("[::]:0") # OS assigns a free port
+ self.server.start()
+
+ channel = grpc.insecure_channel(f"localhost:{port}")
+ self.stub = pb2_grpc.CommandExecutorServiceStub(channel)
+
+ yield
+
+ channel.close()
+ self.server.stop(grace=0)
+
+ # -- uploadBlueprint over gRPC --
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_upload_blueprint_grpc_roundtrip(self, MockHandler):
+ expected = pb2.UploadBlueprintOutput()
+ expected.requestId = "req-grpc-1"
+ expected.status = pb2.SUCCESS
+ expected.payload = '{"result": "uploaded"}'
+ MockHandler.return_value.uploadBlueprint.return_value = expected
+
+ req = _upload_request(
+ name="grpc-bp", version="3.0.0", uuid="grpc-uuid",
+ request_id="req-grpc-1", bin_data=_create_valid_zip_bytes(),
+ )
+ response = self.stub.uploadBlueprint(req)
+
+ assert response.requestId == "req-grpc-1"
+ assert response.status == pb2.SUCCESS
+ assert response.payload == '{"result": "uploaded"}'
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_upload_blueprint_grpc_failure(self, MockHandler):
+ expected = pb2.UploadBlueprintOutput()
+ expected.requestId = "req-grpc-fail"
+ expected.status = pb2.FAILURE
+ expected.payload = '{"error": "bad archive"}'
+ MockHandler.return_value.uploadBlueprint.return_value = expected
+
+ req = _upload_request(request_id="req-grpc-fail")
+ response = self.stub.uploadBlueprint(req)
+
+ assert response.status == pb2.FAILURE
+
+ # -- prepareEnv over gRPC --
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_prepare_env_grpc_success(self, MockHandler):
+ MockHandler.return_value.prepare_env.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: ["dep1 installed"],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _prepare_env_request(request_id="req-prep-grpc")
+ response = self.stub.prepareEnv(req)
+
+ assert response.requestId == "req-prep-grpc"
+ assert response.status == pb2.SUCCESS
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_prepare_env_grpc_failure(self, MockHandler):
+ MockHandler.return_value.prepare_env.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: False,
+ utils.RESULTS_LOG_KEY: [],
+ utils.ERR_MSG_KEY: "network error during pip install",
+ }
+
+ req = _prepare_env_request(request_id="req-prep-fail")
+ response = self.stub.prepareEnv(req)
+
+ assert response.requestId == "req-prep-fail"
+ assert response.status == pb2.FAILURE
+
+ # -- executeCommand over gRPC --
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_execute_command_grpc_success(self, MockHandler):
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: ["output line"],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _execution_request(request_id="req-exec-grpc", command="python run.py")
+ response = self.stub.executeCommand(req)
+
+ assert response.requestId == "req-exec-grpc"
+ assert response.status == pb2.SUCCESS
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_execute_command_grpc_failure(self, MockHandler):
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: False,
+ utils.RESULTS_LOG_KEY: ["partial"],
+ utils.ERR_MSG_KEY: "script error",
+ }
+
+ req = _execution_request(request_id="req-exec-fail")
+ response = self.stub.executeCommand(req)
+
+ assert response.requestId == "req-exec-fail"
+ assert response.status == pb2.FAILURE
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_execute_command_grpc_preserves_payload(self, MockHandler):
+ """Verify payload round-trips correctly through gRPC serialization."""
+ payload_data = json.dumps({"key": "value", "number": 42})
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: [payload_data],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _execution_request(request_id="req-payload")
+ response = self.stub.executeCommand(req)
+
+ assert response.requestId == "req-payload"
+ assert response.status == pb2.SUCCESS
+
+ @patch("command_executor_server.CommandExecutorHandler")
+ def test_request_fields_received_by_handler(self, MockHandler):
+ """Verify that the proto request fields survive serialization to the server."""
+ MockHandler.return_value.execute_command.return_value = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: [],
+ utils.ERR_MSG_KEY: "",
+ }
+
+ req = _execution_request(
+ name="field-bp", version="5.0.0", uuid="field-uuid",
+ request_id="field-req", command="python check.py",
+ )
+ self.stub.executeCommand(req)
+
+ # The handler was constructed with a request that has the right identifiers
+ call_args = MockHandler.call_args[0][0]
+ assert call_args.identifiers.blueprintName == "field-bp"
+ assert call_args.identifiers.blueprintVersion == "5.0.0"
+ assert call_args.requestId == "field-req"
+ assert call_args.command == "python check.py"
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for cds_utils/payload_coder.py — MIME payload and error message
+encoding used by CBA scripts to communicate results back to the
+command executor.
+"""
+
+import json
+import sys
+from io import StringIO
+from unittest.mock import patch
+
+from cds_utils.payload_coder import (
+ send_response_data_payload,
+ send_response_err_msg,
+ send_response_err_msg_and_exit,
+)
+
+
+class TestSendResponseDataPayload:
+
+ def test_output_contains_begin_end_markers(self):
+ """Output must be wrapped in BEGIN/END_EXTRA_PAYLOAD markers."""
+ captured = StringIO()
+ with patch("sys.stdout", captured):
+ send_response_data_payload({"status": "ok"})
+
+ output = captured.getvalue()
+ assert "BEGIN_EXTRA_PAYLOAD" in output
+ assert "END_EXTRA_PAYLOAD" in output
+
+ def test_payload_is_json_encoded(self):
+ """The JSON payload must appear between the markers."""
+ captured = StringIO()
+ with patch("sys.stdout", captured):
+ send_response_data_payload({"key": "value", "count": 42})
+
+ output = captured.getvalue()
+ # The encoded JSON should be somewhere in the output
+ assert '"key"' in output
+ assert '"value"' in output
+ assert "42" in output
+
+ def test_output_is_mime_formatted(self):
+ """Output should be MIME multipart form-data."""
+ captured = StringIO()
+ with patch("sys.stdout", captured):
+ send_response_data_payload({"a": 1})
+
+ output = captured.getvalue()
+ assert "Content-Type:" in output
+ assert "form-data" in output
+
+ def test_empty_payload(self):
+ """An empty dict should still produce valid markers and MIME."""
+ captured = StringIO()
+ with patch("sys.stdout", captured):
+ send_response_data_payload({})
+
+ output = captured.getvalue()
+ assert "BEGIN_EXTRA_PAYLOAD" in output
+ assert "END_EXTRA_PAYLOAD" in output
+ assert "{}" in output
+
+ def test_roundtrip_with_parse_cmd_exec_output(self):
+ """The output of send_response_data_payload should be parseable
+ by utils.parse_cmd_exec_output (integration-style test)."""
+ import tempfile
+ from unittest.mock import MagicMock
+ import utils
+
+ payload_data = {"result": "success", "code": 200}
+
+ captured = StringIO()
+ with patch("sys.stdout", captured):
+ send_response_data_payload(payload_data)
+
+ output = captured.getvalue()
+
+ with tempfile.TemporaryFile(mode="w+") as f:
+ f.write(output)
+ results_log = []
+ payload_result = {}
+ err_msg_result = []
+ utils.parse_cmd_exec_output(
+ f, MagicMock(), payload_result, err_msg_result, results_log, {}
+ )
+
+ assert payload_result["result"] == "success"
+ assert payload_result["code"] == 200
+
+
+class TestSendResponseErrMsg:
+
+ def test_output_contains_begin_end_markers(self):
+ captured = StringIO()
+ with patch("sys.stdout", captured):
+ send_response_err_msg("something went wrong")
+
+ output = captured.getvalue()
+ assert "BEGIN_EXTRA_RET_ERR_MSG" in output
+ assert "END_EXTRA_RET_ERR_MSG" in output
+
+ def test_error_message_is_included(self):
+ captured = StringIO()
+ with patch("sys.stdout", captured):
+ send_response_err_msg("disk full")
+
+ output = captured.getvalue()
+ assert "disk full" in output
+
+ def test_roundtrip_with_parse_cmd_exec_output(self):
+ """The err msg output should be parseable by parse_cmd_exec_output."""
+ import tempfile
+ from unittest.mock import MagicMock
+ import utils
+
+ captured = StringIO()
+ with patch("sys.stdout", captured):
+ send_response_err_msg("custom error from script")
+
+ output = captured.getvalue()
+
+ with tempfile.TemporaryFile(mode="w+") as f:
+ f.write(output)
+ results_log = []
+ payload_result = {}
+ err_msg_result = []
+ utils.parse_cmd_exec_output(
+ f, MagicMock(), payload_result, err_msg_result, results_log, {}
+ )
+
+ assert len(err_msg_result) == 1
+ assert "custom error from script" in err_msg_result[0]
+
+
+class TestSendResponseErrMsgAndExit:
+
+ def test_output_contains_markers(self):
+ captured = StringIO()
+ with patch("sys.stdout", captured), \
+ pytest.raises(SystemExit) as exc_info:
+ send_response_err_msg_and_exit("fatal error")
+
+ output = captured.getvalue()
+ assert "BEGIN_EXTRA_RET_ERR_MSG" in output
+ assert "END_EXTRA_RET_ERR_MSG" in output
+ assert "fatal error" in output
+
+ def test_exits_with_default_code_1(self):
+ with patch("sys.stdout", StringIO()), \
+ pytest.raises(SystemExit) as exc_info:
+ send_response_err_msg_and_exit("oops")
+
+ assert exc_info.value.code == 1
+
+ def test_exits_with_custom_code(self):
+ with patch("sys.stdout", StringIO()), \
+ pytest.raises(SystemExit) as exc_info:
+ send_response_err_msg_and_exit("oops", code=42)
+
+ assert exc_info.value.code == 42
+
+
+# Need pytest import for raises
+import pytest
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for request_header_validator_interceptor.py — gRPC server interceptor
+that validates authorization headers on incoming requests.
+"""
+
+from unittest.mock import MagicMock, patch
+
+import grpc
+
+from request_header_validator_interceptor import (
+ RequestHeaderValidatorInterceptor,
+ _unary_unary_rpc_terminator,
+)
+
+
+# ===================================================================
+# _unary_unary_rpc_terminator
+# ===================================================================
+
+class TestUnaryUnaryRpcTerminator:
+
+ def test_returns_rpc_method_handler(self):
+ handler = _unary_unary_rpc_terminator(
+ grpc.StatusCode.UNAUTHENTICATED, "Access denied"
+ )
+ # grpc.unary_unary_rpc_method_handler returns an RpcMethodHandler
+ assert handler is not None
+ assert handler.unary_unary is not None
+
+ def test_terminator_aborts_context(self):
+ handler = _unary_unary_rpc_terminator(
+ grpc.StatusCode.UNAUTHENTICATED, "No access"
+ )
+ mock_context = MagicMock()
+ # Call the actual terminate function
+ handler.unary_unary(None, mock_context)
+ mock_context.abort.assert_called_once_with(
+ grpc.StatusCode.UNAUTHENTICATED, "No access"
+ )
+
+
+# ===================================================================
+# RequestHeaderValidatorInterceptor
+# ===================================================================
+
+class TestRequestHeaderValidatorInterceptor:
+
+ def _make_interceptor(self, header="authorization", value="Basic abc123"):
+ return RequestHeaderValidatorInterceptor(
+ header, value,
+ grpc.StatusCode.UNAUTHENTICATED, "Access denied!"
+ )
+
+ def test_valid_header_continues(self):
+ interceptor = self._make_interceptor(
+ header="authorization", value="Basic abc123"
+ )
+
+ # Mock handler_call_details with matching metadata
+ mock_details = MagicMock()
+ mock_details.invocation_metadata = [
+ ("authorization", "Basic abc123"),
+ ("other-header", "other-value"),
+ ]
+
+ mock_continuation = MagicMock(return_value="continued_handler")
+ result = interceptor.intercept_service(mock_continuation, mock_details)
+
+ mock_continuation.assert_called_once_with(mock_details)
+ assert result == "continued_handler"
+
+ def test_missing_header_returns_terminator(self):
+ interceptor = self._make_interceptor(
+ header="authorization", value="Basic abc123"
+ )
+
+ mock_details = MagicMock()
+ mock_details.invocation_metadata = [
+ ("other-header", "other-value"),
+ ]
+
+ mock_continuation = MagicMock()
+ result = interceptor.intercept_service(mock_continuation, mock_details)
+
+ # Continuation should NOT be called
+ mock_continuation.assert_not_called()
+ # Result should be the terminator (an RpcMethodHandler)
+ assert result is not None
+
+ def test_wrong_value_returns_terminator(self):
+ interceptor = self._make_interceptor(
+ header="authorization", value="Basic abc123"
+ )
+
+ mock_details = MagicMock()
+ mock_details.invocation_metadata = [
+ ("authorization", "Basic WRONG"),
+ ]
+
+ mock_continuation = MagicMock()
+ result = interceptor.intercept_service(mock_continuation, mock_details)
+
+ mock_continuation.assert_not_called()
+
+ def test_empty_metadata_returns_terminator(self):
+ interceptor = self._make_interceptor()
+
+ mock_details = MagicMock()
+ mock_details.invocation_metadata = []
+
+ mock_continuation = MagicMock()
+ result = interceptor.intercept_service(mock_continuation, mock_details)
+
+ mock_continuation.assert_not_called()
--- /dev/null
+#
+# Copyright (C) 2026 Deutsche Telekom.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Tests for utils.py — gRPC response building, request field extraction,
+output parsing, and log truncation.
+"""
+
+import json
+import tempfile
+from types import SimpleNamespace
+from unittest.mock import patch, MagicMock
+
+import pytest
+
+import proto.CommandExecutor_pb2 as CommandExecutor_pb2
+import utils
+
+
+# ---------------------------------------------------------------------------
+# Helpers — lightweight request stub
+# ---------------------------------------------------------------------------
+
+def _make_identifiers(name="bp1", version="1.0.0", uuid="abc-123"):
+ return SimpleNamespace(blueprintName=name, blueprintVersion=version, blueprintUUID=uuid)
+
+
+def _make_request(name="bp1", version="1.0.0", uuid="abc-123",
+ timeout=30, request_id="req-1", sub_request_id="sub-1",
+ originator_id="orig-1"):
+ return SimpleNamespace(
+ identifiers=_make_identifiers(name, version, uuid),
+ timeOut=timeout,
+ requestId=request_id,
+ subRequestId=sub_request_id,
+ originatorId=originator_id,
+ )
+
+
+# ===================================================================
+# Request field extraction
+# ===================================================================
+
+class TestRequestFieldExtraction:
+ """Tests for the simple accessor functions at the top of utils.py."""
+
+ def test_blueprint_name_version_uuid(self):
+ req = _make_request(name="my-bp", version="2.0.0", uuid="u-1")
+ assert utils.blueprint_name_version_uuid(req) == "my-bp/2.0.0/u-1"
+
+ def test_blueprint_name_version(self):
+ req = _make_request(name="my-bp", version="2.0.0")
+ assert utils.blueprint_name_version(req) == "my-bp/2.0.0"
+
+ def test_get_blueprint_name(self):
+ req = _make_request(name="test-cba")
+ assert utils.get_blueprint_name(req) == "test-cba"
+
+ def test_get_blueprint_version(self):
+ req = _make_request(version="3.1.0")
+ assert utils.get_blueprint_version(req) == "3.1.0"
+
+ def test_get_blueprint_uuid(self):
+ req = _make_request(uuid="12345")
+ assert utils.get_blueprint_uuid(req) == "12345"
+
+ def test_get_blueprint_timeout(self):
+ req = _make_request(timeout=120)
+ assert utils.get_blueprint_timeout(req) == 120
+
+ def test_get_blueprint_requestid(self):
+ req = _make_request(request_id="r-99")
+ assert utils.get_blueprint_requestid(req) == "r-99"
+
+ def test_get_blueprint_subRequestId(self):
+ req = _make_request(sub_request_id="sr-42")
+ assert utils.get_blueprint_subRequestId(req) == "sr-42"
+
+
+# ===================================================================
+# getExtraLogData
+# ===================================================================
+
+class TestGetExtraLogData:
+
+ def test_with_request(self):
+ req = _make_request(request_id="r1", sub_request_id="s1", originator_id="o1")
+ extra = utils.getExtraLogData(req)
+ assert extra == {
+ 'request_id': 'r1',
+ 'subrequest_id': 's1',
+ 'originator_id': 'o1',
+ }
+
+ def test_without_request(self):
+ extra = utils.getExtraLogData()
+ assert extra == {
+ 'request_id': '',
+ 'subrequest_id': '',
+ 'originator_id': '',
+ }
+
+ def test_with_none(self):
+ extra = utils.getExtraLogData(None)
+ assert extra == {
+ 'request_id': '',
+ 'subrequest_id': '',
+ 'originator_id': '',
+ }
+
+
+# ===================================================================
+# build_ret_data
+# ===================================================================
+
+class TestBuildRetData:
+
+ def test_success_minimal(self):
+ ret = utils.build_ret_data(True)
+ assert ret[utils.CDS_IS_SUCCESSFUL_KEY] is True
+ assert ret[utils.RESULTS_LOG_KEY] == []
+ assert utils.ERR_MSG_KEY not in ret
+ assert utils.REUPLOAD_CBA_KEY not in ret
+
+ def test_failure_with_error(self):
+ ret = utils.build_ret_data(False, error="something broke")
+ assert ret[utils.CDS_IS_SUCCESSFUL_KEY] is False
+ assert ret[utils.ERR_MSG_KEY] == "something broke"
+
+ def test_with_results_log(self):
+ logs = ["line1", "line2"]
+ ret = utils.build_ret_data(True, results_log=logs)
+ assert ret[utils.RESULTS_LOG_KEY] == logs
+
+ def test_reupload_cba_flag(self):
+ ret = utils.build_ret_data(False, reupload_cba=True)
+ assert ret[utils.REUPLOAD_CBA_KEY] is True
+
+ def test_reupload_cba_absent_when_false(self):
+ ret = utils.build_ret_data(False, reupload_cba=False)
+ assert utils.REUPLOAD_CBA_KEY not in ret
+
+
+# ===================================================================
+# build_grpc_response
+# ===================================================================
+
+class TestBuildGrpcResponse:
+
+ def test_success_response(self):
+ data = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: ["log line 1"],
+ }
+ resp = utils.build_grpc_response("req-1", data)
+
+ assert isinstance(resp, CommandExecutor_pb2.ExecutionOutput)
+ assert resp.requestId == "req-1"
+ assert resp.status == CommandExecutor_pb2.SUCCESS
+ assert resp.errMsg == ""
+ # CDS_IS_SUCCESSFUL_KEY and RESULTS_LOG_KEY should be popped from data
+ payload = json.loads(resp.payload)
+ assert utils.CDS_IS_SUCCESSFUL_KEY not in payload
+ assert utils.RESULTS_LOG_KEY not in payload
+
+ def test_failure_response_with_error(self):
+ data = {
+ utils.CDS_IS_SUCCESSFUL_KEY: False,
+ utils.RESULTS_LOG_KEY: ["log1"],
+ utils.ERR_MSG_KEY: ["Error line 1", "Error line 2"],
+ }
+ resp = utils.build_grpc_response("req-2", data)
+
+ assert resp.status == CommandExecutor_pb2.FAILURE
+ assert "Error line 1" in resp.errMsg
+ assert "Error line 2" in resp.errMsg
+
+ def test_response_contains_timestamp(self):
+ data = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: [],
+ }
+ resp = utils.build_grpc_response("req-3", data)
+ assert resp.timestamp.seconds > 0
+
+ def test_response_logs_preserved(self):
+ data = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: ["line A", "line B", "line C"],
+ }
+ resp = utils.build_grpc_response("req-4", data)
+ assert list(resp.response) == ["line A", "line B", "line C"]
+
+ def test_extra_payload_fields_preserved(self):
+ data = {
+ utils.CDS_IS_SUCCESSFUL_KEY: True,
+ utils.RESULTS_LOG_KEY: [],
+ "custom_key": "custom_value",
+ }
+ resp = utils.build_grpc_response("req-5", data)
+ payload = json.loads(resp.payload)
+ assert payload["custom_key"] == "custom_value"
+
+
+# ===================================================================
+# build_grpc_blueprint_validation_response
+# ===================================================================
+
+class TestBuildGrpcBlueprintValidationResponse:
+
+ def test_success(self):
+ resp = utils.build_grpc_blueprint_validation_response(
+ "req-1", "sub-1", "uuid-1", success=True
+ )
+ assert isinstance(resp, CommandExecutor_pb2.BlueprintValidationOutput)
+ assert resp.requestId == "req-1"
+ assert resp.subRequestId == "sub-1"
+ assert resp.cbaUUID == "uuid-1"
+ assert resp.status == CommandExecutor_pb2.SUCCESS
+
+ def test_failure(self):
+ resp = utils.build_grpc_blueprint_validation_response(
+ "req-2", "sub-2", "uuid-2", success=False
+ )
+ assert resp.status == CommandExecutor_pb2.FAILURE
+
+ def test_has_timestamp(self):
+ resp = utils.build_grpc_blueprint_validation_response("r", "s", "u")
+ assert resp.timestamp.seconds > 0
+
+
+# ===================================================================
+# build_grpc_blueprint_upload_response
+# ===================================================================
+
+class TestBuildGrpcBlueprintUploadResponse:
+
+ def test_success(self):
+ resp = utils.build_grpc_blueprint_upload_response("req-1", "sub-1", success=True)
+ assert isinstance(resp, CommandExecutor_pb2.UploadBlueprintOutput)
+ assert resp.status == CommandExecutor_pb2.SUCCESS
+ assert resp.requestId == "req-1"
+ assert resp.subRequestId == "sub-1"
+
+ def test_failure_with_payload(self):
+ resp = utils.build_grpc_blueprint_upload_response(
+ "req-2", "sub-2", success=False, payload=["err1", "err2"]
+ )
+ assert resp.status == CommandExecutor_pb2.FAILURE
+ payload = json.loads(resp.payload)
+ assert payload == ["err1", "err2"]
+
+ def test_has_timestamp(self):
+ resp = utils.build_grpc_blueprint_upload_response("r", "s")
+ assert resp.timestamp.seconds > 0
+
+
+# ===================================================================
+# truncate_execution_output
+# ===================================================================
+
+class TestTruncateExecutionOutput:
+
+ def test_small_output_not_truncated(self):
+ eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}")
+ eo.response.append("short log line")
+ result = utils.truncate_execution_output(eo)
+ assert list(result.response) == ["short log line"]
+
+ def test_large_output_is_truncated(self):
+ eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}")
+ # Fill with enough data to exceed RESPONSE_MAX_SIZE (4MB)
+ large_line = "x" * 10000
+ for _ in range(500):
+ eo.response.append(large_line)
+
+ result = utils.truncate_execution_output(eo)
+ # The last entry should be the truncation message
+ assert result.response[-1].startswith("[...] TRUNCATED CHARS")
+ assert result.ByteSize() <= utils.RESPONSE_MAX_SIZE + 1000 # small overhead ok
+
+ def test_truncation_message_contains_char_count(self):
+ eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}")
+ large_line = "y" * 10000
+ for _ in range(500):
+ eo.response.append(large_line)
+
+ result = utils.truncate_execution_output(eo)
+ last_line = result.response[-1]
+ # Extract the number from "[...] TRUNCATED CHARS : NNN"
+ assert "TRUNCATED CHARS" in last_line
+ count_str = last_line.split(":")[-1].strip()
+ assert int(count_str) > 0
+
+
+# ===================================================================
+# parse_cmd_exec_output
+# ===================================================================
+
+class TestParseCmdExecOutput:
+
+ def _make_logger(self):
+ logger = MagicMock()
+ return logger
+
+ def test_plain_output(self):
+ """Lines without special markers go to results_log."""
+ content = "line 1\nline 2\nline 3\n"
+ with tempfile.TemporaryFile(mode="w+") as f:
+ f.write(content)
+ results_log = []
+ payload = {}
+ err_msg = []
+ utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {})
+
+ assert results_log == ["line 1", "line 2", "line 3"]
+ assert payload == {}
+ assert err_msg == []
+
+ def test_error_message_section(self):
+ """Lines between BEGIN/END_EXTRA_RET_ERR_MSG go to err_msg_result."""
+ content = (
+ "some output\n"
+ "BEGIN_EXTRA_RET_ERR_MSG\n"
+ "error detail 1\n"
+ "error detail 2\n"
+ "END_EXTRA_RET_ERR_MSG\n"
+ "more output\n"
+ )
+ with tempfile.TemporaryFile(mode="w+") as f:
+ f.write(content)
+ results_log = []
+ payload = {}
+ err_msg = []
+ utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {})
+
+ assert "error detail 1\nerror detail 2" in err_msg[0]
+ assert "some output" in results_log
+ assert "more output" in results_log
+
+ def test_payload_section(self):
+ """Lines between BEGIN/END_EXTRA_PAYLOAD are parsed as MIME payload."""
+ # Build the MIME payload the same way payload_coder does
+ from email.mime import multipart, text as mime_text
+ m = multipart.MIMEMultipart("form-data")
+ data = mime_text.MIMEText("response_payload", "json", "utf8")
+ data.set_payload(json.JSONEncoder().encode({"key": "value"}))
+ m.attach(data)
+
+ content = (
+ "output before\n"
+ "BEGIN_EXTRA_PAYLOAD\n"
+ + m.as_string() + "\n"
+ "END_EXTRA_PAYLOAD\n"
+ "output after\n"
+ )
+ with tempfile.TemporaryFile(mode="w+") as f:
+ f.write(content)
+ results_log = []
+ payload = {}
+ err_msg = []
+ utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {})
+
+ assert payload.get("key") == "value"
+ assert "output before" in results_log
+ assert "output after" in results_log
+
+ def test_empty_file(self):
+ """Empty output produces empty results."""
+ with tempfile.TemporaryFile(mode="w+") as f:
+ results_log = []
+ payload = {}
+ err_msg = []
+ utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {})
+
+ assert results_log == []
+ assert payload == {}
+ assert err_msg == []