2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * ================================================================================
9 * Modifications Copyright (C) 2019 Ericsson
10 * =============================================================================
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
15 * http://www.apache.org/licenses/LICENSE-2.0
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
23 * ============LICENSE_END=========================================================
26 package org.onap.appc.adapter.messaging.dmaap.impl;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
31 import org.onap.appc.adapter.message.EventSender;
32 import org.onap.appc.adapter.message.MessageDestination;
33 import org.onap.appc.adapter.message.Producer;
34 import org.onap.appc.adapter.message.event.EventHeader;
35 import org.onap.appc.adapter.message.event.EventMessage;
36 import org.onap.appc.adapter.message.event.EventStatus;
37 import org.onap.appc.configuration.Configuration;
38 import org.onap.appc.configuration.ConfigurationFactory;
39 import org.onap.appc.exceptions.APPCException;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.Date;
43 import java.util.List;
45 import java.util.Properties;
46 import java.util.concurrent.ConcurrentHashMap;
49 public class EventSenderDmaapImpl implements EventSender
51 private final EELFLogger LOG = EELFManager.getInstance().getLogger(EventSenderDmaapImpl.class);
52 public static final String EVENT_TOPIC_WRITE = "dmaap.event.topic.write";
53 public static final String DMAAP_USERNAME = "dmaap.appc.username";
54 public static final String DMAAP_PASSWORD = "dmaap.appc.password";
55 public static final String EVENT_POOL_MEMBERS = "dmaap.event.pool.members";
57 private static Configuration configuration = ConfigurationFactory.getConfiguration();
59 private Map<String, Producer> producerMap = new ConcurrentHashMap<>();
61 public Map<String, Producer> getProducerMap() {
65 public void setProducerMap(Map<String, Producer> producerMap) {
66 this.producerMap = producerMap;
69 public EventSenderDmaapImpl(){
73 public void initialize(){
74 Properties properties = configuration.getProperties();
78 final List<String> pool = new ArrayList<>();
80 for(MessageDestination destination: MessageDestination.values()){
81 writeTopic = properties.getProperty(destination + "." + EVENT_TOPIC_WRITE);
82 username = properties.getProperty(destination + "." + DMAAP_USERNAME);
83 password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
84 String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
86 if (hostNames != null && !hostNames.isEmpty()) {
87 LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
88 Collections.addAll(pool, hostNames.split(","));
91 LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
92 LOG.debug(String.format("writeTopic = %s, taken from property: %s", writeTopic, destination + "." + EVENT_TOPIC_WRITE));
93 LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
94 Producer producer = new DmaapProducerImpl(pool, writeTopic,username, password);
96 for (String url : pool) {
97 if (url.contains("3905") || url.contains("https")) {
98 LOG.debug("Producer should use HTTPS");
99 producer.useHttps(true);
103 producerMap.put(destination.toString(), producer);
109 public boolean sendEvent(MessageDestination destination, EventMessage msg) {
110 String jsonStr = msg.toJson();
111 String id = msg.getEventHeader().getEventId();
112 LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
113 Producer producer = producerMap.get(destination.toString());
114 return producer.post(id, jsonStr);
118 public boolean sendEvent(MessageDestination destination, EventMessage msg, String eventTopicName) {
119 String jsonStr = msg.toJson();
120 String id = msg.getEventHeader().getEventId();
121 LOG.info(String.format("Posting Message [%s - %s]", id, jsonStr));
122 Producer producer = createProducer(destination, eventTopicName);
123 return producer.post(id, jsonStr);
126 private Producer createProducer(MessageDestination destination, String eventTopicName) {
127 Properties properties = configuration.getProperties();
128 final List<String> pool = new ArrayList<>();
129 String username = properties.getProperty(destination + "." + DMAAP_USERNAME);
130 String password = properties.getProperty(destination + "." + DMAAP_PASSWORD);
131 String hostNames = properties.getProperty(destination + "." + EVENT_POOL_MEMBERS);
133 if (hostNames != null && !hostNames.isEmpty()) {
134 LOG.debug(String.format("hostNames = %s, taken from property: %s", hostNames, destination + "." + EVENT_POOL_MEMBERS));
135 Collections.addAll(pool, hostNames.split(","));
138 LOG.debug(String.format("pool = %s, taken from property: %s", pool, destination + "." + EVENT_POOL_MEMBERS));
139 LOG.debug(String.format("writeTopic = %s, taken from property: %s", eventTopicName, destination + "." + EVENT_TOPIC_WRITE));
140 LOG.debug(String.format("username = %s, taken from property: %s", username, destination + "." + DMAAP_USERNAME));
141 Producer producer = new DmaapProducerImpl(pool, eventTopicName, username, password);
143 for (String url : pool) {
144 if (url.contains("3905") || url.contains("https")) {
145 LOG.debug("Producer should use HTTPS");
146 producer.useHttps(true);
154 public boolean sendEvent(MessageDestination destination, Map<String, String> params, SvcLogicContext ctx) throws APPCException {
156 if (params == null) {
157 String message = "Parameters map is empty (null)";
159 throw new APPCException(message);
161 String eventTime = new Date(System.currentTimeMillis()).toString();
162 String apiVer = params.get("apiVer");
163 String eventId = params.get("eventId");
164 String reason = params.get("reason");
165 String entityId = params.get("entityId");
166 if(entityId != null){
167 reason += "(" + entityId + ")";
169 Integer code = Integer.getInteger(params.get("code"), 500);
171 if (eventTime == null || apiVer == null || eventId == null || reason == null) {
172 String message = String.format("Missing input parameters: %s", params);
174 throw new APPCException(message);
176 EventMessage eventMessage = new EventMessage(
177 new EventHeader(eventTime, apiVer, eventId),
178 new EventStatus(code, reason));
180 return sendEvent(destination, eventMessage);