e4064c5dfb02c6a347fe112b5884d1ad37f35e17
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2018-2019 Samsung Electronics Co., Ltd.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23
24 import java.net.MalformedURLException;
25 import java.util.Map;
26
27 import org.onap.policy.common.endpoints.event.comm.Topic;
28 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
34  * its listeners.
35  */
36 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
37
38     private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
39
40
41     protected final String userName;
42     protected final String password;
43
44     protected String environment = null;
45     protected String aftEnvironment = null;
46     protected String partner = null;
47     protected String latitude = null;
48     protected String longitude = null;
49
50     protected Map<String, String> additionalProps = null;
51
52
53     /**
54      * Constructor.
55      *
56      * @param busTopicParams Parameters object containing all the required inputs
57      *
58      * @throws IllegalArgumentException An invalid parameter passed in
59      */
60     public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
61
62         super(busTopicParams);
63
64         this.userName = busTopicParams.getUserName();
65         this.password = busTopicParams.getPassword();
66
67         this.environment = busTopicParams.getEnvironment();
68         this.aftEnvironment = busTopicParams.getAftEnvironment();
69         this.partner = busTopicParams.getPartner();
70
71         this.latitude = busTopicParams.getLatitude();
72         this.longitude = busTopicParams.getLongitude();
73
74         this.additionalProps = busTopicParams.getAdditionalProps();
75         try {
76             this.init();
77         } catch (Exception e) {
78             logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}",
79                             topic, e.getMessage(), e);
80             throw new IllegalArgumentException(e);
81         }
82     }
83
84
85     /**
86      * Initialize the Cambria or MR Client.
87      */
88     @Override
89     public void init() throws MalformedURLException {
90         BusTopicParams.TopicParamsBuilder builder = BusTopicParams.builder()
91             .servers(this.servers)
92             .topic(this.topic)
93             .apiKey(this.apiKey)
94             .apiSecret(this.apiSecret)
95             .consumerGroup(this.consumerGroup)
96             .consumerInstance(this.consumerInstance)
97             .fetchTimeout(this.fetchTimeout)
98             .fetchLimit(this.fetchLimit)
99             .useHttps(this.useHttps);
100
101         if (anyNullOrEmpty(this.userName, this.password)) {
102             this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
103                     .allowSelfSignedCerts(this.allowSelfSignedCerts)
104                     .build());
105         } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
106             this.consumer = new BusConsumer.CambriaConsumerWrapper(builder
107                     .userName(this.userName)
108                     .password(this.password)
109                     .allowSelfSignedCerts(this.allowSelfSignedCerts)
110                     .build());
111         } else {
112             this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(builder
113                     .userName(this.userName)
114                     .password(this.password)
115                     .environment(this.environment)
116                     .aftEnvironment(this.aftEnvironment)
117                     .partner(this.partner)
118                     .latitude(this.latitude)
119                     .longitude(this.longitude)
120                     .additionalProps(this.additionalProps)
121                     .build());
122         }
123
124         logger.info("{}: INITTED", this);
125     }
126
127     @Override
128     public CommInfrastructure getTopicCommInfrastructure() {
129         return Topic.CommInfrastructure.DMAAP;
130     }
131
132     @Override
133     public String toString() {
134         StringBuilder builder = new StringBuilder();
135         builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
136                 .append((password == null || password.isEmpty()) ? "-" : password.length())
137                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
138                 .append(super.toString()).append("]");
139         return builder.toString();
140     }
141
142
143 }