72341ee56d68990e1add88575267acfa5a27c62c
[policy/distribution.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2018 Intel Corp. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.distribution.reception.handling.file;
23
24 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
25
26 import java.io.File;
27 import java.io.IOException;
28 import java.nio.file.FileSystems;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.WatchEvent;
32 import java.nio.file.WatchKey;
33 import java.nio.file.WatchService;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.TimeUnit;
37 import java.util.zip.ZipFile;
38
39 import org.onap.policy.common.parameters.ParameterService;
40 import org.onap.policy.distribution.model.Csar;
41 import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
42 import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
43 import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Handles reception of inputs from File System which can be used to decode policies.
49  */
50 public class FileSystemReceptionHandler extends AbstractReceptionHandler {
51
52     private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemReceptionHandler.class);
53     private boolean running = false;
54
55     /**
56      * {@inheritDoc}.
57      */
58     @Override
59     protected void initializeReception(final String parameterGroupName) {
60         LOGGER.debug("FileSystemReceptionHandler init...");
61         try {
62             final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
63                     ParameterService.get(parameterGroupName);
64             final FileClientHandler fileClientHandler = new FileClientHandler(this,
65                     handlerParameters.getWatchPath(),
66                     handlerParameters.getMaxThread());
67             final Thread fileWatcherThread = new Thread(fileClientHandler);
68             fileWatcherThread.start();
69         } catch (final Exception ex) {
70             LOGGER.error("FileSystemReceptionHandler initialization failed", ex);
71         }
72     }
73
74     /**
75      * {@inheritDoc}.
76      */
77     @Override
78     public void destroy() {
79         running = false;
80     }
81
82     /**
83      * Method to check the running status of file watcher thread.
84      *
85      * @return the running status
86      */
87     public boolean isRunning() {
88         return running;
89     }
90
91     /**
92      * Initialize the file watcher thread.
93      *
94      * @param watchPath Path to watch
95      */
96     public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
97         try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
98             final Path dir = Paths.get(watchPath);
99             dir.register(watcher, ENTRY_CREATE);
100             LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
101             startWatchService(watcher, dir, maxThread);
102         } catch (final Exception ex) {
103             LOGGER.error("FileWatcher initialization failed", ex);
104             Thread.currentThread().interrupt();
105         }
106     }
107
108     /**
109      * Method to keep watching the given path for any new file created.
110      *
111      * @param watcher the watcher
112      * @param dir the watch directory
113      * @param maxThread the max thread number
114      * @throws InterruptedException if it occurs
115      */
116     @SuppressWarnings("unchecked")
117     protected void startWatchService(final WatchService watcher,
118             final Path dir, int maxThread) throws InterruptedException {
119         WatchKey key;
120         ExecutorService pool = Executors.newFixedThreadPool(maxThread);
121
122         try {
123             running = true;
124             while (running) {
125                 key = watcher.take();
126
127                 for (final WatchEvent<?> event : key.pollEvents()) {
128                     final WatchEvent<Path> ev = (WatchEvent<Path>) event;
129                     final Path fileName = ev.context();
130                     pool.execute(() -> {
131                         LOGGER.debug("new CSAR found: {}", fileName);
132                         DistributionStatisticsManager.updateTotalDistributionCount();
133                         final String fullFilePath = dir.toString() + File.separator + fileName.toString();
134                         try {
135                             waitForFileToBeReady(fullFilePath);
136                             createPolicyInputAndCallHandler(fullFilePath);
137                             LOGGER.debug("CSAR complete: {}", fileName);
138                         } catch (InterruptedException e) {
139                             LOGGER.error("waitForFileToBeReady interrupted", e);
140                             Thread.currentThread().interrupt();
141                         }
142                     });
143                 }
144                 final boolean valid = key.reset();
145                 if (!valid) {
146                     LOGGER.error("Watch key no longer valid!");
147                     break;
148                 }
149             }
150         } finally {
151             pool.shutdown();
152         }
153     }
154
155     /**
156      * Method to create policy input & call policy handlers.
157      *
158      * @param fileName the filename
159      */
160     protected void createPolicyInputAndCallHandler(final String fileName) {
161         try {
162             final Csar csarObject = new Csar(fileName);
163             DistributionStatisticsManager.updateTotalDownloadCount();
164             inputReceived(csarObject);
165             DistributionStatisticsManager.updateDownloadSuccessCount();
166             DistributionStatisticsManager.updateDistributionSuccessCount();
167         } catch (final PolicyDecodingException ex) {
168             DistributionStatisticsManager.updateDownloadFailureCount();
169             DistributionStatisticsManager.updateDistributionFailureCount();
170             LOGGER.error("Policy creation failed", ex);
171         }
172     }
173
174     private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
175         boolean flag = true;
176         while (flag) {
177             TimeUnit.MILLISECONDS.sleep(100);
178             try (ZipFile zipFile = new ZipFile(fullFilePath)) {
179                 flag = false;
180             } catch (final IOException exp) {
181                 LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);
182             }
183         }
184     }
185 }