Skip to content

Commit

Permalink
Merge pull request #38 from a-mma/vec_queue_optimizations
Browse files Browse the repository at this point in the history
vector queue added
  • Loading branch information
freakeinstein authored Nov 23, 2019
2 parents 3181261 + a704472 commit 302ac39
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 66 deletions.
1 change: 1 addition & 0 deletions src/core/faissclient/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ module.exports = {
keys: vec_ids_
},
function(err, resp) {
console.log(err, resp)
if (!err) {
var doc_ids_ = [];
for (let i = 0; i < resp.rows.length; i++) {
Expand Down
133 changes: 94 additions & 39 deletions src/hannoy/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
import yaml
import os

import threading
import queue
import time

model_location = '/data/model_ha'

class Annoy:
def __init__(self):
self.total = 0
# to keep the thread & queue running
self.process_flag = True
self.q_maxsize = 10100
self.process_thread = None
self._lock = threading.Lock()
self.process_timeout_sec = 5 # seconds
# this is to keep track of all vectors inserted
# for saving into disk and retrieve later
self.index_disk = None
Expand All @@ -21,53 +30,97 @@ def __init__(self):
except Exception as e:
print('Error initializing Annoy: ', e)

# spawn process thread
self.spawn()

def __del__(self):
self.process_flag = False
if self.process_thread:
self.process_thread.join()

def spawn (self):
# create pipeline to add documents
self.pipeline = queue.Queue(maxsize=self.q_maxsize)
# create process thread
self.process_thread = threading.Thread(target=self.process, args=(), daemon=True)
# start process thread
self.process_thread.start()
# return self.pipeline

def initAnnoy(self):
# only do if no index loaded from disk
if not self.modelLoaded:
print('Annoy init index')
self.a_index = AnnoyIndex(self.dim, self.sim_metric)

# build index
build_ = self.a_index.build(self.n_trees)
# Lock index read / wtite until it is built
with self._lock:
# build index
build_ = self.a_index.build(self.n_trees)

if build_:
self.modelLoaded = self.saveModelToDisk()

if build_:
self.modelLoaded = self.saveModelToDisk()
return self.modelLoaded

def addVectors(self, documents):
# unbuild index first
self.a_index.unbuild()
self.total = self.total + len(documents)
ids = []
# add vectors
for document in documents:
_id = document._id
vec = document.vector
ids.append(_id)
vector_e = vec.e
vector_e_l = len(vector_e)
# check if the vector length is below dimention limit
# then pad vector with 0 by dimension
if vector_e_l < self.dim:
vector_e.extend([0]*(self.dim-vector_e_l))
# make sure vector length doesn't exceed dimension limit
vector_e = vector_e[:self.dim]

# add vector
self.a_index.add_item(int(_id), vector_e)
# keep a copy for disk storage
list_ = vector_e
list_.append(int(_id))
if self.index_disk is None:
self.index_disk = np.array([list_], dtype=float)
else:
self.index_disk = np.append(self.index_disk, [list_], axis=0)

# build vector
build_ = self.a_index.build(self.n_trees)
if build_:
self.modelLoaded = self.saveModelToDisk()
return self.modelLoaded, ids
# add document to queue
self.pipeline.put_nowait(document)
ids.append(document._id)
return True, ids

def process(self):
while (self.process_flag):
# print(list(self.pipeline.queue))

# set a timeout till next vector indexing
time.sleep(self.process_timeout_sec)

# check if queue is not empty
if self.pipeline.qsize() > 0:
# Lock index read / wtite until it is built
with self._lock:

# unbuild index first
self.a_index.unbuild()

# fetch all currently available documents from queue
while not self.pipeline.empty():
# extract document & contents
document = self.pipeline.get_nowait()
_id = document._id
vec = document.vector
vector_e = vec.e

# resize vectors
vector_e_l = len(vector_e)
# check if the vector length is below dimention limit
# then pad vector with 0 by dimension
if vector_e_l < self.dim:
vector_e.extend([0]*(self.dim-vector_e_l))
# make sure vector length doesn't exceed dimension limit
vector_e = vector_e[:self.dim]

# add vector to index
self.a_index.add_item(int(_id), vector_e)
# keep a copy for disk storage
list_ = vector_e
list_.append(int(_id))
# append to disk proxy
if self.index_disk is None:
self.index_disk = np.array([list_], dtype=float)
else:
self.index_disk = np.append(self.index_disk, [list_], axis=0)

# build vector
build_ = self.a_index.build(self.n_trees)

# write to disk
if build_:
self.modelLoaded = self.saveModelToDisk()

def deleteVectors(self, ids):

Expand All @@ -77,10 +130,12 @@ def getNearest(self, matrix, k):
ids = []
dists = []

for vec_data in matrix:
_id, _dist = self.a_index.get_nns_by_vector(vec_data, k, include_distances=True)
ids.append(_id)
dists.append(_dist)
# Lock index read / wtite until nearest neighbor search
with self._lock:
for vec_data in matrix:
_id, _dist = self.a_index.get_nns_by_vector(vec_data, k, include_distances=True)
ids.append(_id)
dists.append(_dist)

return True, ids, dists

Expand Down
116 changes: 89 additions & 27 deletions src/hfaiss/index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import numpy as np
import faiss

import threading
import queue
import time

model_location = '/data/model_hf'

class Faiss:
Expand All @@ -13,6 +17,30 @@ def __init__(self):
self.modelLoaded = self.loadModelFromDisk(model_location)
self.is_initiated = self.modelLoaded

# to keep the thread & queue running
self.process_flag = True
self.q_maxsize = 10100
self.process_thread = None
self._lock = threading.Lock()
self.process_timeout_sec = 5 # seconds

# spawn process thread
self.spawn()

def __del__(self):
self.process_flag = False
if self.process_thread:
self.process_thread.join()

def spawn (self):
# create pipeline to add documents
self.pipeline = queue.Queue(maxsize=self.q_maxsize)
# create process thread
self.process_thread = threading.Thread(target=self.process, args=(), daemon=True)
# start process thread
self.process_thread.start()
# return self.pipeline

def initFaiss(self, nlist, nprobe, bytesPerVec, bytesPerSubVec, dim, matrix):
self.nlist = nlist
self.nprobe = nprobe
Expand All @@ -23,14 +51,18 @@ def initFaiss(self, nlist, nprobe, bytesPerVec, bytesPerSubVec, dim, matrix):
self.train_data = np.matrix(matrix).astype('float32')
print('FAISS init quantizer', self.train_data, self.train_data.shape)
self.f_quantizer = faiss.IndexFlatL2(self.dim)
print('FAISS init index')
self.f_index = faiss.IndexIVFPQ(self.f_quantizer, self.dim, self.nlist, self.bytesPerVec, self.bytesPerSubVec)
print('FAISS train index')
self.f_index.train(self.train_data)
print('FAISS train index finished')
# Lock index read / wtite until it is built
with self._lock:
print('FAISS init index')
self.f_index = faiss.IndexIVFPQ(self.f_quantizer, self.dim, self.nlist, self.bytesPerVec, self.bytesPerSubVec)
print('FAISS train index')
self.f_index.train(self.train_data)
print('FAISS train index finished')

self.modelLoaded = self.saveModelToDisk(model_location, self.f_index)
# write index to disk
self.modelLoaded = self.saveModelToDisk(model_location, self.f_index)
self.is_initiated = self.modelLoaded

return self.is_initiated

def isInitiated(self):
Expand All @@ -39,11 +71,11 @@ def isInitiated(self):
def loadModelFromDisk(self, location):
try:
# read index
self.f_index = read_index(location)
self.f_index = faiss.read_index(location)
print('FAISS index loading success')
return True
except:
print('FAISS index loading failed')
except Exception as e:
print('FAISS index loading failed', e)
return False

def saveModelToDisk(self, location, index):
Expand All @@ -58,32 +90,62 @@ def saveModelToDisk(self, location, index):

def addVectors(self, documents):
ids = []
vecs = []
# add vectors
for document in documents:
_id = document._id
vec = document.vector
ids.append(_id)
vector_e = vec.e
vector_e_l = len(vector_e)
# check if the vector length is below dimention limit
# then pad vector with 0 by dimension
if vector_e_l < self.dim:
vector_e.extend([0]*(self.dim-vector_e_l))
# make sure vector length doesn't exceed dimension limit
vecs.append(vector_e[:self.dim])
# convert to np matrix
vec_data = np.matrix(vecs).astype('float32')
id_data = np.array(ids).astype('int')
# add vector
self.f_index.add_with_ids(vec_data, id_data)
# add document to queue
self.pipeline.put_nowait(document)
ids.append(document._id)
return True, ids

def process(self):
while (self.process_flag):
# print(list(self.pipeline.queue))

# set a timeout till next vector indexing
time.sleep(self.process_timeout_sec)

# check if queue is not empty
if self.pipeline.qsize() > 0:
ids = []
vecs = []

# fetch all currently available documents from queue
while not self.pipeline.empty():
# extract document & contents
document = self.pipeline.get_nowait()
_id = document._id
vec = document.vector
ids.append(_id)
vector_e = vec.e
vector_e_l = len(vector_e)
# check if the vector length is below dimention limit
# then pad vector with 0 by dimension
if vector_e_l < self.dim:
vector_e.extend([0]*(self.dim-vector_e_l))
# make sure vector length doesn't exceed dimension limit
vecs.append(vector_e[:self.dim])

# convert to np matrix
vec_data = np.matrix(vecs).astype('float32')
id_data = np.array(ids).astype('int')

# Lock index read / wtite until it is built
with self._lock:
# add vector
self.f_index.add_with_ids(vec_data, id_data)

# write to disk
self.saveModelToDisk(model_location, self.f_index)

def deleteVectors(self, ids):

return True, ids

def getNearest(self, matrix, k):
# convert to np matrix
vec_data = np.matrix(matrix).astype('float32')
D, I = self.f_index.search(vec_data, k)

# Lock index read / wtite until nearest neighbor search
with self._lock:
D, I = self.f_index.search(vec_data, k)
return True, I.tolist(), D.tolist()

0 comments on commit 302ac39

Please sign in to comment.