fixes for null pointer exceptions 93/54893/1
authorsunil unnava <su622b@att.com>
Thu, 14 Jun 2018 22:05:40 +0000 (18:05 -0400)
committersunil unnava <su622b@att.com>
Thu, 14 Jun 2018 22:07:36 +0000 (18:07 -0400)
Issue-ID: DMAAP-519
Change-Id: Ia32d0bd58c5f438b50e361221a96b988b00a1120
Signed-off-by: sunil unnava <su622b@att.com>
pom.xml
src/main/java/com/att/nsa/mr/client/MRClientFactory.java
src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index c903d66..d7930ff 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
        <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
        <artifactId>dmaapClient</artifactId>
        <packaging>jar</packaging>
-       <version>1.1.6-SNAPSHOT</version>
+       <version>1.1.7-SNAPSHOT</version>
        <name>dmaap-messagerouter-dmaapclient</name>
        <description>Client library for MR event routing API</description>
        <url>https://github.com/att/dmaap-framework</url>
@@ -29,7 +29,7 @@
                <!-- for the client library, we want to allow 1.6 or later -->
                <maven.compiler.target>1.7</maven.compiler.target>
                <maven.compiler.source>1.7</maven.compiler.source>
-               <jersey.version>2.22.1</jersey.version>
+               <jersey.version>2.26</jersey.version>
                <version.jackson.core>2.6.7.1</version.jackson.core>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 
index 32b44e4..f237776 100644 (file)
@@ -39,6 +39,7 @@ import com.att.nsa.mr.client.impl.MRConsumerImpl;
 import com.att.nsa.mr.client.impl.MRMetaClient;
 import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+import com.att.nsa.mr.tools.ValidatorUtil;
 
 /**
  * A factory for MR clients.<br/>
@@ -483,7 +484,7 @@ public class MRClientFactory {
                MRSimplerBatchPublisher pub;
                if (withResponse) {
                        pub = new MRSimplerBatchPublisher.Builder()
-                                       .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+                                       .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
                                        .onTopic(props.getProperty("topic"))
                                        .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
                                                        Integer.parseInt(props.getProperty("maxAgeMs").toString()))
@@ -492,7 +493,7 @@ public class MRClientFactory {
                                        .withResponse(withResponse).build();
                } else {
                        pub = new MRSimplerBatchPublisher.Builder()
-                                       .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+                                       .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType"))
                                        .onTopic(props.getProperty("topic"))
                                        .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
                                                        Integer.parseInt(props.getProperty("maxAgeMs").toString()))
@@ -512,13 +513,16 @@ public class MRClientFactory {
                }
                pub.setProtocolFlag(props.getProperty("TransportType"));
                pub.setProps(props);
-               routeFilePath = props.getProperty("DME2preferredRouterFilePath");
-               routeReader = new FileReader(new File(routeFilePath));
                prop = new Properties();
-               File fo = new File(routeFilePath);
-               if (!fo.exists()) {
-                       routeWriter = new FileWriter(new File(routeFilePath));
+               if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) {
+                       routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+                       routeReader = new FileReader(new File(routeFilePath));
+                       File fo = new File(routeFilePath);
+                       if (!fo.exists()) {
+                               routeWriter = new FileWriter(new File(routeFilePath));
+                       }
                }
+               // pub.setContentType(contentType);
                return pub;
        }
 
@@ -623,6 +627,7 @@ public class MRClientFactory {
 
        public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
                int timeout;
+               ValidatorUtil.validateSubscriber(props);
                if (props.getProperty("timeout") != null)
                        timeout = Integer.parseInt(props.getProperty("timeout"));
                else
index b54fedb..faa81ce 100644 (file)
@@ -127,6 +127,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
                                try {
                                        // getLog().info ( "Receiving msgs from: " +
                                        // url+subContextPath );
+                                       long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs)  ? dme2ReplyHandlerTimeoutMs :  (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); 
+                                       //String reply = sender.sendAndWait(timeout);   
                                        String reply = sender.sendAndWait(timeoutMs + 10000L);
                                        final JSONObject o = getResponseDataInJson(reply);
                                        // msgs.add(reply);
@@ -362,6 +364,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
        private HashMap<String, String> DMETimeOuts;
        private String handlers;
        public static final String routerFilePath = null;
+       private long dme2ReplyHandlerTimeoutMs;
+       private long longPollingMs;
+       private static final long defaultDme2PerEndPointTimeoutMs = 10000L;
+       private static final long defaultDme2ReplyHandlerTimeoutMs = 10000L;
+       
 
        public static String getRouterFilePath() {
                return routerFilePath;
@@ -388,6 +395,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
        }
 
        private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
+               this.longPollingMs = timeoutMs;
                latitude = props.getProperty("Latitude");
                longitude = props.getProperty("Longitude");
                version = props.getProperty("Version");
@@ -459,7 +467,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
                System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
                // SSL changes
 
-               sender = new DME2Client(new URI(url), timeoutMs + 10000L);
+               long dme2PerEndPointTimeoutMs;
+               try {
+                       dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS"));
+                       //backward compatibility
+                       if ( dme2PerEndPointTimeoutMs <= 0) {
+                               dme2PerEndPointTimeoutMs =  timeoutMs + defaultDme2PerEndPointTimeoutMs;
+                       }
+               } catch (NumberFormatException nfe) {
+                       //backward compatibility
+                       dme2PerEndPointTimeoutMs =  timeoutMs + defaultDme2PerEndPointTimeoutMs;
+                       getLog().debug("DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + defaultDme2PerEndPointTimeoutMs);
+               }
+
+               try {
+                       dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS"));
+               } catch (NumberFormatException nfe) {
+                       try {
+                               long dme2EpReadTimeoutMs =  Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+                               long dme2EpConnTimeoutMs =  Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+                               dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs;
+                               getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " + dme2ReplyHandlerTimeoutMs);
+                       }catch (NumberFormatException e) {
+                               //backward compatibility
+                               dme2ReplyHandlerTimeoutMs =  timeoutMs + defaultDme2ReplyHandlerTimeoutMs;
+                               getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs);
+                       }
+               }
+               //backward compatibility
+               if ( dme2ReplyHandlerTimeoutMs <= 0) {
+                       dme2ReplyHandlerTimeoutMs =  timeoutMs + defaultDme2ReplyHandlerTimeoutMs;
+               }
+               sender = new DME2Client(new URI(url),dme2PerEndPointTimeoutMs);
                sender.setAllowAllHttpReturnCodes(true);
                sender.setMethod(methodType);
                sender.setSubContext(subContextPath);
@@ -469,7 +508,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
                sender.setHeaders(DMETimeOuts);
                sender.setPayload("");
 
-               if (handlers.equalsIgnoreCase("yes")) {
+               if (handlers!=null&&handlers.equalsIgnoreCase("yes")) {
                        sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
                                        props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
                        sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
@@ -594,7 +633,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
                        if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
                                DMEConfigure(timeoutMs, limit);
 
-                               String reply = sender.sendAndWait(timeoutMs + 10000L);
+                               long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs)  ? dme2ReplyHandlerTimeoutMs :  (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); 
+                               String reply = sender.sendAndWait(timeout);
 
                                final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
 
index 2f7680b..5bb5087 100644 (file)
@@ -67,6 +67,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        fUrls = baseUrls;
                        return this;
                }
+               
+               public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )            
+               {               
+                       fUrls = baseUrls;               
+                       fServiceName = serviceName;             
+                       fTransportype = transportype;           
+                       return this;            
+               }
 
                public Builder onTopic(String topic) {
                        fTopic = topic;
@@ -119,6 +127,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                }
 
                private Collection<String> fUrls;
+               private Collection<String> fServiceName;                
+               private String fTransportype;   
                private String fTopic;
                private int fMaxBatchSize = 100;
                private long fMaxBatchAgeMs = 1000;
@@ -278,7 +288,9 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
                final long nowMs = Clock.now();
 
-               host = this.fHostSelector.selectBaseHost();
+               if (this.fHostSelector != null) {
+                       host = this.fHostSelector.selectBaseHost();
+               }
 
                final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
                                props.getProperty("partition"));
@@ -796,7 +808,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        sender.setSubContext(subContextPath);
                        sender.setCredentials(dmeuser, dmepassword);
                        sender.setHeaders(DMETimeOuts);
-                       if (handlers.equalsIgnoreCase("yes")) {
+                       if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
                                sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
                                                props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
                                sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
diff --git a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java
new file mode 100644 (file)
index 0000000..900c932
--- /dev/null
@@ -0,0 +1,176 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+package com.att.nsa.mr.tools;
+
+import java.util.Properties;
+
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+
+public class ValidatorUtil {
+
+       public  static void validatePublisher(Properties props) {
+               String transportType = props.getProperty("TransportType");
+               if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(transportType)) {
+                       validateForDME2(props);
+               } else {
+                        validateForNonDME2(props);
+               }
+               String maxBatchSize  = props.getProperty("maxBatchSize");
+               if (maxBatchSize == null || maxBatchSize.isEmpty()) {
+                       throw new IllegalArgumentException ( "maxBatchSize is needed" );
+               }
+               String maxAgeMs  = props.getProperty("maxAgeMs");
+               if (maxAgeMs == null || maxAgeMs.isEmpty()) {
+                       throw new IllegalArgumentException ( "maxAgeMs is needed" );
+               }
+               String messageSentThreadOccurance  = props.getProperty("MessageSentThreadOccurance");
+               if (messageSentThreadOccurance == null || messageSentThreadOccurance.isEmpty()) {
+                       throw new IllegalArgumentException ( "MessageSentThreadOccurance is needed" );
+               }
+               
+       }
+
+       public  static void validateSubscriber(Properties props) {
+               String transportType = props.getProperty("TransportType");
+               if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(transportType)) {
+                       validateForDME2(props);
+               } else {
+                        validateForNonDME2(props);
+               }
+               String group  = props.getProperty("group");
+               if (group == null || group.isEmpty()) {
+                       throw new IllegalArgumentException ( "group is needed" );
+               }
+               String id  = props.getProperty("id");
+               if (id == null || id.isEmpty()) {
+                       throw new IllegalArgumentException ( "Consumer (Id)  is needed" );
+               }
+               String sessionstickinessrequired  = props.getProperty("sessionstickinessrequired");
+               if (sessionstickinessrequired == null || sessionstickinessrequired.isEmpty()) {
+                       throw new IllegalArgumentException ( "sessionstickinessrequired  is needed" );
+               }
+       }
+       
+       private  static void validateForDME2(Properties props) {
+               String serviceName  = props.getProperty("ServiceName");
+               if (serviceName == null || serviceName.isEmpty()) {
+                       throw new IllegalArgumentException ( "Servicename is needed" );
+               }
+               String topic  = props.getProperty("topic");
+               if (topic == null || topic.isEmpty()) {
+                       throw new IllegalArgumentException ( "topic is needed" );
+               }
+               String username  = props.getProperty("username");
+               if (username == null || username.isEmpty()) {
+                       throw new IllegalArgumentException ( "username is needed" );
+               }
+               String password  = props.getProperty("password");
+               if (password == null || password.isEmpty()) {
+                       throw new IllegalArgumentException ( "password is needed" );
+               }
+               String dME2preferredRouterFilePath  = props.getProperty("DME2preferredRouterFilePath");
+               if (dME2preferredRouterFilePath == null || dME2preferredRouterFilePath.isEmpty()) {
+                       throw new IllegalArgumentException ( "DME2preferredRouterFilePath is needed" );
+               }
+               String partner  = props.getProperty("Partner");
+               String routeOffer  = props.getProperty("routeOffer");
+               if ((partner == null || partner.isEmpty()) && (routeOffer == null || routeOffer.isEmpty())) {
+                       throw new IllegalArgumentException ( "Partner or  routeOffer is needed" );
+               }
+               String protocol  = props.getProperty("Protocol");
+               if (protocol == null || protocol.isEmpty()) {
+                       throw new IllegalArgumentException ( "Protocol is needed" );
+               }
+               String methodType  = props.getProperty("MethodType");
+               if (methodType == null || methodType.isEmpty()) {
+                       throw new IllegalArgumentException ( "MethodType is needed" );
+               }
+               String contenttype  = props.getProperty("contenttype");
+               if (contenttype == null || contenttype.isEmpty()) {
+                       throw new IllegalArgumentException ( "contenttype is needed" );
+               }
+               String latitude  = props.getProperty("Latitude");
+               if (latitude == null || latitude.isEmpty()) {
+                       throw new IllegalArgumentException ( "Latitude is needed" );
+               }
+               String longitude  = props.getProperty("Longitude");
+               if (longitude == null || longitude.isEmpty()) {
+                       throw new IllegalArgumentException ( "Longitude is needed" );
+               }
+               String aftEnv  = props.getProperty("AFT_ENVIRONMENT");
+               if (aftEnv == null || aftEnv.isEmpty()) {
+                       throw new IllegalArgumentException ( "AFT_ENVIRONMENT is needed" );
+               }
+               String version  = props.getProperty("Version");
+               if (version == null || version.isEmpty()) {
+                       throw new IllegalArgumentException ( "Version is needed" );
+               }
+               String environment  = props.getProperty("Environment");
+               if (environment == null || environment.isEmpty()) {
+                       throw new IllegalArgumentException ( "Environment is needed" );
+               }
+               String subContextPath  = props.getProperty("SubContextPath");
+               if (subContextPath == null || subContextPath.isEmpty()) {
+                       throw new IllegalArgumentException ( "SubContextPath is needed" );
+               }
+       }
+       
+       private  static void validateForNonDME2(Properties props) {
+               String transportType = props.getProperty("TransportType");
+               String host  = props.getProperty("host");
+               if (host == null || host.isEmpty()) {
+                       throw new IllegalArgumentException ( "Servicename is needed" );
+               }
+               String topic  = props.getProperty("topic");
+               if (topic == null || topic.isEmpty()) {
+                       throw new IllegalArgumentException ( "topic is needed" );
+               }
+               String methodType  = props.getProperty("MethodType");
+               if (methodType == null || methodType.isEmpty()) {
+                       throw new IllegalArgumentException ( "MethodType is needed" );
+               }
+               String contenttype  = props.getProperty("contenttype");
+               if (contenttype == null || contenttype.isEmpty()) {
+                       throw new IllegalArgumentException ( "contenttype is needed" );
+               }
+               String username  = props.getProperty("username");
+               if (username == null || username.isEmpty()) {
+                       throw new IllegalArgumentException ( "username is needed" );
+               }
+               String password  = props.getProperty("password");
+               if (password == null || password.isEmpty()) {
+                       throw new IllegalArgumentException ( "password is needed" );
+               }
+               if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(transportType)) {
+                       String authKey  = props.getProperty("authKey");
+                       if (authKey == null || authKey.isEmpty()) {
+                               throw new IllegalArgumentException ( "authKey is needed" );
+                       }
+                       String authDate  = props.getProperty("authDate");
+                       if (authDate == null || authDate.isEmpty()) {
+                               throw new IllegalArgumentException ( "password is needed" );
+                       }
+                       
+               }
+       }       
+       
+}