0fcb86b4bef4afa9c0ce824f86125b73d29b4284
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.Map;
26
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
34  * its listeners.
35  */
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
37
38     private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
39
40
41     protected final String userName;
42     protected final String password;
43
44     protected String environment = null;
45     protected String aftEnvironment = null;
46     protected String partner = null;
47     protected String latitude = null;
48     protected String longitude = null;
49
50     protected Map<String, String> additionalProps = null;
51
52
53     /**
54      * 
55      * @param busTopicParams Parameters object containing all the required inputs     *
56      * @throws IllegalArgumentException An invalid parameter passed in
57      */
58     public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
59
60         super(busTopicParams);
61
62         this.userName = busTopicParams.getUserName();
63         this.password = busTopicParams.getPassword();
64
65         this.environment = busTopicParams.getEnvironment();
66         this.aftEnvironment = busTopicParams.getAftEnvironment();
67         this.partner = busTopicParams.getPartner();
68
69         this.latitude = busTopicParams.getLatitude();
70         this.longitude = busTopicParams.getLongitude();
71
72         this.additionalProps = busTopicParams.getAdditionalProps();
73         try {
74             this.init();
75         } catch (Exception e) {
76             logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", 
77                             topic, e.getMessage(), e);
78             throw new IllegalArgumentException(e);
79         }
80     }
81
82
83     /**
84      * Initialize the Cambria or MR Client.
85      */
86     @Override
87     public void init() throws MalformedURLException {
88         if (anyNullOrEmpty(this.userName, this.password)) {
89             this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
90                     .servers(this.servers)
91                     .topic(this.topic)
92                     .apiKey(this.apiKey)
93                     .apiSecret(this.apiSecret)
94                     .consumerGroup(this.consumerGroup)
95                     .consumerInstance(this.consumerInstance)
96                     .fetchTimeout(this.fetchTimeout)
97                     .fetchLimit(this.fetchLimit)
98                     .useHttps(this.useHttps)
99                     .allowSelfSignedCerts(this.allowSelfSignedCerts)
100                     .build());
101         } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
102             this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
103                     .servers(this.servers)
104                     .topic(this.topic)
105                     .apiKey(this.apiKey)
106                     .apiSecret(this.apiSecret)
107                     .userName(this.userName)
108                     .password(this.password)
109                     .consumerGroup(this.consumerGroup)
110                     .consumerInstance(this.consumerInstance)
111                     .fetchTimeout(this.fetchTimeout)
112                     .fetchLimit(this.fetchLimit)
113                     .useHttps(this.useHttps)
114                     .allowSelfSignedCerts(this.allowSelfSignedCerts)
115                     .build());
116         } else {
117             this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder()
118                     .servers(this.servers)
119                     .topic(this.topic)
120                     .apiKey(this.apiKey)
121                     .apiSecret(this.apiSecret)
122                     .userName(this.userName)
123                     .password(this.password)
124                     .consumerGroup(this.consumerGroup)
125                     .consumerInstance(this.consumerInstance)
126                     .fetchTimeout(this.fetchTimeout)
127                     .fetchLimit(this.fetchLimit)
128                     .environment(this.environment)
129                     .aftEnvironment(this.aftEnvironment)
130                     .partner(this.partner)
131                     .latitude(this.latitude)
132                     .longitude(this.longitude)
133                     .additionalProps(this.additionalProps)
134                     .useHttps(this.useHttps).build());
135         }
136
137         logger.info("{}: INITTED", this);
138     }
139
140     /**
141      * {@inheritDoc}
142      */
143     @Override
144     public CommInfrastructure getTopicCommInfrastructure() {
145         return Topic.CommInfrastructure.DMAAP;
146     }
147
148     @Override
149     public String toString() {
150         StringBuilder builder = new StringBuilder();
151         builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
152                 .append((password == null || password.isEmpty()) ? "-" : password.length())
153                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
154                 .append(super.toString()).append("]");
155         return builder.toString();
156     }
157
158
159 }