PyExecutor ResourceResoluton helper class. 99/103499/1
authorMichal Jagiello <michal.jagiello@t-mobile.pl>
Mon, 9 Mar 2020 14:34:14 +0000 (14:34 +0000)
committerMichal Jagiello <michal.jagiello@t-mobile.pl>
Wed, 11 Mar 2020 09:30:15 +0000 (09:30 +0000)
Create a class to call workflow execution requests to gRPC server.
Create an interceptor to use header authorization for gRPC calls.

Issue-ID: CCSDK-1989
Signed-off-by: Michal Jagiello <michal.jagiello@t-mobile.pl>
Change-Id: Ia449a089e02e7a12e31bee5e3b7debee506d8426

ms/py-executor/resource_resolution/README
ms/py-executor/resource_resolution/authorization.py [new file with mode: 0644]
ms/py-executor/resource_resolution/client.py
ms/py-executor/resource_resolution/resource_resolution.py [new file with mode: 0644]
ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py [new file with mode: 0644]
ms/py-executor/resource_resolution/tests/resource_resolution_test.py [new file with mode: 0644]

index 222dae4..3536004 100644 (file)
@@ -77,4 +77,67 @@ if __name__ == "__main__":
             for response in client.process(generate_messages()):
                 print(response)
 
+```
+
+### Authorizarion header
+
+```
+from proto.BluePrintCommon_pb2 import ActionIdentifiers, CommonHeader
+from proto.BluePrintProcessing_pb2 import ExecutionServiceInput
+from resource_resolution.client import Client as ResourceResolutionClient
+
+
+def generate_messages():
+    commonHeader = CommonHeader()
+    commonHeader.requestId = "1234"
+    commonHeader.subRequestId = "1234-1"
+    commonHeader.originatorId = "CDS"
+
+    actionIdentifiers = ActionIdentifiers()
+    actionIdentifiers.blueprintName = "sample-cba"
+    actionIdentifiers.blueprintVersion = "1.0.0"
+    actionIdentifiers.actionName = "SampleScript"
+
+    input = ExecutionServiceInput(commonHeader=commonHeader, actionIdentifiers=actionIdentifiers)
+
+    commonHeader2 = CommonHeader()
+    commonHeader2.requestId = "1235"
+    commonHeader2.subRequestId = "1234-2"
+    commonHeader2.originatorId = "CDS"
+
+    input2 = ExecutionServiceInput(commonHeader=commonHeader2, actionIdentifiers=actionIdentifiers)
+
+    yield from [input, input2]
+
+
+if __name__ == "__main__":
+    with ResourceResolutionClient("127.0.0.1:9111", use_header_auth=True, header_auth_token="Token test") as client:
+        for response in client.process(generate_messages()):
+            print(response)
+
+```
+
+# ResourceResoulution helper class
+
+## How to use examples
+
+### Insecure channel
+
+```
+from resource_resolution.resource_resolution import ResourceResolution, WorkflowExecution, WorkflowExecutionResult
+
+
+if __name__ == "__main__":
+    with ResourceResolution(use_header_auth=True, header_auth_token="Basic token") as rr:
+        for response in rr.execute_workflows(  # type: WorkflowExecutionResult
+            WorkflowExecution(
+                blueprint_name="blueprintName",
+                blueprint_version="1.0",
+                workflow_name="resource-assignment"
+            )
+        ):
+            if response.has_error:
+                print(response.error_message)
+            else:
+                print(response.payload)
 ```
\ No newline at end of file
diff --git a/ms/py-executor/resource_resolution/authorization.py b/ms/py-executor/resource_resolution/authorization.py
new file mode 100644 (file)
index 0000000..ae5954e
--- /dev/null
@@ -0,0 +1,64 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+from collections import namedtuple
+from typing import Any, Callable, List
+
+from grpc import ClientCallDetails, StreamStreamClientInterceptor
+
+
+class NewClientCallDetails(
+    namedtuple("_ClientCallDetails", ("method", "timeout", "metadata", "credentials")), ClientCallDetails
+):
+    """Namedtuple class to store metadata.
+
+    It's impossible to change original metadata in ClientCallDetails object
+    passed as a parameter to intercept method, so this class is going to get
+    original metadata tuple and add the authorization one.
+    """
+
+    pass
+
+
+class AuthTokenInterceptor(StreamStreamClientInterceptor):
+    """Interceptor class to set authorization header.
+
+    Set authorization header (but it can be any header also) for a gRPC call.
+    """
+
+    def __init__(self, token: str, header: str = "authorization") -> None:
+        """Initialize interceptor.
+
+        Set token and header which should be set into call. By default header is "authorization".
+        Header have to be lowercase.
+
+        Args:
+            token (str): Token value to be set.
+            header (str, optional): Header name. It must be lowercase. Defaults to "authorization".
+        """
+        self.token: str = token
+        if not header.islower():
+            raise ValueError("Header must be lowercase.")
+        self.header: str = header
+
+    def intercept_stream_stream(
+        self, continuation: Callable, client_call_details: ClientCallDetails, request_iterator: Any
+    ) -> Any:
+        """Add header into metadata."""
+        metadata: List = list(client_call_details.metadata) if client_call_details.metadata is not None else []
+        metadata.append((self.header, self.token,))
+        new_client_call_details: NewClientCallDetails = NewClientCallDetails(
+            client_call_details.method, client_call_details.timeout, metadata, client_call_details.credentials
+        )
+        return continuation(new_client_call_details, request_iterator)
index 8908774..fee1686 100644 (file)
@@ -14,12 +14,20 @@ limitations under the License.
 """
 from logging import Logger, getLogger
 from types import TracebackType
-from typing import Iterable, List, Optional, Type
-
-from grpc import Channel, insecure_channel, secure_channel, ssl_channel_credentials
+from typing import Iterable, Optional, Type
+
+from grpc import (
+    Channel,
+    insecure_channel,
+    intercept_channel,
+    secure_channel,
+    ssl_channel_credentials,
+)
 from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput
 from proto.BluePrintProcessing_pb2_grpc import BluePrintProcessingServiceStub
 
+from .authorization import AuthTokenInterceptor
+
 
 class Client:
     """Resource resoulution client class."""
@@ -28,20 +36,29 @@ class Client:
         self,
         server_address: str,
         *,
+        # TLS/SSL configuration
         use_ssl: bool = False,
         root_certificates: bytes = None,
         private_key: bytes = None,
         certificate_chain: bytes = None,
+        # Authentication header configuration
+        use_header_auth: bool = False,
+        header_auth_token: str = None,
     ) -> None:
         """Client class initialization.
 
         :param server_address: Address to server to connect.
         :param use_ssl: Boolean flag to determine if secure channel should be created or not. Keyword argument.
         :param root_certificates: The PEM-encoded root certificates. None if it shouldn't be used. Keyword argument.
-        :param private_key: The PEM-encoded private key as a byte string, or None if no private key should be used. Keyword argument.
-        :param certificate_chain: The PEM-encoded certificate chain as a byte string to use or or None if no certificate chain should be used. Keyword argument.
+        :param private_key: The PEM-encoded private key as a byte string, or None if no private key should be used.
+            Keyword argument.
+        :param certificate_chain: The PEM-encoded certificate chain as a byte string to use or or None if
+            no certificate chain should be used. Keyword argument.
+        :param use_header_auth: Boolean flag to determine if authorization headed shoud be added for every call or not.
+            Keyword argument.
+        :param header_auth_token: Authorization token value. Keyword argument.
         """
-        self.logger = getLogger(__name__)
+        self.logger: Logger = getLogger(__name__)
         if use_ssl:
             self.channel: Channel = secure_channel(
                 server_address, ssl_channel_credentials(root_certificates, private_key, certificate_chain)
@@ -50,6 +67,8 @@ class Client:
         else:
             self.channel: Channel = insecure_channel(server_address)
             self.logger.debug(f"Create insecure channel to connect to {server_address}")
+        if use_header_auth:
+            self.channel: Channel = intercept_channel(self.channel, AuthTokenInterceptor(header_auth_token))
         self.stub: BluePrintProcessingServiceStub = BluePrintProcessingServiceStub(self.channel)
 
     def close(self) -> None:
diff --git a/ms/py-executor/resource_resolution/resource_resolution.py b/ms/py-executor/resource_resolution/resource_resolution.py
new file mode 100644 (file)
index 0000000..e4f162f
--- /dev/null
@@ -0,0 +1,294 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from enum import Enum, unique
+from logging import Logger, getLogger
+from types import TracebackType
+from typing import Any, Dict, Generator, Optional, Type
+
+from google.protobuf import json_format
+
+from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput
+
+from .client import Client
+
+
+@unique
+class WorkflowMode(Enum):
+    """Workflow mode enumerator.
+
+    Workflow can be executed in two modes: synchronously and asynchronously.
+    This enumerator stores valid values to set the mode: SYNC for synchronously mode and ASYNC for asynchronously.
+    """
+
+    SYNC = "sync"
+    ASYNC = "async"
+
+
+class WorkflowExecution:
+    """Wokflow execution class.
+
+    Describes workflow to call. Set blueprint name and version and workflow name to execute.
+    Workflow inputs are optional, by default set to empty directory.
+    Workflow mode is also optional. It is set by default to call workflow synchronously.
+    """
+
+    def __init__(
+        self,
+        blueprint_name: str,
+        blueprint_version: str,
+        workflow_name: str,
+        workflow_inputs: Dict[str, Any] = None,
+        workflow_mode: WorkflowMode = WorkflowMode.SYNC,
+    ) -> None:
+        """Initialize workflow execution.
+
+        Get all needed information to execute workflow.
+
+        Args:
+            blueprint_name (str): Blueprint name to execute workflow from.
+            blueprint_version (str): Blueprint version.
+            workflow_name (str): Name of the workflow to execute
+            workflow_inputs (Dict[str, Any], optional): Key-value workflow inputs. Defaults to None.
+            workflow_mode (WorkflowMode, optional): Workflow execution mode. It can be run synchronously or
+                asynchronously. Defaults to WorkflowMode.SYNC.
+        """
+        self.blueprint_name: str = blueprint_name
+        self.blueprint_version: str = blueprint_version
+        self.workflow_name: str = workflow_name
+        if workflow_inputs is None:
+            workflow_inputs = {}
+        self.workflow_inputs: Dict[str, Any] = workflow_inputs
+        self.workflow_mode: WorkflowMode = workflow_mode
+
+    @property
+    def message(self) -> ExecutionServiceInput:
+        """Workflow execution protobuf message.
+
+        This message is going to be sent to gRPC server to execute workflow.
+
+        Returns:
+            ExecutionServiceInput: Properly filled protobuf message.
+        """
+        execution_msg: ExecutionServiceInput = ExecutionServiceInput()
+        execution_msg.actionIdentifiers.mode = self.workflow_mode.value
+        execution_msg.actionIdentifiers.blueprintName = self.blueprint_name
+        execution_msg.actionIdentifiers.blueprintVersion = self.blueprint_version
+        execution_msg.actionIdentifiers.actionName = self.workflow_name
+        execution_msg.payload.update({f"{self.workflow_name}-request": self.workflow_inputs})
+        return execution_msg
+
+
+class WorkflowExecutionResult:
+    """Result of workflow execution.
+
+    Store both workflow data and the result returns by server.
+    """
+
+    def __init__(self, workflow_execution: WorkflowExecution, execution_output: ExecutionServiceOutput) -> None:
+        """Initialize workflow execution result object.
+
+        Stores workflow execution data and execution result.
+
+        Args:
+            workflow_execution (WorkflowExecution): WorkflowExecution object which was used to call request.
+            execution_output (ExecutionServiceOutput): gRPC server response.
+        """
+        self.workflow_execution: WorkflowExecution = workflow_execution
+        self.execution_output: ExecutionServiceOutput = execution_output
+
+    @property
+    def blueprint_name(self) -> str:
+        """Name of blueprint used to call workflow.
+
+        This value is taken from server response not request (should be the same).
+
+        Returns:
+            str: Blueprint name
+        """
+        return self.execution_output.actionIdentifiers.blueprintName
+
+    @property
+    def blueprint_version(self) -> str:
+        """Blueprint version.
+
+        This value is taken from server response not request (should be the same).
+
+        Returns:
+            str: Blueprint version
+        """
+        return self.execution_output.actionIdentifiers.blueprintVersion
+
+    @property
+    def workflow_name(self) -> str:
+        """Workflow name.
+
+        This value is taken from server response not request (should be the same).
+
+        Returns:
+            str: Workflow name
+        """
+        return self.execution_output.actionIdentifiers.actionName
+
+    @property
+    def has_error(self) -> bool:
+        """Returns bool if request returns error or not.
+
+        Returns:
+            bool: True if response has status code different than 200
+        """
+        return self.execution_output.status.code != 200
+
+    @property
+    def error_message(self) -> str:
+        """Error message.
+
+        This property is available only if response has error. Otherwise AttributeError will be raised.
+
+        Raises:
+            AttributeError: Response has 200 response code and hasn't error message.
+
+        Returns:
+            str: Error message returned by server
+        """
+        if self.has_error:
+            return self.execution_output.status.errorMessage
+        raise AttributeError("Execution does not finish with error")
+
+    @property
+    def payload(self) -> dict:
+        """Response payload.
+
+        Payload retured by the server is migrated to Python dict.
+
+        Returns:
+            dict: Response's payload.
+        """
+        return json_format.MessageToDict(self.execution_output.payload)
+
+
+class ResourceResolution:
+    """Resource resolution class.
+
+    Helper class to connect to blueprintprocessor's gRPC server, send request to execute workflow and parse responses.
+    Blueprint with workflow must be deployed before workflow request.
+    It's possible to create both secre or unsecure connection (without SSL/TLS).
+    """
+
+    def __init__(
+        self,
+        *,
+        server_address: str = "127.0.0.1",
+        server_port: int = "9111",
+        use_ssl: bool = False,
+        root_certificates: bytes = None,
+        private_key: bytes = None,
+        certificate_chain: bytes = None,
+        # Authentication header configuration
+        use_header_auth: bool = False,
+        header_auth_token: str = None,
+    ) -> None:
+        """Resource resolution object initialization.
+
+        Args:
+            server_address (str, optional): gRPC server address. Defaults to "127.0.0.1".
+            server_port (int, optional): gRPC server address port. Defaults to "9111".
+            use_ssl (bool, optional): Boolean flag to determine if secure channel should be created or not.
+                Defaults to False.
+            root_certificates (bytes, optional): The PEM-encoded root certificates. None if it shouldn't be used.
+                Defaults to None.
+            private_key (bytes, optional): The PEM-encoded private key as a byte string, or None if no private key
+                should be used. Defaults to None.
+            certificate_chain (bytes, optional): The PEM-encoded certificate chain as a byte string to use or or None if
+                no certificate chain should be used. Defaults to None.
+            use_header_auth (bool, optional): Boolean flag to determine if authorization headed shoud be added for
+                every call or not. Defaults to False.
+            header_auth_token (str, optional): Authorization token value. Defaults to None.
+        """
+        # Logger
+        self.logger: Logger = getLogger(__name__)
+        # Client settings
+        self.client_server_address: str = server_address
+        self.client_server_port: str = server_port
+        self.client_use_ssl: bool = use_ssl
+        self.client_root_certificates: bytes = root_certificates
+        self.client_private_key: bytes = private_key
+        self.client_certificate_chain: bytes = certificate_chain
+        self.client_use_header_auth: bool = use_header_auth
+        self.client_header_auth_token: str = header_auth_token
+        self.client: Client = None
+
+    def __enter__(self) -> "ResourceResolution":
+        """Enter ResourceResolution instance context.
+
+        Client connection is created.
+        """
+        self.client = Client(
+            server_address=f"{self.client_server_address}:{self.client_server_port}",
+            use_ssl=self.client_use_ssl,
+            root_certificates=self.client_root_certificates,
+            private_key=self.client_private_key,
+            certificate_chain=self.client_certificate_chain,
+            use_header_auth=self.client_use_header_auth,
+            header_auth_token=self.client_header_auth_token,
+        )
+        return self
+
+    def __exit__(
+        self,
+        unused_exc_type: Optional[Type[BaseException]],
+        unused_exc_value: Optional[BaseException],
+        unused_traceback: Optional[TracebackType],
+    ) -> None:
+        """Exit ResourceResolution instance context.
+
+        Client connection is closed.
+        """
+        self.client.close()
+
+    def execute_workflows(self, *workflows: WorkflowExecution) -> Generator[WorkflowExecutionResult, None, None]:
+        """Execute provided workflows.
+
+        Workflows are going to be execured using one gRPC API call. Depends of implementation that may has
+        some consequences. In some cases if any request fails all requests after that won't be called.
+
+        Responses and zipped with workflows and WorkflowExecutionResult object is initialized and yielded.
+
+        Raises:
+            AttributeError: Raises if client object is not created. It occurs only if you not uses context manager.
+                Then user have to create client instance for ResourceResolution object by himself calling:
+                ```
+                resource_resoulution.client = Client(
+                    server_address=f"{resource_resoulution.client_server_address}:{resource_resoulution.client_server_port}",
+                    use_ssl=resource_resoulution.client_use_ssl,
+                    root_certificates=resource_resoulution.client_root_certificates,
+                    private_key=resource_resoulution.client_private_key,
+                    certificate_chain=resource_resoulution.client_certificate_chain,
+                    use_header_auth=resource_resoulution.client_use_header_auth,
+                    header_auth_token=resource_resoulution.client_header_auth_token,
+                )
+                ```
+                Remeber also to close client connection.
+
+        Returns:
+            Generator[WorkflowExecutionResult, None, None]: WorkflowExecutionResult object
+                with both WorkflowExection object and server response for it's request.
+        """
+        self.logger.debug("Execute workflows")
+        if not self.client:
+            raise AttributeError("gRPC client not connected")
+
+        for response, workflow in zip(self.client.process((workflow.message for workflow in workflows)), workflows):
+            yield WorkflowExecutionResult(workflow, response)
diff --git a/ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py b/ms/py-executor/resource_resolution/tests/authorization_interceptor_test.py
new file mode 100644 (file)
index 0000000..4b03f0b
--- /dev/null
@@ -0,0 +1,50 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from unittest.mock import MagicMock, _Call
+
+import pytest
+
+from resource_resolution.authorization import AuthTokenInterceptor, NewClientCallDetails
+
+
+def test_resource_resolution_auth_token_interceptor():
+    """Test AuthTokenInterceptor class.
+
+     - Checks if it's correctly set default value.
+     - Checks if it's correctly set passed values.
+     - Checks if it's correctly checked if all header characters are lowercase.
+     - Checks if continuation function is called with headers setted 
+    """
+    interceptor: AuthTokenInterceptor = AuthTokenInterceptor("test_token", header="header")
+    assert interceptor.token == "test_token"
+    assert interceptor.header == "header"
+
+    interceptor: AuthTokenInterceptor = AuthTokenInterceptor("test_token")
+    assert interceptor.token == "test_token"
+    assert interceptor.header == "authorization"
+
+    with pytest.raises(ValueError):
+        AuthTokenInterceptor("test_token", header="Auth")
+
+    continuation_mock: MagicMock = MagicMock()
+    client_call_details: MagicMock = MagicMock()
+    request_iterator: MagicMock = MagicMock()
+
+    interceptor.intercept_stream_stream(continuation_mock, client_call_details, request_iterator)
+    continuation_mock.assert_called_once()
+    client_call_details_argument: _Call = continuation_mock.call_args_list[0][0][0]  # Get NewClientCallDetails instance
+    assert isinstance(client_call_details_argument, NewClientCallDetails)
+    assert client_call_details_argument.metadata[0] == (interceptor.header, interceptor.token)
diff --git a/ms/py-executor/resource_resolution/tests/resource_resolution_test.py b/ms/py-executor/resource_resolution/tests/resource_resolution_test.py
new file mode 100644 (file)
index 0000000..8a41357
--- /dev/null
@@ -0,0 +1,105 @@
+"""Copyright 2020 Deutsche Telekom.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+
+from google.protobuf import json_format
+from pytest import raises
+
+from resource_resolution.resource_resolution import (
+    ExecutionServiceInput,
+    ExecutionServiceOutput,
+    WorkflowExecution,
+    WorkflowExecutionResult,
+    WorkflowMode,
+)
+
+
+def test_workflow_execution_class():
+    """Workflow execution class tests.
+
+    - Test initialization and default values
+    - Test request message formatting
+    """
+    # Without inputs
+    workflow_execution: WorkflowExecution = WorkflowExecution("test blueprint", "test version", "test workflow")
+    assert workflow_execution.blueprint_name == "test blueprint"
+    assert workflow_execution.blueprint_version == "test version"
+    assert workflow_execution.workflow_name == "test workflow"
+    assert workflow_execution.workflow_inputs == {}
+    assert workflow_execution.workflow_mode == WorkflowMode.SYNC
+
+    msg: ExecutionServiceInput = workflow_execution.message
+    msg_dict: dict = json_format.MessageToDict(msg)
+    assert msg_dict["actionIdentifiers"]["blueprintName"] == "test blueprint"
+    assert msg_dict["actionIdentifiers"]["blueprintVersion"] == "test version"
+    assert msg_dict["actionIdentifiers"]["actionName"] == "test workflow"
+    assert msg_dict["actionIdentifiers"]["mode"] == "sync"
+    assert list(msg_dict["payload"].keys())[0] == "test workflow-request"
+    assert msg_dict["payload"]["test workflow-request"] == {}
+
+    # With inputs
+    workflow_execution: WorkflowExecution = WorkflowExecution(
+        "test blueprint2",
+        "test version2",
+        "test workflow2",
+        workflow_inputs={"test": "test"},
+        workflow_mode=WorkflowMode.ASYNC,
+    )
+    assert workflow_execution.blueprint_name == "test blueprint2"
+    assert workflow_execution.blueprint_version == "test version2"
+    assert workflow_execution.workflow_name == "test workflow2"
+    assert workflow_execution.workflow_inputs == {"test": "test"}
+    assert workflow_execution.workflow_mode == WorkflowMode.ASYNC
+
+    msg: ExecutionServiceInput = workflow_execution.message
+    msg_dict: dict = json_format.MessageToDict(msg)
+    assert msg_dict["actionIdentifiers"]["blueprintName"] == "test blueprint2"
+    assert msg_dict["actionIdentifiers"]["blueprintVersion"] == "test version2"
+    assert msg_dict["actionIdentifiers"]["actionName"] == "test workflow2"
+    assert msg_dict["actionIdentifiers"]["mode"] == "async"
+    assert list(msg_dict["payload"].keys())[0] == "test workflow2-request"
+    assert msg_dict["payload"]["test workflow2-request"] == {"test": "test"}
+
+
+def test_workflow_execution_result_class():
+    """Workflow execution result class tests.
+
+    - Test initizalization and default values
+    - Test `has_error` property
+    - Test `error_message` property
+    - Test payload formatting
+    """
+    workflow_execution: WorkflowExecution = WorkflowExecution("test blueprint", "test version", "test workflow")
+    execution_output: ExecutionServiceOutput = ExecutionServiceOutput()
+    execution_output.actionIdentifiers.blueprintName = "test blueprint"
+    execution_output.actionIdentifiers.blueprintVersion = "test version"
+    execution_output.actionIdentifiers.actionName = "test workflow"
+    execution_output.status.code = 200
+
+    execution_result: WorkflowExecutionResult = WorkflowExecutionResult(workflow_execution, execution_output)
+    assert not execution_result.has_error
+    with raises(AttributeError):
+        execution_result.error_message
+    assert execution_result.payload == {}
+    assert execution_result.blueprint_name == "test blueprint"
+    assert execution_result.blueprint_version == "test version"
+    assert execution_result.workflow_name == "test workflow"
+
+    execution_output.payload.update({"test_key": "test_value"})
+    execution_result: WorkflowExecutionResult = WorkflowExecutionResult(workflow_execution, execution_output)
+    assert execution_result.payload == {"test_key": "test_value"}
+
+    execution_output.status.code = 500
+    assert execution_result.has_error
+    assert execution_result.error_message == ""