396580e99c84b8ec3e70bfb5baf116313b14c3d3
[policy/common.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
22
23 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
24 import com.att.nsa.mr.client.response.MRPublisherResponse;
25 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
26
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.Properties;
30 import java.util.concurrent.TimeUnit;
31
32 import org.onap.policy.common.endpoints.event.comm.bus.internal.BusPublisher;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * DmaapClient library wrapper
38  */
39 public abstract class DmaapPublisherWrapper implements BusPublisher {
40
41     private static Logger logger = LoggerFactory.getLogger(DmaapPublisherWrapper.class);
42
43     /**
44      * MR based Publisher
45      */
46     protected MRSimplerBatchPublisher publisher;
47     protected Properties props;
48
49     /**
50      * MR Publisher Wrapper
51      *
52      * @param servers messaging bus hosts
53      * @param topic topic
54      * @param username AAF or DME2 Login
55      * @param password AAF or DME2 Password
56      */
57     public DmaapPublisherWrapper(ProtocolTypeConstants protocol, List<String> servers, String topic, String username,
58             String password, boolean useHttps) {
59
60
61         if (topic == null || topic.isEmpty()) {
62             throw new IllegalArgumentException("No topic for DMaaP");
63         }
64
65
66         if (protocol == ProtocolTypeConstants.AAF_AUTH) {
67             if (servers == null || servers.isEmpty()) {
68                 throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided");
69             }
70
71             ArrayList<String> dmaapServers = new ArrayList<>();
72             if (useHttps) {
73                 for (String server : servers) {
74                     dmaapServers.add(server + ":3905");
75                 }
76
77             } else {
78                 for (String server : servers) {
79                     dmaapServers.add(server + ":3904");
80                 }
81             }
82
83
84             this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
85
86             this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
87         } else if (protocol == ProtocolTypeConstants.DME2) {
88             ArrayList<String> dmaapServers = new ArrayList<>();
89             dmaapServers.add("0.0.0.0:3904");
90
91             this.publisher = new MRSimplerBatchPublisher.Builder().againstUrls(dmaapServers).onTopic(topic).build();
92
93             this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
94         }
95
96         this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName()));
97
98         this.publisher.setUsername(username);
99         this.publisher.setPassword(password);
100
101         props = new Properties();
102
103         if (useHttps) {
104             props.setProperty("Protocol", "https");
105         } else {
106             props.setProperty("Protocol", "http");
107         }
108
109         props.setProperty("contenttype", "application/json");
110         props.setProperty("username", username);
111         props.setProperty("password", password);
112
113         props.setProperty("topic", topic);
114
115         this.publisher.setProps(props);
116
117         if (protocol == ProtocolTypeConstants.AAF_AUTH) {
118             this.publisher.setHost(servers.get(0));
119         }
120
121         logger.info("{}: CREATION: using protocol {}", this, protocol.getValue());
122     }
123
124     /**
125      * {@inheritDoc}
126      */
127     @Override
128     public void close() {
129         logger.info("{}: CLOSE", this);
130
131         try {
132             this.publisher.close(1, TimeUnit.SECONDS);
133         } catch (Exception e) {
134             logger.warn("{}: CLOSE FAILED because of {}", this, e.getMessage(), e);
135         }
136     }
137
138     /**
139      * {@inheritDoc}
140      */
141     @Override
142     public boolean send(String partitionId, String message) {
143         if (message == null) {
144             throw new IllegalArgumentException("No message provided");
145         }
146
147         this.publisher.setPubResponse(new MRPublisherResponse());
148         this.publisher.send(partitionId, message);
149         MRPublisherResponse response = this.publisher.sendBatchWithResponse();
150         if (response != null) {
151             logger.debug("DMaaP publisher received {} : {}", response.getResponseCode(), response.getResponseMessage());
152         }
153
154         return true;
155     }
156
157     @Override
158     public String toString() {
159         return "DmaapPublisherWrapper [" + "publisher.getAuthDate()=" + publisher.getAuthDate()
160                 + ", publisher.getAuthKey()=" + publisher.getAuthKey() + ", publisher.getHost()=" + publisher.getHost()
161                 + ", publisher.getProtocolFlag()=" + publisher.getProtocolFlag() + ", publisher.getUsername()="
162                 + publisher.getUsername() + "]";
163     }
164 }