Updates for ODL Neon
[ccsdk/distribution.git] / dgbuilder / red / nodes / flows.js
1 /**
2  * Copyright 2014 IBM Corp.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  **/
16
17 var util = require("util");
18 var when = require("when");
19
20 var typeRegistry = require("./registry");
21 var credentials = require("./credentials");
22 var log = require("../log");
23 var events = require("../events");
24
25 var storage = null;
26
27 var nodes = {};
28 var activeConfig = [];
29 var missingTypes = [];
30
31 events.on('type-registered',function(type) {
32         if (missingTypes.length > 0) {
33             var i = missingTypes.indexOf(type);
34             if (i != -1) {
35                 missingTypes.splice(i,1);
36                 util.log("[red] Missing type registered: "+type);
37                 if (missingTypes.length === 0) {
38                     parseConfig();
39                 }
40             }
41         }
42 });
43
44 /**
45  * Parses the current activeConfig and creates the required node instances
46  */ 
47 function parseConfig() {
48     var i;
49     var nt;
50     missingTypes = [];
51     
52     // Scan the configuration for any unknown node types
53     for (i=0;i<activeConfig.length;i++) {
54         var type = activeConfig[i].type;
55         // TODO: remove workspace in next release+1
56         if (type != "workspace" && type != "tab") {
57             nt = typeRegistry.get(type);
58             if (!nt && missingTypes.indexOf(type) == -1) {
59                 missingTypes.push(type);
60             }
61         }
62     }
63     // Abort if there are any missing types
64     if (missingTypes.length > 0) {
65         util.log("[red] Waiting for missing types to be registered:");
66         for (i=0;i<missingTypes.length;i++) {
67             util.log("[red]  - "+missingTypes[i]);
68         }
69         return;
70     }
71
72     util.log("[red] Starting flows");
73     events.emit("nodes-starting");
74     
75     // Instantiate each node in the flow
76     for (i=0;i<activeConfig.length;i++) {
77         var nn = null;
78         // TODO: remove workspace in next release+1
79         if (activeConfig[i].type != "workspace" && activeConfig[i].type != "tab") {
80             nt = typeRegistry.get(activeConfig[i].type);
81             if (nt) {
82                 try {
83                     nn = new nt(activeConfig[i]);
84                 }
85                 catch (err) {
86                     util.log("[red] "+activeConfig[i].type+" : "+err);
87                 }
88             }
89             // console.log(nn);
90             if (nn === null) {
91                 util.log("[red] unknown type: "+activeConfig[i].type);
92             }
93         }
94     }
95     // Clean up any orphaned credentials
96     credentials.clean(flowNodes.get);
97     events.emit("nodes-started");
98 }
99
100 /**
101  * Stops the current activeConfig
102  */
103 function stopFlows() {
104     if (activeConfig&&activeConfig.length > 0) {
105         util.log("[red] Stopping flows");
106     }
107     return flowNodes.clear();
108 }
109
110 var flowNodes = module.exports = {
111     init: function(_storage) {
112         storage = _storage;
113     },
114     
115     /**
116      * Load the current activeConfig from storage and start it running
117      * @return a promise for the loading of the config
118      */
119     load: function() {
120         return storage.getFlows().then(function(flows) {
121             return credentials.load().then(function() {
122                 activeConfig = flows;
123                 if (activeConfig && activeConfig.length > 0) {
124                     parseConfig();
125                 }
126             });
127         }).otherwise(function(err) {
128             util.log("[red] Error loading flows : "+err);
129         });
130     },
131     
132     /**
133      * Add a node to the current active set
134      * @param n the node to add
135      */
136     add: function(n) {
137         nodes[n.id] = n;
138         n.on("log",log.log);
139     },
140     
141     /**
142      * Get a node
143      * @param i the node id
144      * @return the node
145      */
146     get: function(i) {
147         return nodes[i];
148     },
149     
150     /**
151      * Stops all active nodes and clears the active set
152      * @return a promise for the stopping of all active nodes
153      */
154     clear: function() {
155         return when.promise(function(resolve) {
156             events.emit("nodes-stopping");
157             var promises = [];
158             for (var n in nodes) {
159                 if (nodes.hasOwnProperty(n)) {
160                     try {
161                         var p = nodes[n].close();
162                         if (p) {
163                             promises.push(p);
164                         }
165                     } catch(err) {
166                         nodes[n].error(err);
167                     }
168                 }
169             }
170             when.settle(promises).then(function() {
171                 events.emit("nodes-stopped");
172                 nodes = {};
173                 resolve();
174             });
175         });
176     },
177     
178     /**
179      * Provides an iterator over the active set of nodes
180      * @param cb a function to be called for each node in the active set
181      */
182     each: function(cb) {
183         for (var n in nodes) {
184             if (nodes.hasOwnProperty(n)) {
185                 cb(nodes[n]);
186             }
187         }
188     },
189
190     /**
191      * @return the active configuration
192      */
193     getFlows: function() {
194         return activeConfig;
195     },
196     
197     /**
198      * Sets the current active config.
199      * @param config the configuration to enable
200      * @return a promise for the starting of the new flow
201      */
202     setFlows: function (config) {
203         // Extract any credential updates
204         for (var i=0; i<config.length; i++) {
205             var node = config[i];
206             if (node.credentials) {
207                 credentials.extract(node);
208                 delete node.credentials;
209             }
210         }
211         return credentials.save()
212             .then(function() { return storage.saveFlows(config);})
213             .then(function() { return stopFlows();})
214             .then(function () {
215                 activeConfig = config;
216                 parseConfig();
217             });
218     },
219     stopFlows: stopFlows
220 };