Sonar major issues
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRSimplerBatchPublisher.java
index db982ec..ef0d13a 100644 (file)
@@ -67,6 +67,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        fUrls = baseUrls;
                        return this;
                }
+               
+               public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )            
+               {               
+                       fUrls = baseUrls;               
+                       fServiceName = serviceName;             
+                       fTransportype = transportype;           
+                       return this;            
+               }
 
                public Builder onTopic(String topic) {
                        fTopic = topic;
@@ -105,20 +113,22 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                        return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
                                                        fAllowSelfSignedCerts, threadOccuranceTime);
                                } catch (MalformedURLException e) {
-                                       throw new RuntimeException(e);
+                                       throw new IllegalArgumentException(e);
                                }
                        } else {
                                try {
                                        return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
                                                        fAllowSelfSignedCerts, fMaxBatchSize);
                                } catch (MalformedURLException e) {
-                                       throw new RuntimeException(e);
+                                       throw new IllegalArgumentException(e);
                                }
                        }
 
                }
 
                private Collection<String> fUrls;
+               private Collection<String> fServiceName;                
+               private String fTransportype;   
                private String fTopic;
                private int fMaxBatchSize = 100;
                private long fMaxBatchAgeMs = 1000;
@@ -167,12 +177,13 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
        public void close() {
                try {
                        final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-                       if (remains.size() > 0) {
+                       if (remains.isEmpty()) {
                                getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
                                                + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
                        }
                } catch (InterruptedException e) {
                        getLog().warn("Possible message loss. " + e.getMessage(), e);
+                       Thread.currentThread().interrupt();
                } catch (IOException e) {
                        getLog().warn("Possible message loss. " + e.getMessage(), e);
                }
@@ -224,7 +235,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
        private synchronized boolean shouldSendNow() {
                boolean shouldSend = false;
-               if (fPending.size() > 0) {
+               if (fPending.isEmpty()) {
                        final long nowMs = Clock.now();
 
                        shouldSend = (fPending.size() >= fMaxBatchSize);
@@ -277,17 +288,15 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
                final long nowMs = Clock.now();
 
-               host = this.fHostSelector.selectBaseHost();
+               if (this.fHostSelector != null) {
+                       host = this.fHostSelector.selectBaseHost();
+               }
 
                final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
                                props.getProperty("partition"));
 
                try {
-                       /*
-                        * final String contentType = fCompress ?
-                        * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
-                        */
-
+                       
                        final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
                        OutputStream os = baseStream;
                        final String contentType = props.getProperty("contenttype");
@@ -377,6 +386,23 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                fPending.clear();
                                return true;
                        }
+                       
+                       if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
+
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+                                       return false;
+                               }
+                               final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+                               getLog().info(logLine);
+                               fPending.clear();
+                               return true;
+                       }
                } catch (IllegalArgumentException x) {
                        getLog().warn(x.getMessage(), x);
                } catch (IOException x) {
@@ -530,6 +556,28 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                                fPending.clear();
                                return pubResponse;
                        }
+                       
+                       if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+                               getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+                                               + (nowMs - fPending.peek().timestamp) + " ms");
+                               final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
+
+                               // Here we are checking for error response. If HTTP status
+                               // code is not within the http success response code
+                               // then we consider this as error and return false
+                               pubResponse = createMRPublisherResponse(result, pubResponse);
+
+                               if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+                                               || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+                                       return pubResponse;
+                               }
+
+                               final String logLine = String.valueOf((Clock.now() - startMs));
+                               getLog().info(logLine);
+                               fPending.clear();
+                               return pubResponse;
+                       }
                } catch (IllegalArgumentException x) {
                        getLog().warn(x.getMessage(), x);
                        pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
@@ -572,7 +620,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                return pubResponse;
        }
 
-       private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+       public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
 
                if (reply.isEmpty()) {
 
@@ -682,10 +730,6 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
        private void DME2Configue() throws Exception {
                try {
 
-                       /*
-                        * FileReader reader = new FileReader(new File (producerFilePath));
-                        * Properties props = new Properties(); props.load(reader);
-                        */
                        latitude = props.getProperty("Latitude");
                        longitude = props.getProperty("Longitude");
                        version = props.getProperty("Version");
@@ -694,12 +738,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        partner = props.getProperty("Partner");
                        routeOffer = props.getProperty("routeOffer");
                        subContextPath = props.getProperty("SubContextPath") + fTopic;
-                       /*
-                        * if(props.getProperty("partition")!=null &&
-                        * !props.getProperty("partition").equalsIgnoreCase("")){
-                        * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
-                        * "partition"); }
-                        */
+                       
                        protocol = props.getProperty("Protocol");
                        methodType = props.getProperty("MethodType");
                        dmeuser = props.getProperty("username");
@@ -742,7 +781,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
 
                        // SSL changes
                        // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-                       // "SSLv3,TLSv1,TLSv1.1");
+                       
                        System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
                        System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
                        System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
@@ -756,7 +795,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                        sender.setSubContext(subContextPath);
                        sender.setCredentials(dmeuser, dmepassword);
                        sender.setHeaders(DMETimeOuts);
-                       if (handlers.equalsIgnoreCase("yes")) {
+                       if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
                                sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
                                                props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
                                sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
@@ -775,7 +814,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP
                } catch (Exception x) {
 
                        getLog().warn(x.getMessage(), x);
-                       throw new Exception(x.getMessage());
+                       throw new IllegalArgumentException(x.getMessage());
                }
        }