2 * Copyright © 2018-2019 AT&T Intellectual Property.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onap.ccsdk.cds.blueprintsprocessor.message.kafka
20 import org.apache.kafka.streams.processor.ProcessorContext
21 import org.apache.kafka.streams.processor.StateStore
22 import org.apache.kafka.streams.state.StoreBuilder
23 import org.apache.kafka.streams.state.StoreSupplier
24 import org.onap.ccsdk.cds.blueprintsprocessor.db.BlueprintDBLibGenericService
25 import org.onap.ccsdk.cds.blueprintsprocessor.db.primaryDBLibGenericService
26 import org.onap.ccsdk.cds.controllerblueprints.core.logger
27 import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService
31 class KafkaJDBCKeyStoreSupplier(private val name: String) : StoreSupplier<KafkaJDBCStore> {
33 override fun get(): KafkaJDBCStore {
34 // Get the DBLibGenericService Instance
35 val bluePrintDBLibGenericService = BlueprintDependencyService.primaryDBLibGenericService()
36 return KafkaJDBCStoreImpl(name, bluePrintDBLibGenericService)
39 override fun name(): String {
43 override fun metricsScope(): String {
48 class KafkaJDBCKeyStoreBuilder(private val storeSupplier: KafkaJDBCKeyStoreSupplier)
49 : StoreBuilder<KafkaJDBCStore> {
51 private var logConfig: MutableMap<String, String> = HashMap()
52 private var enableCaching: Boolean = false
53 private var enableLogging = true
55 override fun logConfig(): MutableMap<String, String> {
59 override fun withCachingDisabled(): StoreBuilder<KafkaJDBCStore> {
64 override fun loggingEnabled(): Boolean {
68 override fun withLoggingDisabled(): StoreBuilder<KafkaJDBCStore> {
73 override fun withCachingEnabled(): StoreBuilder<KafkaJDBCStore> {
78 override fun withLoggingEnabled(config: MutableMap<String, String>?): StoreBuilder<KafkaJDBCStore> {
83 override fun name(): String {
84 return "KafkaJDBCKeyStoreBuilder"
87 override fun build(): KafkaJDBCStore {
88 return storeSupplier.get()
92 interface KafkaJDBCStore : StateStore {
94 suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>>
96 suspend fun update(sql: String, params: Map<String, Any>): Int
100 class KafkaJDBCStoreImpl(private val name: String,
101 private val bluePrintDBLibGenericService: BlueprintDBLibGenericService)
104 private val log = logger(KafkaJDBCStoreImpl::class)
106 override fun isOpen(): Boolean {
107 log.info("isOpen...")
111 override fun init(context: ProcessorContext, root: StateStore) {
115 override fun flush() {
119 override fun close() {
123 override fun name(): String {
127 override fun persistent(): Boolean {
131 override suspend fun query(sql: String, params: Map<String, Any>): List<Map<String, Any>> {
132 log.info("Query : $sql")
133 log.info("Params : $params")
134 return bluePrintDBLibGenericService.query(sql, params)
137 override suspend fun update(sql: String, params: Map<String, Any>): Int {
138 log.info("Query : $sql")
139 log.info("Params : $params")
140 return bluePrintDBLibGenericService.update(sql, params)