37ca7ce3ea420abaf1183e3c4d1ae863fcd3c94c
[appc.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22
23 package org.openecomp.appc.adapter.messaging.dmaap.impl;
24
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import org.openecomp.sdnc.sli.SvcLogicContext;
28
29 import java.util.*;
30 import java.util.concurrent.ConcurrentHashMap;
31
32 import org.openecomp.appc.adapter.message.EventSender;
33 import org.openecomp.appc.adapter.message.MessageDestination;
34 import org.openecomp.appc.adapter.message.Producer;
35 import org.openecomp.appc.adapter.message.event.EventHeader;
36 import org.openecomp.appc.adapter.message.event.EventMessage;
37 import org.openecomp.appc.adapter.message.event.EventStatus;
38 import org.openecomp.appc.adapter.messaging.dmaap.impl.DmaapProducerImpl;
39 import org.openecomp.appc.configuration.Configuration;
40 import org.openecomp.appc.configuration.ConfigurationFactory;
41 import org.openecomp.appc.exceptions.APPCException;
42
43 public class EventSenderDmaapImpl implements EventSender
44 {
45     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class);
46     public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write";
47     public static final String DMAAP_USERNAME = "dmaap.appc.username";
48     public static final String DMAAP_PASSWORD = "dmaap.appc.password";
49     public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members";
50
51     private static Configuration configuration = ConfigurationFactory.getConfiguration();
52
53     private Map<String,Producer> producerMap = new ConcurrentHashMap<>();
54
55     public Map<String, Producer> getProducerMap() {
56         return producerMap;
57     }
58
59     public void setProducerMap(Map<String, Producer> producerMap) {
60         this.producerMap = producerMap;
61     }
62
63     public EventSenderDmaapImpl(){
64
65     }
66
67     public void initialize(){
68         Properties properties = configuration.getProperties();
69         String writeTopic;
70         String username;
71         String password;
72         final List<String> pool = new ArrayList<>();
73
74         for(MessageDestination destination: MessageDestination.values()){
75             writeTopic = properties.getProperty(destination + "." +  EVENT_TOPIC_WRITE);
76             username = properties.getProperty(destination + "." + DMAAP_USERNAME);
77             password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
78             String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
79
80             if (hostNames != null && !hostNames.isEmpty()) {
81                 LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
82                 Collections.addAll(pool, hostNames.split(","));
83             }
84
85             LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
86             LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE));
87             LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
88             Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password);
89
90             for (String url : pool) {
91                 if (url.contains("3905") || url.contains("https")) {
92                     LOG.debug("Producer should use HTTPS");
93                     producer.useHttps(true);
94                     break;
95                 }
96             }
97             producerMap.put(destination.toString(),producer);
98         }
99
100     }
101
102     @Override
103     public boolean sendEvent(MessageDestination destination, EventMessage msg) {
104         String jsonStr = msg.toJson();
105         String id = msg.getEventHeader().getEventId();
106         LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
107         Producer producer = producerMap.get(destination.toString());
108         return producer.post(id, jsonStr);
109     }
110
111     @Override
112     public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) {
113         String jsonStr = msg.toJson();
114         String id = msg.getEventHeader().getEventId();
115         LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
116         Producer producer = createProducer(destination, eventTopicName);
117         return producer.post(id, jsonStr);
118     }
119     
120     private Producer createProducer(MessageDestination destination, String eventTopicName) {
121         Properties properties = configuration.getProperties();
122         final List<String> pool = new ArrayList<>();
123         String username = properties.getProperty(destination + "." + DMAAP_USERNAME);
124         String password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
125         String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
126
127         if (hostNames != null && !hostNames.isEmpty()) {
128             LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
129             Collections.addAll(pool, hostNames.split(","));
130         }
131
132         LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
133         LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE));
134         LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
135         Producer producer = new DmaapProducerImpl(pool, eventTopicName,username, password);
136
137         for (String url : pool) {
138             if (url.contains("3905") || url.contains("https")) {
139                 LOG.debug("Producer should use HTTPS");
140                 producer.useHttps(true);
141                 break;
142             }
143         }
144         return producer;
145     }
146
147     @Override
148     public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException {
149
150         if (params == null) {
151             String message = "Parameters map is empty (null)";
152             LOG.error(message);
153             throw new APPCException(message);
154         }
155         String eventTime = new Date(System.currentTimeMillis()).toString();
156         String apiVer = params.get("apiVer");
157         String eventId = params.get("eventId");
158         String reason = params.get("reason");
159         String entityId=params.get("entityId");
160         if(entityId!=null){
161             reason=reason+"("+entityId+")";
162         }
163         Integer code = Integer.getInteger(params.get("code"), 500);
164
165         if (eventTime == null || apiVer == null || eventId == null || reason == null) {
166             String message = String.format("Missing input parameters: %s", params);
167             LOG.error(message);
168             throw new APPCException(message);
169         }
170         EventMessage eventMessage = new EventMessage(
171                         new EventHeader(eventTime, apiVer, eventId),
172                         new EventStatus(code, reason));
173
174         return sendEvent(destination,eventMessage);
175     }
176 }