DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / resources / streamReaders / CambriaJsonStreamReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.resources.streamReaders;
23
24 import org.json.JSONException;
25 import org.json.JSONObject;
26 import org.json.JSONTokener;
27 import org.onap.dmaap.dmf.mr.CambriaApiException;
28 import org.onap.dmaap.dmf.mr.backends.Publisher.message;
29 import org.onap.dmaap.dmf.mr.beans.LogDetails;
30 import org.onap.dmaap.dmf.mr.resources.CambriaEventSet.reader;
31
32 import javax.servlet.http.HttpServletResponse;
33 import java.io.InputStream;
34
35 /**
36  * 
37  * @author anowarul.islam
38  *
39  */
40 public class CambriaJsonStreamReader implements reader {
41         private final JSONTokener fTokens;
42         private final boolean fIsList;
43         private long fCount;
44         private final String fDefPart;
45         public static final String kKeyField = "cambria.partition";
46
47         /**
48          * 
49          * @param is
50          * @param defPart
51          * @throws CambriaApiException
52          */
53         public CambriaJsonStreamReader(InputStream is, String defPart) throws CambriaApiException {
54                 try {
55                         fTokens = new JSONTokener(is);
56                         fCount = 0;
57                         fDefPart = defPart;
58
59                         final int c = fTokens.next();
60                         if (c == '[') {
61                                 fIsList = true;
62                         } else if (c == '{') {
63                                 fTokens.back();
64                                 fIsList = false;
65                         } else {
66                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expecting an array or an object.");
67                         }
68                 } catch (JSONException e) {
69                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
70                 }
71         }
72
73         @Override
74         public message next() throws CambriaApiException {
75                 try {
76                         if (!fTokens.more()) {
77                                 return null;
78                         }
79
80                         final int c = fTokens.next();
81                         
82                         
83                         if (fIsList) {
84                                 if (c == ']' || (fCount > 0 && c == 10))
85                                         return null;
86
87
88                                 if (fCount > 0 && c != ',' && c!= 10) {
89                                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
90                                                         "Expected ',' or closing ']' after last object.");
91                                 }
92
93                                 if (fCount == 0 && c != '{' && c!= 10  && c!=32) {
94                                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected { to start an object.");
95                                 }
96                         } else if (fCount != 0 || c != '{') {
97                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected '{' to start an object.");
98                         }
99
100                         if (c == '{') {
101                                 fTokens.back();
102                         }
103                         final JSONObject o = new JSONObject(fTokens);
104                         fCount++;
105                         return new msg(o);
106                 } catch (JSONException e) {
107                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
108
109                 }
110         }
111
112         private class msg implements message {
113                 private final String fKey;
114                 private  String fMsg;
115                 private LogDetails logDetails;
116                 private boolean transactionEnabled;
117
118                 /**
119                  * constructor
120                  * 
121                  * @param o
122                  */
123                 
124                 
125                 
126                 public msg(JSONObject o) {
127                         String key = o.optString(kKeyField, fDefPart);
128                         if (key == null) {
129                                 key = "" + System.currentTimeMillis();
130                         }
131                         fKey = key;
132                                         
133                                 fMsg = o.toString().trim();
134                         
135                 }
136
137                 @Override
138                 public String getKey() {
139                         return fKey;
140                 }
141
142                 @Override
143                 public String getMessage() {
144                         return fMsg;
145                 }
146
147                 @Override
148                 public boolean isTransactionEnabled() {
149                         return transactionEnabled;
150                 }
151
152                 @Override
153                 public void setTransactionEnabled(boolean transactionEnabled) {
154                         this.transactionEnabled = transactionEnabled;
155                 }
156
157                 @Override
158                 public void setLogDetails(LogDetails logDetails) {
159                         this.logDetails = logDetails;
160                 }
161
162                 @Override
163                 public LogDetails getLogDetails() {
164                         return logDetails;
165                 }
166         }
167 }