1 """Copyright 2020 Deutsche Telekom.
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
7 http://www.apache.org/licenses/LICENSE-2.0
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
17 from dataclasses import dataclass, field
18 from enum import Enum, unique
19 from logging import Logger, getLogger
21 from types import TracebackType
22 from typing import Any, Dict, Generator, Optional, Type
24 from google.protobuf import json_format
26 from proto.BluePrintProcessing_pb2 import ExecutionServiceInput, ExecutionServiceOutput
28 from .grpc import Client as GrpcClient
29 from .http import Client as HttpClient
33 class WorkflowMode(Enum):
34 """Workflow mode enumerator.
36 Workflow can be executed in two modes: synchronously and asynchronously.
37 This enumerator stores valid values to set the mode: SYNC for synchronously mode and ASYNC for asynchronously.
44 class WorkflowExecution:
45 """Wokflow execution class.
47 Describes workflow to call. Set blueprint name and version and workflow name to execute.
48 Workflow inputs are optional, by default set to empty directory.
49 Workflow mode is also optional. It is set by default to call workflow synchronously.
55 blueprint_version: str,
57 workflow_inputs: Dict[str, Any] = None,
58 workflow_mode: WorkflowMode = WorkflowMode.SYNC,
60 """Initialize workflow execution.
62 Get all needed information to execute workflow.
65 blueprint_name (str): Blueprint name to execute workflow from.
66 blueprint_version (str): Blueprint version.
67 workflow_name (str): Name of the workflow to execute
68 workflow_inputs (Dict[str, Any], optional): Key-value workflow inputs. Defaults to None.
69 workflow_mode (WorkflowMode, optional): Workflow execution mode. It can be run synchronously or
70 asynchronously. Defaults to WorkflowMode.SYNC.
72 self.blueprint_name: str = blueprint_name
73 self.blueprint_version: str = blueprint_version
74 self.workflow_name: str = workflow_name
75 if workflow_inputs is None:
77 self.workflow_inputs: Dict[str, Any] = workflow_inputs
78 self.workflow_mode: WorkflowMode = workflow_mode
81 def message(self) -> ExecutionServiceInput:
82 """Workflow execution protobuf message.
84 This message is going to be sent to gRPC server to execute workflow.
87 ExecutionServiceInput: Properly filled protobuf message.
89 execution_msg: ExecutionServiceInput = ExecutionServiceInput()
90 execution_msg.actionIdentifiers.mode = self.workflow_mode.value
91 execution_msg.actionIdentifiers.blueprintName = self.blueprint_name
92 execution_msg.actionIdentifiers.blueprintVersion = self.blueprint_version
93 execution_msg.actionIdentifiers.actionName = self.workflow_name
94 execution_msg.payload.update({f"{self.workflow_name}-request": self.workflow_inputs})
98 class WorkflowExecutionResult:
99 """Result of workflow execution.
101 Store both workflow data and the result returns by server.
104 def __init__(self, workflow_execution: WorkflowExecution, execution_output: ExecutionServiceOutput) -> None:
105 """Initialize workflow execution result object.
107 Stores workflow execution data and execution result.
110 workflow_execution (WorkflowExecution): WorkflowExecution object which was used to call request.
111 execution_output (ExecutionServiceOutput): gRPC server response.
113 self.workflow_execution: WorkflowExecution = workflow_execution
114 self.execution_output: ExecutionServiceOutput = execution_output
117 def blueprint_name(self) -> str:
118 """Name of blueprint used to call workflow.
120 This value is taken from server response not request (should be the same).
125 return self.execution_output.actionIdentifiers.blueprintName
128 def blueprint_version(self) -> str:
129 """Blueprint version.
131 This value is taken from server response not request (should be the same).
134 str: Blueprint version
136 return self.execution_output.actionIdentifiers.blueprintVersion
139 def workflow_name(self) -> str:
142 This value is taken from server response not request (should be the same).
147 return self.execution_output.actionIdentifiers.actionName
150 def has_error(self) -> bool:
151 """Returns bool if request returns error or not.
154 bool: True if response has status code different than 200
156 return self.execution_output.status.code != 200
159 def error_message(self) -> str:
162 This property is available only if response has error. Otherwise AttributeError will be raised.
165 AttributeError: Response has 200 response code and hasn't error message.
168 str: Error message returned by server
171 return self.execution_output.status.errorMessage
172 raise AttributeError("Execution does not finish with error")
175 def payload(self) -> dict:
178 Payload retured by the server is migrated to Python dict.
181 dict: Response's payload.
183 return json_format.MessageToDict(self.execution_output.payload)
188 """Template dataclass.
190 Store resolved template data.
191 It keeps also ResourceResolution object to call `store_template` method.
194 resource_resolution: "ResourceResolution" = field(repr=False)
196 blueprint_version: str
197 artifact_name: str = None
199 resolution_key: str = None
200 resource_type: str = None
201 resource_id: str = None
203 def store(self) -> None:
204 """Store template using blueprintprocessor HTTP API.
206 It uses ResourceResolution `store_template` method.
208 self.resource_resolution.store_template(
209 blueprint_name=self.blueprint_name,
210 blueprint_version=self.blueprint_version,
211 artifact_name=self.artifact_name,
213 resolution_key=self.resolution_key,
214 resource_type=self.resource_type,
215 resource_id=self.resource_id,
219 class ResourceResolution:
220 """Resource resolution class.
222 Helper class to connect to blueprintprocessor's gRPC server, send request to execute workflow and parse responses.
223 Blueprint with workflow must be deployed before workflow request.
224 It's possible to create both secre or unsecure connection (without SSL/TLS).
230 server_address: str = "127.0.0.1",
231 # GRPC client configuration
232 grpc_server_port: int = 9111,
233 use_ssl: bool = False,
234 root_certificates: bytes = None,
235 private_key: bytes = None,
236 certificate_chain: bytes = None,
237 # Authentication header configuration for GRPC client
238 use_header_auth: bool = False,
239 header_auth_token: str = None,
240 # HTTP client configuration
241 http_server_port: int = 8080,
242 http_auth_user: str = None,
243 http_auth_pass: str = None,
244 http_use_ssl: bool = True,
246 """Resource resolution object initialization.
249 server_address (str, optional): gRPC server address. Defaults to "127.0.0.1".
250 grpc_server_port (int, optional): gRPC server address port. Defaults to 9111.
251 use_ssl (bool, optional): Boolean flag to determine if secure channel should be created or not.
253 root_certificates (bytes, optional): The PEM-encoded root certificates. None if it shouldn't be used.
255 private_key (bytes, optional): The PEM-encoded private key as a byte string, or None if no private key
256 should be used. Defaults to None.
257 certificate_chain (bytes, optional): The PEM-encoded certificate chain as a byte string to use or or None if
258 no certificate chain should be used. Defaults to None.
259 use_header_auth (bool, optional): Boolean flag to determine if authorization headed shoud be added for
260 every call or not. Defaults to False.
261 header_auth_token (str, optional): Authorization token value. Defaults to None.
262 If no value is provided "AUTH_TOKEN" environment variable will be used.
263 http_server_port (int, optional): HTTP server address port. Defaults to 8080.
264 http_auth_user (str, optional): Username used for HTTP requests authorization. Defaults to None.
265 If no value is provided "API_USERNAME" environment variable will be used.
266 http_auth_pass (str, optional): Password used for HTTP requests authorization. Defaults to None.
267 If no value is provided "API_PASSWORD" environment variable will be used.
268 http_use_ssl (bool, optional): Determines if secure connection should be used for HTTP requests.
272 self.logger: Logger = getLogger(__name__)
273 # GrpcClient settings
274 self.grpc_client_server_address: str = server_address
275 self.grpc_client_server_port: str = grpc_server_port
276 self.grpc_client_use_ssl: bool = use_ssl
277 self.grpc_client_root_certificates: bytes = root_certificates
278 self.grpc_client_private_key: bytes = private_key
279 self.grpc_client_certificate_chain: bytes = certificate_chain
280 self.grpc_client_use_header_auth: bool = use_header_auth
281 self.grpc_client_header_auth_token: str = header_auth_token or getenv("AUTH_TOKEN")
282 self.grpc_client: GrpcClient = None
283 # HttpClient settings
284 self.http_client: HttpClient = HttpClient(
286 server_port=http_server_port,
287 auth_user=http_auth_user or getenv("API_USERNAME"),
288 auth_pass=http_auth_pass or getenv("API_PASSWORD"),
289 use_ssl=http_use_ssl,
292 def __enter__(self) -> "ResourceResolution":
293 """Enter ResourceResolution instance context.
295 GrpcClient connection is created.
297 self.grpc_client = GrpcClient(
298 server_address=f"{self.grpc_client_server_address}:{self.grpc_client_server_port}",
299 use_ssl=self.grpc_client_use_ssl,
300 root_certificates=self.grpc_client_root_certificates,
301 private_key=self.grpc_client_private_key,
302 certificate_chain=self.grpc_client_certificate_chain,
303 use_header_auth=self.grpc_client_use_header_auth,
304 header_auth_token=self.grpc_client_header_auth_token,
310 unused_exc_type: Optional[Type[BaseException]],
311 unused_exc_value: Optional[BaseException],
312 unused_traceback: Optional[TracebackType],
314 """Exit ResourceResolution instance context.
316 GrpcClient connection is closed.
318 self.grpc_client.close()
320 def execute_workflows(self, *workflows: WorkflowExecution) -> Generator[WorkflowExecutionResult, None, None]:
321 """Execute provided workflows.
323 Workflows are going to be execured using one gRPC API call. Depends of implementation that may has
324 some consequences. In some cases if any request fails all requests after that won't be called.
326 Responses and zipped with workflows and WorkflowExecutionResult object is initialized and yielded.
329 AttributeError: Raises if client object is not created. It occurs only if you not uses context manager.
330 Then user have to create client instance for ResourceResolution object by himself calling:
332 resource_resoulution.client = GrpcClient(
333 server_address=f"{resource_resoulution.client_server_address}:{resource_resoulution.client_server_port}",
334 use_ssl=resource_resoulution.client_use_ssl,
335 root_certificates=resource_resoulution.client_root_certificates,
336 private_key=resource_resoulution.client_private_key,
337 certificate_chain=resource_resoulution.client_certificate_chain,
338 use_header_auth=resource_resoulution.client_use_header_auth,
339 header_auth_token=resource_resoulution.client_header_auth_token,
342 Remeber also to close client connection.
345 Generator[WorkflowExecutionResult, None, None]: WorkflowExecutionResult object
346 with both WorkflowExection object and server response for it's request.
348 self.logger.debug("Execute workflows")
349 if not self.grpc_client:
350 raise AttributeError("gRPC client not connected")
352 for response, workflow in zip(
353 self.grpc_client.process((workflow.message for workflow in workflows)), workflows
355 yield WorkflowExecutionResult(workflow, response)
357 def _check_template_resolve_params(
358 self, resolution_key: str = None, resource_type: str = None, resource_id: str = None
360 """Check template API request parameters.
362 It's possible to store/retrieve templates using pair of artifact name and resolution key OR
363 resource type and resource id. This method checks if valid combination of parameters were used.
366 resolution_key (str, optional): resolutionKey HTTP request parameter value. Defaults to None.
367 resource_type (str, optional): resourceType HTTP request parameter value. Defaults to None.
368 resource_id (str, optional): resourceId HTTP request parameter value. Defaults to None.
371 AttributeError: Invalid combination of parametes used
373 if not any([resolution_key, all([resource_type, resource_id])]):
374 raise AttributeError(
375 "To store/retrieve template resolution_key and artifact_name or both resource_type and resource_id have to be provided"
381 blueprint_version: str,
384 resolution_key: str = None,
385 resource_type: str = None,
386 resource_id: str = None,
388 """Store template using blueprintprocessor HTTP API.
390 Prepare and send a request to store resolved template using blueprint name, blueprint version
391 and pair of artifact name and resolution key OR resource type and resource id.
393 Method returns Template dataclass, which stores all template data and can be used to update
397 blueprint_name (str): Blueprint name
398 blueprint_version (str): Blueprint version
399 result (str): Template result
400 artifact_name (str): Artifact name
401 resolution_key (str, optional): Resolution key. Defaults to None.
402 resource_type (str, optional): Resource type. Defaults to None.
403 resource_id (str, optional): Resource ID. Defaults to None.
405 self.logger.debug("Store template")
406 self._check_template_resolve_params(resolution_key, resource_type, resource_id)
407 base_endpoint: str = f"template/{blueprint_name}/{blueprint_version}"
408 if resolution_key and artifact_name:
409 endpoint: str = f"{base_endpoint}/{artifact_name}/{resolution_key}"
411 endpoint: str = f"{base_endpoint}/{resource_type}/{resource_id}"
412 response = self.http_client.send_request(
413 "POST", endpoint, headers={"Content-Type": "application/json"}, data=json.dumps({"result": result})
416 def retrieve_template(
419 blueprint_version: str,
421 resolution_key: str = None,
422 resource_type: str = None,
423 resource_id: str = None,
425 """Get stored template using blueprintprocessor's HTTP API.
427 Prepare and send a request to retrieve resolved template using blueprint name, blueprint version
428 and pair of artifact name and resolution key OR resource type and resource id.
431 blueprint_name (str): Blueprint name
432 blueprint_version (str): Blueprint version
433 artifact_name (str): Artifact name
434 resolution_key (str, optional): Resolution key. Defaults to None.
435 resource_type (str, optional): Resource type. Defaults to None.
436 resource_id (str, optional): Resource ID. Defaults to None.
438 self.logger.debug("Retrieve template")
439 self._check_template_resolve_params(resolution_key, resource_type, resource_id)
440 params: dict = {"bpName": blueprint_name, "bpVersion": blueprint_version, "artifactName": artifact_name}
442 params.update({"resolutionKey": resolution_key})
444 params.update({"resourceType": resource_type, "resourceId": resource_id})
445 response = self.http_client.send_request(
446 "GET", "template", headers={"Accept": "application/json"}, params=params
449 resource_resolution=self,
450 blueprint_name=blueprint_name,
451 blueprint_version=blueprint_version,
452 artifact_name=artifact_name,
453 resolution_key=resolution_key,
454 resource_type=resource_type,
455 resource_id=resource_id,
456 result=response.json()["result"],