You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently workers can only process documents -- broker gets the document information from MongoDB and pass it to worker's wrapper (then it calls the worker's main function, passing the document as a parameter).
With the current approach we can't create worker for, for example, create a corpus wordcloud analysis (or any analysis that needs to process an entire corpus instead of a document).
We could just change a little the code so broker can get an entire corpus from MongoDB and pass it to worker's wrapper, but there is a problem with this simple approach: a corpus is much larger than a document (since it is a collection of documents, each of them with its own analysis) and is not a good idea to pass an entire corpus from broker process to worker process (multiprocessing uses pickle for this job, with temporary files to save the pickled objects).
So, the best way to do it is getting data from MongoDB inside worker process, but it is not good to provide MongoDB access to the worker. Then, I think we need a solution like this:
Broker should pass MongoDB access information to workers.wrapper when the worker needs to work on a corpus (key from = 'document' in worker's __meta__).
workers.wrapper should connect to MongoDB, get the entire corpus in a lazy-way and pass this lazy-object to worker's main function.
workers.wrapper should also pass corpus-specific information, like it does for documents (for example, to worker know the results of previous analysis, as in worker freqdist: we need the key tokens that is the output of worker tokenizer).
There is a problem when we permit workers to do corpus analysis: if a corpus change (document added, modified or deleted), we need to re-run all the analysis. We must create a way to re-schedule the corpus pipeline when a job of addition/modification/deletion of a document from that corpus finish (probably we'll need a heuristic to do not schedule 100 corpus pipelines (for the same corpus) when we add 100 documents to the corpus).
Note: maybe a map-reduce approach should be better, for example: passing each document for a worker.map function and then all the resulting information to worker.reduce.
The text was updated successfully, but these errors were encountered:
Currently workers can only process documents -- broker gets the document information from MongoDB and pass it to worker's wrapper (then it calls the worker's
main
function, passing the document as a parameter).With the current approach we can't create worker for, for example, create a corpus wordcloud analysis (or any analysis that needs to process an entire corpus instead of a document).
We could just change a little the code so broker can get an entire corpus from MongoDB and pass it to worker's wrapper, but there is a problem with this simple approach: a corpus is much larger than a document (since it is a collection of documents, each of them with its own analysis) and is not a good idea to pass an entire corpus from broker process to worker process (
multiprocessing
usespickle
for this job, with temporary files to save the pickled objects).So, the best way to do it is getting data from MongoDB inside worker process, but it is not good to provide MongoDB access to the worker. Then, I think we need a solution like this:
workers.wrapper
when the worker needs to work on a corpus (keyfrom
='document'
in worker's__meta__
).workers.wrapper
should connect to MongoDB, get the entire corpus in a lazy-way and pass this lazy-object to worker'smain
function.workers.wrapper
should also pass corpus-specific information, like it does for documents (for example, to worker know the results of previous analysis, as in workerfreqdist
: we need the keytokens
that is the output of workertokenizer
).There is a problem when we permit workers to do corpus analysis: if a corpus change (document added, modified or deleted), we need to re-run all the analysis. We must create a way to re-schedule the corpus pipeline when a job of addition/modification/deletion of a document from that corpus finish (probably we'll need a heuristic to do not schedule 100 corpus pipelines (for the same corpus) when we add 100 documents to the corpus).
Note: maybe a map-reduce approach should be better, for example: passing each document for a
worker.map
function and then all the resulting information toworker.reduce
.The text was updated successfully, but these errors were encountered: