Sonar major issues
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import com.att.nsa.mr.client.HostSelector;
55 import com.att.nsa.mr.client.MRBatchingPublisher;
56 import com.att.nsa.mr.client.response.MRPublisherResponse;
57 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
58
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60         private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62         public static class Builder {
63                 public Builder() {
64                 }
65
66                 public Builder againstUrls(Collection<String> baseUrls) {
67                         fUrls = baseUrls;
68                         return this;
69                 }
70                 
71                 public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )            
72                 {               
73                         fUrls = baseUrls;               
74                         fServiceName = serviceName;             
75                         fTransportype = transportype;           
76                         return this;            
77                 }
78
79                 public Builder onTopic(String topic) {
80                         fTopic = topic;
81                         return this;
82                 }
83
84                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
85                         fMaxBatchSize = maxBatchSize;
86                         fMaxBatchAgeMs = maxBatchAgeMs;
87                         return this;
88                 }
89
90                 public Builder compress(boolean compress) {
91                         fCompress = compress;
92                         return this;
93                 }
94
95                 public Builder httpThreadTime(int threadOccuranceTime) {
96                         this.threadOccuranceTime = threadOccuranceTime;
97                         return this;
98                 }
99
100                 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
101                         fAllowSelfSignedCerts = allowSelfSignedCerts;
102                         return this;
103                 }
104
105                 public Builder withResponse(boolean withResponse) {
106                         fWithResponse = withResponse;
107                         return this;
108                 }
109
110                 public MRSimplerBatchPublisher build() {
111                         if (!fWithResponse) {
112                                 try {
113                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
114                                                         fAllowSelfSignedCerts, threadOccuranceTime);
115                                 } catch (MalformedURLException e) {
116                                         throw new IllegalArgumentException(e);
117                                 }
118                         } else {
119                                 try {
120                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
121                                                         fAllowSelfSignedCerts, fMaxBatchSize);
122                                 } catch (MalformedURLException e) {
123                                         throw new IllegalArgumentException(e);
124                                 }
125                         }
126
127                 }
128
129                 private Collection<String> fUrls;
130                 private Collection<String> fServiceName;                
131                 private String fTransportype;   
132                 private String fTopic;
133                 private int fMaxBatchSize = 100;
134                 private long fMaxBatchAgeMs = 1000;
135                 private boolean fCompress = false;
136                 private int threadOccuranceTime = 50;
137                 private boolean fAllowSelfSignedCerts = false;
138                 private boolean fWithResponse = false;
139
140         };
141
142         @Override
143         public int send(String partition, String msg) {
144                 return send(new message(partition, msg));
145         }
146
147         @Override
148         public int send(String msg) {
149                 return send(new message(null, msg));
150         }
151
152         @Override
153         public int send(message msg) {
154                 final LinkedList<message> list = new LinkedList<message>();
155                 list.add(msg);
156                 return send(list);
157         }
158
159         @Override
160         public synchronized int send(Collection<message> msgs) {
161                 if (fClosed) {
162                         throw new IllegalStateException("The publisher was closed.");
163                 }
164
165                 for (message userMsg : msgs) {
166                         fPending.add(new TimestampedMessage(userMsg));
167                 }
168                 return getPendingMessageCount();
169         }
170
171         @Override
172         public synchronized int getPendingMessageCount() {
173                 return fPending.size();
174         }
175
176         @Override
177         public void close() {
178                 try {
179                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
180                         if (remains.isEmpty()) {
181                                 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
182                                                 + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
183                         }
184                 } catch (InterruptedException e) {
185                         getLog().warn("Possible message loss. " + e.getMessage(), e);
186                         Thread.currentThread().interrupt();
187                 } catch (IOException e) {
188                         getLog().warn("Possible message loss. " + e.getMessage(), e);
189                 }
190         }
191
192         @Override
193         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
194                 synchronized (this) {
195                         fClosed = true;
196
197                         // stop the background sender
198                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
199                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
200                         fExec.shutdown();
201                 }
202
203                 final long now = Clock.now();
204                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
205                 final long timeoutAtMs = now + waitInMs;
206
207                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
208                         send(true);
209                         Thread.sleep(250);
210                 }
211
212                 synchronized (this) {
213                         final LinkedList<message> result = new LinkedList<message>();
214                         fPending.drainTo(result);
215                         return result;
216                 }
217         }
218
219         /**
220          * Possibly send a batch to the MR server. This is called by the background
221          * thread and the close() method
222          * 
223          * @param force
224          */
225         private synchronized void send(boolean force) {
226                 if (force || shouldSendNow()) {
227                         if (!sendBatch()) {
228                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
229
230                                 // note the time for back-off
231                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
232                         }
233                 }
234         }
235
236         private synchronized boolean shouldSendNow() {
237                 boolean shouldSend = false;
238                 if (fPending.size() > 0) {
239                         final long nowMs = Clock.now();
240
241                         shouldSend = (fPending.size() >= fMaxBatchSize);
242                         if (!shouldSend) {
243                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
244                                 shouldSend = sendAtMs <= nowMs;
245                         }
246
247                         // however, wait after an error
248                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
249                 }
250                 return shouldSend;
251         }
252
253         /**
254          * Method to parse published JSON Objects and Arrays
255          * 
256          * @return JSONArray
257          */
258         private JSONArray parseJSON() {
259                 JSONArray jsonArray = new JSONArray();
260                 for (TimestampedMessage m : fPending) {
261                         JSONTokener jsonTokener = new JSONTokener(m.fMsg);
262                         JSONObject jsonObject = null;
263                         JSONArray tempjsonArray = null;
264                         final char firstChar = jsonTokener.next();
265                         jsonTokener.back();
266                         if ('[' == firstChar) {
267                                 tempjsonArray = new JSONArray(jsonTokener);
268                                 if (null != tempjsonArray) {
269                                         for (int i = 0; i < tempjsonArray.length(); i++) {
270                                                 jsonArray.put(tempjsonArray.getJSONObject(i));
271                                         }
272                                 }
273                         } else {
274                                 jsonObject = new JSONObject(jsonTokener);
275                                 jsonArray.put(jsonObject);
276                         }
277
278                 }
279                 return jsonArray;
280         }
281
282         private synchronized boolean sendBatch() {
283                 // it's possible for this call to be made with an empty list. in this
284                 // case, just return.
285                 if (fPending.size() < 1) {
286                         return true;
287                 }
288
289                 final long nowMs = Clock.now();
290
291                 if (this.fHostSelector != null) {
292                         host = this.fHostSelector.selectBaseHost();
293                 }
294
295                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
296                                 props.getProperty("partition"));
297
298                 try {
299                         /*
300                          * final String contentType = fCompress ?
301                          * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
302                          */
303
304                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
305                         OutputStream os = baseStream;
306                         final String contentType = props.getProperty("contenttype");
307                         if (contentType.equalsIgnoreCase("application/json")) {
308                                 JSONArray jsonArray = parseJSON();
309                                 os.write(jsonArray.toString().getBytes());
310                                 os.close();
311
312                         } else if (contentType.equalsIgnoreCase("text/plain")) {
313                                 for (TimestampedMessage m : fPending) {
314                                         os.write(m.fMsg.getBytes());
315                                         os.write('\n');
316                                 }
317                                 os.close();
318                         } else if (contentType.equalsIgnoreCase("application/cambria")
319                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
320                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
321                                         os = new GZIPOutputStream(baseStream);
322                                 }
323                                 for (TimestampedMessage m : fPending) {
324
325                                         os.write(("" + m.fPartition.length()).getBytes());
326                                         os.write('.');
327                                         os.write(("" + m.fMsg.length()).getBytes());
328                                         os.write('.');
329                                         os.write(m.fPartition.getBytes());
330                                         os.write(m.fMsg.getBytes());
331                                         os.write('\n');
332                                 }
333                                 os.close();
334                         } else {
335                                 for (TimestampedMessage m : fPending) {
336                                         os.write(m.fMsg.getBytes());
337
338                                 }
339                                 os.close();
340                         }
341
342                         final long startMs = Clock.now();
343                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
344
345                                 DME2Configue();
346
347                                 Thread.sleep(5);
348                                 getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
349                                                 + (nowMs - fPending.peek().timestamp) + " ms");
350                                 sender.setPayload(os.toString());
351                                 String dmeResponse = sender.sendAndWait(5000L);
352
353                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
354                                 getLog().info(logLine);
355                                 fPending.clear();
356                                 return true;
357                         }
358
359                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
360                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
361                                                 + (nowMs - fPending.peek().timestamp) + " ms");
362                                 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
363                                                 username, password, protocolFlag);
364                                 // Here we are checking for error response. If HTTP status
365                                 // code is not within the http success response code
366                                 // then we consider this as error and return false
367                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
368                                         return false;
369                                 }
370                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
371                                 getLog().info(logLine);
372                                 fPending.clear();
373                                 return true;
374                         }
375
376                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
377                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
378                                                 + (nowMs - fPending.peek().timestamp) + " ms");
379                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
380                                                 protocolFlag);
381
382                                 // Here we are checking for error response. If HTTP status
383                                 // code is not within the http success response code
384                                 // then we consider this as error and return false
385                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
386                                         return false;
387                                 }
388                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
389                                 getLog().info(logLine);
390                                 fPending.clear();
391                                 return true;
392                         }
393                         
394                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
395                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
396                                                 + (nowMs - fPending.peek().timestamp) + " ms");
397                                 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
398
399                                 // Here we are checking for error response. If HTTP status
400                                 // code is not within the http success response code
401                                 // then we consider this as error and return false
402                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
403                                         return false;
404                                 }
405                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
406                                 getLog().info(logLine);
407                                 fPending.clear();
408                                 return true;
409                         }
410                 } catch (IllegalArgumentException x) {
411                         getLog().warn(x.getMessage(), x);
412                 } catch (IOException x) {
413                         getLog().warn(x.getMessage(), x);
414                 } catch (HttpException x) {
415                         getLog().warn(x.getMessage(), x);
416                 } catch (Exception x) {
417                         getLog().warn(x.getMessage(), x);
418                 }
419                 return false;
420         }
421
422         public synchronized MRPublisherResponse sendBatchWithResponse() {
423                 // it's possible for this call to be made with an empty list. in this
424                 // case, just return.
425                 if (fPending.size() < 1) {
426                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
427                         pubResponse.setResponseMessage("No Messages to send");
428                         return pubResponse;
429                 }
430
431                 final long nowMs = Clock.now();
432
433                 host = this.fHostSelector.selectBaseHost();
434
435                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
436                                 props.getProperty("partition"));
437                 OutputStream os = null;
438                 try {
439
440                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
441                         os = baseStream;
442                         final String contentType = props.getProperty("contenttype");
443                         if (contentType.equalsIgnoreCase("application/json")) {
444                                 JSONArray jsonArray = parseJSON();
445                                 os.write(jsonArray.toString().getBytes());
446                         } else if (contentType.equalsIgnoreCase("text/plain")) {
447                                 for (TimestampedMessage m : fPending) {
448                                         os.write(m.fMsg.getBytes());
449                                         os.write('\n');
450                                 }
451                         } else if (contentType.equalsIgnoreCase("application/cambria")
452                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
453                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
454                                         os = new GZIPOutputStream(baseStream);
455                                 }
456                                 for (TimestampedMessage m : fPending) {
457
458                                         os.write(("" + m.fPartition.length()).getBytes());
459                                         os.write('.');
460                                         os.write(("" + m.fMsg.length()).getBytes());
461                                         os.write('.');
462                                         os.write(m.fPartition.getBytes());
463                                         os.write(m.fMsg.getBytes());
464                                         os.write('\n');
465                                 }
466                                 os.close();
467                         } else {
468                                 for (TimestampedMessage m : fPending) {
469                                         os.write(m.fMsg.getBytes());
470
471                                 }
472                         }
473
474                         final long startMs = Clock.now();
475                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
476
477                                 try {
478                                         DME2Configue();
479
480                                         Thread.sleep(5);
481                                         getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
482                                                         + (nowMs - fPending.peek().timestamp) + " ms");
483                                         sender.setPayload(os.toString());
484
485                                         String dmeResponse = sender.sendAndWait(5000L);
486
487                                         pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
488
489                                         if (Integer.valueOf(pubResponse.getResponseCode()) < 200
490                                                         || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
491
492                                                 return pubResponse;
493                                         }
494                                         final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
495                                         getLog().info(logLine);
496                                         fPending.clear();
497
498                                 } catch (DME2Exception x) {
499                                         getLog().warn(x.getMessage(), x);
500                                         pubResponse.setResponseCode(x.getErrorCode());
501                                         pubResponse.setResponseMessage(x.getErrorMessage());
502                                 } catch (URISyntaxException x) {
503
504                                         getLog().warn(x.getMessage(), x);
505                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
506                                         pubResponse.setResponseMessage(x.getMessage());
507                                 } catch (Exception x) {
508
509                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
510                                         pubResponse.setResponseMessage(x.getMessage());
511                                         logger.error("exception: ", x);
512
513                                 }
514
515                                 return pubResponse;
516                         }
517
518                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
519                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
520                                                 + (nowMs - fPending.peek().timestamp) + " ms");
521                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
522                                                 authDate, username, password, protocolFlag);
523                                 // Here we are checking for error response. If HTTP status
524                                 // code is not within the http success response code
525                                 // then we consider this as error and return false
526
527                                 pubResponse = createMRPublisherResponse(result, pubResponse);
528
529                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
530                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
531
532                                         return pubResponse;
533                                 }
534
535                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
536                                 getLog().info(logLine);
537                                 fPending.clear();
538                                 return pubResponse;
539                         }
540
541                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
542                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
543                                                 + (nowMs - fPending.peek().timestamp) + " ms");
544                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
545                                                 password, protocolFlag);
546
547                                 // Here we are checking for error response. If HTTP status
548                                 // code is not within the http success response code
549                                 // then we consider this as error and return false
550                                 pubResponse = createMRPublisherResponse(result, pubResponse);
551
552                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
553                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
554
555                                         return pubResponse;
556                                 }
557
558                                 final String logLine = String.valueOf((Clock.now() - startMs));
559                                 getLog().info(logLine);
560                                 fPending.clear();
561                                 return pubResponse;
562                         }
563                         
564                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
565                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
566                                                 + (nowMs - fPending.peek().timestamp) + " ms");
567                                 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
568
569                                 // Here we are checking for error response. If HTTP status
570                                 // code is not within the http success response code
571                                 // then we consider this as error and return false
572                                 pubResponse = createMRPublisherResponse(result, pubResponse);
573
574                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
575                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
576
577                                         return pubResponse;
578                                 }
579
580                                 final String logLine = String.valueOf((Clock.now() - startMs));
581                                 getLog().info(logLine);
582                                 fPending.clear();
583                                 return pubResponse;
584                         }
585                 } catch (IllegalArgumentException x) {
586                         getLog().warn(x.getMessage(), x);
587                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
588                         pubResponse.setResponseMessage(x.getMessage());
589
590                 } catch (IOException x) {
591                         getLog().warn(x.getMessage(), x);
592                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
593                         pubResponse.setResponseMessage(x.getMessage());
594
595                 } catch (HttpException x) {
596                         getLog().warn(x.getMessage(), x);
597                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
598                         pubResponse.setResponseMessage(x.getMessage());
599
600                 } catch (Exception x) {
601                         getLog().warn(x.getMessage(), x);
602
603                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
604                         pubResponse.setResponseMessage(x.getMessage());
605
606                 }
607
608                 finally {
609                         if (fPending.size() > 0) {
610                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
611                                 pubResponse.setPendingMsgs(fPending.size());
612                         }
613                         if (os != null) {
614                                 try {
615                                         os.close();
616                                 } catch (Exception x) {
617                                         getLog().warn(x.getMessage(), x);
618                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
619                                         pubResponse.setResponseMessage("Error in closing Output Stream");
620                                 }
621                         }
622                 }
623
624                 return pubResponse;
625         }
626
627         public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
628
629                 if (reply.isEmpty()) {
630
631                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
632                         mrPubResponse.setResponseMessage("Please verify the Producer properties");
633                 } else if (reply.startsWith("{")) {
634                         JSONObject jObject = new JSONObject(reply);
635                         if (jObject.has("message") && jObject.has("status")) {
636                                 String message = jObject.getString("message");
637                                 if (null != message) {
638                                         mrPubResponse.setResponseMessage(message);
639                                 }
640                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
641                         } else {
642                                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
643                                 mrPubResponse.setResponseMessage(reply);
644                         }
645                 } else if (reply.startsWith("<")) {
646                         String responseCode = getHTTPErrorResponseCode(reply);
647                         if (responseCode.contains("403")) {
648                                 responseCode = "403";
649                         }
650                         mrPubResponse.setResponseCode(responseCode);
651                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
652                 }
653
654                 return mrPubResponse;
655         }
656
657         private final String fTopic;
658         private final int fMaxBatchSize;
659         private final long fMaxBatchAgeMs;
660         private final boolean fCompress;
661         private int threadOccuranceTime;
662         private boolean fClosed;
663         private String username;
664         private String password;
665         private String host;
666
667         // host selector
668         private HostSelector fHostSelector = null;
669
670         private final LinkedBlockingQueue<TimestampedMessage> fPending;
671         private long fDontSendUntilMs;
672         private final ScheduledThreadPoolExecutor fExec;
673
674         private String latitude;
675         private String longitude;
676         private String version;
677         private String serviceName;
678         private String env;
679         private String partner;
680         private String routeOffer;
681         private String subContextPath;
682         private String protocol;
683         private String methodType;
684         private String url;
685         private String dmeuser;
686         private String dmepassword;
687         private String contentType;
688         private static final long sfWaitAfterError = 10000;
689         private HashMap<String, String> DMETimeOuts;
690         private DME2Client sender;
691         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
692         private String authKey;
693         private String authDate;
694         private String handlers;
695         private Properties props;
696         public static String routerFilePath;
697         protected static final Map<String, String> headers = new HashMap<String, String>();
698         public static MultivaluedMap<String, Object> headersMap;
699
700         private MRPublisherResponse pubResponse;
701
702         public MRPublisherResponse getPubResponse() {
703                 return pubResponse;
704         }
705
706         public void setPubResponse(MRPublisherResponse pubResponse) {
707                 this.pubResponse = pubResponse;
708         }
709
710         public static String getRouterFilePath() {
711                 return routerFilePath;
712         }
713
714         public static void setRouterFilePath(String routerFilePath) {
715                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
716         }
717
718         public Properties getProps() {
719                 return props;
720         }
721
722         public void setProps(Properties props) {
723                 this.props = props;
724         }
725
726         public String getProtocolFlag() {
727                 return protocolFlag;
728         }
729
730         public void setProtocolFlag(String protocolFlag) {
731                 this.protocolFlag = protocolFlag;
732         }
733
734         private void DME2Configue() throws Exception {
735                 try {
736
737                         /*
738                          * FileReader reader = new FileReader(new File (producerFilePath));
739                          * Properties props = new Properties(); props.load(reader);
740                          */
741                         latitude = props.getProperty("Latitude");
742                         longitude = props.getProperty("Longitude");
743                         version = props.getProperty("Version");
744                         serviceName = props.getProperty("ServiceName");
745                         env = props.getProperty("Environment");
746                         partner = props.getProperty("Partner");
747                         routeOffer = props.getProperty("routeOffer");
748                         subContextPath = props.getProperty("SubContextPath") + fTopic;
749                         /*
750                          * if(props.getProperty("partition")!=null &&
751                          * !props.getProperty("partition").equalsIgnoreCase("")){
752                          * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
753                          * "partition"); }
754                          */
755                         protocol = props.getProperty("Protocol");
756                         methodType = props.getProperty("MethodType");
757                         dmeuser = props.getProperty("username");
758                         dmepassword = props.getProperty("password");
759                         contentType = props.getProperty("contenttype");
760                         handlers = props.getProperty("sessionstickinessrequired");
761                         routerFilePath = props.getProperty("DME2preferredRouterFilePath");
762
763                         /**
764                          * Changes to DME2Client url to use Partner for auto failover
765                          * between data centers When Partner value is not provided use the
766                          * routeOffer value for auto failover within a cluster
767                          */
768
769                         String partitionKey = props.getProperty("partition");
770
771                         if (partner != null && !partner.isEmpty()) {
772                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
773                                                 + partner;
774                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
775                                         url = url + "&partitionKey=" + partitionKey;
776                                 }
777                         } else if (routeOffer != null && !routeOffer.isEmpty()) {
778                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
779                                                 + routeOffer;
780                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
781                                         url = url + "&partitionKey=" + partitionKey;
782                                 }
783                         }
784
785                         DMETimeOuts = new HashMap<String, String>();
786                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
787                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
788                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
789                         DMETimeOuts.put("Content-Type", contentType);
790                         System.setProperty("AFT_LATITUDE", latitude);
791                         System.setProperty("AFT_LONGITUDE", longitude);
792                         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
793                         // System.setProperty("DME2.DEBUG", "true");
794
795                         // SSL changes
796                         // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
797                         // "SSLv3,TLSv1,TLSv1.1");
798                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
799                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
800                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
801
802                         // SSL changes
803
804                         sender = new DME2Client(new URI(url), 5000L);
805
806                         sender.setAllowAllHttpReturnCodes(true);
807                         sender.setMethod(methodType);
808                         sender.setSubContext(subContextPath);
809                         sender.setCredentials(dmeuser, dmepassword);
810                         sender.setHeaders(DMETimeOuts);
811                         if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
812                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
813                                                 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
814                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
815                                                 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
816                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
817                         } else {
818                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
819                         }
820                 } catch (DME2Exception x) {
821                         getLog().warn(x.getMessage(), x);
822                         throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
823                 } catch (URISyntaxException x) {
824
825                         getLog().warn(x.getMessage(), x);
826                         throw new URISyntaxException(url, x.getMessage());
827                 } catch (Exception x) {
828
829                         getLog().warn(x.getMessage(), x);
830                         throw new IllegalArgumentException(x.getMessage());
831                 }
832         }
833
834         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
835                         boolean compress) throws MalformedURLException {
836                 super(hosts);
837
838                 if (topic == null || topic.length() < 1) {
839                         throw new IllegalArgumentException("A topic must be provided.");
840                 }
841
842                 fHostSelector = new HostSelector(hosts, null);
843                 fClosed = false;
844                 fTopic = topic;
845                 fMaxBatchSize = maxBatchSize;
846                 fMaxBatchAgeMs = maxBatchAgeMs;
847                 fCompress = compress;
848
849                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
850                 fDontSendUntilMs = 0;
851                 fExec = new ScheduledThreadPoolExecutor(1);
852                 pubResponse = new MRPublisherResponse();
853
854         }
855
856         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
857                         boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
858                 super(hosts);
859
860                 if (topic == null || topic.length() < 1) {
861                         throw new IllegalArgumentException("A topic must be provided.");
862                 }
863
864                 fHostSelector = new HostSelector(hosts, null);
865                 fClosed = false;
866                 fTopic = topic;
867                 fMaxBatchSize = maxBatchSize;
868                 fMaxBatchAgeMs = maxBatchAgeMs;
869                 fCompress = compress;
870                 threadOccuranceTime = httpThreadOccurnace;
871                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
872                 fDontSendUntilMs = 0;
873                 fExec = new ScheduledThreadPoolExecutor(1);
874                 fExec.scheduleAtFixedRate(new Runnable() {
875                         @Override
876                         public void run() {
877                                 send(false);
878                         }
879                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
880         }
881
882         private static class TimestampedMessage extends message {
883                 public TimestampedMessage(message m) {
884                         super(m);
885                         timestamp = Clock.now();
886                 }
887
888                 public final long timestamp;
889         }
890
891         public String getUsername() {
892                 return username;
893         }
894
895         public void setUsername(String username) {
896                 this.username = username;
897         }
898
899         public String getPassword() {
900                 return password;
901         }
902
903         public void setPassword(String password) {
904                 this.password = password;
905         }
906
907         public String getHost() {
908                 return host;
909         }
910
911         public void setHost(String host) {
912                 this.host = host;
913         }
914
915         public String getContentType() {
916                 return contentType;
917         }
918
919         public void setContentType(String contentType) {
920                 this.contentType = contentType;
921         }
922
923         public String getAuthKey() {
924                 return authKey;
925         }
926
927         public void setAuthKey(String authKey) {
928                 this.authKey = authKey;
929         }
930
931         public String getAuthDate() {
932                 return authDate;
933         }
934
935         public void setAuthDate(String authDate) {
936                 this.authDate = authDate;
937         }
938
939 }