Skip to content

Commit

Permalink
RxKotlin 2.0 Release Candidate (#95)
Browse files Browse the repository at this point in the history
* Dependencies updated

* Initial implementation of rxKotlin for rxJava2

* Code cleaning.
Code review related fixes.

* Whitespaces removed.

* gitignore update

* subscribers replaced by named args extension

* subscribeBy non-null params

* empty* methods removed
flowable extensions added
minor fixes

* dependencies updated

* subscribeBy method for Flowable

* tests updated

* minor tests refactoring

* * JoinToString method and tests ported from rxKotlin 1.0
* Subjects removed
* Operators added

* Minor formatting fix

* More formatting fixes

* Refactor "subscription" references to "disposable"

* refactor `flowable.kt` and `observable.kt` to match 1.x changes

* add `Flowable` support for operators

* update `single` to match 1.x changes

* rid Subject functions

* update readme to reflect `onComplete`

* Fix 2.x extension tests

* add Array<T>.toFlowable()

* update and fix compile errors for tests in 2.x

* move swtichOnNext()

* git commit -m 'refactor package domain to io.reactivex.rxkotlin'

* optimize imports
  • Loading branch information
thomasnield authored Mar 11, 2017
1 parent 97f0555 commit 3e005e6
Show file tree
Hide file tree
Showing 23 changed files with 245 additions and 178 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fun main(args: Array<String>) {
.subscribeBy( // named arguments for lambda Subscribers
onNext = { println(it) },
onError = { it.printStackTrace() },
onCompleted = { println("Done!") }
onComplete = { println("Done!") }
)

}
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
buildscript {
ext.kotlin_version = '1.0.6'
ext.kotlin_version = '1.1.0'
repositories { jcenter() }
dependencies {
classpath 'com.netflix.nebula:gradle-rxjava-project-plugin:4.+',
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Sun Feb 12 16:33:43 SGT 2017
#Sun Mar 05 07:47:21 SGT 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-all.zip
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package rx.lang.kotlin.examples
package io.reactivex.rxkotlin.examples

import io.reactivex.Observable
import io.reactivex.disposables.CompositeDisposable
import rx.lang.kotlin.addTo
import rx.lang.kotlin.combineLatest
import rx.lang.kotlin.observable
import rx.lang.kotlin.plusAssign
import rx.lang.kotlin.subscribeBy
import rx.lang.kotlin.toObservable
import rx.lang.kotlin.zip
import io.reactivex.rxkotlin.*
import io.reactivex.rxkotlin.combineLatest
import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable
import io.reactivex.rxkotlin.zip
import java.net.URL
import java.util.Scanner
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -50,37 +48,37 @@ fun main(args: Array<String>) {
addToCompositeSubscription()
}

private fun URL.toScannerObservable() = observable<String> { s ->
private fun URL.toScannerObservable() = Observable.create<String> { s ->
this.openStream().use { stream ->
Scanner(stream).useDelimiter("\\A")
.toObservable()
.subscribe { s.onNext(it) }
}
}

fun syncObservable(): Observable<String> = observable { subscriber ->
fun syncObservable(): Observable<String> = Observable.create { subscriber ->
(0..75).toObservable()
.map { "Sync value_$it" }
.subscribe { subscriber.onNext(it) }
}

fun asyncObservable(): Observable<String> = observable { subscriber ->
fun asyncObservable(): Observable<String> = Observable.create { subscriber ->
thread {
(0..75).toObservable()
.map { "Async value_$it" }
.subscribe { subscriber.onNext(it) }
}
}

fun asyncWiki(vararg articleNames: String): Observable<String> = observable { subscriber ->
fun asyncWiki(vararg articleNames: String): Observable<String> = Observable.create { subscriber ->
thread {
articleNames.toObservable()
.flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() }
.subscribe { subscriber.onNext(it) }
}
}

fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable<String> = observable { subscriber ->
fun asyncWikiWithErrorHandling(vararg articleNames: String): Observable<String> = Observable.create { subscriber ->
thread {
articleNames.toObservable()
.flatMapMaybe { name -> URL("http://en.wikipedia.org/wiki/$name").toScannerObservable().firstElement() }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rx.lang.kotlin.examples.retrofit
package io.reactivex.rxkotlin.examples.retrofit

import io.reactivex.Observable
import retrofit.RestAdapter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.Completable
import io.reactivex.functions.Action
Expand Down
19 changes: 19 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/disposable.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.reactivex.rxkotlin

import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable

/**
* disposable += observable.subscribe()
*/
operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
add(disposable)
}

/**
* Add the subscription to a CompositeSubscription.
* @param compositeDisposable CompositeDisposable to add this subscription to
* @return this instance
*/
fun Disposable.addTo(compositeDisposable: CompositeDisposable): Disposable
= apply { compositeDisposable.add(this) }
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.BackpressureStrategy
import io.reactivex.Flowable
import io.reactivex.FlowableEmitter
import io.reactivex.Single
import io.reactivex.functions.BiFunction

fun <T : Any> flowable(
strategy: BackpressureStrategy = BackpressureStrategy.BUFFER,
body: (FlowableEmitter<in T>) -> Unit
): Flowable<T> = Flowable.create(body, strategy)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}

fun BooleanArray.toFlowable(): Flowable<Boolean> = Flowable.fromArray(*this.toTypedArray())
fun ByteArray.toFlowable(): Flowable<Byte> = Flowable.fromArray(*this.toTypedArray())
fun ShortArray.toFlowable(): Flowable<Short> = Flowable.fromArray(*this.toTypedArray())
fun IntArray.toFlowable(): Flowable<Int> = Flowable.fromArray(*this.toTypedArray())
fun LongArray.toFlowable(): Flowable<Long> = Flowable.fromArray(*this.toTypedArray())
fun FloatArray.toFlowable(): Flowable<Float> = Flowable.fromArray(*this.toTypedArray())
fun DoubleArray.toFlowable(): Flowable<Double> = Flowable.fromArray(*this.toTypedArray())
fun BooleanArray.toFlowable(): Flowable<Boolean> = this.asIterable().toFlowable()
fun ByteArray.toFlowable(): Flowable<Byte> = this.asIterable().toFlowable()
fun ShortArray.toFlowable(): Flowable<Short> = this.asIterable().toFlowable()
fun IntArray.toFlowable(): Flowable<Int> = this.asIterable().toFlowable()
fun LongArray.toFlowable(): Flowable<Long> = this.asIterable().toFlowable()
fun FloatArray.toFlowable(): Flowable<Float> = this.asIterable().toFlowable()
fun DoubleArray.toFlowable(): Flowable<Double> = this.asIterable().toFlowable()
fun <T : Any> Array<T>.toFlowable(): Flowable<T> = Flowable.fromArray(*this)

fun IntProgression.toFlowable(): Flowable<Int> =
Expand All @@ -30,14 +19,11 @@ fun IntProgression.toFlowable(): Flowable<Int> =

fun <T : Any> Iterator<T>.toFlowable(): Flowable<T> = toIterable().toFlowable()
fun <T : Any> Iterable<T>.toFlowable(): Flowable<T> = Flowable.fromIterable(this)
fun <T : Any> Sequence<T>.toFlowable(): Flowable<T> = Flowable.fromIterable(iterator().toIterable())
fun <T : Any> Sequence<T>.toFlowable(): Flowable<T> = asIterable().toFlowable()

fun <T : Any> Iterable<Flowable<out T>>.merge(): Flowable<T> = Flowable.merge(this.toFlowable())
fun <T : Any> Iterable<Flowable<out T>>.mergeDelayError(): Flowable<T> = Flowable.mergeDelayError(this.toFlowable())

inline fun <T : Any, R : Any> Flowable<T>.fold(initial: R, crossinline body: (R, T) -> R): Single<R>
= reduce(initial) { a, e -> body(a, e) }

/**
* Returns Flowable that wrap all values into [IndexedValue] and populates corresponding index value.
* Works similar to [kotlin.withIndex]
Expand All @@ -56,20 +42,19 @@ fun <T : Any> Flowable<T>.withIndex(): Flowable<IndexedValue<T>>
inline fun <T : Any, R : Any> Flowable<T>.flatMapSequence(crossinline body: (T) -> Sequence<R>): Flowable<R>
= flatMap { body(it).toFlowable() }

fun <T : Any> Flowable<Flowable<T>>.switchOnNext(): Flowable<T> = Flowable.switchOnNext(this)

/**
* Flowable.combineLatest(List<? extends Flowable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Flowable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Flowable<R>
inline fun <T, R> Iterable<Flowable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Flowable<R>
= Flowable.combineLatest(this) { combineFunction(it.asList().map { it as T }) }

/**
* Flowable.zip(List<? extends Flowable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Flowable<R>
inline fun <T, R> Iterable<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Flowable<R>
= Flowable.zip(this) { zipFunction(it.asList().map { it as T }) }

/**
Expand All @@ -78,6 +63,10 @@ inline fun <T, R> List<Flowable<T>>.zip(crossinline zipFunction: (args: List<T>)
inline fun <reified R : Any> Flowable<*>.cast(): Flowable<R> = cast(R::class.java)

/**
* Filters the items emitted by an Observable, only emitting those of the specified type.
* Filters the items emitted by an Flowable, only emitting those of the specified type.
*/
inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class.java)
inline fun <reified R : Any> Flowable<*>.ofType(): Flowable<R> = ofType(R::class.java)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.Maybe
import java.util.concurrent.Callable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.Single
import io.reactivex.functions.BiFunction

fun <T : Any> observable(body: (ObservableEmitter<in T>) -> Unit): Observable<T> = Observable.create(body)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}

fun BooleanArray.toObservable(): Observable<Boolean> = Observable.fromArray(*this.toTypedArray())
fun ByteArray.toObservable(): Observable<Byte> = Observable.fromArray(*this.toTypedArray())
fun ShortArray.toObservable(): Observable<Short> = Observable.fromArray(*this.toTypedArray())
fun IntArray.toObservable(): Observable<Int> = Observable.fromArray(*this.toTypedArray())
fun LongArray.toObservable(): Observable<Long> = Observable.fromArray(*this.toTypedArray())
fun FloatArray.toObservable(): Observable<Float> = Observable.fromArray(*this.toTypedArray())
fun DoubleArray.toObservable(): Observable<Double> = Observable.fromArray(*this.toTypedArray())
fun BooleanArray.toObservable(): Observable<Boolean> = this.asIterable().toObservable()
fun ByteArray.toObservable(): Observable<Byte> = this.asIterable().toObservable()
fun ShortArray.toObservable(): Observable<Short> = this.asIterable().toObservable()
fun IntArray.toObservable(): Observable<Int> = this.asIterable().toObservable()
fun LongArray.toObservable(): Observable<Long> = this.asIterable().toObservable()
fun FloatArray.toObservable(): Observable<Float> = this.asIterable().toObservable()
fun DoubleArray.toObservable(): Observable<Double> = this.asIterable().toObservable()
fun <T : Any> Array<T>.toObservable(): Observable<T> = Observable.fromArray(*this)

fun IntProgression.toObservable(): Observable<Int> =
Expand All @@ -26,14 +19,11 @@ fun IntProgression.toObservable(): Observable<Int> =

fun <T : Any> Iterator<T>.toObservable(): Observable<T> = toIterable().toObservable()
fun <T : Any> Iterable<T>.toObservable(): Observable<T> = Observable.fromIterable(this)
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = Observable.fromIterable(iterator().toIterable())
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = asIterable().toObservable()

fun <T : Any> Iterable<Observable<out T>>.merge(): Observable<T> = Observable.merge(this.toObservable())
fun <T : Any> Iterable<Observable<out T>>.mergeDelayError(): Observable<T> = Observable.mergeDelayError(this.toObservable())

inline fun <T : Any, R : Any> Observable<T>.fold(initial: R, crossinline body: (R, T) -> R): Single<R>
= reduce(initial) { a, e -> body(a, e) }

/**
* Returns Observable that wrap all values into [IndexedValue] and populates corresponding index value.
* Works similar to [kotlin.withIndex]
Expand All @@ -52,20 +42,19 @@ fun <T : Any> Observable<T>.withIndex(): Observable<IndexedValue<T>>
inline fun <T : Any, R : Any> Observable<T>.flatMapSequence(crossinline body: (T) -> Sequence<R>): Observable<R>
= flatMap { body(it).toObservable() }

fun <T : Any> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)

/**
* Observable.combineLatest(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Observable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Observable<R>
inline fun <T, R> Iterable<Observable<T>>.combineLatest(crossinline combineFunction: (args: List<T>) -> R): Observable<R>
= Observable.combineLatest(this) { combineFunction(it.asList().map { it as T }) }

/**
* Observable.zip(List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
*/
@Suppress("UNCHECKED_CAST")
inline fun <T, R> List<Observable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Observable<R>
inline fun <T, R> Iterable<Observable<T>>.zip(crossinline zipFunction: (args: List<T>) -> R): Observable<R>
= Observable.zip(this) { zipFunction(it.asList().map { it as T }) }

/**
Expand All @@ -77,3 +66,7 @@ inline fun <reified R : Any> Observable<*>.cast(): Observable<R> = cast(R::class
* Filters the items emitted by an Observable, only emitting those of the specified type.
*/
inline fun <reified R : Any> Observable<*>.ofType(): Observable<R> = ofType(R::class.java)

private fun <T : Any> Iterator<T>.toIterable() = object : Iterable<T> {
override fun iterator(): Iterator<T> = this@toIterable
}
79 changes: 79 additions & 0 deletions src/main/kotlin/io/reactivex/rxkotlin/operators.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.reactivex.rxkotlin

import io.reactivex.Flowable
import io.reactivex.Observable
import io.reactivex.Single

/**
* Merges the emissions of an Observable<Observable<T>>. Same as calling `flatMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.mergeAll() = flatMap { it }

/**
* Merges the emissions of a Flowable<Flowable<T>>. Same as calling `flatMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.mergeAll() = flatMap { it }


/**
* Concatenates the emissions of an Observable<Observable<T>>. Same as calling `concatMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.concatAll() = concatMap { it }

/**
* Concatenates the emissions of an Flowable<Flowable<T>>. Same as calling `concatMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.concatAll() = concatMap { it }


fun <T : Any> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)


fun <T : Any> Flowable<Flowable<T>>.switchOnNext(): Flowable<T> = Flowable.switchOnNext(this)


/**
* Emits the latest `Observable<T>` emitted through an `Observable<Observable<T>>`. Same as calling `switchMap { it }`.
*/
fun <T : Any> Observable<Observable<T>>.switchLatest() = switchMap { it }


/**
* Emits the latest `Flowable<T>` emitted through an `Flowable<Flowable<T>>`. Same as calling `switchMap { it }`.
*/
fun <T : Any> Flowable<Flowable<T>>.switchLatest() = switchMap { it }


/**
* Joins the emissions of a finite `Observable` into a `String`.
*
* @param separator is the dividing character(s) between each element in the concatenated `String`
*
* @param prefix is the preceding `String` before the concatenated elements (optional)
*
* @param postfix is the succeeding `String` after the concatenated elements (optional)
*/
fun <T : Any> Observable<T>.joinToString(separator: String? = null,
prefix: String? = null,
postfix: String? = null
): Single<String> = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T ->
builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next)
}.map { it.append(postfix ?: "").toString() }



/**
* Joins the emissions of a finite `Flowable` into a `String`.
*
* @param separator is the dividing character(s) between each element in the concatenated `String`
*
* @param prefix is the preceding `String` before the concatenated elements (optional)
*
* @param postfix is the succeeding `String` after the concatenated elements (optional)
*/
fun <T : Any> Flowable<T>.joinToString(separator: String? = null,
prefix: String? = null,
postfix: String? = null
): Single<String> = collect({ StringBuilder(prefix ?: "") }) { builder: StringBuilder, next: T ->
builder.append(if (builder.length == prefix?.length ?: 0) "" else separator ?: "").append(next)
}.map { it.append(postfix ?: "").toString() }
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package rx.lang.kotlin
package io.reactivex.rxkotlin

import io.reactivex.Single
import io.reactivex.SingleEmitter
import java.util.concurrent.Callable
import java.util.concurrent.Future

inline fun <T : Any> single(crossinline body: (s: SingleEmitter<in T>) -> Unit): Single<T> = Single.create { body(it) }
fun <T : Any> T.toSingle(): Single<T> = Single.just(this)
fun <T : Any> Future<T>.toSingle(): Single<T> = Single.fromFuture(this)
fun <T : Any> Callable<T>.toSingle(): Single<T> = Single.fromCallable(this)
Expand Down
Loading

0 comments on commit 3e005e6

Please sign in to comment.