diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..13923f1 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -18,6 +18,10 @@ import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; +import io.reactivex.ObservableSource; +import io.reactivex.ObservableTransformer; +import io.reactivex.annotations.NonNull; +import io.reactivex.subjects.PublishSubject; /** * @author Baoyi Chen @@ -30,6 +34,35 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.method3(observable); } + + private Observable> method1(Observable source) { + return source.compose(new ObservableTransformer>() { + int index = 1; + @Override + public ObservableSource> apply(@NonNull Observable upstream) { + return Observable.create(emitter -> { + upstream.subscribe(sourceItem -> { + emitter.onNext(new Tuple2(index++, sourceItem)); + }, e -> emitter.onError(e), () -> emitter.onComplete()); + }); + } + }); + } + + private Observable> method2(Observable source) { + return source.map(x -> new Tuple2(1, x)) + .scan((t, s) -> new Tuple2(t.getV1().intValue() + 1, s.getV2())); + } + + private Observable> method3(Observable source) { +// Observable.zip + return source.zipWith(Observable.range(1, Integer.MAX_VALUE), (o1, o2) -> { + return new Tuple2(o2, o1); + }); + } + + + } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..7e4ddc9 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -21,6 +21,7 @@ import io.reactivex.Observable; import io.reactivex.Single; +import java.util.HashMap; import java.util.Map; /** @@ -34,7 +35,12 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(s -> s).flatMap(s -> { +// s.reduce() + return s.count().toObservable().map(count -> { + return new Tuple2(s.getKey(), count.intValue()); + }); + }); } /* @@ -43,7 +49,11 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + Map map = new HashMap<>(); + return this.wordCount1(words).>reduce(map,(countMap, item) -> { + countMap.put(item.getV1(), item.getV2()); + return countMap; + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..5b25af9 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -19,6 +19,11 @@ import io.reactivex.Maybe; import io.reactivex.Observable; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Stack; + + /** * @author Baoyi Chen */ @@ -28,7 +33,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce(0, (s, i) -> s += i).toMaybe(); } /* @@ -42,7 +47,41 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + // flatMap no map + return observable.flatMap(s -> { + return Observable.fromIterable(iterator(s)); + }); + } + + private Iterable iterator(Node node) { + class Iter implements Iterator { + private Stack stack = new Stack<>(); + Iter(Node node) { + this.walk(node); + } + + private Node walk(Node node) { + this.stack.push(node); + if (node.left != null) { + this.walk(node.left); + } + if (node.right != null) { + walk(node.right); + } + return null; + } + @Override + public boolean hasNext() { + return this.stack.size() > 0; + } + + @Override + public Integer next() { + Node curr = this.stack.pop(); + return curr.value; + } + } + return node == null ? null : () -> new Iter(node); } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..0734253 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,12 @@ import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.ObservableOnSubscribe; +import io.reactivex.ObservableSource; +import io.reactivex.annotations.NonNull; +import io.reactivex.functions.Function; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +50,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.concatMap(s -> Observable.just(s).observeOn(Schedulers.newThread())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..1f871f7 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,12 +16,19 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; +import io.reactivex.*; +import io.reactivex.annotations.NonNull; +import io.reactivex.functions.Function; +import io.reactivex.observables.GroupedObservable; +import io.reactivex.subjects.PublishSubject; +import io.reactivex.subjects.Subject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; /** @@ -35,7 +42,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.reduce(0L, (i, s) -> i+1); } /* @@ -44,7 +51,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.flatMap(s -> Observable.fromIterable(s)); } /* @@ -53,7 +60,10 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); +// return source.groupBy(s -> s).flatMap(g -> { +// return Observable.just(g.getKey()); +// }); + return source.groupBy(s -> s).map(s -> s.getKey()); } /* @@ -62,7 +72,10 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.flatMap(s -> { + if (conditon.test(s)) return Observable.just(s); + return Observable.empty(); + }); } /* @@ -71,7 +84,13 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); +// source.skip(index).take(1).firstElement(); + return source.take(index + 1).lastElement(); +// AtomicInteger cursor = new AtomicInteger(0); +// return source.reduce((seed, s) -> { +// if (cursor.incrementAndGet() == index) return s; +// return seed; +// }); } /* @@ -80,7 +99,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return Observable.range(0, count).flatMap(s -> source); } /* @@ -89,7 +108,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).concatMap(s -> s); } /* @@ -98,7 +117,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(s -> s); } /* @@ -107,7 +126,9 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(s -> { + return Observable.just(s).delay(delay, unit); + }); } }