2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.plugins.context.schema.avro;
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonElement;
27 import java.io.ByteArrayOutputStream;
28 import java.util.LinkedHashSet;
30 import org.apache.avro.Schema;
31 import org.apache.avro.Schema.Field;
32 import org.apache.avro.Schema.Type;
33 import org.apache.avro.generic.GenericDatumReader;
34 import org.apache.avro.generic.GenericDatumWriter;
35 import org.apache.avro.generic.GenericRecord;
36 import org.apache.avro.io.DatumWriter;
37 import org.apache.avro.io.DecoderFactory;
38 import org.apache.avro.io.EncoderFactory;
39 import org.apache.avro.io.JsonDecoder;
40 import org.apache.avro.io.JsonEncoder;
41 import org.onap.policy.apex.context.ContextRuntimeException;
42 import org.onap.policy.apex.context.impl.schema.AbstractSchemaHelper;
43 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
44 import org.onap.policy.apex.model.basicmodel.concepts.AxKey;
45 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema;
46 import org.slf4j.ext.XLogger;
47 import org.slf4j.ext.XLoggerFactory;
50 * This class is the implementation of the {@link org.onap.policy.apex.context.SchemaHelper} interface for Avro schemas.
52 * @author Liam Fallon (liam.fallon@ericsson.com)
54 public class AvroSchemaHelper extends AbstractSchemaHelper {
55 // Get a reference to the logger
56 private static final XLogger LOGGER = XLoggerFactory.getXLogger(AvroSchemaHelper.class);
58 // Recurring string constants
59 private static final String OBJECT_TAG = ": object \"";
61 // The Avro schema for this context schema
62 private Schema avroSchema;
64 // The mapper that translates between Java and Avro objects
65 private AvroObjectMapper avroObjectMapper;
68 public void init(final AxKey userKey, final AxContextSchema schema) {
69 super.init(userKey, schema);
71 // Configure the Avro schema
73 avroSchema = new Schema.Parser().parse(schema.getSchema());
74 } catch (final Exception e) {
75 final String resultSting = userKey.getId() + ": avro context schema \"" + schema.getId()
76 + "\" schema is invalid: " + e.getMessage() + ", schema: " + schema.getSchema();
77 LOGGER.warn(resultSting, e);
78 throw new ContextRuntimeException(resultSting);
81 // Get the object mapper for the schema type to a Java class
82 avroObjectMapper = new AvroObjectMapperFactory().get(userKey, avroSchema);
84 // Get the Java type for this schema, if it is a primitive type then we can do direct
86 setSchemaClass(avroObjectMapper.getJavaClass());
90 * Getter to get the Avro schema.
92 * @return the Avro schema
94 public Schema getAvroSchema() {
99 public Object getSchemaObject() {
100 return getAvroSchema();
104 public Object createNewInstance() {
105 // Create a new instance using the Avro object mapper
106 final Object newInstance = avroObjectMapper.createNewInstance(avroSchema);
108 // If no new instance is created, use default schema handler behaviour
109 if (newInstance != null) {
112 return super.createNewInstance();
117 public Object createNewInstance(final String stringValue) {
118 return unmarshal(stringValue);
122 public Object createNewInstance(final Object incomingObject) {
123 if (incomingObject instanceof JsonElement) {
124 final Gson gson = new GsonBuilder().serializeNulls().create();
125 final String elementJsonString = gson.toJson((JsonElement) incomingObject);
127 return createNewInstance(elementJsonString);
129 final String returnString =
130 getUserKey().getId() + ": the object \"" + incomingObject + "\" is not an instance of JsonObject";
131 LOGGER.warn(returnString);
132 throw new ContextRuntimeException(returnString);
137 public Object createNewSubInstance(final String subInstanceType) {
138 final Set<String> foundTypes = new LinkedHashSet<>();
140 Object subInstance = createNewSubInstance(avroSchema, subInstanceType, foundTypes);
142 if (subInstance != null) {
145 final String returnString = getUserKey().getId() + ": the schema \"" + avroSchema.getName()
146 + "\" does not have a subtype of type \"" + subInstanceType + "\"";
147 LOGGER.warn(returnString);
148 throw new ContextRuntimeException(returnString);
153 * Create an instance of a sub type of this type.
155 * @param schema the Avro schema of the the type
156 * @param subInstanceType the sub type
157 * @param foundTypes types we have already found
158 * @return the sub type schema or null if it is not created
160 private Object createNewSubInstance(Schema schema, String subInstanceType, final Set<String> foundTypes) {
161 // Try Array element types
162 if (Type.ARRAY == schema.getType()) {
163 Object newInstance = instantiateSubInstance(subInstanceType, schema.getElementType(), foundTypes);
164 if (newInstance != null) {
169 if (Type.MAP == schema.getType()) {
170 Object newInstance = instantiateSubInstance(subInstanceType, schema.getValueType(), foundTypes);
171 if (newInstance != null) {
176 if (Type.RECORD == schema.getType()) {
177 for (Field field : schema.getFields()) {
178 Object newInstance = instantiateSubInstance(subInstanceType, field.schema(), foundTypes);
179 if (newInstance != null) {
189 * Instantiate a sub instance of a type.
191 * @param subInstanceType the type of the sub instance to create
192 * @param subSchema the sub schema we have received
193 * @param foundTypes types we have already found
194 * @return an instance of the type or null if it is the incorrect type
196 private Object instantiateSubInstance(final String subInstanceType, final Schema subSchema,
197 final Set<String> foundTypes) {
198 if (subSchema == null) {
202 // Check for recursive use of field names in records, if we have already checked a field name
203 // skip it this time.
204 if (foundTypes.contains(subSchema.getName())) {
208 foundTypes.add(subSchema.getName());
210 if (subSchema.getName().equals(subInstanceType)) {
211 return new AvroObjectMapperFactory().get(AxArtifactKey.getNullKey(), subSchema)
212 .createNewInstance(subSchema);
214 return createNewSubInstance(subSchema, subInstanceType, foundTypes);
218 public Object unmarshal(final Object object) {
219 // If an object is already in the correct format, just carry on
220 if (passThroughObject(object)) {
224 String objectString = getStringObject(object);
226 // Translate illegal characters in incoming JSON keys to legal Avro values
227 objectString = AvroSchemaKeyTranslationUtilities.translateIllegalKeys(objectString, false);
230 Object decodedObject;
232 final JsonDecoder jsonDecoder = DecoderFactory.get().jsonDecoder(avroSchema, objectString);
233 decodedObject = new GenericDatumReader<GenericRecord>(avroSchema).read(null, jsonDecoder);
234 } catch (final Exception e) {
235 final String returnString = getUserKey().getId() + OBJECT_TAG + objectString
236 + "\" Avro unmarshalling failed: " + e.getMessage();
237 LOGGER.warn(returnString, e);
238 throw new ContextRuntimeException(returnString, e);
241 // Now map the decoded object into something we can handle
242 return avroObjectMapper.mapFromAvro(decodedObject);
246 * Check that the incoming object is a string, the incoming object must be a string containing Json.
248 * @param object incoming object
249 * @return object as String
251 private String getStringObject(final Object object) {
253 if (isObjectString(object)) {
254 return getObjectString(object);
256 return (String) object;
258 } catch (final ClassCastException e) {
259 final String returnString = getUserKey().getId() + OBJECT_TAG + object + "\" of type \""
260 + (object != null ? object.getClass().getName() : "null") + "\" must be assignable to \""
261 + getSchemaClass().getName() + "\" or be a Json string representation of it for Avro unmarshalling";
262 LOGGER.warn(returnString, e);
263 throw new ContextRuntimeException(returnString);
268 * Get a string object.
270 * @param object the string object
273 private String getObjectString(final Object object) {
274 String objectString = object.toString().trim();
275 if (objectString.length() == 0) {
277 } else if (objectString.length() == 1) {
278 return "\"" + objectString + "\"";
280 // All strings must be quoted for decoding
281 if (objectString.charAt(0) != '"') {
282 objectString = '"' + objectString;
284 if (objectString.charAt(objectString.length() - 1) != '"') {
291 private boolean isObjectString(final Object object) {
292 return object != null && avroSchema.getType().equals(Schema.Type.STRING);
296 public String marshal2String(final Object object) {
297 // Condition the object for Avro encoding
298 final Object conditionedObject = avroObjectMapper.mapToAvro(object);
300 final String jsonString = getJsonString(object, conditionedObject);
302 return AvroSchemaKeyTranslationUtilities.translateIllegalKeys(jsonString, true);
305 private String getJsonString(final Object object, final Object conditionedObject) {
307 try (final ByteArrayOutputStream output = new ByteArrayOutputStream()) {
308 final DatumWriter<Object> writer = new GenericDatumWriter<>(avroSchema);
309 final JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(avroSchema, output, true);
310 writer.write(conditionedObject, jsonEncoder);
312 return new String(output.toByteArray());
313 } catch (final Exception e) {
314 final String returnString =
315 getUserKey().getId() + OBJECT_TAG + object + "\" Avro marshalling failed: " + e.getMessage();
316 LOGGER.warn(returnString);
317 throw new ContextRuntimeException(returnString, e);
322 public JsonElement marshal2Object(final Object schemaObject) {
323 // Get the object as a Json string
324 final String schemaObjectAsString = marshal2String(schemaObject);
326 // Get a Gson instance to convert the Json string to an object created by Json
327 final Gson gson = new Gson();
329 // Convert the Json string into an object
330 final Object schemaObjectAsObject = gson.fromJson(schemaObjectAsString, Object.class);
332 return gson.toJsonTree(schemaObjectAsObject);
336 * Check if we can pass this object straight through encoding or decoding, is it an object native to the schema.
338 * @param object the object to check
339 * @return true if it's a straight pass through
341 private boolean passThroughObject(final Object object) {
342 if (object == null || getSchemaClass() == null) {
346 // All strings must be mapped
347 if (object instanceof String) {
351 // Now, check if the object is native
352 return getSchemaClass().isAssignableFrom(object.getClass());