d405f0adb8b804b9ab1b0955944255d7c08e3359
[policy/distribution.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2018 Intel Corp. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  *  Modifications Copyright (C) 2019 AT&T Intellectual Property.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.distribution.reception.handling.file;
24
25 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
26
27 import java.io.File;
28 import java.io.IOException;
29 import java.nio.file.FileSystems;
30 import java.nio.file.Path;
31 import java.nio.file.Paths;
32 import java.nio.file.WatchEvent;
33 import java.nio.file.WatchKey;
34 import java.nio.file.WatchService;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.TimeUnit;
38 import java.util.zip.ZipFile;
39 import org.onap.policy.common.parameters.ParameterService;
40 import org.onap.policy.distribution.model.Csar;
41 import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
42 import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
43 import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Handles reception of inputs from File System which can be used to decode policies.
49  */
50 public class FileSystemReceptionHandler extends AbstractReceptionHandler {
51
52     private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemReceptionHandler.class);
53     private boolean running = false;
54
55     /**
56      * {@inheritDoc}.
57      */
58     @Override
59     protected void initializeReception(final String parameterGroupName) {
60         LOGGER.debug("FileSystemReceptionHandler init...");
61         try {
62             final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
63                     ParameterService.get(parameterGroupName);
64             final FileClientHandler fileClientHandler =
65                     new FileClientHandler(this, handlerParameters.getWatchPath(), handlerParameters.getMaxThread());
66             final Thread fileWatcherThread = new Thread(fileClientHandler);
67             fileWatcherThread.start();
68         } catch (final Exception ex) {
69             LOGGER.error("FileSystemReceptionHandler initialization failed", ex);
70         }
71     }
72
73     /**
74      * {@inheritDoc}.
75      */
76     @Override
77     public void destroy() {
78         running = false;
79     }
80
81     /**
82      * Method to check the running status of file watcher thread.
83      *
84      * @return the running status
85      */
86     public boolean isRunning() {
87         return running;
88     }
89
90     /**
91      * Initialize the file watcher thread.
92      *
93      * @param watchPath Path to watch
94      */
95     public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
96         try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
97             final Path dir = Paths.get(watchPath);
98             dir.register(watcher, ENTRY_CREATE);
99             LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
100             startWatchService(watcher, dir, maxThread);
101         } catch (final Exception ex) {
102             LOGGER.error("FileWatcher initialization failed", ex);
103             Thread.currentThread().interrupt();
104         }
105     }
106
107     /**
108      * Method to keep watching the given path for any new file created.
109      *
110      * @param watcher the watcher
111      * @param dir the watch directory
112      * @param maxThread the max thread number
113      * @throws InterruptedException if it occurs
114      */
115     protected void startWatchService(final WatchService watcher, final Path dir, final int maxThread)
116             throws InterruptedException {
117         WatchKey key;
118         final ExecutorService pool = Executors.newFixedThreadPool(maxThread);
119
120         try {
121             running = true;
122             while (running) {
123                 key = watcher.take();
124                 processFileEvents(dir, key, pool);
125                 final boolean valid = key.reset();
126                 if (!valid) {
127                     LOGGER.error("Watch key no longer valid!");
128                     break;
129                 }
130             }
131         } finally {
132             pool.shutdown();
133         }
134     }
135
136     private void processFileEvents(final Path dir, final WatchKey key, final ExecutorService pool) {
137         for (final WatchEvent<?> event : key.pollEvents()) {
138             @SuppressWarnings("unchecked")
139             final WatchEvent<Path> ev = (WatchEvent<Path>) event;
140             final Path fileName = ev.context();
141             pool.execute(() -> {
142                 LOGGER.debug("new CSAR found: {}", fileName);
143                 DistributionStatisticsManager.updateTotalDistributionCount();
144                 final String fullFilePath = dir.toString() + File.separator + fileName.toString();
145                 try {
146                     waitForFileToBeReady(fullFilePath);
147                     createPolicyInputAndCallHandler(fullFilePath);
148                     LOGGER.debug("CSAR complete: {}", fileName);
149                 } catch (final InterruptedException e) {
150                     LOGGER.error("waitForFileToBeReady interrupted", e);
151                     Thread.currentThread().interrupt();
152                 }
153             });
154         }
155     }
156
157     /**
158      * Method to create policy input & call policy handlers.
159      *
160      * @param fileName the filename
161      */
162     protected void createPolicyInputAndCallHandler(final String fileName) {
163         try {
164             final Csar csarObject = new Csar(fileName);
165             DistributionStatisticsManager.updateTotalDownloadCount();
166             inputReceived(csarObject);
167             DistributionStatisticsManager.updateDownloadSuccessCount();
168             DistributionStatisticsManager.updateDistributionSuccessCount();
169         } catch (final PolicyDecodingException ex) {
170             DistributionStatisticsManager.updateDownloadFailureCount();
171             DistributionStatisticsManager.updateDistributionFailureCount();
172             LOGGER.error("Policy creation failed", ex);
173         }
174     }
175
176     private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
177         boolean flag = true;
178         while (flag) {
179             TimeUnit.MILLISECONDS.sleep(100);
180             try (ZipFile zipFile = new ZipFile(fullFilePath)) {
181                 flag = false;
182             } catch (final IOException exp) {
183                 LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);
184             }
185         }
186     }
187 }