diff --git a/src/main/java/org/lightcouch/Changes.java b/src/main/java/org/lightcouch/Changes.java index a544d46..b844fd7 100644 --- a/src/main/java/org/lightcouch/Changes.java +++ b/src/main/java/org/lightcouch/Changes.java @@ -23,6 +23,8 @@ import org.apache.commons.codec.Charsets; import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; import org.lightcouch.ChangesResult.Row; import com.google.gson.Gson; @@ -56,6 +58,14 @@ * JsonObject doc = feed.getDoc(); * // changes.stop(); // stop continuous feed * } + * + * Selector filter: + * ChangesResult changeResult = dbClient.changes() + * .since(since) + * .limit(10) + * .selector("{\"selector":{\"_deleted\":true}}") + * .getChanges(); + * * * @see ChangesResult * @since 0.0.2 @@ -64,7 +74,7 @@ public class Changes { private BufferedReader reader; - private HttpGet httpGet; + private HttpUriRequest httpRequest; private Row nextRow; private boolean stop; @@ -72,6 +82,8 @@ public class Changes { private Gson gson; private URIBuilder uriBuilder; + private String selector; + Changes(CouchDbClientBase dbc) { this.dbc = dbc; this.gson = dbc.getGson(); @@ -85,10 +97,19 @@ public class Changes { */ public Changes continuousChanges() { final URI uri = uriBuilder.query("feed", "continuous").build(); - httpGet = new HttpGet(uri); - final InputStream in = dbc.get(httpGet); - final InputStreamReader is = new InputStreamReader(in, Charsets.UTF_8); - setReader(new BufferedReader(is)); + if (selector == null) { + final HttpGet get = new HttpGet(uri); + httpRequest = get; + final InputStream in = dbc.get(get); + final InputStreamReader is = new InputStreamReader(in, Charsets.UTF_8); + setReader(new BufferedReader(is)); + } else { + final HttpPost post = new HttpPost(uri); + httpRequest = post; + final InputStream in = dbc.post(post, selector); + final InputStreamReader is = new InputStreamReader(in, Charsets.UTF_8); + setReader(new BufferedReader(is)); + } return this; } @@ -121,7 +142,11 @@ public void stop() { */ public ChangesResult getChanges() { final URI uri = uriBuilder.query("feed", "normal").build(); - return dbc.get(uri, ChangesResult.class); + if (selector == null) { + return dbc.get(uri, ChangesResult.class); + } else { + return dbc.post(uri, selector, ChangesResult.class); + } } // Query Params @@ -151,6 +176,12 @@ public Changes filter(String filter) { return this; } + public Changes selector(String json) { + uriBuilder.query("filter", "_selector"); + this.selector = json; + return this; + } + public Changes includeDocs(boolean includeDocs) { uriBuilder.query("include_docs", includeDocs); return this; @@ -206,7 +237,7 @@ private void setNextRow(Row nextRow) { } private void terminate() { - httpGet.abort(); + httpRequest.abort(); CouchDbUtil.close(getReader()); } } diff --git a/src/main/java/org/lightcouch/CouchDbClient.java b/src/main/java/org/lightcouch/CouchDbClient.java index 5694e92..14378b8 100644 --- a/src/main/java/org/lightcouch/CouchDbClient.java +++ b/src/main/java/org/lightcouch/CouchDbClient.java @@ -229,7 +229,6 @@ public void shutdown() { HttpClientUtils.closeQuietly(this.httpClient); } - @Override public void close() throws IOException { shutdown(); } diff --git a/src/main/java/org/lightcouch/CouchDbClientBase.java b/src/main/java/org/lightcouch/CouchDbClientBase.java index 956eb85..46c9356 100644 --- a/src/main/java/org/lightcouch/CouchDbClientBase.java +++ b/src/main/java/org/lightcouch/CouchDbClientBase.java @@ -621,6 +621,34 @@ HttpResponse post(URI uri, String json) { return executeRequest(post); } + /** + * Performs a HTTP POST request. + * + * @return {@link HttpResponse} + */ + InputStream post(HttpPost post, String json) { + setEntity(post, json); + HttpResponse resp = executeRequest(post); + return getStream(resp); + } + + /** + * Performs a HTTP POST request. + * + * @return An object of type T + */ + T post(URI uri, String json, Class classType) { + InputStream in = null; + try { + in = getStream(post(uri, json)); + return getGson().fromJson(new InputStreamReader(in, "UTF-8"), classType); + } catch (UnsupportedEncodingException e) { + throw new CouchDbException(e); + } finally { + close(in); + } + } + /** * Performs a HTTP DELETE request. * @return {@link Response} diff --git a/src/test/java/org/lightcouch/tests/ChangeNotificationsTest.java b/src/test/java/org/lightcouch/tests/ChangeNotificationsTest.java index 66e1704..abe8366 100644 --- a/src/test/java/org/lightcouch/tests/ChangeNotificationsTest.java +++ b/src/test/java/org/lightcouch/tests/ChangeNotificationsTest.java @@ -73,6 +73,28 @@ public void changes_normalFeed() { assertThat(rows.size(), is(1)); } + @Test + public void changes_normalFeed_selector() { + dbClient.save(new Foo()); + + ChangesResult changes = dbClient.changes().includeDocs(true).limit(1) + .selector("{\"selector\":{\"_id\": {\"$gt\": null}}}").getChanges(); + + List rows = changes.getResults(); + + for (Row row : rows) { + List revs = row.getChanges(); + String docId = row.getId(); + JsonObject doc = row.getDoc(); + + assertNotNull(revs); + assertNotNull(docId); + assertNotNull(doc); + } + + assertThat(rows.size(), is(1)); + } + @Test public void changes_continuousFeed() { dbClient.save(new Foo()); @@ -99,4 +121,29 @@ public void changes_continuousFeed() { changes.stop(); } } + + @Test + public void changes_continuousFeed_selector() { + dbClient.save(new Foo()); + + CouchDbInfo dbInfo = dbClient.context().info(); + String since = dbInfo.getUpdateSeq(); + + Changes changes = dbClient.changes().includeDocs(true).since(since).heartBeat(1000) + .selector("{\"selector\":{\"_id\": {\"$gt\": null}}}").continuousChanges(); + + Response response = dbClient.save(new Foo()); + + while (changes.hasNext()) { + ChangesResult.Row feed = changes.next(); + final JsonObject feedObject = feed.getDoc(); + final String docId = feed.getId(); + System.out.println("next()=" + docId); + + assertEquals(response.getId(), docId); + assertNotNull(feedObject); + + changes.stop(); + } + } }