Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does elaster use bulk indexing? #4

Open
syllogismos opened this issue Dec 6, 2014 · 13 comments
Open

Does elaster use bulk indexing? #4

syllogismos opened this issue Dec 6, 2014 · 13 comments

Comments

@syllogismos
Copy link

Does elaster use bulk indexing? or indexes each document separately?

Also will it be possible to ignore errors? Because of some rogue documents with wrong mappings the entire export gets cancelled.

@syllogismos
Copy link
Author

I managed to continue the import even with errors by commenting out the returns in the err blocks..

@alexbeletsky
Copy link
Member

You might decide to introduce a special flag, smth like ignoreErrors for
that.

As for bulk, its not supported ATM, but something nice to have.
On Dec 6, 2014 2:46 PM, "syllogismos" [email protected] wrote:

I managed to continue the import even with errors by commenting out the
returns in the err blocks..


Reply to this email directly or view it on GitHub
#4 (comment).

@syllogismos
Copy link
Author

Thanks for replying. I will try to implement the bulk thing and send you a pull request. Its taking very long to import.. it took me around 5 hrs for 8 million documents.. I think it can be faster.

@alexbeletsky
Copy link
Member

I actually was thinking about it for quite long time ago. But, since I never really needed, I had no time to make it finally happen.

Especially for those purposes, I've added this to highland, which seems to me is perfect for the job. I will be happy if you pick up from here. Ready to discuss, if you have any questions.

@syllogismos
Copy link
Author

I am just about to start implementing the bulk thing. I want a config option (with default 1000) for the number of docs they can index in a single bulk operation and tada..

I'm bit of a node js newb.. So I will have to hack my way through promises/streams etc.. If you have any suggestions please suggest..

Thank you.

@alexbeletsky
Copy link
Member

Sure. As I said, I see highland is good library for that job. The issue I mentioned above already contains some simple code examples, so you can go from there.

@syllogismos
Copy link
Author

so this code block

var stream = collection
        .find(query)
        .sort({_id: 1})
        .pipe(takeFields)
        .pipe(postToElastic)
        .pipe(progress());

changes to

var stream = collection
        .find(query)
        .sort({_id: 1})
        .pipe(takeFields).batch(1000)
        .pipe(postToElastic)
        .pipe(progress());

and then update the postToElastic function to handle the bulk request?

@alexbeletsky
Copy link
Member

I'm not sure that would be that straight forward.. but the direction is right.

@syllogismos
Copy link
Author

So cool.. All I have to do is this..

var stream = collection
        .find(query)
        .sort({_id: 1})
        .pipe(highland(takeFields).batch(1000))
        .pipe(postToElastic)
        .pipe(progress());

@alexbeletsky
Copy link
Member

You probably also need to change postToElastic() to use bulk insert. As well as put 1000 to config, would be great.

Since project don't have any unit tests, please make sure it's working by manual testing. Ideally, you should test with and without batching and have a benchmarks, so we could really see what's the performance boost with bulk() :)

@syllogismos
Copy link
Author

Hello, Here are the updates I made. syllogismos@aa5d31a

And it is not working.. properly.

Here are the things I did.

  1. Updated postToElastic to accept a array of mongo documents, and then do a bulk request using bulk api. The way bulk api works is you pass "action and meta_data" and then optional "source" document and so on. In our case index action and then our doc and so on. So we need to insert index action along with metadata alternatively into our array of mongo documents..
  2. The bulk request doesn't return error, if one of the documents were failed to indexed, instead it gives you a response json that says if all the index operations succeeded or not.
var stream = collection
        .find(query)
        .sort({_id: 1})
        .pipe(highland(takeFields).batch(1000))
        .pipe(postToElastic)
        .pipe(progress());

If I do above, each item is not passing though the takeFields function, so I introduced a new temp function that does nothing like this

var temp = through(function(item) {
    this.queue(item);
});

and then changed the stream like below..

var stream = collection
        .find(query)
        .sort({_id: 1})
        .pipe(takeFields)
        .pipe(highland(temp).batch(1000))
        .pipe(postToElastic)
        .pipe(progress());

I have no idea how dumb what I did is, but wanted to check if it works or not.. 💃
Just doing the console.log() inside postToElastic instead of doing the bulk request I'm able to print the document arrays on to my console, everytime postToElastic is called. But when I do a bulk request, it stops after indexing the first batch. I have no idea what is wrong.

  1. And lastly, I queue item.length at the end of postToElastic so that I can pass it to progress to update count. I don't know if I can do that.

@syllogismos
Copy link
Author

So my current implementation just stops after indexing the first batch.. and it just waits..

@syllogismos
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants