45c0b6fc5a4a75e84a3282d358595bd04d2136af
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modified Copyright (C) 2018 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  * 
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  * 
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.Map;
26
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
34  * its listeners.
35  */
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
37
38     private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
39
40
41     protected final String userName;
42     protected final String password;
43
44     protected String environment = null;
45     protected String aftEnvironment = null;
46     protected String partner = null;
47     protected String latitude = null;
48     protected String longitude = null;
49
50     protected Map<String, String> additionalProps = null;
51
52
53     /**
54      * Constructor.
55      * 
56      * @param busTopicParams Parameters object containing all the required inputs
57      * 
58      * @throws IllegalArgumentException An invalid parameter passed in
59      */
60     public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
61
62         super(busTopicParams);
63
64         this.userName = busTopicParams.getUserName();
65         this.password = busTopicParams.getPassword();
66
67         this.environment = busTopicParams.getEnvironment();
68         this.aftEnvironment = busTopicParams.getAftEnvironment();
69         this.partner = busTopicParams.getPartner();
70
71         this.latitude = busTopicParams.getLatitude();
72         this.longitude = busTopicParams.getLongitude();
73
74         this.additionalProps = busTopicParams.getAdditionalProps();
75         try {
76             this.init();
77         } catch (Exception e) {
78             logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", 
79                             topic, e.getMessage(), e);
80             throw new IllegalArgumentException(e);
81         }
82     }
83
84
85     /**
86      * Initialize the Cambria or MR Client.
87      */
88     @Override
89     public void init() throws MalformedURLException {
90         if (anyNullOrEmpty(this.userName, this.password)) {
91             this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
92                     .servers(this.servers)
93                     .topic(this.topic)
94                     .apiKey(this.apiKey)
95                     .apiSecret(this.apiSecret)
96                     .consumerGroup(this.consumerGroup)
97                     .consumerInstance(this.consumerInstance)
98                     .fetchTimeout(this.fetchTimeout)
99                     .fetchLimit(this.fetchLimit)
100                     .useHttps(this.useHttps)
101                     .allowSelfSignedCerts(this.allowSelfSignedCerts)
102                     .build());
103         } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
104             this.consumer = new BusConsumer.CambriaConsumerWrapper(BusTopicParams.builder()
105                     .servers(this.servers)
106                     .topic(this.topic)
107                     .apiKey(this.apiKey)
108                     .apiSecret(this.apiSecret)
109                     .userName(this.userName)
110                     .password(this.password)
111                     .consumerGroup(this.consumerGroup)
112                     .consumerInstance(this.consumerInstance)
113                     .fetchTimeout(this.fetchTimeout)
114                     .fetchLimit(this.fetchLimit)
115                     .useHttps(this.useHttps)
116                     .allowSelfSignedCerts(this.allowSelfSignedCerts)
117                     .build());
118         } else {
119             this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(BusTopicParams.builder()
120                     .servers(this.servers)
121                     .topic(this.topic)
122                     .apiKey(this.apiKey)
123                     .apiSecret(this.apiSecret)
124                     .userName(this.userName)
125                     .password(this.password)
126                     .consumerGroup(this.consumerGroup)
127                     .consumerInstance(this.consumerInstance)
128                     .fetchTimeout(this.fetchTimeout)
129                     .fetchLimit(this.fetchLimit)
130                     .environment(this.environment)
131                     .aftEnvironment(this.aftEnvironment)
132                     .partner(this.partner)
133                     .latitude(this.latitude)
134                     .longitude(this.longitude)
135                     .additionalProps(this.additionalProps)
136                     .useHttps(this.useHttps).build());
137         }
138
139         logger.info("{}: INITTED", this);
140     }
141
142     /**
143      * {@inheritDoc}
144      */
145     @Override
146     public CommInfrastructure getTopicCommInfrastructure() {
147         return Topic.CommInfrastructure.DMAAP;
148     }
149
150     @Override
151     public String toString() {
152         StringBuilder builder = new StringBuilder();
153         builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
154                 .append((password == null || password.isEmpty()) ? "-" : password.length())
155                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
156                 .append(super.toString()).append("]");
157         return builder.toString();
158     }
159
160
161 }