e65d44a78b8da33c77f01dcb7ef0b37e3ae6fe18
[policy/drools-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * policy-endpoints
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.policy.drools.event.comm.bus.internal;
22
23 import java.util.List;
24
25 import org.openecomp.policy.drools.event.comm.Topic;
26 import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
27 import org.openecomp.policy.common.logging.eelf.PolicyLogger;
28
29 /**
30  * This topic reader implementation specializes in reading messages
31  * over DMAAP topic and notifying its listeners
32  */
33 public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
34                                             implements DmaapTopicSource, Runnable {
35         
36         protected final String userName;
37         protected final String password;
38         private String className = SingleThreadedDmaapTopicSource.class.getName();
39
40         /**
41          * 
42          * @param servers DMaaP servers
43          * @param topic DMaaP Topic to be monitored
44          * @param apiKey DMaaP API Key (optional)
45          * @param apiSecret DMaaP API Secret (optional)
46          * @param consumerGroup DMaaP Reader Consumer Group
47          * @param consumerInstance DMaaP Reader Instance
48          * @param fetchTimeout DMaaP fetch timeout
49          * @param fetchLimit DMaaP fetch limit
50          * @throws IllegalArgumentException An invalid parameter passed in
51          */
52         public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
53                                                       String apiKey, String apiSecret,
54                                                       String userName, String password,
55                                                       String consumerGroup, String consumerInstance, 
56                                                       int fetchTimeout, int fetchLimit)
57                         throws IllegalArgumentException {
58                 
59                 
60                 super(servers, topic, apiKey, apiSecret, 
61                           consumerGroup, consumerInstance, 
62                           fetchTimeout, fetchLimit);
63                 
64                 this.userName = userName;
65                 this.password = password;               
66                 
67                 try {
68                         this.init();
69                 } catch (Exception e) {
70                         e.printStackTrace();
71                         throw new IllegalArgumentException(e);
72                 }
73         }
74         
75
76         /**
77          * Initialize the Cambria or MR Client
78          */
79         @Override
80         public void init() throws Exception {
81                 
82                 if (this.userName == null || this.userName.isEmpty() || 
83                         this.password == null || this.password.isEmpty()) {
84                         this.consumer =
85                                         new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
86                                                                                    this.apiKey, this.apiSecret,
87                                                                                    this.consumerGroup, this.consumerInstance,
88                                                                                    this.fetchTimeout, this.fetchLimit);                 
89                 } else {
90                         this.consumer =
91                                         new BusConsumer.DmaapConsumerWrapper(this.servers, this.topic, 
92                                                                                     this.apiKey, this.apiSecret,
93                                                                                     this.userName, this.password,
94                                                                                     this.consumerGroup, this.consumerInstance,
95                                                                                     this.fetchTimeout, this.fetchLimit);
96                 }
97                         
98                 PolicyLogger.info(className, "CREATION: " + this);
99         }
100         
101         /**
102          * {@inheritDoc}
103          */
104         @Override
105         public CommInfrastructure getTopicCommInfrastructure() {
106                 return Topic.CommInfrastructure.DMAAP;
107         }
108
109         @Override
110         public String toString() {
111                 StringBuilder builder = new StringBuilder();
112                 builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
113                                 .append((password == null || password.isEmpty()) ? "-" : password.length())
114                                 .append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
115                                 .append(", toString()=").append(super.toString()).append("]");
116                 return builder.toString();
117         }
118
119
120 }