Merge "Adding delete catalog"
[ccsdk/cds.git] / ms / py-executor / resource_resolution / resource_resolution.py
1 """Copyright 2020 Deutsche Telekom.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7     http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 """
15
16 from enum import Enum, unique
17 from logging import Logger, getLogger
18 from types import TracebackType
19 from typing import Any, Dict, Generator, Optional, Type
20
21 from google.protobuf import json_format
22
23 from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput
24
25 from .client import Client
26
27
28 @unique
29 class WorkflowMode(Enum):
30     """Workflow mode enumerator.
31
32     Workflow can be executed in two modes: synchronously and asynchronously.
33     This enumerator stores valid values to set the mode: SYNC for synchronously mode and ASYNC for asynchronously.
34     """
35
36     SYNC = "sync"
37     ASYNC = "async"
38
39
40 class WorkflowExecution:
41     """Wokflow execution class.
42
43     Describes workflow to call. Set blueprint name and version and workflow name to execute.
44     Workflow inputs are optional, by default set to empty directory.
45     Workflow mode is also optional. It is set by default to call workflow synchronously.
46     """
47
48     def __init__(
49         self,
50         blueprint_name: str,
51         blueprint_version: str,
52         workflow_name: str,
53         workflow_inputs: Dict[str, Any] = None,
54         workflow_mode: WorkflowMode = WorkflowMode.SYNC,
55     ) -> None:
56         """Initialize workflow execution.
57
58         Get all needed information to execute workflow.
59
60         Args:
61             blueprint_name (str): Blueprint name to execute workflow from.
62             blueprint_version (str): Blueprint version.
63             workflow_name (str): Name of the workflow to execute
64             workflow_inputs (Dict[str, Any], optional): Key-value workflow inputs. Defaults to None.
65             workflow_mode (WorkflowMode, optional): Workflow execution mode. It can be run synchronously or
66                 asynchronously. Defaults to WorkflowMode.SYNC.
67         """
68         self.blueprint_name: str = blueprint_name
69         self.blueprint_version: str = blueprint_version
70         self.workflow_name: str = workflow_name
71         if workflow_inputs is None:
72             workflow_inputs = {}
73         self.workflow_inputs: Dict[str, Any] = workflow_inputs
74         self.workflow_mode: WorkflowMode = workflow_mode
75
76     @property
77     def message(self) -> ExecutionServiceInput:
78         """Workflow execution protobuf message.
79
80         This message is going to be sent to gRPC server to execute workflow.
81
82         Returns:
83             ExecutionServiceInput: Properly filled protobuf message.
84         """
85         execution_msg: ExecutionServiceInput = ExecutionServiceInput()
86         execution_msg.actionIdentifiers.mode = self.workflow_mode.value
87         execution_msg.actionIdentifiers.blueprintName = self.blueprint_name
88         execution_msg.actionIdentifiers.blueprintVersion = self.blueprint_version
89         execution_msg.actionIdentifiers.actionName = self.workflow_name
90         execution_msg.payload.update({f"{self.workflow_name}-request": self.workflow_inputs})
91         return execution_msg
92
93
94 class WorkflowExecutionResult:
95     """Result of workflow execution.
96
97     Store both workflow data and the result returns by server.
98     """
99
100     def __init__(self, workflow_execution: WorkflowExecution, execution_output: ExecutionServiceOutput) -> None:
101         """Initialize workflow execution result object.
102
103         Stores workflow execution data and execution result.
104
105         Args:
106             workflow_execution (WorkflowExecution): WorkflowExecution object which was used to call request.
107             execution_output (ExecutionServiceOutput): gRPC server response.
108         """
109         self.workflow_execution: WorkflowExecution = workflow_execution
110         self.execution_output: ExecutionServiceOutput = execution_output
111
112     @property
113     def blueprint_name(self) -> str:
114         """Name of blueprint used to call workflow.
115
116         This value is taken from server response not request (should be the same).
117
118         Returns:
119             str: Blueprint name
120         """
121         return self.execution_output.actionIdentifiers.blueprintName
122
123     @property
124     def blueprint_version(self) -> str:
125         """Blueprint version.
126
127         This value is taken from server response not request (should be the same).
128
129         Returns:
130             str: Blueprint version
131         """
132         return self.execution_output.actionIdentifiers.blueprintVersion
133
134     @property
135     def workflow_name(self) -> str:
136         """Workflow name.
137
138         This value is taken from server response not request (should be the same).
139
140         Returns:
141             str: Workflow name
142         """
143         return self.execution_output.actionIdentifiers.actionName
144
145     @property
146     def has_error(self) -> bool:
147         """Returns bool if request returns error or not.
148
149         Returns:
150             bool: True if response has status code different than 200
151         """
152         return self.execution_output.status.code != 200
153
154     @property
155     def error_message(self) -> str:
156         """Error message.
157
158         This property is available only if response has error. Otherwise AttributeError will be raised.
159
160         Raises:
161             AttributeError: Response has 200 response code and hasn't error message.
162
163         Returns:
164             str: Error message returned by server
165         """
166         if self.has_error:
167             return self.execution_output.status.errorMessage
168         raise AttributeError("Execution does not finish with error")
169
170     @property
171     def payload(self) -> dict:
172         """Response payload.
173
174         Payload retured by the server is migrated to Python dict.
175
176         Returns:
177             dict: Response's payload.
178         """
179         return json_format.MessageToDict(self.execution_output.payload)
180
181
182 class ResourceResolution:
183     """Resource resolution class.
184
185     Helper class to connect to blueprintprocessor's gRPC server, send request to execute workflow and parse responses.
186     Blueprint with workflow must be deployed before workflow request.
187     It's possible to create both secre or unsecure connection (without SSL/TLS).
188     """
189
190     def __init__(
191         self,
192         *,
193         server_address: str = "127.0.0.1",
194         server_port: int = "9111",
195         use_ssl: bool = False,
196         root_certificates: bytes = None,
197         private_key: bytes = None,
198         certificate_chain: bytes = None,
199         # Authentication header configuration
200         use_header_auth: bool = False,
201         header_auth_token: str = None,
202     ) -> None:
203         """Resource resolution object initialization.
204
205         Args:
206             server_address (str, optional): gRPC server address. Defaults to "127.0.0.1".
207             server_port (int, optional): gRPC server address port. Defaults to "9111".
208             use_ssl (bool, optional): Boolean flag to determine if secure channel should be created or not.
209                 Defaults to False.
210             root_certificates (bytes, optional): The PEM-encoded root certificates. None if it shouldn't be used.
211                 Defaults to None.
212             private_key (bytes, optional): The PEM-encoded private key as a byte string, or None if no private key
213                 should be used. Defaults to None.
214             certificate_chain (bytes, optional): The PEM-encoded certificate chain as a byte string to use or or None if
215                 no certificate chain should be used. Defaults to None.
216             use_header_auth (bool, optional): Boolean flag to determine if authorization headed shoud be added for
217                 every call or not. Defaults to False.
218             header_auth_token (str, optional): Authorization token value. Defaults to None.
219         """
220         # Logger
221         self.logger: Logger = getLogger(__name__)
222         # Client settings
223         self.client_server_address: str = server_address
224         self.client_server_port: str = server_port
225         self.client_use_ssl: bool = use_ssl
226         self.client_root_certificates: bytes = root_certificates
227         self.client_private_key: bytes = private_key
228         self.client_certificate_chain: bytes = certificate_chain
229         self.client_use_header_auth: bool = use_header_auth
230         self.client_header_auth_token: str = header_auth_token
231         self.client: Client = None
232
233     def __enter__(self) -> "ResourceResolution":
234         """Enter ResourceResolution instance context.
235
236         Client connection is created.
237         """
238         self.client = Client(
239             server_address=f"{self.client_server_address}:{self.client_server_port}",
240             use_ssl=self.client_use_ssl,
241             root_certificates=self.client_root_certificates,
242             private_key=self.client_private_key,
243             certificate_chain=self.client_certificate_chain,
244             use_header_auth=self.client_use_header_auth,
245             header_auth_token=self.client_header_auth_token,
246         )
247         return self
248
249     def __exit__(
250         self,
251         unused_exc_type: Optional[Type[BaseException]],
252         unused_exc_value: Optional[BaseException],
253         unused_traceback: Optional[TracebackType],
254     ) -> None:
255         """Exit ResourceResolution instance context.
256
257         Client connection is closed.
258         """
259         self.client.close()
260
261     def execute_workflows(self, *workflows: WorkflowExecution) -> Generator[WorkflowExecutionResult, None, None]:
262         """Execute provided workflows.
263
264         Workflows are going to be execured using one gRPC API call. Depends of implementation that may has
265         some consequences. In some cases if any request fails all requests after that won't be called.
266
267         Responses and zipped with workflows and WorkflowExecutionResult object is initialized and yielded.
268
269         Raises:
270             AttributeError: Raises if client object is not created. It occurs only if you not uses context manager.
271                 Then user have to create client instance for ResourceResolution object by himself calling:
272                 ```
273                 resource_resoulution.client = Client(
274                     server_address=f"{resource_resoulution.client_server_address}:{resource_resoulution.client_server_port}",
275                     use_ssl=resource_resoulution.client_use_ssl,
276                     root_certificates=resource_resoulution.client_root_certificates,
277                     private_key=resource_resoulution.client_private_key,
278                     certificate_chain=resource_resoulution.client_certificate_chain,
279                     use_header_auth=resource_resoulution.client_use_header_auth,
280                     header_auth_token=resource_resoulution.client_header_auth_token,
281                 )
282                 ```
283                 Remeber also to close client connection.
284
285         Returns:
286             Generator[WorkflowExecutionResult, None, None]: WorkflowExecutionResult object
287                 with both WorkflowExection object and server response for it's request.
288         """
289         self.logger.debug("Execute workflows")
290         if not self.client:
291             raise AttributeError("gRPC client not connected")
292
293         for response, workflow in zip(self.client.process((workflow.message for workflow in workflows)), workflows):
294             yield WorkflowExecutionResult(workflow, response)