c6bd5568df11ff28dc56cdfeb4777fe3de2295be
[policy/common.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
22
23 import java.net.MalformedURLException;
24 import java.util.Map;
25
26 import org.onap.policy.common.endpoints.event.comm.Topic;
27 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSource;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * This topic reader implementation specializes in reading messages over DMAAP topic and notifying
33  * its listeners
34  */
35 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable {
36
37     private static Logger logger = LoggerFactory.getLogger(SingleThreadedDmaapTopicSource.class);
38
39
40     protected final String userName;
41     protected final String password;
42
43     protected String environment = null;
44     protected String aftEnvironment = null;
45     protected String partner = null;
46     protected String latitude = null;
47     protected String longitude = null;
48
49     protected Map<String, String> additionalProps = null;
50
51
52     /**
53      * 
54      * @param busTopicParams Parameters object containing all the required inputs     *
55      * @throws IllegalArgumentException An invalid parameter passed in
56      */
57     public SingleThreadedDmaapTopicSource(BusTopicParams busTopicParams) {
58
59         super(busTopicParams);
60
61         this.userName = busTopicParams.getUserName();
62         this.password = busTopicParams.getPassword();
63
64         this.environment = busTopicParams.getEnvironment();
65         this.aftEnvironment = busTopicParams.getAftEnvironment();
66         this.partner = busTopicParams.getPartner();
67
68         this.latitude = busTopicParams.getLatitude();
69         this.longitude = busTopicParams.getLongitude();
70
71         this.additionalProps = busTopicParams.getAdditionalProps();
72         try {
73             this.init();
74         } catch (Exception e) {
75             logger.error("ERROR during init in dmaap-source: cannot create topic {} because of {}", topic, e.getMessage(), e);
76             throw new IllegalArgumentException(e);
77         }
78     }
79
80
81     /**
82      * Initialize the Cambria or MR Client
83      */
84     @Override
85     public void init() throws MalformedURLException {
86         if (anyNullOrEmpty(this.userName, this.password)) {
87             this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
88                     this.apiSecret, this.consumerGroup, this.consumerInstance, this.fetchTimeout, this.fetchLimit,
89                     this.useHttps, this.allowSelfSignedCerts);
90         } else if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
91             this.consumer = new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey,
92                     this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
93                     this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts);
94         } else {
95             this.consumer = new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, this.apiKey,
96                     this.apiSecret, this.userName, this.password, this.consumerGroup, this.consumerInstance,
97                     this.fetchTimeout, this.fetchLimit, this.environment, this.aftEnvironment, this.partner,
98                     this.latitude, this.longitude, this.additionalProps, this.useHttps);
99         }
100
101         logger.info("{}: INITTED", this);
102     }
103
104     /**
105      * {@inheritDoc}
106      */
107     @Override
108     public CommInfrastructure getTopicCommInfrastructure() {
109         return Topic.CommInfrastructure.DMAAP;
110     }
111
112     @Override
113     public String toString() {
114         StringBuilder builder = new StringBuilder();
115         builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
116                 .append((password == null || password.isEmpty()) ? "-" : password.length())
117                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
118                 .append(super.toString()).append("]");
119         return builder.toString();
120     }
121
122
123 }