-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathatdwmapper.py
executable file
·47 lines (45 loc) · 1.54 KB
/
atdwmapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from pymongo import MongoClient
import pymongo
from pytz import timezone
import pytz
import json
import time
from databridge import process_profile
import argparse
def start_mapping():
profile_path = "profiles/atdw-mapping-accomm.json"
ops={}
## If delta data is defined and not the default value (last)
if hasattr(args, "deltaDate") and args.deltaDate != "last":
ops={
"srcVariables": '{"latest_update_ts":'+args.deltaDate+'}'
}
else:
## Mongo Connection required.
try:
conn = MongoClient(args.host)
db=conn[args.database]
collection = db[args.collection]
query={
"type":"PRODUCT"
}
project={
"_id":False,
"update_ts":1
}
date_doc = collection.find_one(filter=query, projection=project, sort=[("update_ts", pymongo.DESCENDING)])
ops={
"srcVariables": '{"latest_update_ts":'+str(date_doc["update_ts"])+'}'
}
except Exception as e:
print(e)
print(ops)
process_profile(profile_path, ops)
if __name__ == "__main__":
ap = argparse.ArgumentParser(description="ATDW nightly mapper")
ap.add_argument("-dd", "--deltaDate", default="last", help="Update timestamp (unixtimestamp) for the mapper. ")
ap.add_argument("-db", "--database", default=None, help="Database name")
ap.add_argument("-u", "--host", default="mongodb://localhost:27017", help="Mongo host, can be just localhost or the full host uri")
ap.add_argument("-c", "--collection", default=None, help="Which collection to target. ")
args = ap.parse_args()
start_mapping()