Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pluck transformer #31

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions lib/src/pluck.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
part of stream_transformers;

/// Returns the value of a specified nested property
/// from all elements in the sequence. An onError function can be passed
/// to yield a default value if the property couldn't be resolved.
///
/// **Example:**
///
/// var source = new Stream.fromIterable([{'foo': {'bar': {'baz': 'plucked!'}}}]);
/// var stream = source.transform(new Pluck('foo.bar.baz'));
/// stream.listen(print);
///
/// // 'plucked!'
class Pluck<T> implements StreamTransformer<T, T> {

final String _path;
final List<String> _segments;
final Function _onError;

Pluck(String path, {T onError(dynamic data)}) :
_path = path,
_segments = path.split('.'),
_onError = onError;

Stream<T> bind(Stream<T> stream) {
return _bindStream(like: stream, onListen: (EventSink<T> sink) {

void onData(dynamic data) {
bool hasFailedToResolve = false;
dynamic currentValue = data;

for (int i=0, len=_segments.length; i<len; i++) {
try {
currentValue = currentValue[_segments[i]];
} catch (error) {
if (_onError != null) {
sink.add(_onError(data));
hasFailedToResolve = true;
break;
}
else sink.addError(new ArgumentError('Unable to resolve "$_path"'));
}
}

if (!hasFailedToResolve) sink.add(currentValue);
}

return stream.listen(onData, onError: sink.addError, onDone: sink.close);
});
}

}
1 change: 1 addition & 0 deletions lib/stream_transformers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ part 'src/flat_map.dart';
part 'src/flat_map_latest.dart';
part 'src/merge.dart';
part 'src/merge_all.dart';
part 'src/pluck.dart';
part 'src/sample_on.dart';
part 'src/sample_periodically.dart';
part 'src/select_first.dart';
Expand Down
2 changes: 2 additions & 0 deletions test/all_tests.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import 'flat_map_test.dart' as flat_map;
import 'flat_map_latest_test.dart' as flat_map_latest;
import 'merge_all_test.dart' as merge_all;
import 'merge_test.dart' as merge;
import 'pluck_test.dart' as pluck;
import 'sample_on_test.dart' as sample_on;
import 'sample_periodically_test.dart' as sample_periodically;
import 'scan_test.dart' as scan;
Expand All @@ -31,6 +32,7 @@ void main() {
flat_map_latest.main();
merge.main();
merge_all.main();
pluck.main();
sample_on.main();
sample_periodically.main();
scan.main();
Expand Down
78 changes: 78 additions & 0 deletions test/pluck_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
library pluck_test;

import 'dart:async';
import 'package:guinness/guinness.dart';
import 'package:stream_transformers/stream_transformers.dart';
import 'util.dart';

void main() => describe("Pluck", () {
describe("with single subscription stream", () {
testWithStreamController(() => new StreamController());
});

describe("with broadcast stream", () {
testWithStreamController(() => new StreamController.broadcast());
});
});

void testWithStreamController(StreamController provider()) {
String expectedValue = 'plucked!';
Map targetData = {'foo': {'bar': {'baz': expectedValue}}};
String pathThatResolves = 'foo.bar.baz';
String pathThatFails = 'foo.nonExistingProp.baz';
StreamController controller;

beforeEach(() {
controller = provider();
});

afterEach(() {
controller.close();
});

it("resolves the value using a path from the source stream", () {
return testStream(controller.stream.transform(new Pluck<String>(pathThatResolves)),
behavior: () => controller.add(targetData),
expectation: (values) {
expect(values).toEqual([expectedValue]);
});
});

it("yields null if it fails to resolve the value using a path from the source stream", () {
return testStream(controller.stream.transform(new Pluck(pathThatFails, onError: (_) => null)),
behavior: () => controller.add(targetData),
expectation: (values) {
expect(values).toEqual([null]);
});
});

it("forwards errors from source stream if onError is not defined, and resolving the path fails", () {
return testErrorsAreForwarded(
controller.stream.transform(new Pluck(pathThatFails)),
behavior: () => controller.add(targetData),
expectation: (errors) => expect(errors.first).toBeAnInstanceOf(ArgumentError));
});

it("closes transformed stream when source stream is done", () {
var stream = controller.stream.transform(new Pluck(pathThatResolves));
var result = stream.toList();
controller..add(targetData)..close();
return result.then((values) {
expect(values).toEqual([expectedValue]);
});
});

it("cancels input stream when transformed stream is cancelled", () {
var completerA = new Completer();
var controller = new StreamController(onCancel: completerA.complete);

return testStream(
controller.stream.transform(new Pluck(pathThatResolves)),
expectation: (_) => completerA.future);
});

it("returns a stream of the same type", () {
var stream = controller.stream.transform(new Pluck(pathThatResolves));
expect(stream.isBroadcast).toBe(controller.stream.isBroadcast);
});
}