4 from datetime import datetime
5 from elasticsearch import Elasticsearch
7 import elasticsearch.helpers
8 from elasticsearch.client import IndicesClient
10 from index_ops import createIndex, deleteIndex, copyIndex
11 from config_properties import getGlobalVar
12 from file_utils import readFileToJson
14 def updateFieldNames(client, queryFrom, fromIndex, destIndex, addUTC):
17 for filename in os.listdir(typesDir):
19 fieldNames=readFileToJson(typesDir+os.sep+filename)
21 type=filename.split(".")[0]
22 typeFields[type] = fieldNames
24 client.indices.refresh(index=fromIndex)
25 res = elasticsearch.helpers.scan(client, query=queryFrom, index=fromIndex)
30 fieldNames = typeFields.get(res_type)
31 if (fieldNames != None):
33 for field in i['_source']:
34 updatedName=fieldNames.get(field)
35 if (updatedName != None):
36 if (field == 'timestamp' and addUTC == True):
38 value=i['_source'].get(field)
39 action[updatedName]=value
41 action[field]=i['_source'].get(field)
48 bulk_res = elasticsearch.helpers.bulk(client, actions)
49 print "bulk response: ", bulk_res
53 def updateAllrecordsWithUTC(client, queryFrom, fromIndex, destIndex):
56 client.indices.refresh(index=fromIndex)
57 res = elasticsearch.helpers.scan(client, query=queryFrom, index=fromIndex)
63 i['_source']['TIMESTAMP']+=" UTC"
66 bulk_res = elasticsearch.helpers.bulk(client, actions)
67 print "bulk response: ", bulk_res
70 def printQueryResults(client, myQuery, indexName):
71 client.indices.refresh(index=indexName)
72 res = elasticsearch.helpers.scan(client, query=myQuery, index=indexName)
77 print "start script for changing fields"
78 print "================================="
81 es = Elasticsearch([getGlobalVar('host')])
84 mapping=readFileToJson(getGlobalVar('mappingFileName'))
85 res = createIndex(es, getGlobalVar('tempIndexName'), mapping)
87 print "script results in error"
90 print "scan audit index and manipulate data"
91 print "===================================="
93 print "start time: ", datetime.now().time()
94 updateFieldNames(es, getGlobalVar('matchAllQuery'), getGlobalVar('origIndexName'), getGlobalVar('tempIndexName'), getGlobalVar('addUTC'))
96 print "re-create original index"
97 print "========================="
98 res = createIndex(es, getGlobalVar('origIndexName'), mapping)
100 print "script results in error"
103 print "copy data from temp index to original"
104 print "======================================="
105 res = copyIndex(es, getGlobalVar('tempIndexName'), getGlobalVar('origIndexName'))
107 print "script results in error"
110 print "delete temp index"
111 print "=================="
112 res = deleteIndex(es, getGlobalVar('tempIndexName'))
114 print "script results in error"
118 print "end time: ", datetime.now().time()
120 except Exception, error:
121 print "An exception was thrown!"
126 if __name__ == "__main__":