From: Fiete Ostkamp Date: Wed, 4 Mar 2026 13:48:39 +0000 (+0100) Subject: Increase command executor test coverage X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F47%2F143447%2F1;p=ccsdk%2Fcds.git Increase command executor test coverage - increase coverage from 0% (no tests) to 83% Issue-ID: CCSDK-4156 Change-Id: I171ddd1b9e13ee4603170bb6ed3302b098d664c5 Signed-off-by: Fiete Ostkamp --- diff --git a/.gitignore b/.gitignore index c3809a5fb..8b3e0c3d1 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,7 @@ typings/ # dotenv environment variables file .env .env.test +.venv # parcel-bundler cache (https://parceljs.org/) .cache diff --git a/ms/command-executor/src/test/python/__init__.py b/ms/command-executor/src/test/python/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ms/command-executor/src/test/python/test_command_executor_handler.py b/ms/command-executor/src/test/python/test_command_executor_handler.py new file mode 100644 index 000000000..069d2c1fb --- /dev/null +++ b/ms/command-executor/src/test/python/test_command_executor_handler.py @@ -0,0 +1,678 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for command_executor_handler.py — CommandExecutorHandler methods +covering blueprint upload, env preparation, command execution, venv +creation, and package installation. +""" + +import io +import json +import os +import tempfile +import zipfile +from subprocess import CalledProcessError, PIPE +from types import SimpleNamespace +from unittest.mock import patch, MagicMock, mock_open, PropertyMock + +import pytest + +import proto.CommandExecutor_pb2 as CommandExecutor_pb2 +import utils + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_identifiers(name="test-bp", version="1.0.0", uuid="uuid-1"): + return SimpleNamespace( + blueprintName=name, + blueprintVersion=version, + blueprintUUID=uuid, + ) + + +def _make_request(name="test-bp", version="1.0.0", uuid="uuid-1", + timeout=30, request_id="req-1", sub_request_id="sub-1", + originator_id="orig-1", correlation_id="corr-1", + command="", properties=None, packages=None, + archive_type="CBA_ZIP", bin_data=b""): + return SimpleNamespace( + identifiers=_make_identifiers(name, version, uuid), + timeOut=timeout, + requestId=request_id, + subRequestId=sub_request_id, + originatorId=originator_id, + correlationId=correlation_id, + command=command, + properties=properties, + packages=packages or [], + archiveType=archive_type, + binData=bin_data, + ) + + +def _create_valid_zip_bytes(): + """Create a valid in-memory zip file with a single entry.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr("test.txt", "hello world") + return buf.getvalue() + + +def _make_handler(request=None, skip_prometheus=True): + """Create a CommandExecutorHandler with prometheus mocked out.""" + if request is None: + request = _make_request() + + with patch("command_executor_handler.prometheus") as mock_prom, \ + patch.dict(os.environ, {}, clear=False): + # Mock prometheus so it doesn't try to start an HTTP server + mock_histogram = MagicMock() + mock_counter = MagicMock() + mock_prom.REGISTRY = MagicMock() + mock_prom.REGISTRY._command_executor_histogram = mock_histogram + mock_prom.REGISTRY._command_executor_counter = mock_counter + mock_prom.REGISTRY._command_executor_prometheus_server_started = True + mock_prom.Histogram.return_value = mock_histogram + mock_prom.Counter.return_value = mock_counter + + from command_executor_handler import CommandExecutorHandler + handler = CommandExecutorHandler(request) + + return handler + + +# =================================================================== +# __init__ and basic attribute tests +# =================================================================== + +class TestHandlerInit: + + def test_basic_attributes(self): + req = _make_request(name="bp-init", version="2.0.0", uuid="u-init", + timeout=60, request_id="r-init", sub_request_id="s-init") + handler = _make_handler(req) + assert handler.blueprint_name == "bp-init" + assert handler.blueprint_version == "2.0.0" + assert handler.uuid == "u-init" + assert handler.execution_timeout == 60 + assert handler.request_id == "r-init" + assert handler.sub_request_id == "s-init" + + def test_blueprint_dir_path(self): + req = _make_request(name="my-bp", version="1.0.0", uuid="u1") + handler = _make_handler(req) + expected = "/opt/app/onap/blueprints/deploy/my-bp/1.0.0/u1" + assert handler.blueprint_dir == expected + + def test_blueprint_name_version_uuid(self): + req = _make_request(name="n", version="v", uuid="u") + handler = _make_handler(req) + assert handler.blueprint_name_version_uuid == "n/v/u" + + def test_blueprint_name_version_legacy(self): + req = _make_request(name="n", version="v") + handler = _make_handler(req) + assert handler.blueprint_name_version == "n/v" + + +# =================================================================== +# is_installed / blueprint_dir_exists / blueprint_tosca_meta_file_exists +# =================================================================== + +class TestFileChecks: + + def test_is_installed_true(self): + handler = _make_handler() + with patch("os.path.exists", return_value=True): + assert handler.is_installed() is True + + def test_is_installed_false(self): + handler = _make_handler() + with patch("os.path.exists", return_value=False): + assert handler.is_installed() is False + + def test_blueprint_dir_exists(self): + handler = _make_handler() + with patch("os.path.exists", return_value=True): + assert handler.blueprint_dir_exists() is True + + def test_blueprint_tosca_meta_file_exists(self): + handler = _make_handler() + with patch("os.path.exists", return_value=False): + assert handler.blueprint_tosca_meta_file_exists() is False + + +# =================================================================== +# is_valid_archive_type +# =================================================================== + +class TestIsValidArchiveType: + + def test_cba_zip(self): + handler = _make_handler() + assert handler.is_valid_archive_type("CBA_ZIP") is True + + def test_cba_gzip(self): + handler = _make_handler() + assert handler.is_valid_archive_type("CBA_GZIP") is True + + def test_invalid_type(self): + handler = _make_handler() + assert handler.is_valid_archive_type("TAR") is False + + def test_empty_string(self): + handler = _make_handler() + assert handler.is_valid_archive_type("") is False + + +# =================================================================== +# err_exit +# =================================================================== + +class TestErrExit: + + def test_returns_failure_data(self): + handler = _make_handler() + result = handler.err_exit("something broke") + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + assert "something broke" in result[utils.ERR_MSG_KEY] + + +# =================================================================== +# uploadBlueprint +# =================================================================== + +class TestUploadBlueprint: + + def test_invalid_archive_type(self): + req = _make_request(archive_type="UNKNOWN") + handler = _make_handler(req) + upload_req = SimpleNamespace(archiveType="UNKNOWN", binData=b"", + **{'__str__': lambda s: "req"}) + # Use request object with the right fields + result = handler.uploadBlueprint( + SimpleNamespace(archiveType="UNKNOWN", binData=b"") + ) + assert result.status == CommandExecutor_pb2.FAILURE + + def test_successful_zip_upload(self): + zip_bytes = _create_valid_zip_bytes() + req = _make_request(archive_type="CBA_ZIP", bin_data=zip_bytes) + handler = _make_handler(req) + + with patch("os.makedirs") as mock_mkdirs: + upload_req = SimpleNamespace(archiveType="CBA_ZIP", binData=zip_bytes) + # Mock ZipFile to avoid writing to filesystem + with patch("command_executor_handler.ZipFile") as mock_zf: + mock_zf_instance = MagicMock() + mock_zf.return_value.__enter__ = MagicMock(return_value=mock_zf_instance) + mock_zf.return_value.__exit__ = MagicMock(return_value=False) + result = handler.uploadBlueprint(upload_req) + + assert result.status == CommandExecutor_pb2.SUCCESS + + def test_makedirs_failure(self): + req = _make_request() + handler = _make_handler(req) + + with patch("os.makedirs", side_effect=OSError("permission denied")): + upload_req = SimpleNamespace(archiveType="CBA_ZIP", binData=b"data") + result = handler.uploadBlueprint(upload_req) + + assert result.status == CommandExecutor_pb2.FAILURE + + def test_gzip_returns_failure_todo(self): + """CBA_GZIP is recognized but not yet implemented, should return FAILURE.""" + req = _make_request() + handler = _make_handler(req) + + with patch("os.makedirs"): + upload_req = SimpleNamespace(archiveType="CBA_GZIP", binData=b"data") + result = handler.uploadBlueprint(upload_req) + + assert result.status == CommandExecutor_pb2.FAILURE + + +# =================================================================== +# prepare_env +# =================================================================== + +class TestPrepareEnv: + + def test_blueprint_dir_not_found(self): + req = _make_request() + handler = _make_handler(req) + + with patch.object(handler, 'blueprint_dir_exists', return_value=False): + result = handler.prepare_env(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + assert result.get(utils.REUPLOAD_CBA_KEY) is True + + def test_tosca_meta_not_found(self): + req = _make_request() + handler = _make_handler(req) + + with patch.object(handler, 'blueprint_dir_exists', return_value=True), \ + patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=False): + result = handler.prepare_env(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + assert result.get(utils.REUPLOAD_CBA_KEY) is True + + def test_already_installed_reads_file(self): + req = _make_request() + handler = _make_handler(req) + + with patch.object(handler, 'blueprint_dir_exists', return_value=True), \ + patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \ + patch.object(handler, 'is_installed', return_value=True), \ + patch("command_executor_handler.open", mock_open(read_data="previously installed packages")): + result = handler.prepare_env(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True + assert "previously installed packages" in result[utils.RESULTS_LOG_KEY][0] + + def test_already_installed_read_failure(self): + req = _make_request() + handler = _make_handler(req) + + with patch.object(handler, 'blueprint_dir_exists', return_value=True), \ + patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \ + patch.object(handler, 'is_installed', return_value=True), \ + patch("command_executor_handler.open", side_effect=IOError("read error")): + result = handler.prepare_env(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + + def test_create_venv_failure(self): + req = _make_request() + handler = _make_handler(req) + + with patch.object(handler, 'blueprint_dir_exists', return_value=True), \ + patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \ + patch.object(handler, 'is_installed', return_value=False), \ + patch.object(handler, 'create_venv', return_value=utils.build_ret_data(False, error="venv error")): + result = handler.prepare_env(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + + def test_pip_upgrade_failure(self): + req = _make_request() + handler = _make_handler(req) + + with patch.object(handler, 'blueprint_dir_exists', return_value=True), \ + patch.object(handler, 'blueprint_tosca_meta_file_exists', return_value=True), \ + patch.object(handler, 'is_installed', return_value=False), \ + patch.object(handler, 'create_venv', return_value=utils.build_ret_data(True)), \ + patch.object(handler, 'upgrade_pip', return_value=False): + result = handler.prepare_env(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + + +# =================================================================== +# execute_command +# =================================================================== + +class TestExecuteCommand: + + def test_successful_execution(self): + req = _make_request( + command="/opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py arg1", + timeout=30, + ) + handler = _make_handler(req) + + mock_completed = MagicMock() + mock_completed.returncode = 0 + + with patch.object(handler, 'is_installed', return_value=True), \ + patch("os.utime"), \ + patch("subprocess.run", return_value=mock_completed), \ + patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")): + result = handler.execute_command(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True + + def test_failed_execution_nonzero_rc(self): + req = _make_request( + command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py", + timeout=30, + ) + handler = _make_handler(req) + + mock_completed = MagicMock() + mock_completed.returncode = 1 + + with patch.object(handler, 'is_installed', return_value=True), \ + patch("os.utime"), \ + patch("subprocess.run", return_value=mock_completed), \ + patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")): + result = handler.execute_command(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + + def test_timeout_execution(self): + from subprocess import TimeoutExpired + + req = _make_request( + command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py", + timeout=5, + ) + handler = _make_handler(req) + + with patch.object(handler, 'is_installed', return_value=True), \ + patch("os.utime"), \ + patch("subprocess.run", side_effect=TimeoutExpired("cmd", 5)), \ + patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")): + result = handler.execute_command(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + + def test_general_exception(self): + req = _make_request( + command="python /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/Scripts/python/test.py", + timeout=30, + ) + handler = _make_handler(req) + + with patch.object(handler, 'is_installed', return_value=True), \ + patch("os.utime", side_effect=Exception("unexpected")): + result = handler.execute_command(req) + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + + def test_sr7_compatibility_path_rewrite(self): + """If request.command uses name/version but not UUID, UUID should be inserted.""" + req = _make_request( + name="my-bp", version="1.0.0", uuid="u-1", + command="python /opt/app/onap/blueprints/deploy/my-bp/1.0.0/Scripts/python/test.py", + timeout=30, + ) + handler = _make_handler(req) + + mock_completed = MagicMock() + mock_completed.returncode = 0 + captured_cmd = [] + + def capture_run(cmd, **kwargs): + captured_cmd.append(cmd) + return mock_completed + + with patch.object(handler, 'is_installed', return_value=True), \ + patch("os.utime"), \ + patch("subprocess.run", side_effect=capture_run), \ + patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")): + handler.execute_command(req) + + # The command should have been rewritten to include the UUID + assert "my-bp/1.0.0/u-1" in captured_cmd[0] + + def test_ansible_playbook_adds_interpreter(self): + """ansible-playbook commands should get ansible_python_interpreter set.""" + req = _make_request( + command="ansible-playbook /opt/app/onap/blueprints/deploy/test-bp/1.0.0/uuid-1/playbook.yml", + timeout=30, + ) + handler = _make_handler(req) + + mock_completed = MagicMock() + mock_completed.returncode = 0 + captured_cmd = [] + + def capture_run(cmd, **kwargs): + captured_cmd.append(cmd) + return mock_completed + + with patch.object(handler, 'is_installed', return_value=True), \ + patch("os.utime"), \ + patch("subprocess.run", side_effect=capture_run), \ + patch("tempfile.TemporaryFile", return_value=tempfile.TemporaryFile(mode="w+")): + handler.execute_command(req) + + assert "ansible_python_interpreter=" in captured_cmd[0] + + +# =================================================================== +# create_venv +# =================================================================== + +class TestCreateVenv: + + def test_successful_creation(self): + handler = _make_handler() + with patch("venv.create") as mock_create: + result = handler.create_venv() + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is True + mock_create.assert_called_once() + + def test_creation_failure(self): + handler = _make_handler() + with patch("venv.create", side_effect=Exception("venv failed")): + result = handler.create_venv() + + assert result[utils.CDS_IS_SUCCESSFUL_KEY] is False + assert "venv failed" in str(result.get(utils.ERR_MSG_KEY, "")) + + def test_system_site_packages_disabled(self): + handler = _make_handler() + with patch("venv.create") as mock_create, \ + patch.dict(os.environ, {"CREATE_VENV_DISABLE_SITE_PACKAGES": "1"}): + handler.create_venv() + + call_kwargs = mock_create.call_args + # system_site_packages should be False when env var is set + assert call_kwargs[1].get("system_site_packages") is False or \ + (len(call_kwargs[0]) > 0 and call_kwargs[1].get("system_site_packages") is False) + + def test_system_site_packages_enabled_by_default(self): + handler = _make_handler() + env = os.environ.copy() + env.pop("CREATE_VENV_DISABLE_SITE_PACKAGES", None) + with patch("venv.create") as mock_create, \ + patch.dict(os.environ, env, clear=True): + handler.create_venv() + + call_kwargs = mock_create.call_args + assert call_kwargs[1].get("system_site_packages") is True + + +# =================================================================== +# upgrade_pip +# =================================================================== + +class TestUpgradePip: + + def test_successful_upgrade(self): + handler = _make_handler() + results = [] + + mock_result = MagicMock() + mock_result.stdout = b"Successfully installed pip-23.0" + + with patch("subprocess.run", return_value=mock_result): + success = handler.upgrade_pip(results) + + assert success is True + assert "Successfully installed pip-23.0" in results[0] + + def test_failed_upgrade(self): + handler = _make_handler() + results = [] + + with patch("subprocess.run", side_effect=CalledProcessError( + 1, "pip", stderr=b"pip upgrade failed")): + success = handler.upgrade_pip(results) + + assert success is False + assert "pip upgrade failed" in results[0] + + +# =================================================================== +# install_python_packages +# =================================================================== + +class TestInstallPythonPackages: + + def test_successful_install(self): + handler = _make_handler() + results = [] + + mock_result = MagicMock() + mock_result.stdout = b"Successfully installed package-1.0" + + with patch("subprocess.run", return_value=mock_result): + success = handler.install_python_packages("some-package", results) + + assert success is True + + def test_failed_install(self): + handler = _make_handler() + results = [] + + with patch("subprocess.run", side_effect=CalledProcessError( + 1, "pip install", stderr=b"No matching distribution")): + success = handler.install_python_packages("bad-package", results) + + assert success is False + assert "No matching distribution" in results[0] + + def test_requirements_txt_uses_full_path(self): + handler = _make_handler() + results = [] + captured_cmd = [] + + mock_result = MagicMock() + mock_result.stdout = b"ok" + + def capture_run(cmd, **kwargs): + captured_cmd.append(cmd) + return mock_result + + with patch("subprocess.run", side_effect=capture_run): + handler.install_python_packages("requirements.txt", results) + + # Should use the full path to pip and requirements.txt + assert any("bin/pip" in str(c) for c in captured_cmd) + assert any("requirements.txt" in str(c) for c in captured_cmd) + + def test_utility_package_uses_cp(self): + handler = _make_handler() + results = [] + captured_cmd = [] + + mock_result = MagicMock() + mock_result.stdout = b"ok" + + def capture_run(cmd, **kwargs): + captured_cmd.append(cmd) + return mock_result + + with patch("subprocess.run", side_effect=capture_run): + handler.install_python_packages("UTILITY", results) + + assert captured_cmd[0][0] == "cp" + + def test_pip_install_user_flag(self): + handler = _make_handler() + results = [] + captured_cmd = [] + + mock_result = MagicMock() + mock_result.stdout = b"ok" + + def capture_run(cmd, **kwargs): + captured_cmd.append(cmd) + return mock_result + + with patch("subprocess.run", side_effect=capture_run), \ + patch.dict(os.environ, {"PIP_INSTALL_USER_FLAG": "1"}): + handler.install_python_packages("some-package", results) + + assert "--user" in captured_cmd[0] + + +# =================================================================== +# install_ansible_packages +# =================================================================== + +class TestInstallAnsiblePackages: + + def test_successful_install(self): + handler = _make_handler() + results = [] + + mock_result = MagicMock() + mock_result.stdout = b"Role installed successfully" + + with patch("subprocess.run", return_value=mock_result): + success = handler.install_ansible_packages("my-role", results) + + assert success is True + + def test_failed_install(self): + handler = _make_handler() + results = [] + + with patch("subprocess.run", side_effect=CalledProcessError( + 1, "ansible-galaxy", stderr=b"Role not found")): + success = handler.install_ansible_packages("bad-role", results) + + assert success is False + assert "Role not found" in results[0] + + def test_uses_ansible_galaxy_command(self): + handler = _make_handler() + results = [] + captured_cmd = [] + + mock_result = MagicMock() + mock_result.stdout = b"ok" + + def capture_run(cmd, **kwargs): + captured_cmd.append(cmd) + return mock_result + + with patch("subprocess.run", side_effect=capture_run): + handler.install_ansible_packages("some-role", results) + + assert captured_cmd[0][0] == "ansible-galaxy" + assert "install" in captured_cmd[0] + assert "some-role" in captured_cmd[0] + + def test_http_proxy_is_passed(self): + handler = _make_handler() + results = [] + captured_env = [] + + mock_result = MagicMock() + mock_result.stdout = b"ok" + + def capture_run(cmd, **kwargs): + captured_env.append(kwargs.get("env", {})) + return mock_result + + with patch("subprocess.run", side_effect=capture_run), \ + patch.dict(os.environ, {"http_proxy": "http://proxy:8080"}): + handler.install_ansible_packages("role", results) + + assert captured_env[0].get("https_proxy") == "http://proxy:8080" diff --git a/ms/command-executor/src/test/python/test_command_executor_server.py b/ms/command-executor/src/test/python/test_command_executor_server.py new file mode 100644 index 000000000..9d6db2815 --- /dev/null +++ b/ms/command-executor/src/test/python/test_command_executor_server.py @@ -0,0 +1,490 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Integration tests for command_executor_server.py — CommandExecutorServer gRPC servicer. + +Includes: + - Unit-level tests that mock CommandExecutorHandler to verify branching logic + - Full gRPC integration tests that start a real server, register the servicer, + and make real RPC calls through a channel/stub with mocked handler internals +""" + +import io +import json +import logging +import os +import zipfile +from concurrent import futures +from types import SimpleNamespace +from unittest.mock import patch, MagicMock + +import grpc +import pytest + +import proto.CommandExecutor_pb2 as pb2 +import proto.CommandExecutor_pb2_grpc as pb2_grpc +import utils +from command_executor_server import CommandExecutorServer + + +# --------------------------------------------------------------------------- +# Helpers — build real protobuf messages +# --------------------------------------------------------------------------- + +def _identifiers(name="test-bp", version="1.0.0", uuid="uuid-123"): + ids = pb2.Identifiers() + ids.blueprintName = name + ids.blueprintVersion = version + ids.blueprintUUID = uuid + return ids + + +def _upload_request(name="test-bp", version="1.0.0", uuid="uuid-123", + request_id="req-1", sub_request_id="sub-1", + originator_id="orig-1", archive_type="CBA_ZIP", + bin_data=b""): + req = pb2.UploadBlueprintInput() + req.identifiers.CopyFrom(_identifiers(name, version, uuid)) + req.requestId = request_id + req.subRequestId = sub_request_id + req.originatorId = originator_id + req.archiveType = archive_type + req.binData = bin_data + return req + + +def _prepare_env_request(name="test-bp", version="1.0.0", uuid="uuid-123", + request_id="req-2", sub_request_id="sub-2", + originator_id="orig-1", timeout=30): + req = pb2.PrepareEnvInput() + req.identifiers.CopyFrom(_identifiers(name, version, uuid)) + req.requestId = request_id + req.subRequestId = sub_request_id + req.originatorId = originator_id + req.timeOut = timeout + return req + + +def _execution_request(name="test-bp", version="1.0.0", uuid="uuid-123", + request_id="req-3", sub_request_id="sub-3", + originator_id="orig-1", command="python test.py", + timeout=30): + req = pb2.ExecutionInput() + req.identifiers.CopyFrom(_identifiers(name, version, uuid)) + req.requestId = request_id + req.subRequestId = sub_request_id + req.originatorId = originator_id + req.command = command + req.timeOut = timeout + return req + + +def _create_valid_zip_bytes(): + """Create a valid in-memory zip with a single entry.""" + buf = io.BytesIO() + with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf: + zf.writestr("test.txt", "hello world") + return buf.getvalue() + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def server_instance(): + """A plain CommandExecutorServer (no gRPC wiring).""" + return CommandExecutorServer() + + +@pytest.fixture +def mock_context(): + """A MagicMock standing in for gRPC ServicerContext.""" + return MagicMock() + + +# --------------------------------------------------------------------------- +# Unit tests — mock CommandExecutorHandler to test server branch logic +# --------------------------------------------------------------------------- + +class TestUploadBlueprint: + """uploadBlueprint delegates to handler.uploadBlueprint and returns result.""" + + @patch("command_executor_server.CommandExecutorHandler") + def test_returns_handler_result(self, MockHandler, server_instance, mock_context): + fake_response = pb2.UploadBlueprintOutput() + fake_response.requestId = "req-1" + fake_response.status = pb2.SUCCESS + MockHandler.return_value.uploadBlueprint.return_value = fake_response + + req = _upload_request() + result = server_instance.uploadBlueprint(req, mock_context) + + MockHandler.assert_called_once_with(req) + MockHandler.return_value.uploadBlueprint.assert_called_once_with(req) + assert result == fake_response + + @patch("command_executor_server.CommandExecutorHandler") + def test_passes_request_to_handler(self, MockHandler, server_instance, mock_context): + MockHandler.return_value.uploadBlueprint.return_value = pb2.UploadBlueprintOutput() + + req = _upload_request(name="my-bp", version="2.0.0", uuid="u-456") + server_instance.uploadBlueprint(req, mock_context) + + # Handler was constructed with the exact request + MockHandler.assert_called_once_with(req) + + +class TestPrepareEnv: + """prepareEnv delegates to handler.prepare_env, logs, and builds grpc response.""" + + @patch("command_executor_server.CommandExecutorHandler") + def test_successful_prepare_env(self, MockHandler, server_instance, mock_context): + MockHandler.return_value.prepare_env.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: ["pkg installed"], + utils.ERR_MSG_KEY: "", + } + + req = _prepare_env_request(request_id="req-ok") + result = server_instance.prepareEnv(req, mock_context) + + MockHandler.assert_called_once_with(req) + MockHandler.return_value.prepare_env.assert_called_once_with(req) + # Result is an ExecutionOutput + assert isinstance(result, pb2.ExecutionOutput) + assert result.requestId == "req-ok" + assert result.status == pb2.SUCCESS + + @patch("command_executor_server.CommandExecutorHandler") + def test_failed_prepare_env(self, MockHandler, server_instance, mock_context): + MockHandler.return_value.prepare_env.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: False, + utils.RESULTS_LOG_KEY: [], + utils.ERR_MSG_KEY: "pip install failed", + } + + req = _prepare_env_request(request_id="req-fail") + result = server_instance.prepareEnv(req, mock_context) + + assert isinstance(result, pb2.ExecutionOutput) + assert result.requestId == "req-fail" + assert result.status == pb2.FAILURE + + @patch("command_executor_server.CommandExecutorHandler") + def test_prepare_env_logs_success(self, MockHandler, server_instance, mock_context, caplog): + MockHandler.return_value.prepare_env.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: ["ok"], + utils.ERR_MSG_KEY: "", + } + + req = _prepare_env_request() + with caplog.at_level(logging.INFO, logger="CommandExecutorServer"): + server_instance.prepareEnv(req, mock_context) + + assert any("Package installation logs" in r.message for r in caplog.records) + + @patch("command_executor_server.CommandExecutorHandler") + def test_prepare_env_logs_failure(self, MockHandler, server_instance, mock_context, caplog): + MockHandler.return_value.prepare_env.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: False, + utils.RESULTS_LOG_KEY: [], + utils.ERR_MSG_KEY: "missing dep", + } + + req = _prepare_env_request() + with caplog.at_level(logging.INFO, logger="CommandExecutorServer"): + server_instance.prepareEnv(req, mock_context) + + assert any("Failed to prepare python environment" in r.message for r in caplog.records) + + +class TestExecuteCommand: + """executeCommand delegates to handler.execute_command, logs, builds grpc response.""" + + @patch("command_executor_server.CommandExecutorHandler") + def test_successful_execution(self, MockHandler, server_instance, mock_context): + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: ["all ok"], + utils.ERR_MSG_KEY: "", + } + + req = _execution_request(request_id="req-exec") + result = server_instance.executeCommand(req, mock_context) + + MockHandler.assert_called_once_with(req) + MockHandler.return_value.execute_command.assert_called_once_with(req) + assert isinstance(result, pb2.ExecutionOutput) + assert result.requestId == "req-exec" + assert result.status == pb2.SUCCESS + + @patch("command_executor_server.CommandExecutorHandler") + def test_failed_execution_with_err_msg(self, MockHandler, server_instance, mock_context): + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: False, + utils.RESULTS_LOG_KEY: ["step1"], + utils.ERR_MSG_KEY: "script crashed", + } + + req = _execution_request(request_id="req-fail") + result = server_instance.executeCommand(req, mock_context) + + assert result.status == pb2.FAILURE + + @patch("command_executor_server.CommandExecutorHandler") + def test_failed_execution_without_err_msg(self, MockHandler, server_instance, mock_context, caplog): + """When ERR_MSG_KEY is missing from the response dict, no error text is logged.""" + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: False, + utils.RESULTS_LOG_KEY: ["partial output"], + } + + req = _execution_request() + with caplog.at_level(logging.INFO, logger="CommandExecutorServer"): + result = server_instance.executeCommand(req, mock_context) + + assert result.status == pb2.FAILURE + # The "Error returned:" substring should NOT appear since ERR_MSG_KEY is absent + failure_msgs = [r.message for r in caplog.records if "Failed to executeCommand" in r.message] + assert len(failure_msgs) == 1 + assert "Error returned:" not in failure_msgs[0] + + @patch("command_executor_server.CommandExecutorHandler") + def test_logs_success(self, MockHandler, server_instance, mock_context, caplog): + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: [], + utils.ERR_MSG_KEY: "", + } + + req = _execution_request() + with caplog.at_level(logging.INFO, logger="CommandExecutorServer"): + server_instance.executeCommand(req, mock_context) + + assert any("Execution finished successfully" in r.message for r in caplog.records) + + @patch("command_executor_server.CommandExecutorHandler") + def test_logs_failure_with_error_detail(self, MockHandler, server_instance, mock_context, caplog): + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: False, + utils.RESULTS_LOG_KEY: ["started"], + utils.ERR_MSG_KEY: "timeout!", + } + + req = _execution_request() + with caplog.at_level(logging.INFO, logger="CommandExecutorServer"): + server_instance.executeCommand(req, mock_context) + + failure_msgs = [r.message for r in caplog.records if "Failed to executeCommand" in r.message] + assert len(failure_msgs) == 1 + assert "Error returned: timeout!" in failure_msgs[0] + + @patch.dict(os.environ, {"CE_DEBUG": "true"}) + @patch("command_executor_server.CommandExecutorHandler") + def test_ce_debug_logs_request(self, MockHandler, server_instance, mock_context, caplog): + """When CE_DEBUG=true, the full request is logged.""" + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: [], + utils.ERR_MSG_KEY: "", + } + + req = _execution_request() + with caplog.at_level(logging.INFO, logger="CommandExecutorServer"): + server_instance.executeCommand(req, mock_context) + + # At least 3 log entries: received + request dump + success + payload + assert len(caplog.records) >= 3 + + @patch.dict(os.environ, {"CE_DEBUG": "false"}) + @patch("command_executor_server.CommandExecutorHandler") + def test_ce_debug_off_does_not_log_request(self, MockHandler, server_instance, mock_context, caplog): + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: [], + utils.ERR_MSG_KEY: "", + } + + req = _execution_request() + with caplog.at_level(logging.INFO, logger="CommandExecutorServer"): + server_instance.executeCommand(req, mock_context) + + # The request object itself should not appear in logs + logged_messages = " ".join(r.message for r in caplog.records if isinstance(r.message, str)) + assert "Received executeCommand" in logged_messages + + +# --------------------------------------------------------------------------- +# gRPC integration tests — real server + channel + stub +# --------------------------------------------------------------------------- + +class TestGrpcIntegration: + """ + Spin up a real gRPC server with CommandExecutorServer registered, + make RPC calls through a channel, and verify end-to-end message + serialization / deserialization with mocked handler internals. + """ + + @pytest.fixture(autouse=True) + def grpc_server(self): + """Start a gRPC server on a free port, yield a stub, then shut down.""" + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=4)) + self.servicer = CommandExecutorServer() + pb2_grpc.add_CommandExecutorServiceServicer_to_server(self.servicer, self.server) + port = self.server.add_insecure_port("[::]:0") # OS assigns a free port + self.server.start() + + channel = grpc.insecure_channel(f"localhost:{port}") + self.stub = pb2_grpc.CommandExecutorServiceStub(channel) + + yield + + channel.close() + self.server.stop(grace=0) + + # -- uploadBlueprint over gRPC -- + + @patch("command_executor_server.CommandExecutorHandler") + def test_upload_blueprint_grpc_roundtrip(self, MockHandler): + expected = pb2.UploadBlueprintOutput() + expected.requestId = "req-grpc-1" + expected.status = pb2.SUCCESS + expected.payload = '{"result": "uploaded"}' + MockHandler.return_value.uploadBlueprint.return_value = expected + + req = _upload_request( + name="grpc-bp", version="3.0.0", uuid="grpc-uuid", + request_id="req-grpc-1", bin_data=_create_valid_zip_bytes(), + ) + response = self.stub.uploadBlueprint(req) + + assert response.requestId == "req-grpc-1" + assert response.status == pb2.SUCCESS + assert response.payload == '{"result": "uploaded"}' + + @patch("command_executor_server.CommandExecutorHandler") + def test_upload_blueprint_grpc_failure(self, MockHandler): + expected = pb2.UploadBlueprintOutput() + expected.requestId = "req-grpc-fail" + expected.status = pb2.FAILURE + expected.payload = '{"error": "bad archive"}' + MockHandler.return_value.uploadBlueprint.return_value = expected + + req = _upload_request(request_id="req-grpc-fail") + response = self.stub.uploadBlueprint(req) + + assert response.status == pb2.FAILURE + + # -- prepareEnv over gRPC -- + + @patch("command_executor_server.CommandExecutorHandler") + def test_prepare_env_grpc_success(self, MockHandler): + MockHandler.return_value.prepare_env.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: ["dep1 installed"], + utils.ERR_MSG_KEY: "", + } + + req = _prepare_env_request(request_id="req-prep-grpc") + response = self.stub.prepareEnv(req) + + assert response.requestId == "req-prep-grpc" + assert response.status == pb2.SUCCESS + + @patch("command_executor_server.CommandExecutorHandler") + def test_prepare_env_grpc_failure(self, MockHandler): + MockHandler.return_value.prepare_env.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: False, + utils.RESULTS_LOG_KEY: [], + utils.ERR_MSG_KEY: "network error during pip install", + } + + req = _prepare_env_request(request_id="req-prep-fail") + response = self.stub.prepareEnv(req) + + assert response.requestId == "req-prep-fail" + assert response.status == pb2.FAILURE + + # -- executeCommand over gRPC -- + + @patch("command_executor_server.CommandExecutorHandler") + def test_execute_command_grpc_success(self, MockHandler): + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: ["output line"], + utils.ERR_MSG_KEY: "", + } + + req = _execution_request(request_id="req-exec-grpc", command="python run.py") + response = self.stub.executeCommand(req) + + assert response.requestId == "req-exec-grpc" + assert response.status == pb2.SUCCESS + + @patch("command_executor_server.CommandExecutorHandler") + def test_execute_command_grpc_failure(self, MockHandler): + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: False, + utils.RESULTS_LOG_KEY: ["partial"], + utils.ERR_MSG_KEY: "script error", + } + + req = _execution_request(request_id="req-exec-fail") + response = self.stub.executeCommand(req) + + assert response.requestId == "req-exec-fail" + assert response.status == pb2.FAILURE + + @patch("command_executor_server.CommandExecutorHandler") + def test_execute_command_grpc_preserves_payload(self, MockHandler): + """Verify payload round-trips correctly through gRPC serialization.""" + payload_data = json.dumps({"key": "value", "number": 42}) + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: [payload_data], + utils.ERR_MSG_KEY: "", + } + + req = _execution_request(request_id="req-payload") + response = self.stub.executeCommand(req) + + assert response.requestId == "req-payload" + assert response.status == pb2.SUCCESS + + @patch("command_executor_server.CommandExecutorHandler") + def test_request_fields_received_by_handler(self, MockHandler): + """Verify that the proto request fields survive serialization to the server.""" + MockHandler.return_value.execute_command.return_value = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: [], + utils.ERR_MSG_KEY: "", + } + + req = _execution_request( + name="field-bp", version="5.0.0", uuid="field-uuid", + request_id="field-req", command="python check.py", + ) + self.stub.executeCommand(req) + + # The handler was constructed with a request that has the right identifiers + call_args = MockHandler.call_args[0][0] + assert call_args.identifiers.blueprintName == "field-bp" + assert call_args.identifiers.blueprintVersion == "5.0.0" + assert call_args.requestId == "field-req" + assert call_args.command == "python check.py" diff --git a/ms/command-executor/src/test/python/test_payload_coder.py b/ms/command-executor/src/test/python/test_payload_coder.py new file mode 100644 index 000000000..ead4a374c --- /dev/null +++ b/ms/command-executor/src/test/python/test_payload_coder.py @@ -0,0 +1,181 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for cds_utils/payload_coder.py — MIME payload and error message +encoding used by CBA scripts to communicate results back to the +command executor. +""" + +import json +import sys +from io import StringIO +from unittest.mock import patch + +from cds_utils.payload_coder import ( + send_response_data_payload, + send_response_err_msg, + send_response_err_msg_and_exit, +) + + +class TestSendResponseDataPayload: + + def test_output_contains_begin_end_markers(self): + """Output must be wrapped in BEGIN/END_EXTRA_PAYLOAD markers.""" + captured = StringIO() + with patch("sys.stdout", captured): + send_response_data_payload({"status": "ok"}) + + output = captured.getvalue() + assert "BEGIN_EXTRA_PAYLOAD" in output + assert "END_EXTRA_PAYLOAD" in output + + def test_payload_is_json_encoded(self): + """The JSON payload must appear between the markers.""" + captured = StringIO() + with patch("sys.stdout", captured): + send_response_data_payload({"key": "value", "count": 42}) + + output = captured.getvalue() + # The encoded JSON should be somewhere in the output + assert '"key"' in output + assert '"value"' in output + assert "42" in output + + def test_output_is_mime_formatted(self): + """Output should be MIME multipart form-data.""" + captured = StringIO() + with patch("sys.stdout", captured): + send_response_data_payload({"a": 1}) + + output = captured.getvalue() + assert "Content-Type:" in output + assert "form-data" in output + + def test_empty_payload(self): + """An empty dict should still produce valid markers and MIME.""" + captured = StringIO() + with patch("sys.stdout", captured): + send_response_data_payload({}) + + output = captured.getvalue() + assert "BEGIN_EXTRA_PAYLOAD" in output + assert "END_EXTRA_PAYLOAD" in output + assert "{}" in output + + def test_roundtrip_with_parse_cmd_exec_output(self): + """The output of send_response_data_payload should be parseable + by utils.parse_cmd_exec_output (integration-style test).""" + import tempfile + from unittest.mock import MagicMock + import utils + + payload_data = {"result": "success", "code": 200} + + captured = StringIO() + with patch("sys.stdout", captured): + send_response_data_payload(payload_data) + + output = captured.getvalue() + + with tempfile.TemporaryFile(mode="w+") as f: + f.write(output) + results_log = [] + payload_result = {} + err_msg_result = [] + utils.parse_cmd_exec_output( + f, MagicMock(), payload_result, err_msg_result, results_log, {} + ) + + assert payload_result["result"] == "success" + assert payload_result["code"] == 200 + + +class TestSendResponseErrMsg: + + def test_output_contains_begin_end_markers(self): + captured = StringIO() + with patch("sys.stdout", captured): + send_response_err_msg("something went wrong") + + output = captured.getvalue() + assert "BEGIN_EXTRA_RET_ERR_MSG" in output + assert "END_EXTRA_RET_ERR_MSG" in output + + def test_error_message_is_included(self): + captured = StringIO() + with patch("sys.stdout", captured): + send_response_err_msg("disk full") + + output = captured.getvalue() + assert "disk full" in output + + def test_roundtrip_with_parse_cmd_exec_output(self): + """The err msg output should be parseable by parse_cmd_exec_output.""" + import tempfile + from unittest.mock import MagicMock + import utils + + captured = StringIO() + with patch("sys.stdout", captured): + send_response_err_msg("custom error from script") + + output = captured.getvalue() + + with tempfile.TemporaryFile(mode="w+") as f: + f.write(output) + results_log = [] + payload_result = {} + err_msg_result = [] + utils.parse_cmd_exec_output( + f, MagicMock(), payload_result, err_msg_result, results_log, {} + ) + + assert len(err_msg_result) == 1 + assert "custom error from script" in err_msg_result[0] + + +class TestSendResponseErrMsgAndExit: + + def test_output_contains_markers(self): + captured = StringIO() + with patch("sys.stdout", captured), \ + pytest.raises(SystemExit) as exc_info: + send_response_err_msg_and_exit("fatal error") + + output = captured.getvalue() + assert "BEGIN_EXTRA_RET_ERR_MSG" in output + assert "END_EXTRA_RET_ERR_MSG" in output + assert "fatal error" in output + + def test_exits_with_default_code_1(self): + with patch("sys.stdout", StringIO()), \ + pytest.raises(SystemExit) as exc_info: + send_response_err_msg_and_exit("oops") + + assert exc_info.value.code == 1 + + def test_exits_with_custom_code(self): + with patch("sys.stdout", StringIO()), \ + pytest.raises(SystemExit) as exc_info: + send_response_err_msg_and_exit("oops", code=42) + + assert exc_info.value.code == 42 + + +# Need pytest import for raises +import pytest diff --git a/ms/command-executor/src/test/python/test_request_header_validator_interceptor.py b/ms/command-executor/src/test/python/test_request_header_validator_interceptor.py new file mode 100644 index 000000000..5fe530672 --- /dev/null +++ b/ms/command-executor/src/test/python/test_request_header_validator_interceptor.py @@ -0,0 +1,130 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for request_header_validator_interceptor.py — gRPC server interceptor +that validates authorization headers on incoming requests. +""" + +from unittest.mock import MagicMock, patch + +import grpc + +from request_header_validator_interceptor import ( + RequestHeaderValidatorInterceptor, + _unary_unary_rpc_terminator, +) + + +# =================================================================== +# _unary_unary_rpc_terminator +# =================================================================== + +class TestUnaryUnaryRpcTerminator: + + def test_returns_rpc_method_handler(self): + handler = _unary_unary_rpc_terminator( + grpc.StatusCode.UNAUTHENTICATED, "Access denied" + ) + # grpc.unary_unary_rpc_method_handler returns an RpcMethodHandler + assert handler is not None + assert handler.unary_unary is not None + + def test_terminator_aborts_context(self): + handler = _unary_unary_rpc_terminator( + grpc.StatusCode.UNAUTHENTICATED, "No access" + ) + mock_context = MagicMock() + # Call the actual terminate function + handler.unary_unary(None, mock_context) + mock_context.abort.assert_called_once_with( + grpc.StatusCode.UNAUTHENTICATED, "No access" + ) + + +# =================================================================== +# RequestHeaderValidatorInterceptor +# =================================================================== + +class TestRequestHeaderValidatorInterceptor: + + def _make_interceptor(self, header="authorization", value="Basic abc123"): + return RequestHeaderValidatorInterceptor( + header, value, + grpc.StatusCode.UNAUTHENTICATED, "Access denied!" + ) + + def test_valid_header_continues(self): + interceptor = self._make_interceptor( + header="authorization", value="Basic abc123" + ) + + # Mock handler_call_details with matching metadata + mock_details = MagicMock() + mock_details.invocation_metadata = [ + ("authorization", "Basic abc123"), + ("other-header", "other-value"), + ] + + mock_continuation = MagicMock(return_value="continued_handler") + result = interceptor.intercept_service(mock_continuation, mock_details) + + mock_continuation.assert_called_once_with(mock_details) + assert result == "continued_handler" + + def test_missing_header_returns_terminator(self): + interceptor = self._make_interceptor( + header="authorization", value="Basic abc123" + ) + + mock_details = MagicMock() + mock_details.invocation_metadata = [ + ("other-header", "other-value"), + ] + + mock_continuation = MagicMock() + result = interceptor.intercept_service(mock_continuation, mock_details) + + # Continuation should NOT be called + mock_continuation.assert_not_called() + # Result should be the terminator (an RpcMethodHandler) + assert result is not None + + def test_wrong_value_returns_terminator(self): + interceptor = self._make_interceptor( + header="authorization", value="Basic abc123" + ) + + mock_details = MagicMock() + mock_details.invocation_metadata = [ + ("authorization", "Basic WRONG"), + ] + + mock_continuation = MagicMock() + result = interceptor.intercept_service(mock_continuation, mock_details) + + mock_continuation.assert_not_called() + + def test_empty_metadata_returns_terminator(self): + interceptor = self._make_interceptor() + + mock_details = MagicMock() + mock_details.invocation_metadata = [] + + mock_continuation = MagicMock() + result = interceptor.intercept_service(mock_continuation, mock_details) + + mock_continuation.assert_not_called() diff --git a/ms/command-executor/src/test/python/test_utils.py b/ms/command-executor/src/test/python/test_utils.py new file mode 100644 index 000000000..41800d7e4 --- /dev/null +++ b/ms/command-executor/src/test/python/test_utils.py @@ -0,0 +1,391 @@ +# +# Copyright (C) 2026 Deutsche Telekom. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Tests for utils.py — gRPC response building, request field extraction, +output parsing, and log truncation. +""" + +import json +import tempfile +from types import SimpleNamespace +from unittest.mock import patch, MagicMock + +import pytest + +import proto.CommandExecutor_pb2 as CommandExecutor_pb2 +import utils + + +# --------------------------------------------------------------------------- +# Helpers — lightweight request stub +# --------------------------------------------------------------------------- + +def _make_identifiers(name="bp1", version="1.0.0", uuid="abc-123"): + return SimpleNamespace(blueprintName=name, blueprintVersion=version, blueprintUUID=uuid) + + +def _make_request(name="bp1", version="1.0.0", uuid="abc-123", + timeout=30, request_id="req-1", sub_request_id="sub-1", + originator_id="orig-1"): + return SimpleNamespace( + identifiers=_make_identifiers(name, version, uuid), + timeOut=timeout, + requestId=request_id, + subRequestId=sub_request_id, + originatorId=originator_id, + ) + + +# =================================================================== +# Request field extraction +# =================================================================== + +class TestRequestFieldExtraction: + """Tests for the simple accessor functions at the top of utils.py.""" + + def test_blueprint_name_version_uuid(self): + req = _make_request(name="my-bp", version="2.0.0", uuid="u-1") + assert utils.blueprint_name_version_uuid(req) == "my-bp/2.0.0/u-1" + + def test_blueprint_name_version(self): + req = _make_request(name="my-bp", version="2.0.0") + assert utils.blueprint_name_version(req) == "my-bp/2.0.0" + + def test_get_blueprint_name(self): + req = _make_request(name="test-cba") + assert utils.get_blueprint_name(req) == "test-cba" + + def test_get_blueprint_version(self): + req = _make_request(version="3.1.0") + assert utils.get_blueprint_version(req) == "3.1.0" + + def test_get_blueprint_uuid(self): + req = _make_request(uuid="12345") + assert utils.get_blueprint_uuid(req) == "12345" + + def test_get_blueprint_timeout(self): + req = _make_request(timeout=120) + assert utils.get_blueprint_timeout(req) == 120 + + def test_get_blueprint_requestid(self): + req = _make_request(request_id="r-99") + assert utils.get_blueprint_requestid(req) == "r-99" + + def test_get_blueprint_subRequestId(self): + req = _make_request(sub_request_id="sr-42") + assert utils.get_blueprint_subRequestId(req) == "sr-42" + + +# =================================================================== +# getExtraLogData +# =================================================================== + +class TestGetExtraLogData: + + def test_with_request(self): + req = _make_request(request_id="r1", sub_request_id="s1", originator_id="o1") + extra = utils.getExtraLogData(req) + assert extra == { + 'request_id': 'r1', + 'subrequest_id': 's1', + 'originator_id': 'o1', + } + + def test_without_request(self): + extra = utils.getExtraLogData() + assert extra == { + 'request_id': '', + 'subrequest_id': '', + 'originator_id': '', + } + + def test_with_none(self): + extra = utils.getExtraLogData(None) + assert extra == { + 'request_id': '', + 'subrequest_id': '', + 'originator_id': '', + } + + +# =================================================================== +# build_ret_data +# =================================================================== + +class TestBuildRetData: + + def test_success_minimal(self): + ret = utils.build_ret_data(True) + assert ret[utils.CDS_IS_SUCCESSFUL_KEY] is True + assert ret[utils.RESULTS_LOG_KEY] == [] + assert utils.ERR_MSG_KEY not in ret + assert utils.REUPLOAD_CBA_KEY not in ret + + def test_failure_with_error(self): + ret = utils.build_ret_data(False, error="something broke") + assert ret[utils.CDS_IS_SUCCESSFUL_KEY] is False + assert ret[utils.ERR_MSG_KEY] == "something broke" + + def test_with_results_log(self): + logs = ["line1", "line2"] + ret = utils.build_ret_data(True, results_log=logs) + assert ret[utils.RESULTS_LOG_KEY] == logs + + def test_reupload_cba_flag(self): + ret = utils.build_ret_data(False, reupload_cba=True) + assert ret[utils.REUPLOAD_CBA_KEY] is True + + def test_reupload_cba_absent_when_false(self): + ret = utils.build_ret_data(False, reupload_cba=False) + assert utils.REUPLOAD_CBA_KEY not in ret + + +# =================================================================== +# build_grpc_response +# =================================================================== + +class TestBuildGrpcResponse: + + def test_success_response(self): + data = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: ["log line 1"], + } + resp = utils.build_grpc_response("req-1", data) + + assert isinstance(resp, CommandExecutor_pb2.ExecutionOutput) + assert resp.requestId == "req-1" + assert resp.status == CommandExecutor_pb2.SUCCESS + assert resp.errMsg == "" + # CDS_IS_SUCCESSFUL_KEY and RESULTS_LOG_KEY should be popped from data + payload = json.loads(resp.payload) + assert utils.CDS_IS_SUCCESSFUL_KEY not in payload + assert utils.RESULTS_LOG_KEY not in payload + + def test_failure_response_with_error(self): + data = { + utils.CDS_IS_SUCCESSFUL_KEY: False, + utils.RESULTS_LOG_KEY: ["log1"], + utils.ERR_MSG_KEY: ["Error line 1", "Error line 2"], + } + resp = utils.build_grpc_response("req-2", data) + + assert resp.status == CommandExecutor_pb2.FAILURE + assert "Error line 1" in resp.errMsg + assert "Error line 2" in resp.errMsg + + def test_response_contains_timestamp(self): + data = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: [], + } + resp = utils.build_grpc_response("req-3", data) + assert resp.timestamp.seconds > 0 + + def test_response_logs_preserved(self): + data = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: ["line A", "line B", "line C"], + } + resp = utils.build_grpc_response("req-4", data) + assert list(resp.response) == ["line A", "line B", "line C"] + + def test_extra_payload_fields_preserved(self): + data = { + utils.CDS_IS_SUCCESSFUL_KEY: True, + utils.RESULTS_LOG_KEY: [], + "custom_key": "custom_value", + } + resp = utils.build_grpc_response("req-5", data) + payload = json.loads(resp.payload) + assert payload["custom_key"] == "custom_value" + + +# =================================================================== +# build_grpc_blueprint_validation_response +# =================================================================== + +class TestBuildGrpcBlueprintValidationResponse: + + def test_success(self): + resp = utils.build_grpc_blueprint_validation_response( + "req-1", "sub-1", "uuid-1", success=True + ) + assert isinstance(resp, CommandExecutor_pb2.BlueprintValidationOutput) + assert resp.requestId == "req-1" + assert resp.subRequestId == "sub-1" + assert resp.cbaUUID == "uuid-1" + assert resp.status == CommandExecutor_pb2.SUCCESS + + def test_failure(self): + resp = utils.build_grpc_blueprint_validation_response( + "req-2", "sub-2", "uuid-2", success=False + ) + assert resp.status == CommandExecutor_pb2.FAILURE + + def test_has_timestamp(self): + resp = utils.build_grpc_blueprint_validation_response("r", "s", "u") + assert resp.timestamp.seconds > 0 + + +# =================================================================== +# build_grpc_blueprint_upload_response +# =================================================================== + +class TestBuildGrpcBlueprintUploadResponse: + + def test_success(self): + resp = utils.build_grpc_blueprint_upload_response("req-1", "sub-1", success=True) + assert isinstance(resp, CommandExecutor_pb2.UploadBlueprintOutput) + assert resp.status == CommandExecutor_pb2.SUCCESS + assert resp.requestId == "req-1" + assert resp.subRequestId == "sub-1" + + def test_failure_with_payload(self): + resp = utils.build_grpc_blueprint_upload_response( + "req-2", "sub-2", success=False, payload=["err1", "err2"] + ) + assert resp.status == CommandExecutor_pb2.FAILURE + payload = json.loads(resp.payload) + assert payload == ["err1", "err2"] + + def test_has_timestamp(self): + resp = utils.build_grpc_blueprint_upload_response("r", "s") + assert resp.timestamp.seconds > 0 + + +# =================================================================== +# truncate_execution_output +# =================================================================== + +class TestTruncateExecutionOutput: + + def test_small_output_not_truncated(self): + eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}") + eo.response.append("short log line") + result = utils.truncate_execution_output(eo) + assert list(result.response) == ["short log line"] + + def test_large_output_is_truncated(self): + eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}") + # Fill with enough data to exceed RESPONSE_MAX_SIZE (4MB) + large_line = "x" * 10000 + for _ in range(500): + eo.response.append(large_line) + + result = utils.truncate_execution_output(eo) + # The last entry should be the truncation message + assert result.response[-1].startswith("[...] TRUNCATED CHARS") + assert result.ByteSize() <= utils.RESPONSE_MAX_SIZE + 1000 # small overhead ok + + def test_truncation_message_contains_char_count(self): + eo = CommandExecutor_pb2.ExecutionOutput(requestId="r1", payload="{}") + large_line = "y" * 10000 + for _ in range(500): + eo.response.append(large_line) + + result = utils.truncate_execution_output(eo) + last_line = result.response[-1] + # Extract the number from "[...] TRUNCATED CHARS : NNN" + assert "TRUNCATED CHARS" in last_line + count_str = last_line.split(":")[-1].strip() + assert int(count_str) > 0 + + +# =================================================================== +# parse_cmd_exec_output +# =================================================================== + +class TestParseCmdExecOutput: + + def _make_logger(self): + logger = MagicMock() + return logger + + def test_plain_output(self): + """Lines without special markers go to results_log.""" + content = "line 1\nline 2\nline 3\n" + with tempfile.TemporaryFile(mode="w+") as f: + f.write(content) + results_log = [] + payload = {} + err_msg = [] + utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {}) + + assert results_log == ["line 1", "line 2", "line 3"] + assert payload == {} + assert err_msg == [] + + def test_error_message_section(self): + """Lines between BEGIN/END_EXTRA_RET_ERR_MSG go to err_msg_result.""" + content = ( + "some output\n" + "BEGIN_EXTRA_RET_ERR_MSG\n" + "error detail 1\n" + "error detail 2\n" + "END_EXTRA_RET_ERR_MSG\n" + "more output\n" + ) + with tempfile.TemporaryFile(mode="w+") as f: + f.write(content) + results_log = [] + payload = {} + err_msg = [] + utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {}) + + assert "error detail 1\nerror detail 2" in err_msg[0] + assert "some output" in results_log + assert "more output" in results_log + + def test_payload_section(self): + """Lines between BEGIN/END_EXTRA_PAYLOAD are parsed as MIME payload.""" + # Build the MIME payload the same way payload_coder does + from email.mime import multipart, text as mime_text + m = multipart.MIMEMultipart("form-data") + data = mime_text.MIMEText("response_payload", "json", "utf8") + data.set_payload(json.JSONEncoder().encode({"key": "value"})) + m.attach(data) + + content = ( + "output before\n" + "BEGIN_EXTRA_PAYLOAD\n" + + m.as_string() + "\n" + "END_EXTRA_PAYLOAD\n" + "output after\n" + ) + with tempfile.TemporaryFile(mode="w+") as f: + f.write(content) + results_log = [] + payload = {} + err_msg = [] + utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {}) + + assert payload.get("key") == "value" + assert "output before" in results_log + assert "output after" in results_log + + def test_empty_file(self): + """Empty output produces empty results.""" + with tempfile.TemporaryFile(mode="w+") as f: + results_log = [] + payload = {} + err_msg = [] + utils.parse_cmd_exec_output(f, self._make_logger(), payload, err_msg, results_log, {}) + + assert results_log == [] + assert payload == {} + assert err_msg == []