bd140cd76e95fd4aba9b2d89f572774782af1619
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import org.onap.dmaap.mr.client.HostSelector;
55 import org.onap.dmaap.mr.client.MRBatchingPublisher;
56 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
57 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
58
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60         private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62         private static final String PROPS_PROTOCOL = "Protocol";
63         private static final String PROPS_PARTITION = "partition";
64         private static final String PROPS_CONTENT_TYPE = "contenttype";
65
66         private static final String CONTENT_TYPE_CAMBRIA_ZIP = "application/cambria-zip";
67         private static final String CONTENT_TYPE_CAMBRIA = "application/cambria";
68         private static final String CONTENT_TYPE_JSON = "application/json";
69         private static final String CONTENT_TYPE_TEXT = "text/plain";
70
71         private static final String JSON_STATUS = "status";
72
73         public static class Builder {
74
75                 public Builder againstUrls(Collection<String> baseUrls) {
76                         fUrls = baseUrls;
77                         return this;
78                 }
79                 
80                 public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )            
81                 {               
82                         fUrls = baseUrls;               
83                         fServiceName = serviceName;             
84                         fTransportype = transportype;           
85                         return this;            
86                 }
87
88                 public Builder onTopic(String topic) {
89                         fTopic = topic;
90                         return this;
91                 }
92
93                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
94                         fMaxBatchSize = maxBatchSize;
95                         fMaxBatchAgeMs = maxBatchAgeMs;
96                         return this;
97                 }
98
99                 public Builder compress(boolean compress) {
100                         fCompress = compress;
101                         return this;
102                 }
103
104                 public Builder httpThreadTime(int threadOccuranceTime) {
105                         this.threadOccuranceTime = threadOccuranceTime;
106                         return this;
107                 }
108
109                 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
110                         fAllowSelfSignedCerts = allowSelfSignedCerts;
111                         return this;
112                 }
113
114                 public Builder withResponse(boolean withResponse) {
115                         fWithResponse = withResponse;
116                         return this;
117                 }
118
119                 public MRSimplerBatchPublisher build() {
120                         if (!fWithResponse) {
121                                 try {
122                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
123                                                         fAllowSelfSignedCerts, threadOccuranceTime);
124                                 } catch (MalformedURLException e) {
125                                         throw new IllegalArgumentException(e);
126                                 }
127                         } else {
128                                 try {
129                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
130                                                         fAllowSelfSignedCerts, fMaxBatchSize);
131                                 } catch (MalformedURLException e) {
132                                         throw new IllegalArgumentException(e);
133                                 }
134                         }
135
136                 }
137
138                 private Collection<String> fUrls;
139                 private Collection<String> fServiceName;                
140                 private String fTransportype;   
141                 private String fTopic;
142                 private int fMaxBatchSize = 100;
143                 
144                 private long fMaxBatchAgeMs = 1000;
145                 private boolean fCompress = false;
146                 private int threadOccuranceTime = 50;
147                 private boolean fAllowSelfSignedCerts = false;
148                 private boolean fWithResponse = false;
149
150         };
151
152         @Override
153         public int send(String partition, String msg) {
154                 return send(new message(partition, msg));
155         }
156
157         @Override
158         public int send(String msg) {
159                 return send(new message(null, msg));
160         }
161
162         @Override
163         public int send(message msg) {
164                 final LinkedList<message> list = new LinkedList<>();
165                 list.add(msg);
166                 return send(list);
167         }
168
169         @Override
170         public synchronized int send(Collection<message> msgs) {
171                 if (fClosed) {
172                         throw new IllegalStateException("The publisher was closed.");
173                 }
174
175                 for (message userMsg : msgs) {
176                         fPending.add(new TimestampedMessage(userMsg));
177                 }
178                 return getPendingMessageCount();
179         }
180
181         @Override
182         public synchronized int getPendingMessageCount() {
183                 return fPending.size();
184         }
185
186         @Override
187         public void close() {
188                 try {
189                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
190                         if (remains.isEmpty()) {
191                                 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
192                                         remains.size());
193                         }
194                 } catch (InterruptedException e) {
195                         getLog().warn("Possible message loss. " + e.getMessage(), e);
196                         Thread.currentThread().interrupt();
197                 } catch (IOException e) {
198                         getLog().warn("Possible message loss. " + e.getMessage(), e);
199                 }
200         }
201
202         @Override
203         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
204                 synchronized (this) {
205                         fClosed = true;
206
207                         // stop the background sender
208                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
209                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
210                         fExec.shutdown();
211                 }
212
213                 final long now = Clock.now();
214                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
215                 final long timeoutAtMs = now + waitInMs;
216
217                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
218                         send(true);
219                         Thread.sleep(250);
220                 }
221
222                 synchronized (this) {
223                         final LinkedList<message> result = new LinkedList<>();
224                         fPending.drainTo(result);
225                         return result;
226                 }
227         }
228
229         /**
230          * Possibly send a batch to the MR server. This is called by the background
231          * thread and the close() method
232          * 
233          * @param force
234          */
235         private synchronized void send(boolean force) {
236                 if ((force || shouldSendNow()) && !sendBatch()) {
237                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
238
239                                 // note the time for back-off
240                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
241                 }
242         }
243
244         private synchronized boolean shouldSendNow() {
245                 boolean shouldSend = false;
246                 if (fPending.size()>0) {
247                         final long nowMs = Clock.now();
248
249                         shouldSend = (fPending.size() >= fMaxBatchSize);
250                         if (!shouldSend) {
251                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
252                                 shouldSend = sendAtMs <= nowMs;
253                         }
254
255                         // however, wait after an error
256                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
257                 }
258                 return shouldSend;
259         }
260
261         /**
262          * Method to parse published JSON Objects and Arrays
263          * 
264          * @return JSONArray
265          */
266         private JSONArray parseJSON() {
267                 JSONArray jsonArray = new JSONArray();
268                 for (TimestampedMessage m : fPending) {
269                         JSONTokener jsonTokener = new JSONTokener(m.fMsg);
270                         JSONObject jsonObject = null;
271                         JSONArray tempjsonArray = null;
272                         final char firstChar = jsonTokener.next();
273                         jsonTokener.back();
274                         if ('[' == firstChar) {
275                                 tempjsonArray = new JSONArray(jsonTokener);
276                                 if (null != tempjsonArray) {
277                                         for (int i = 0; i < tempjsonArray.length(); i++) {
278                                                 jsonArray.put(tempjsonArray.getJSONObject(i));
279                                         }
280                                 }
281                         } else {
282                                 jsonObject = new JSONObject(jsonTokener);
283                                 jsonArray.put(jsonObject);
284                         }
285
286                 }
287                 return jsonArray;
288         }
289
290         private void logTime(long startMs, String dmeResponse) {
291                 if (getLog().isInfoEnabled()) {
292                         getLog().info("MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse);
293                 }
294         }
295
296         private synchronized boolean sendBatch() {
297                 // it's possible for this call to be made with an empty list. in this
298                 // case, just return.
299                 if (fPending.isEmpty()) {
300                         return true;
301                 }
302
303                 final long nowMs = Clock.now();
304
305                 if (this.fHostSelector != null) {
306                         host = this.fHostSelector.selectBaseHost();
307                 }
308
309                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
310                                 props.getProperty(PROPS_PARTITION));
311
312                 try {
313                         
314                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
315                         OutputStream os = baseStream;
316                         final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
317                         if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
318                                 JSONArray jsonArray = parseJSON();
319                                 os.write(jsonArray.toString().getBytes());
320                                 os.close();
321
322                         } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
323                                 for (TimestampedMessage m : fPending) {
324                                         os.write(m.fMsg.getBytes());
325                                         os.write('\n');
326                                 }
327                                 os.close();
328                         } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
329                                         || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
330                                 if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
331                                         os = new GZIPOutputStream(baseStream);
332                                 }
333                                 for (TimestampedMessage m : fPending) {
334
335                                         os.write(("" + m.fPartition.length()).getBytes());
336                                         os.write('.');
337                                         os.write(("" + m.fMsg.length()).getBytes());
338                                         os.write('.');
339                                         os.write(m.fPartition.getBytes());
340                                         os.write(m.fMsg.getBytes());
341                                         os.write('\n');
342                                 }
343                                 os.close();
344                         } else {
345                                 for (TimestampedMessage m : fPending) {
346                                         os.write(m.fMsg.getBytes());
347
348                                 }
349                                 os.close();
350                         }
351
352                         final long startMs = Clock.now();
353                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
354
355                                 DME2Configue();
356
357                                 this.wait(5);
358                                 getLog().info(String
359                                         .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
360                                                 nowMs - fPending.peek().timestamp));
361                                 sender.setPayload(os.toString());
362                                 String dmeResponse = sender.sendAndWait(5000L);
363
364                                 logTime(startMs, dmeResponse);
365                                 fPending.clear();
366                                 return true;
367                         }
368
369             if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
370                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
371                                         nowMs - fPending.peek().timestamp);
372                 final JSONObject result =
373                         postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
374                                 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
375                                 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
376                 // Here we are checking for error response. If HTTP status
377                 // code is not within the http success response code
378                 // then we consider this as error and return false
379                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
380                     return false;
381                 }
382                                 logTime(startMs, result.toString());
383                 fPending.clear();
384                 return true;
385             }
386
387                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
388                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
389                                         nowMs - fPending.peek().timestamp);
390                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
391                                                 protocolFlag);
392
393                                 // Here we are checking for error response. If HTTP status
394                                 // code is not within the http success response code
395                                 // then we consider this as error and return false
396                                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
397                                         return false;
398                                 }
399                                 logTime(startMs, result.toString());
400                                 fPending.clear();
401                                 return true;
402                         }
403                         
404                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
405                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
406                                         nowMs - fPending.peek().timestamp);
407                                 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
408
409                                 // Here we are checking for error response. If HTTP status
410                                 // code is not within the http success response code
411                                 // then we consider this as error and return false
412                                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
413                                         return false;
414                                 }
415                                 logTime(startMs, result.toString());
416                                 fPending.clear();
417                                 return true;
418                         }
419                 } catch (Exception x) {
420                         getLog().warn(x.getMessage(), x);
421                 }
422                 return false;
423         }
424
425         public synchronized MRPublisherResponse sendBatchWithResponse() {
426                 // it's possible for this call to be made with an empty list. in this
427                 // case, just return.
428                 if (fPending.isEmpty()) {
429                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
430                         pubResponse.setResponseMessage("No Messages to send");
431                         return pubResponse;
432                 }
433
434                 final long nowMs = Clock.now();
435
436                 host = this.fHostSelector.selectBaseHost();
437
438                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(PROPS_PROTOCOL),
439                                 props.getProperty(PROPS_PARTITION));
440                 OutputStream os = null;
441                 try {
442
443                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
444                         os = baseStream;
445                         final String contentType = props.getProperty(PROPS_CONTENT_TYPE);
446                         if (contentType.equalsIgnoreCase(CONTENT_TYPE_JSON)) {
447                                 JSONArray jsonArray = parseJSON();
448                                 os.write(jsonArray.toString().getBytes());
449                         } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
450                                 for (TimestampedMessage m : fPending) {
451                                         os.write(m.fMsg.getBytes());
452                                         os.write('\n');
453                                 }
454                         } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA)
455                                         || (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP))) {
456                                 if (contentType.equalsIgnoreCase(CONTENT_TYPE_CAMBRIA_ZIP)) {
457                                         os = new GZIPOutputStream(baseStream);
458                                 }
459                                 for (TimestampedMessage m : fPending) {
460
461                                         os.write(("" + m.fPartition.length()).getBytes());
462                                         os.write('.');
463                                         os.write(("" + m.fMsg.length()).getBytes());
464                                         os.write('.');
465                                         os.write(m.fPartition.getBytes());
466                                         os.write(m.fMsg.getBytes());
467                                         os.write('\n');
468                                 }
469                                 os.close();
470                         } else {
471                                 for (TimestampedMessage m : fPending) {
472                                         os.write(m.fMsg.getBytes());
473
474                                 }
475                         }
476
477                         final long startMs = Clock.now();
478                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
479
480                                 try {
481                                         DME2Configue();
482
483                                         this.wait(5);
484                                         getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
485                                                         nowMs - fPending.peek().timestamp);
486                                         sender.setPayload(os.toString());
487
488                                         String dmeResponse = sender.sendAndWait(5000L);
489
490                                         pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
491
492                                         if (Integer.valueOf(pubResponse.getResponseCode()) < 200
493                                                         || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
494
495                                                 return pubResponse;
496                                         }
497                                         final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
498                                         getLog().info(logLine);
499                                         fPending.clear();
500
501                                 } catch (DME2Exception x) {
502                                         getLog().warn(x.getMessage(), x);
503                                         pubResponse.setResponseCode(x.getErrorCode());
504                                         pubResponse.setResponseMessage(x.getErrorMessage());
505                                 } catch (URISyntaxException x) {
506
507                                         getLog().warn(x.getMessage(), x);
508                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
509                                         pubResponse.setResponseMessage(x.getMessage());
510                                 } catch (Exception x) {
511
512                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
513                                         pubResponse.setResponseMessage(x.getMessage());
514                                         logger.error("exception: ", x);
515
516                                 }
517
518                                 return pubResponse;
519                         }
520
521                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
522                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
523                                         nowMs - fPending.peek().timestamp);
524                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
525                                                 authDate, username, password, protocolFlag);
526                                 // Here we are checking for error response. If HTTP status
527                                 // code is not within the http success response code
528                                 // then we consider this as error and return false
529
530                                 pubResponse = createMRPublisherResponse(result, pubResponse);
531
532                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
533                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
534
535                                         return pubResponse;
536                                 }
537
538                                 logTime(startMs, result);
539                                 fPending.clear();
540                                 return pubResponse;
541                         }
542
543                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
544                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
545                                         nowMs - fPending.peek().timestamp);
546                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
547                                                 password, protocolFlag);
548
549                                 // Here we are checking for error response. If HTTP status
550                                 // code is not within the http success response code
551                                 // then we consider this as error and return false
552                                 pubResponse = createMRPublisherResponse(result, pubResponse);
553
554                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
555                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
556
557                                         return pubResponse;
558                                 }
559
560                                 final String logLine = String.valueOf((Clock.now() - startMs));
561                                 getLog().info(logLine);
562                                 fPending.clear();
563                                 return pubResponse;
564                         }
565                         
566                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
567                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
568                                         nowMs - fPending.peek().timestamp);
569                                 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
570
571                                 // Here we are checking for error response. If HTTP status
572                                 // code is not within the http success response code
573                                 // then we consider this as error and return false
574                                 pubResponse = createMRPublisherResponse(result, pubResponse);
575
576                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
577                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
578
579                                         return pubResponse;
580                                 }
581
582                                 final String logLine = String.valueOf((Clock.now() - startMs));
583                                 getLog().info(logLine);
584                                 fPending.clear();
585                                 return pubResponse;
586                         }
587                 } catch (IllegalArgumentException | HttpException x) {
588                         getLog().warn(x.getMessage(), x);
589                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
590                         pubResponse.setResponseMessage(x.getMessage());
591
592                 } catch (IOException x) {
593                         getLog().warn(x.getMessage(), x);
594                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
595                         pubResponse.setResponseMessage(x.getMessage());
596                 } catch (Exception x) {
597                         getLog().warn(x.getMessage(), x);
598
599                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
600                         pubResponse.setResponseMessage(x.getMessage());
601
602                 }
603
604                 finally {
605                         if (!fPending.isEmpty()) {
606                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
607                                 pubResponse.setPendingMsgs(fPending.size());
608                         }
609                         if (os != null) {
610                                 try {
611                                         os.close();
612                                 } catch (Exception x) {
613                                         getLog().warn(x.getMessage(), x);
614                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
615                                         pubResponse.setResponseMessage("Error in closing Output Stream");
616                                 }
617                         }
618                 }
619
620                 return pubResponse;
621         }
622
623         public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
624
625                 if (reply.isEmpty()) {
626
627                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
628                         mrPubResponse.setResponseMessage("Please verify the Producer properties");
629                 } else if (reply.startsWith("{")) {
630                         JSONObject jObject = new JSONObject(reply);
631                         if (jObject.has("message") && jObject.has(JSON_STATUS)) {
632                                 String message = jObject.getString("message");
633                                 if (null != message) {
634                                         mrPubResponse.setResponseMessage(message);
635                                 }
636                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
637                         } else {
638                                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
639                                 mrPubResponse.setResponseMessage(reply);
640                         }
641                 } else if (reply.startsWith("<")) {
642                         String responseCode = getHTTPErrorResponseCode(reply);
643                         if (responseCode.contains("403")) {
644                                 responseCode = "403";
645                         }
646                         mrPubResponse.setResponseCode(responseCode);
647                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
648                 }
649
650                 return mrPubResponse;
651         }
652
653         private final String fTopic;
654         private final int fMaxBatchSize;
655         private final long fMaxBatchAgeMs;
656         private final boolean fCompress;
657         private int threadOccuranceTime;
658         private boolean fClosed;
659         private String username;
660         private String password;
661         private String host;
662
663         // host selector
664         private HostSelector fHostSelector = null;
665
666         private final LinkedBlockingQueue<TimestampedMessage> fPending;
667         private long fDontSendUntilMs;
668         private final ScheduledThreadPoolExecutor fExec;
669
670         private String latitude;
671         private String longitude;
672         private String version;
673         private String serviceName;
674         private String env;
675         private String partner;
676         private String routeOffer;
677         private String subContextPath;
678         private String protocol;
679         private String methodType;
680         private String url;
681         private String dmeuser;
682         private String dmepassword;
683         private String contentType;
684         private static final long sfWaitAfterError = 10000;
685         private HashMap<String, String> DMETimeOuts;
686         private DME2Client sender;
687         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
688         private String authKey;
689         private String authDate;
690         private String handlers;
691         private Properties props;
692         public static String routerFilePath;
693         protected static final Map<String, String> headers = new HashMap<String, String>();
694         public static MultivaluedMap<String, Object> headersMap;
695
696         private MRPublisherResponse pubResponse;
697
698         public MRPublisherResponse getPubResponse() {
699                 return pubResponse;
700         }
701
702         public void setPubResponse(MRPublisherResponse pubResponse) {
703                 this.pubResponse = pubResponse;
704         }
705
706         public static String getRouterFilePath() {
707                 return routerFilePath;
708         }
709
710         public static void setRouterFilePath(String routerFilePath) {
711                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
712         }
713
714         public Properties getProps() {
715                 return props;
716         }
717
718         public void setProps(Properties props) {
719                 this.props = props;
720                 setClientConfig(DmaapClientUtil.getClientConfig(props));
721         }
722
723         public String getProtocolFlag() {
724                 return protocolFlag;
725         }
726
727         public void setProtocolFlag(String protocolFlag) {
728                 this.protocolFlag = protocolFlag;
729         }
730
731         private void DME2Configue() throws Exception {
732                 try {
733
734                         latitude = props.getProperty("Latitude");
735                         longitude = props.getProperty("Longitude");
736                         version = props.getProperty("Version");
737                         serviceName = props.getProperty("ServiceName");
738                         env = props.getProperty("Environment");
739                         partner = props.getProperty("Partner");
740                         routeOffer = props.getProperty("routeOffer");
741                         subContextPath = props.getProperty("SubContextPath") + fTopic;
742                         
743                         protocol = props.getProperty(PROPS_PROTOCOL);
744                         methodType = props.getProperty("MethodType");
745                         dmeuser = props.getProperty("username");
746                         dmepassword = props.getProperty("password");
747                         contentType = props.getProperty(PROPS_CONTENT_TYPE);
748                         handlers = props.getProperty("sessionstickinessrequired");
749                         routerFilePath = props.getProperty("DME2preferredRouterFilePath");
750
751                         /**
752                          * Changes to DME2Client url to use Partner for auto failover
753                          * between data centers When Partner value is not provided use the
754                          * routeOffer value for auto failover within a cluster
755                          */
756
757                         String partitionKey = props.getProperty(PROPS_PARTITION);
758
759                         if (partner != null && !partner.isEmpty()) {
760                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
761                                                 + partner;
762                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
763                                         url = url + "&partitionKey=" + partitionKey;
764                                 }
765                         } else if (routeOffer != null && !routeOffer.isEmpty()) {
766                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
767                                                 + routeOffer;
768                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
769                                         url = url + "&partitionKey=" + partitionKey;
770                                 }
771                         }
772
773                         DMETimeOuts = new HashMap<String, String>();
774                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
775                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
776                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
777                         DMETimeOuts.put("Content-Type", contentType);
778                         System.setProperty("AFT_LATITUDE", latitude);
779                         System.setProperty("AFT_LONGITUDE", longitude);
780                         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
781                         // System.setProperty("DME2.DEBUG", "true");
782
783                         // SSL changes
784                         // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
785                         
786                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
787                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
788                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
789
790                         // SSL changes
791
792                         sender = new DME2Client(new URI(url), 5000L);
793
794                         sender.setAllowAllHttpReturnCodes(true);
795                         sender.setMethod(methodType);
796                         sender.setSubContext(subContextPath);
797                         sender.setCredentials(dmeuser, dmepassword);
798                         sender.setHeaders(DMETimeOuts);
799                         if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
800                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
801                                                 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
802                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
803                                                 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
804                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
805                         } else {
806                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
807                         }
808                 } catch (DME2Exception x) {
809                         getLog().warn(x.getMessage(), x);
810                         throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
811                 } catch (URISyntaxException x) {
812
813                         getLog().warn(x.getMessage(), x);
814                         throw new URISyntaxException(url, x.getMessage());
815                 } catch (Exception x) {
816
817                         getLog().warn(x.getMessage(), x);
818                         throw new IllegalArgumentException(x.getMessage());
819                 }
820         }
821
822         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
823                         boolean compress) throws MalformedURLException {
824                 super(hosts);
825
826                 if (topic == null || topic.length() < 1) {
827                         throw new IllegalArgumentException("A topic must be provided.");
828                 }
829
830                 fHostSelector = new HostSelector(hosts, null);
831                 fClosed = false;
832                 fTopic = topic;
833                 fMaxBatchSize = maxBatchSize;
834                 fMaxBatchAgeMs = maxBatchAgeMs;
835                 fCompress = compress;
836
837                 fPending = new LinkedBlockingQueue<>();
838                 fDontSendUntilMs = 0;
839                 fExec = new ScheduledThreadPoolExecutor(1);
840                 pubResponse = new MRPublisherResponse();
841
842         }
843
844         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
845                         boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
846                 super(hosts);
847
848                 if (topic == null || topic.length() < 1) {
849                         throw new IllegalArgumentException("A topic must be provided.");
850                 }
851
852                 fHostSelector = new HostSelector(hosts, null);
853                 fClosed = false;
854                 fTopic = topic;
855                 fMaxBatchSize = maxBatchSize;
856                 fMaxBatchAgeMs = maxBatchAgeMs;
857                 fCompress = compress;
858                 threadOccuranceTime = httpThreadOccurnace;
859                 fPending = new LinkedBlockingQueue<>();
860                 fDontSendUntilMs = 0;
861                 fExec = new ScheduledThreadPoolExecutor(1);
862                 fExec.scheduleAtFixedRate(new Runnable() {
863                         @Override
864                         public void run() {
865                                 send(false);
866                         }
867                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
868                 pubResponse = new MRPublisherResponse();
869         }
870
871         private static class TimestampedMessage extends message {
872                 public TimestampedMessage(message m) {
873                         super(m);
874                         timestamp = Clock.now();
875                 }
876
877                 public final long timestamp;
878         }
879
880         public String getUsername() {
881                 return username;
882         }
883
884         public void setUsername(String username) {
885                 this.username = username;
886         }
887
888         public String getPassword() {
889                 return password;
890         }
891
892         public void setPassword(String password) {
893                 this.password = password;
894         }
895
896         public String getHost() {
897                 return host;
898         }
899
900         public void setHost(String host) {
901                 this.host = host;
902         }
903
904         public String getContentType() {
905                 return contentType;
906         }
907
908         public void setContentType(String contentType) {
909                 this.contentType = contentType;
910         }
911
912         public String getAuthKey() {
913                 return authKey;
914         }
915
916         public void setAuthKey(String authKey) {
917                 this.authKey = authKey;
918         }
919
920         public String getAuthDate() {
921                 return authDate;
922         }
923
924         public void setAuthDate(String authDate) {
925                 this.authDate = authDate;
926         }
927
928 }