-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathies_track.py
161 lines (135 loc) · 9.1 KB
/
ies_track.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#LICENSE = GPL3 (Gnu Public License v3)
#Produced under contract to Dstl
from rdflib import Graph, plugin, URIRef, BNode, Literal
from rdflib.namespace import DC, DCAT, DCTERMS, OWL, RDF, RDFS, XMLNS, XSD
from rdflib.serializer import Serializer
import uuid
import dateutil.parser #required to do earliest, latest date time comparison
import ies_functions as ies
import sqlite3
from time import sleep
import geohash_hilbert as ghh
aisDataFile="./2ferries4weeks.csv"
aisDB="../GeoData/nyc.db" #This file can be downloaded from https://ais-sqlite-db.s3.eu-west-2.amazonaws.com/nyc.db - change your path as necessary
pingDelay = 0.1 #forced delay between reading off individual IES pings
sampleSize=20 #The total number of pings to process
trackSize=4 #The number of location pings to accrue before pushing out a track
#Set this to True to use Kafka
useKafka = False
if useKafka:
iesOutput = "kafka"
kafkaBroker = ies.initialiseKafka(kHost="localhost:9092") #change this if Kafka is not on local machine
iesKafkaTopic = "ies"
else:
iesOutput = "file"
#def createEpsgLocationObservation(iesGraph,observedEntity,coordinates,epsgCode,timestamp):
#Simple function to process a line of AIS in IES. It expects mmsi, timestamp (in ISO8601 format), lat, lon
def createLocationObservation(iesGraph,ping,obs=None,transponder=None,measures=None):
mmsi = str(ping[0])
timestamp = str(ping[1])
lat = float(ping[2])
lon = float(ping[3])
print(lat,lon)
if transponder == None:
transponder = ies.createLocationTransponder(iesGraph=iesGraph,mmsi=mmsi)
lo = ies.instantiate(iesGraph=iesGraph,_class=ies.locationObservation)
#If track emulation is not required, obs will be None. If it's not None, make the LocationObservation (lo) part of the overall track observation
if obs:
ies.addToGraph(iesGraph=iesGraph,subject=lo,predicate=ies.ipao,obj=obs)
#...and the ParticularPeriod in which the observation occurred
ies.putInPeriod(iesGraph=iesGraph,item=lo,timeString=timestamp)
#And involve the transponder in that location observation
ltPart = ies.instantiate(iesGraph=iesGraph,_class=ies.observedTarget)
ies.addToGraph(iesGraph=iesGraph,subject=ltPart,predicate=ies.ipo,obj=transponder) #participation of the transponder
ies.addToGraph(iesGraph=iesGraph,subject=ltPart,predicate=ies.ipi,obj=lo) #participation in the LocationObservation
#Now the observed location, a geopoint with a lat and long - using a geohash to give each point a unique uri
gp = URIRef(ies.dataUri+"latlong"+str(lat)+","+str(lon))
ies.instantiate(iesGraph=iesGraph,_class=ies.geoPoint,instance=gp)
#Add the lat and long values as identifiers of the geopoint...firstly creating repeatable URIs for them so they don't overwrite
latObj = URIRef(gp.toPython()+"_lat")
lonObj = URIRef(gp.toPython()+"_lon")
ies.instantiate(iesGraph=iesGraph, _class=ies.latitude,instance=latObj)
ies.instantiate(iesGraph=iesGraph, _class=ies.longitude,instance=lonObj)
ies.addToGraph(iesGraph=iesGraph,subject=gp,predicate=ies.iib,obj=latObj)
ies.addToGraph(iesGraph=iesGraph,subject=gp,predicate=ies.iib,obj=lonObj)
#Add the representation values to the lat and lon objects
ies.addToGraph(iesGraph=iesGraph,subject=latObj,predicate=ies.rv,obj=Literal(lat, datatype=XSD.string))
ies.addToGraph(iesGraph=iesGraph,subject=lonObj,predicate=ies.rv,obj=Literal(lon, datatype=XSD.string))
#Now the participation of the GeoPoint in the Observation
gpPart = ies.instantiate(iesGraph=iesGraph,_class=ies.observedLocation)
ies.addToGraph(iesGraph=iesGraph,subject=gpPart,predicate=ies.ipo,obj=gp) #participation of the GeoPoint
ies.addToGraph(iesGraph=iesGraph,subject=gpPart,predicate=ies.ipi,obj=lo) #participation in the LocationObservation
#This code fires if the measure classes etc. have been provided - it takes the speed and course over ground and processes that.
if measures:
sogVal = float(ping[4])
cogVal = float(ping[5])
sog = ies.addMeasure(iesGraph=iesGraph,measureClass=measures["sogClass"],value=sogVal,uom=measures["knots"])
cog = ies.addMeasure(iesGraph=iesGraph,measureClass=measures["cogClass"],value=cogVal,uom=measures["degTN"])
ies.addToGraph(iesGraph=iesGraph,subject=ltPart,predicate=ies.och,obj=sog)
ies.addToGraph(iesGraph=iesGraph,subject=ltPart,predicate=ies.och,obj=cog)
#A simple parent observation to group the others into track
def createParentObservation(iesGraph):
return ies.instantiate(iesGraph=iesGraph,_class=ies.observation)
def exportTrack(iesGraph,track,output="file", epsgCode = "4326"):
#The track dictionary should already have min and max timestamps for the pings it contains
ies.initialiseGraph(iesGraph=iesGraph)
#Add a parent observation
obs = ies.instantiate(iesGraph=iesGraph,_class=ies.observation)
ies.startsIn(iesGraph=iesGraph,item=obs,timeString=track["minDateTime"].isoformat())
ies.endsIn(iesGraph=iesGraph,item=obs,timeString=track["maxDateTime"].isoformat())
measures = dict({})
#Now add the measure classes for Speed Over Ground and Course Over Ground...and knots for the Unit Of Measure
measures["sogClass"] = ies.instantiate(iesGraph=iesGraph,_class=ies.classOfMeasure,instance=URIRef(ies.dataUri+"SpeedOverGround"))
measures["cogClass"] = ies.instantiate(iesGraph=iesGraph,_class=ies.classOfMeasure,instance=URIRef(ies.dataUri+"CourseOverGround"))
measures["knots"] = ies.instantiate(iesGraph=iesGraph,_class=ies.unitOfMeasure,instance=URIRef(ies.dataUri+"Knots"))
measures["degTN"] = ies.instantiate(iesGraph=iesGraph,_class=ies.unitOfMeasure,instance=URIRef(ies.dataUri+"DegreesTrueNorth"))
ies.addName(iesGraph=iesGraph,item=measures["sogClass"],nameString="Speed Over Ground")
ies.addName(iesGraph=iesGraph,item=measures["cogClass"],nameString="Course Over Ground")
ies.addName(iesGraph=iesGraph,item=measures["knots"],nameString="knots")
ies.addName(iesGraph=iesGraph,item=measures["degTN"],nameString="degrees true North")
#add the location transponder - We don't know this is necessarily a vessel. All we know is that we have a LocationTransponder.
lt = ies.createLocationTransponder(iesGraph=iesGraph,mmsi=track["id"])
obsvd = ies.instantiate(iesGraph=iesGraph,_class=ies.observedTarget)
ies.addToGraph(iesGraph=iesGraph,subject=obsvd,predicate=ies.ipo,obj=lt)
ies.addToGraph(iesGraph=iesGraph,subject=obsvd,predicate=ies.ipi,obj=obs)
#now go through the individual location observations and add those...
for ping in track["pings"]:
createLocationObservation(iesGraph=iesGraph,ping=ping,transponder=lt,obs=obs,measures=measures)
if output == "kafka":
ies.sendToKafka(iesGraph=iesGraph,kProducer=kafkaBroker,kTopic=iesKafkaTopic)
else:
ies.saveRdf(iesGraph,'./data/track'+track["id"]+'-'+str(track["counter"])+'.ies.ttl')
#This runs through a sqlite3 database of IES data (see download instructions at start of this file) and creates tracks
#If output is set to "file" they're exported as turtle files, indexed by track number and mmsi
#If output is set to "kafka" they're exported to kafka (see global variables at top of file for Kafka settings)
def processDatabase(dbFileName,delay,batchSize,output="file"):
trackDict ={} #This is used to keep a record of the number of pings agains each vessel
conn = sqlite3.connect(dbFileName)
cursor = conn.cursor()
cursor.execute("""select * from position ORDER BY BaseDateTime""")
for row in cursor.fetchall():
mmsi = row[0]
dt = dateutil.parser.parse(row[1])
if mmsi in trackDict:
trackDict[mmsi]["pings"].append(row)
if trackDict[mmsi]["maxDateTime"] == None or dt > trackDict[mmsi]["maxDateTime"]:
trackDict[mmsi]["maxDateTime"] = dt
if trackDict[mmsi]["minDateTime"] == None or dt < trackDict[mmsi]["minDateTime"]:
trackDict[mmsi]["minDateTime"] = dt
if len(trackDict[mmsi]["pings"]) >= batchSize:
#batch size for track has been reached, time to export a track
exportTrack(graph,trackDict[mmsi],output)
trackDict[mmsi]["minDateTime"] = None
trackDict[mmsi]["maxDateTime"] = None
trackDict[mmsi]["pings"] = []
trackDict[mmsi]["counter"] = trackDict[mmsi]["counter"] + 1
else:
trackDict[mmsi] = {"id":mmsi,"pings":[row],"minDateTime":dt,"maxDateTime":dt,"counter":0}
sleep(delay)
#Set up the rdf graph
graph=ies.initialiseGraph(None)
#Now process the data in the database (each row comes out as a tuple)
# 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# MMSI, BaseDateTime, LAT, LON, SOG, COG, Heading, VesselName, IMO, CallSign, VesselType, Status, Length, Width, Draft, Cargo
#The important thing here is whether or not you use Kafka. Change the global useKafka variable at the top of the script to change this.
processDatabase(dbFileName=aisDB,delay=pingDelay,batchSize=trackSize,output=iesOutput)