[SDC] Add kafka native messaging
[sdc.git] / catalog-be / src / main / java / org / openecomp / sdc / be / components / distribution / engine / DistributionNotificationSender.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20 package org.openecomp.sdc.be.components.distribution.engine;
21
22 import org.openecomp.sdc.be.components.kafka.KafkaHandler;
23 import org.openecomp.sdc.be.config.ConfigurationManager;
24 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
25 import org.openecomp.sdc.be.dao.api.ActionStatus;
26 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
27 import org.openecomp.sdc.be.impl.ComponentsUtils;
28 import org.openecomp.sdc.be.model.Service;
29 import org.openecomp.sdc.be.model.User;
30 import org.openecomp.sdc.common.log.wrappers.Logger;
31 import org.springframework.stereotype.Component;
32
33 @Component("distributionNotificationSender")
34 public class DistributionNotificationSender {
35
36     protected static final String DISTRIBUTION_NOTIFICATION_SENDING = "distributionNotificationSending";
37     private static final Logger logger = Logger.getLogger(DistributionNotificationSender.class.getName());
38     @javax.annotation.Resource
39     protected ComponentsUtils componentUtils;
40     private final CambriaHandler cambriaHandler = new CambriaHandler();
41
42     private final KafkaHandler kafkaHandler = new KafkaHandler();
43
44 //
45     private final DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
46
47     public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData,
48                                          INotificationData notificationData, Service service, User modifier) {
49         long startTime = System.currentTimeMillis();
50         CambriaErrorResponse status;
51         if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
52             status = cambriaHandler
53                 .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
54                     messageBusData.getDmaaPuebEndpoints(), notificationData,
55                     deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
56         }
57         else{
58             status = kafkaHandler.sendNotification(topicName, notificationData);
59         }
60         logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode());
61         auditDistributionNotification(
62             new AuditDistributionNotificationBuilder().setTopicName(topicName).setDistributionId(distributionId).setStatus(status).setService(service)
63                 .setEnvId(messageBusData.getEnvId()).setModifier(modifier).setWorkloadContext(notificationData.getWorkloadContext())
64                 .setTenant(messageBusData.getTenant()));
65         long endTime = System.currentTimeMillis();
66         if (logger.isDebugEnabled()) {
67             logger.debug("After building and publishing artifacts object. Total took {} milliseconds", endTime - startTime);
68         }
69         return convertCambriaResponse(status);
70     }
71
72     private void auditDistributionNotification(AuditDistributionNotificationBuilder builder) {
73         if (this.componentUtils != null) {
74             Integer httpCode = builder.getStatus().getHttpCode();
75             String httpCodeStr = String.valueOf(httpCode);
76             String desc = getDescriptionFromErrorResponse(builder.getStatus());
77             this.componentUtils.auditDistributionNotification(builder.getService().getUUID(), builder.getService().getName(), "Service",
78                 builder.getService().getVersion(), builder.getModifier(), builder.getEnvId(), builder.getService().getLifecycleState().name(),
79                 builder.getTopicName(), builder.getDistributionId(), desc, httpCodeStr, builder.getWorkloadContext(), builder.getTenant());
80         }
81     }
82
83     private String getDescriptionFromErrorResponse(CambriaErrorResponse status) {
84         CambriaOperationStatus operationStatus = status.getOperationStatus();
85         switch (operationStatus) {
86             case OK:
87                 return "OK";
88             case AUTHENTICATION_ERROR:
89                 return "Error: Authentication problem towards U-EB server";
90             case INTERNAL_SERVER_ERROR:
91                 return "Error: Internal U-EB server error";
92             case UNKNOWN_HOST_ERROR:
93                 return "Error: Cannot reach U-EB server host";
94             case CONNNECTION_ERROR:
95                 return "Error: Cannot connect to U-EB server";
96             case OBJECT_NOT_FOUND:
97                 return "Error: object not found in U-EB server";
98             default:
99                 return "Error: Internal Cambria server problem";
100         }
101     }
102
103     private ActionStatus convertCambriaResponse(CambriaErrorResponse status) {
104         CambriaOperationStatus operationStatus = status.getOperationStatus();
105         switch (operationStatus) {
106             case OK:
107                 return ActionStatus.OK;
108             case AUTHENTICATION_ERROR:
109                 return ActionStatus.AUTHENTICATION_ERROR;
110             case INTERNAL_SERVER_ERROR:
111                 return ActionStatus.GENERAL_ERROR;
112             case UNKNOWN_HOST_ERROR:
113                 return ActionStatus.UNKNOWN_HOST;
114             case CONNNECTION_ERROR:
115                 return ActionStatus.CONNNECTION_ERROR;
116             case OBJECT_NOT_FOUND:
117                 return ActionStatus.OBJECT_NOT_FOUND;
118             default:
119                 return ActionStatus.GENERAL_ERROR;
120         }
121     }
122 }