CSIT Fix for SDC-2585
[sdc.git] / asdctool / src / main / resources / es-resources / audit_migration_1602.py
1 import itertools
2 import string
3 import json
4 from datetime import datetime
5 from elasticsearch import Elasticsearch
6 import elasticsearch
7 import elasticsearch.helpers
8 from elasticsearch.client import IndicesClient
9 import sys, os
10 from index_ops import createIndex, deleteIndex, copyIndex
11 from config_properties import getGlobalVar 
12 from file_utils import readFileToJson
13
14 def updateFieldNames(client, queryFrom, fromIndex, destIndex, addUTC):
15     typesDir="types"
16     typeFields = {}
17     for filename in os.listdir(typesDir):
18        print filename
19        fieldNames=readFileToJson(typesDir+os.sep+filename)
20        
21        type=filename.split(".")[0]
22        typeFields[type] = fieldNames
23    
24     client.indices.refresh(index=fromIndex)
25     res = elasticsearch.helpers.scan(client, query=queryFrom, index=fromIndex)
26        
27     actions = []
28     for i in res:
29        res_type = i['_type']
30        fieldNames = typeFields.get(res_type)
31        if (fieldNames != None):
32          action={}
33          for field in i['_source']:
34              updatedName=fieldNames.get(field)
35              if (updatedName != None):        
36                  if (field == 'timestamp' and addUTC == True):
37                      value+=" UTC"
38                  value=i['_source'].get(field)   
39                  action[updatedName]=value
40              else:
41                  action[field]=i['_source'].get(field)
42          i['_source']=action
43        
44        i['_index']=destIndex
45        i.pop('_id', None)
46        actions.append(i)
47
48     bulk_res = elasticsearch.helpers.bulk(client, actions)
49     print "bulk response: ", bulk_res
50
51
52
53 def updateAllrecordsWithUTC(client, queryFrom, fromIndex, destIndex):
54
55     #scan indices
56     client.indices.refresh(index=fromIndex)
57     res = elasticsearch.helpers.scan(client, query=queryFrom, index=fromIndex)
58
59     actions = []
60     for i in res:
61         print i
62         i['_index']=destIndex
63         i['_source']['TIMESTAMP']+=" UTC"
64         actions.append(i)
65
66     bulk_res = elasticsearch.helpers.bulk(client, actions)
67     print "bulk response: ", bulk_res
68
69
70 def printQueryResults(client, myQuery, indexName):
71     client.indices.refresh(index=indexName)
72     res = elasticsearch.helpers.scan(client, query=myQuery, index=indexName)
73     for i in res:
74        print i
75
76 def main():
77    print "start script for changing fields"
78    print "================================="
79    
80    # initialize es
81    es = Elasticsearch([getGlobalVar('host')])
82
83    try:
84     mapping=readFileToJson(getGlobalVar('mappingFileName'))
85     res = createIndex(es, getGlobalVar('tempIndexName'), mapping)
86     if (res != 0):
87       print "script results in error"
88       sys.exit(1)
89
90     print "scan audit index and manipulate data"
91     print "===================================="
92
93     print "start time: ", datetime.now().time()
94     updateFieldNames(es, getGlobalVar('matchAllQuery'), getGlobalVar('origIndexName'), getGlobalVar('tempIndexName'), getGlobalVar('addUTC'))
95    
96     print "re-create original index"
97     print "========================="
98     res = createIndex(es, getGlobalVar('origIndexName'), mapping)
99     if (res != 0):
100       print "script results in error"
101       sys.exit(1)
102    
103     print "copy data from temp index to original"
104     print "======================================="
105     res = copyIndex(es, getGlobalVar('tempIndexName'), getGlobalVar('origIndexName'))
106     if (res != 0):
107       print "script results in error"
108       sys.exit(1)
109    
110     print "delete temp index"
111     print "=================="
112     res = deleteIndex(es, getGlobalVar('tempIndexName'))
113     if (res != 0):
114       print "script results in error"
115       sys.exit(1)
116    
117    
118     print "end time: ", datetime.now().time()
119
120    except Exception, error:
121       print "An exception was thrown!"
122       print str(error)
123       return 2
124   
125
126 if __name__ == "__main__":
127         main()
128
129
130
131
132