2 * ===============================LICENSE_START======================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============================LICENSE_END===========================================
21 package org.onap.dcae.apod.analytics.it.dmaap;
23 import com.google.inject.Inject;
24 import com.google.inject.name.Named;
25 import org.onap.dcae.apod.analytics.dmaap.DMaaPMRFactory;
26 import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRPublisherConfig;
27 import org.onap.dcae.apod.analytics.dmaap.domain.config.DMaaPMRSubscriberConfig;
28 import org.onap.dcae.apod.analytics.dmaap.service.publisher.DMaaPMRPublisher;
29 import org.onap.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
31 import java.util.HashMap;
35 * @author Rajiv Singla . Creation Date: 2/1/2017.
37 public class DMaaPMRCreatorImpl implements DMaaPMRCreator {
39 private final String subscriberHostName;
41 private final Integer subscriberHostPort;
43 private final String subscriberTopicName;
45 private final String subscriberProtocol;
47 private final String subscriberUserName;
49 private final String subscriberUserPassword;
51 private final String subscriberContentType;
53 private final String subscriberConsumerId;
55 private final String subscriberConsumerGroup;
57 private final Integer subscriberTimeoutMS;
59 private final Integer subscriberMessageLimit;
61 private final Integer subscriberPollingInterval;
63 // publisher preferences
64 private final String publisherHostName;
66 private final Integer publisherHostPort;
68 private final String publisherTopicName;
70 private final String publisherProtocol;
72 private final String publisherUserName;
74 private final String publisherUserPassword;
76 private final String publisherContentType;
78 private final Integer publisherPollingInterval;
80 private final Integer publisherMaxBatchSize;
82 private final Integer publisherMaxRecoveryQueueSize;
85 private final DMaaPMRFactory dMaaPMRFactory;
88 public DMaaPMRCreatorImpl(@Named("dmaap.mr.subscriber.hostname") String subscriberHostName,
89 @Named("dmaap.mr.subscriber.portNumber") Integer subscriberHostPort,
90 @Named("dmaap.mr.subscriber.topicName") String subscriberTopicName,
91 @Named("dmaap.mr.subscriber.protocol") String subscriberProtocol,
92 @Named("dmaap.mr.subscriber.username") String subscriberUserName,
93 @Named("dmaap.mr.subscriber.userPassword") String subscriberUserPassword,
94 @Named("dmaap.mr.subscriber.contentType") String subscriberContentType,
95 @Named("dmaap.mr.subscriber.consumerId") String subscriberConsumerId,
96 @Named("dmaap.mr.subscriber.consumerGroup") String subscriberConsumerGroup,
97 @Named("dmaap.mr.subscriber.timeoutMS") Integer subscriberTimeoutMS,
98 @Named("dmaap.mr.subscriber.messageLimit") Integer subscriberMessageLimit,
99 @Named("dmaap.mr.subscriber.pollingInterval") Integer subscriberPollingInterval,
100 @Named("dmaap.mr.publisher.hostname") String publisherHostName,
101 @Named("dmaap.mr.publisher.portNumber") Integer publisherHostPort,
102 @Named("dmaap.mr.publisher.topicName") String publisherTopicName,
103 @Named("dmaap.mr.publisher.protocol") String publisherProtocol,
104 @Named("dmaap.mr.publisher.username") String publisherUserName,
105 @Named("dmaap.mr.publisher.userPassword") String publisherUserPassword,
106 @Named("dmaap.mr.publisher.contentType") String publisherContentType,
107 @Named("dmaap.mr.publisher.pollingInterval") Integer publisherPollingInterval,
108 @Named("dmaap.mr.publisher.maxBatchSize") Integer publisherMaxBatchSize,
109 @Named("dmaap.mr.publisher.maxRecoveryQueueSize") Integer publisherMaxRecoveryQueueSize) {
110 this.subscriberHostName = subscriberHostName;
111 this.subscriberHostPort = subscriberHostPort;
112 this.subscriberTopicName = subscriberTopicName;
113 this.subscriberProtocol = subscriberProtocol;
114 this.subscriberUserName = subscriberUserName;
115 this.subscriberUserPassword = subscriberUserPassword;
116 this.subscriberContentType = subscriberContentType;
117 this.subscriberConsumerId = subscriberConsumerId;
118 this.subscriberConsumerGroup = subscriberConsumerGroup;
119 this.subscriberTimeoutMS = subscriberTimeoutMS;
120 this.subscriberMessageLimit = subscriberMessageLimit;
121 this.subscriberPollingInterval = subscriberPollingInterval;
122 this.publisherHostName = publisherHostName;
123 this.publisherHostPort = publisherHostPort;
124 this.publisherTopicName = publisherTopicName;
125 this.publisherProtocol = publisherProtocol;
126 this.publisherUserName = publisherUserName;
127 this.publisherUserPassword = publisherUserPassword;
128 this.publisherContentType = publisherContentType;
129 this.publisherPollingInterval = publisherPollingInterval;
130 this.publisherMaxBatchSize = publisherMaxBatchSize;
131 this.publisherMaxRecoveryQueueSize = publisherMaxRecoveryQueueSize;
133 this.dMaaPMRFactory = DMaaPMRFactory.create();
138 public DMaaPMRSubscriber getDMaaPMRSubscriber() {
139 final DMaaPMRSubscriberConfig subscriberConfig =
140 new DMaaPMRSubscriberConfig.Builder(subscriberHostName, subscriberTopicName)
141 .setPortNumber(subscriberHostPort)
142 .setProtocol(subscriberProtocol)
143 .setUserName(subscriberUserName)
144 .setUserPassword(subscriberUserPassword)
145 .setContentType(subscriberContentType)
146 .setMessageLimit(subscriberMessageLimit)
147 .setTimeoutMS(subscriberTimeoutMS)
148 .setConsumerId(subscriberConsumerId)
149 .setConsumerGroup(subscriberConsumerGroup)
151 return dMaaPMRFactory.createSubscriber(subscriberConfig);
155 public DMaaPMRPublisher getDMaaPMRPublisher() {
156 final DMaaPMRPublisherConfig publisherConfig =
157 new DMaaPMRPublisherConfig.Builder(publisherHostName, publisherTopicName)
158 .setPortNumber(publisherHostPort)
159 .setProtocol(publisherProtocol)
160 .setUserName(publisherUserName)
161 .setUserPassword(publisherUserPassword)
162 .setContentType(publisherContentType)
163 .setMaxBatchSize(publisherMaxBatchSize)
164 .setMaxRecoveryQueueSize(publisherMaxRecoveryQueueSize)
166 return dMaaPMRFactory.createPublisher(publisherConfig);
170 public DMaaPMRSubscriber getDMaaPMRSubscriberWithTopicName(String subscriberTopicName) {
171 final DMaaPMRSubscriberConfig subscriberConfig =
172 new DMaaPMRSubscriberConfig.Builder(subscriberHostName, subscriberTopicName)
173 .setPortNumber(subscriberHostPort)
174 .setProtocol(subscriberProtocol)
175 .setUserName(subscriberUserName)
176 .setUserPassword(subscriberUserPassword)
177 .setContentType(subscriberContentType)
178 .setMessageLimit(subscriberMessageLimit)
179 .setTimeoutMS(subscriberTimeoutMS)
180 .setConsumerId(subscriberConsumerId)
181 .setConsumerGroup(subscriberConsumerGroup)
183 return dMaaPMRFactory.createSubscriber(subscriberConfig);
188 public DMaaPMRPublisher getDMaaPMRPublisherWithTopicName(String publisherTopicName) {
189 final DMaaPMRPublisherConfig publisherConfig =
190 new DMaaPMRPublisherConfig.Builder(publisherHostName, publisherTopicName)
191 .setPortNumber(publisherHostPort)
192 .setProtocol(publisherProtocol)
193 .setUserName(publisherUserName)
194 .setUserPassword(publisherUserPassword)
195 .setContentType(publisherContentType)
196 .setMaxBatchSize(publisherMaxBatchSize)
197 .setMaxRecoveryQueueSize(publisherMaxRecoveryQueueSize)
199 return dMaaPMRFactory.createPublisher(publisherConfig);
203 public Map<String, String> getDMaaPMRSubscriberConfig() {
204 Map<String, String> sourceConfigurationMap = new HashMap<>();
205 sourceConfigurationMap.put("referenceName", "source-referenceName");
206 sourceConfigurationMap.put("hostName", subscriberHostName);
207 sourceConfigurationMap.put("portNumber", subscriberHostPort.toString());
208 sourceConfigurationMap.put("topicName", subscriberTopicName);
209 sourceConfigurationMap.put("pollingInterval", subscriberPollingInterval.toString());
210 sourceConfigurationMap.put("protocol", subscriberProtocol);
211 sourceConfigurationMap.put("userName", subscriberUserName);
212 sourceConfigurationMap.put("userPassword", subscriberUserPassword);
213 sourceConfigurationMap.put("contentType", subscriberContentType);
214 sourceConfigurationMap.put("consumerId", subscriberConsumerId);
215 sourceConfigurationMap.put("consumerGroup", subscriberConsumerGroup);
216 sourceConfigurationMap.put("timeoutMS", subscriberTimeoutMS.toString());
217 sourceConfigurationMap.put("messageLimit", subscriberMessageLimit.toString());
218 return sourceConfigurationMap;
222 public Map<String, String> getDMaaPMRPublisherConfig() {