Random error message when op_failed
[clamp.git] / extra / docker / elk / tools / DMaaPServiceMocker / ds_mocker.py
1 #!/usr/bin/env python3
2 import os
3 import json
4 import copy
5 import random
6 import requests
7 import uuid
8 import time
9 from datetime import datetime
10
11 def luck(n=2):
12     """ gives 1 chance out of n (default: 2) to return True """
13     assert n > 1
14     return bool(random.randint(0, n-1))
15 def now_dmaap_timestamp():
16     return str(datetime.now().timestamp()).replace(".","")[:13]
17 def now_notification_time():
18     return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")
19
20 CONTROL_LOOP_NAMES = [
21     'CL-vCPE-d925ed73',
22     'CL-vCPE-37b1c91e',
23     'CL-vCPE-c2597657',
24     'CL-vCPE-a11318ba',
25     'CL-vCPE-5321c558',
26 ]
27
28 TEMPLATES = {
29     'event_abated'                   :'event_abated.json',
30     'event_onset'                    :'event_onset.json',
31     'notification_active'            :'notification_active.json',
32     'notification_final_failed'      :'notification_final_failed.json',
33     'notification_final_open'        :'notification_final_open.json',
34     'notification_final_success'     :'notification_final_success.json',
35     'notification_operation_failure' :'notification_operation_failure.json',
36     'notification_operation'         :'notification_operation.json',
37     'notification_operation_success' :'notification_operation_success.json',
38     'notification_rejected_disabled' :'notification_rejected_disabled.json',
39     'notification_rejected_missing'  :'notification_rejected_missing.json',
40 }
41
42 ERROR_MESSAGES = [
43     ('APPC', 'APPC1 : timeout on restart','RESTART'),
44     ('APPC', 'APPC2 : cannot restart','RESTART'),
45     ('SO', 'SO1 : scale up failed', 'SCALEUP'),
46     ('SO', 'SO2 : scale down failed', 'SCALEDOWN'),
47 ]
48
49 for key in TEMPLATES:
50     with open(TEMPLATES[key]) as f:
51         content = f.read()
52     TEMPLATES[key] = json.loads(content)
53
54
55 class DMaaPMessage(dict):
56
57     dmaap_host_url = "http://dmaap.host.url:9200/"
58     dmaap_username = None
59     dmaap_password = None
60
61     @classmethod
62     def from_template(cls, tmpl, **kwargs):
63         obj = cls()
64         obj.update(copy.deepcopy(TEMPLATES[tmpl]))
65         for keys,value in kwargs.items():
66             current_node = obj
67             keys = keys.split(".")
68             key = keys[0]
69             for i in range(len(keys) - 1):
70                 current_node = current_node[keys[i]]
71                 key = keys[i]
72             current_node[key] = value
73         return obj
74
75     def publish(self, topic):
76         url = "%s/events/%s" % (self.dmaap_host_url, topic)
77         auth = None
78         if self.dmaap_username and self.dmaap_password:
79             auth = (self.dmaap_username, self.dmaap_password)
80         response = requests.post(url, data=json.dumps(self), auth=auth)
81         return response.status_code
82
83 class Event(DMaaPMessage):
84
85     topic = "DCAE-CL-EVENT"
86
87     @staticmethod
88     def abated(**kwargs):
89         return Event.from_template('event_abated', **kwargs)
90
91     @staticmethod
92     def onset(**kwargs):
93         return Event.from_template('event_onset', **kwargs)
94
95     def publish(self):
96         return super().publish(self.topic)
97
98
99 class Notification(DMaaPMessage):
100
101     topic = "POLICY-CL-MGT"
102
103     @classmethod
104     def from_template(cls, tmpl, **kwargs):
105         kwargs['notificationTime'] = now_notification_time()
106         return super().from_template(tmpl, **kwargs)
107
108     @staticmethod
109     def active(**kwargs):
110         return Notification.from_template('notification_active', **kwargs)
111
112     @staticmethod
113     def final(**kwargs):
114         class FinalNotification(Notification):
115             @staticmethod
116             def success(**kwargs):
117                 return FinalNotification.from_template('notification_final_success', **kwargs)
118             @staticmethod
119             def failed(**kwargs):
120                 msg = FinalNotification.from_template('notification_final_failed', **kwargs)
121                 error = ERROR_MESSAGES[random.randint(0, len(ERROR_MESSAGES) - 1)]
122                 h = msg['history'][-1]
123                 h['actor'],h['message'],h['operation'] = error[0],error[1],error[2]
124                 return msg
125             @staticmethod
126             def open(**kwargs):
127                 return FinalNotification.from_template('notification_final_open', **kwargs)
128         return FinalNotification
129
130     @staticmethod
131     def operation(**kwargs):
132         class OperationNotification(Notification):
133             @staticmethod
134             def success(**kwargs):
135                 return OperationNotification.from_template('notification_operation_success', **kwargs)
136             @staticmethod
137             def failure(**kwargs):
138                 return OperationNotification.from_template('notification_operation_failure', **kwargs)
139         return OperationNotification.from_template('notification_operation', **kwargs)
140
141     @staticmethod
142     def rejected(**kwargs):
143         class RejectedNotification(Notification):
144             @staticmethod
145             def disabled(**kwargs):
146                 return RejectedNotification.from_template('notification_rejected_disabled', **kwargs)
147             @staticmethod
148             def missing_fields(**kwargs):
149                 return RejectedNotification.from_template('notification_rejected_missing', **kwargs)
150
151         return RejectedNotification
152
153     def publish(self):
154         return super().publish(self.topic)
155
156
157
158 class CLStatus(object):
159
160     def __init__(self, dmaap_url=None,
161                  missing=None, disabled=None, op_failure=None):
162         self._stopped = False
163         def maybe(thing, ):
164             if thing is None:
165                 thing = not luck(10)
166             return thing
167         self._missing = maybe(missing)
168         self._disabled = maybe(disabled)
169         self._op_failure = maybe(op_failure)
170         self._config = dict(
171             requestID=str(uuid.uuid4()),
172             closedLoopControlName=CONTROL_LOOP_NAMES[random.randint(0, len(CONTROL_LOOP_NAMES) - 1)]
173         )
174
175     def __iter__(self):
176         return next(self)
177
178     def __next__(self):
179         if self._stopped:
180             raise StopIteration()
181         config = self._config
182         config.update(dict(closedLoopAlarmStart=now_dmaap_timestamp()))
183         yield Event.onset(**config)
184         if self._missing:
185             self._stopped = True
186             yield Notification.rejected().missing_fields(**config)
187             raise StopIteration()
188         elif self._disabled:
189             self._stopped = True
190             yield Notification.rejected().disabled(**config)
191             raise StopIteration()
192
193         yield Notification.active(**config)
194         yield Notification.operation(**config)
195
196         config['closedLoopAlarmEnd'] = now_dmaap_timestamp()
197         if self._op_failure:
198             yield Notification.operation().failure(**config)
199             self._stopped = True
200             yield Notification.final().failed(**config)
201         else:
202             yield Notification.operation().success(**config)
203             yield Event.abated(**config)
204             self._stopped = True
205             yield Notification.final().success(**config)
206         raise StopIteration()
207
208 def print_usage():
209     print("""
210     ./ds_mocker.py <DMAAP_URL> <EVENT_TOPIC> [NOTIFICATION_TOPIC [REQUEST_TOPIC]]
211     """)
212     exit()
213
214 def push(test_datas):
215     for current_i, status in enumerate(test_datas):
216         time.sleep(random.randint(0,3))
217         for s in status:
218             # print(s)
219             status_code = s.publish()
220             if status_code != 200:
221                 print("Error when publishing : status_code={}".format(status_code))
222                 exit(1)
223             time.sleep(random.randint(0,3))
224         print("%03d,missing:%5s,disabled:%5s,op_failure:%5s - %s" % (current_i, status._missing, status._disabled, status._op_failure, status._config))
225
226
227
228 def generate_dataset_1():
229     test_datas = [CLStatus(missing=False, disabled=False, op_failure=False) for i in range(300)]  \
230              + [CLStatus(missing=True, disabled=False, op_failure=False) for i in range(5)]  \
231              + [CLStatus(missing=False, disabled=True, op_failure=False) for i in range(6)]  \
232              + [CLStatus(missing=False, disabled=False, op_failure=True) for i in range(12)]
233     random.shuffle(test_datas)
234     return test_datas
235
236 def generate_error_dataset_1():
237     test_datas = [CLStatus(missing=False, disabled=False, op_failure=True) for i in range(60)]
238     random.shuffle(test_datas)
239     return test_datas
240
241
242 DATASETS = {
243     'dataset_1': generate_dataset_1,
244     'op_failure_1': generate_error_dataset_1,
245 }
246
247 if __name__ == "__main__":
248     import sys
249     if len(sys.argv) < 3:
250         print_usage()
251
252     DMaaPMessage.dmaap_host_url = sys.argv[1]
253     Event.topic = sys.argv[2]
254     Notification.topic = len(sys.argv) > 3 and sys.argv[3] or sys.argv[2]
255     # Request.topic = len(sys.argv) > 4 or Notification.topic
256     #push(DATASETS['op_failure_1']())
257     push(DATASETS['dataset_1']())