2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal.impl;
23 import com.att.nsa.mr.client.MRClientFactory;
24 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
26 import java.net.MalformedURLException;
27 import java.util.List;
29 import java.util.Properties;
31 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSinkFactory;
32 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 public class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
38 private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
40 private final Properties props;
42 public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
43 String dme2Login, String dme2Password, String consumerGroup, String consumerInstance, int fetchTimeout,
44 int fetchLimit, String environment, String aftEnvironment, String dme2Partner, String latitude,
45 String longitude, Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
49 super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, consumerInstance, fetchTimeout,
53 final String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
55 if (environment == null || environment.isEmpty()) {
56 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX);
58 if (aftEnvironment == null || aftEnvironment.isEmpty()) {
59 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX);
61 if (latitude == null || latitude.isEmpty()) {
62 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX);
64 if (longitude == null || longitude.isEmpty()) {
65 throw parmException(topic, PolicyEndPointProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX);
68 if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
69 throw new IllegalArgumentException(
70 "Must provide at least " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
71 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
72 + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
73 + PolicyEndPointProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
76 final String serviceName = servers.get(0);
78 this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
80 this.consumer.setUsername(dme2Login);
81 this.consumer.setPassword(dme2Password);
83 props = new Properties();
85 props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
87 props.setProperty("username", dme2Login);
88 props.setProperty("password", dme2Password);
90 /* These are required, no defaults */
91 props.setProperty("topic", topic);
93 props.setProperty("Environment", environment);
94 props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
96 if (dme2Partner != null) {
97 props.setProperty("Partner", dme2Partner);
99 if (dme2RouteOffer != null) {
100 props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
103 props.setProperty("Latitude", latitude);
104 props.setProperty("Longitude", longitude);
106 /* These are optional, will default to these values if not set in additionalProps */
107 props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
108 props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
109 props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
110 props.setProperty("Version", "1.0");
111 props.setProperty("SubContextPath", "/");
112 props.setProperty("sessionstickinessrequired", "no");
114 /* These should not change */
115 props.setProperty("TransportType", "DME2");
116 props.setProperty("MethodType", "GET");
119 props.setProperty(PROTOCOL_PROP, "https");
122 props.setProperty(PROTOCOL_PROP, "http");
125 props.setProperty("contenttype", "application/json");
127 if (additionalProps != null) {
128 for (Map.Entry<String, String> entry : additionalProps.entrySet()) {
129 props.put(entry.getKey(), entry.getValue());
133 MRClientFactory.prop = props;
134 this.consumer.setProps(props);
136 logger.info("{}: CREATION", this);
139 private IllegalArgumentException parmException(String topic, String propnm) {
140 return new IllegalArgumentException("Missing " + PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "."
141 + topic + propnm + " property for DME2 in DMaaP");