07fbce80cfcaf0910e00d4fa8705f1df3ed83e8d
[policy/clamp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2023 Nordix Foundation.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.clamp.acm.participant.kserve.handler;
22
23 import io.kubernetes.client.openapi.ApiException;
24 import java.io.IOException;
25 import java.lang.invoke.MethodHandles;
26 import java.util.HashMap;
27 import java.util.Map;
28 import java.util.UUID;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import javax.validation.Validation;
34 import javax.validation.ValidationException;
35 import lombok.Getter;
36 import lombok.RequiredArgsConstructor;
37 import lombok.Setter;
38 import org.apache.http.HttpStatus;
39 import org.onap.policy.clamp.acm.participant.intermediary.api.AutomationCompositionElementListener;
40 import org.onap.policy.clamp.acm.participant.intermediary.api.ParticipantIntermediaryApi;
41 import org.onap.policy.clamp.acm.participant.kserve.exception.KserveException;
42 import org.onap.policy.clamp.acm.participant.kserve.k8s.InferenceServiceValidator;
43 import org.onap.policy.clamp.acm.participant.kserve.k8s.KserveClient;
44 import org.onap.policy.clamp.acm.participant.kserve.models.ConfigurationEntity;
45 import org.onap.policy.clamp.acm.participant.kserve.models.KserveInferenceEntity;
46 import org.onap.policy.clamp.models.acm.concepts.AcElementDeploy;
47 import org.onap.policy.clamp.models.acm.concepts.DeployState;
48 import org.onap.policy.clamp.models.acm.concepts.LockState;
49 import org.onap.policy.common.utils.coder.Coder;
50 import org.onap.policy.common.utils.coder.CoderException;
51 import org.onap.policy.common.utils.coder.StandardCoder;
52 import org.onap.policy.models.base.PfModelException;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55 import org.springframework.stereotype.Component;
56
57 /**
58  * This class handles implementation of automationCompositionElement updates.
59  */
60 @Component
61 @RequiredArgsConstructor
62 public class AutomationCompositionElementHandler implements AutomationCompositionElementListener {
63
64     private static final Coder CODER = new StandardCoder();
65
66     private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
67
68     private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
69
70     @Setter
71     private ParticipantIntermediaryApi intermediaryApi;
72
73     private final KserveClient kserveClient;
74
75     @Getter
76     private static final Map<UUID, ConfigurationEntity> configRequestMap = new HashMap<>();
77
78
79     private static class ThreadConfig {
80
81         private int uninitializedToPassiveTimeout = 60;
82         private int statusCheckInterval = 30;
83     }
84
85     @Override
86     public void undeploy(UUID automationCompositionId, UUID automationCompositionElementId) {
87         var configurationEntity = configRequestMap.get(automationCompositionElementId);
88         if (configurationEntity != null) {
89             try {
90                 for (KserveInferenceEntity kserveInferenceEntity : configurationEntity.getKserveInferenceEntities()) {
91                     kserveClient.undeployInferenceService(kserveInferenceEntity.getNamespace(),
92                             kserveInferenceEntity.getName());
93                 }
94                 configRequestMap.remove(automationCompositionElementId);
95                 intermediaryApi.updateAutomationCompositionElementState(automationCompositionId,
96                         automationCompositionElementId, DeployState.UNDEPLOYED, LockState.NONE);
97             } catch (IOException | ApiException exception) {
98                 LOGGER.warn("Deletion of Inference service failed", exception);
99             }
100         }
101     }
102
103     /**
104      * Callback method to handle an update on an automation composition element.
105      *
106      * @param automationCompositionId the ID of the automation composition
107      * @param element the information on the automation composition element
108      * @param properties properties Map
109      */
110     @Override
111     public void deploy(UUID automationCompositionId, AcElementDeploy element, Map<String, Object> properties)
112             throws PfModelException {
113         try {
114             var configurationEntity = CODER.convert(properties, ConfigurationEntity.class);
115             var violations = Validation.buildDefaultValidatorFactory().getValidator().validate(configurationEntity);
116             if (violations.isEmpty()) {
117                 boolean isAllInferenceSvcDeployed = true;
118                 var config = CODER.convert(properties, ThreadConfig.class);
119                 for (KserveInferenceEntity kserveInferenceEntity : configurationEntity.getKserveInferenceEntities()) {
120                     kserveClient.deployInferenceService(kserveInferenceEntity.getNamespace(),
121                             kserveInferenceEntity.getPayload());
122
123                     if (!checkInferenceServiceStatus(kserveInferenceEntity.getName(),
124                             kserveInferenceEntity.getNamespace(), config.uninitializedToPassiveTimeout,
125                             config.statusCheckInterval)) {
126                         isAllInferenceSvcDeployed = false;
127                         break;
128                     }
129                 }
130                 if (isAllInferenceSvcDeployed) {
131                     configRequestMap.put(element.getId(), configurationEntity);
132                     intermediaryApi.updateAutomationCompositionElementState(automationCompositionId, element.getId(),
133                             DeployState.DEPLOYED, LockState.LOCKED);
134                 } else {
135                     LOGGER.error("Inference Service deployment failed");
136                 }
137             } else {
138                 LOGGER.error("Violations found in the config request parameters: {}", violations);
139                 throw new ValidationException("Constraint violations in the config request");
140             }
141         } catch (CoderException e) {
142             throw new KserveException(HttpStatus.SC_BAD_REQUEST, "Invalid inference service configuration", e);
143         } catch (InterruptedException e) {
144             Thread.currentThread().interrupt();
145             throw new KserveException("Interrupt in configuring the inference service", e);
146         } catch (IOException | ExecutionException | ApiException e) {
147             throw new KserveException("Failed to configure the inference service", e);
148         }
149     }
150
151     /**
152      * Check the status of Inference Service.
153      *
154      * @param inferenceServiceName name of the inference service
155      * @param namespace            kubernetes namespace
156      * @param timeout              Inference service time check
157      * @param statusCheckInterval  Status check time interval
158      * @return status of the inference service
159      * @throws ExecutionException   Exception on execution
160      * @throws InterruptedException Exception on inference service status check
161      */
162     public boolean checkInferenceServiceStatus(String inferenceServiceName, String namespace, int timeout,
163             int statusCheckInterval) throws ExecutionException, InterruptedException {
164         // Invoke runnable thread to check pod status
165         Future<String> result = executor.submit(
166                 new InferenceServiceValidator(inferenceServiceName, namespace, timeout, statusCheckInterval,
167                         kserveClient), "Done");
168         return (!result.get().isEmpty()) && result.isDone();
169     }
170 }