-
Notifications
You must be signed in to change notification settings - Fork 1
/
watch.py
executable file
·135 lines (106 loc) · 4.11 KB
/
watch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import sys
import time
import os
import glob
import matplotlib.pyplot as plt
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from watchdog.events import FileCreatedEvent
from watchdog.utils import unicode_paths
from pathtools.patterns import match_any_paths
from astropy.io import fits
class Watcher:
# Class to watch files in a directory using watchdog.
#You can use this on a command line in a terminal it just outputs file created.
DIRECTORY_TO_WATCH = "."
event_handler = None
usels = True
last_file = ""
def __init__(self,dirName=".",ls=True):
self.observer = Observer()
self.DIRECTORY_TO_WATCH = dirName
self.usels = ls
self.event_handler=Handler()
if (self.usels):
print("Using ls to raise event." )
print("Monitoring - %s."% os.path.abspath(self.DIRECTORY_TO_WATCH))
def run(self):
if self.usels == 0:
print("Regestering handler - %s." % self.event_handler)
self.observer.schedule(self.event_handler, self.DIRECTORY_TO_WATCH, recursive=True)
self.observer.start()
try:
while True:
time.sleep(5)
if (self.usels) :
try:
self.checkNew()
except Exception as e:
print(e)
except Exception as ex:
if self.usels == 0:
self.observer.stop()
print(ex)
if self.usels == 0:
self.observer.join()
def checkNew(self):
list_of_files = glob.glob(os.path.abspath(self.DIRECTORY_TO_WATCH)+'/*fit*') # * means all if need specific format then *.csv
list_of_files.sort()
#print(list_of_files)
if (len(list_of_files) >= 1):
latest_file = list_of_files[len(list_of_files)-1]
if (self.last_file != latest_file) :
self.last_file = latest_file
#print("New file - %s." % self.last_file)
event = FileCreatedEvent(self.last_file)
self.event_handler.on_any_event(event)
class Handler(FileSystemEventHandler):
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
# Take any action here when a file is first created.
print("Received created event - %s." % event.src_path)
class FitsWatcher(Watcher):
#Class to watch for FITS files ina adirectory.
#This will try to render the nth file - so if its not in a notebook it will not work.
NUM_TO_SKIP = 0
def __init__(self,dirName=".",numToSkip=2):
super().__init__(dirName)
self.NUM_TO_SKIP=numToSkip
self.event_handler=FitsHandler(self.NUM_TO_SKIP)
print("will render each %d."% (self.NUM_TO_SKIP))
class FitsHandler(FileSystemEventHandler):
COUNT=0
NUM_TO_SKIP=0
def __init__(self,skip):
FitsHandler.COUNT=0
FitsHandler.NUM_TO_SKIP=skip
@staticmethod
def on_any_event(event):
if event.is_directory:
return None
elif event.event_type == 'created':
# Count fits files and render some of them ..
print("FithsHandler Received - %s." % event.src_path)
if (event.src_path.endswith(".fits") or event.src_path.endswith(".fits.gz") or event.src_path.endswith(".fits.fz")):
FitsHandler.COUNT = FitsHandler.COUNT + 1
if ((FitsHandler.COUNT % FitsHandler.NUM_TO_SKIP == 0) or Watcher.usels):
FitsHandler.render(event.src_path)
else:
print("Skipping - %s." % event.src_path)
@staticmethod
def render(filePath):
print("Rendering - %s." % filePath)
try:
image_data = fits.getdata(filePath)
plt.rcParams['figure.figsize'] = [10, 10]
plt.imshow(image_data, cmap='gray')
plt.colorbar()
plt.show()
except Exception as ex:
print("Can not load fits - %s." % ex)
if __name__ == '__main__':
w = Watcher(sys.argv[1])
w.run()