745213529626d7f528309088c36226b848a065f5
[policy/engine.git] / PolicyEngineUtils / src / main / java / org / onap / policy / utils / BusPublisher.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * PolicyEngineUtils
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.utils;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Properties;
26 import java.util.concurrent.TimeUnit;
27 import org.onap.dmaap.mr.client.impl.MRSimplerBatchPublisher;
28 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
29 import org.onap.policy.common.logging.eelf.PolicyLogger;
30 import org.onap.policy.common.logging.flexlogger.FlexLogger;
31 import org.onap.policy.common.logging.flexlogger.Logger;
32
33 public interface BusPublisher {
34         
35         /**
36          * sends a message
37          * 
38          * @param partition id
39          * @param message the message
40          * @return true if success, false otherwise
41          * @throws IllegalArgumentException if no message provided
42          */
43         public boolean send(String partitionId, String message);
44         
45         /**
46          * closes the publisher
47          */
48         public void close();
49         
50         /**
51          * DmaapClient library wrapper
52          */
53         public static class DmaapPublisherWrapper implements BusPublisher {
54             private static Logger logger = FlexLogger.getLogger(DmaapPublisherWrapper.class);
55                 /**
56                  * MR based Publisher
57                  */             
58                 protected MRSimplerBatchPublisher publisher;
59                 
60                 public DmaapPublisherWrapper(List<String> servers, String topic,
61                                                      String aafLogin,
62                                                      String aafPassword) {
63                         
64                         ArrayList<String> dmaapServers = new ArrayList<>();
65                         for (String server: servers) {
66                                 dmaapServers.add(server + ":3904");
67                         }
68                                         
69                         this.publisher = 
70                                 new MRSimplerBatchPublisher.Builder().
71                                                         againstUrls(dmaapServers).
72                                                         onTopic(topic).
73                                                         build();
74                         
75                         this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
76                         
77                         this.publisher.setUsername(aafLogin);
78                         this.publisher.setPassword(aafPassword);  
79                         
80                         Properties props = new Properties();
81                         props.setProperty("Protocol", "http");
82                         props.setProperty("contenttype", "application/json");
83                         
84                         this.publisher.setProps(props);
85                         
86                         this.publisher.setHost(servers.get(0));
87                         
88                         if (PolicyLogger.isInfoEnabled())
89                                 PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
90                                                           "CREATION: " + this);
91                 }
92
93                 /**
94                  * {@inheritDoc}
95                  */
96                 @Override
97                 public void close() {
98                         if (logger.isInfoEnabled())
99                             logger.info(DmaapPublisherWrapper.class.getName()+ 
100                                                   "CREATION: " + this);
101                         
102                         try {
103                                 this.publisher.close(1, TimeUnit.SECONDS);
104                         } catch (Exception e) {
105                             logger.warn(DmaapPublisherWrapper.class.getName()+ 
106                                                   "CLOSE: " + this + " because of " + 
107                                                                   e.getMessage(), e);
108                         }
109                 }
110                 
111                 /**
112                  * {@inheritDoc}
113                  */
114                 @Override
115                 public boolean send(String partitionId, String message){
116                         if (message == null)
117                                 throw new IllegalArgumentException("No message provided");
118                         
119                         this.publisher.send(partitionId, message);
120                         return true;
121                         
122                 }
123                 
124                 @Override
125                 public String toString() {
126                         StringBuilder builder = new StringBuilder();
127                         builder.append("DmaapPublisherWrapper [").
128                         append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
129                         append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
130                         append(", publisher.getHost()=").append(publisher.getHost()).
131                         append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
132                         append(", publisher.getUsername()=").append(publisher.getUsername()).
133                         append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
134                         append("]");
135                         return builder.toString();
136                 }
137         }
138
139 }