rxjava用法

现在越来越多Android开发者使用到RxJava,在Android使用RxJava主要有如下好处: 1,轻松切换线程。以前我们切换线程主要使用Handler等手段来做。 2,轻松解决回调的嵌套问题。现在的app业务逻辑越来越复杂,多的时候3,4层回调嵌套,使得代码可维护性变得很差。RxJava链式调用使得这些调用变得扁平化。

随着RxJava的流行,越来越多的开源项目开始支持RxJava,像Retrofit、GreenDao等。这些开源项目支持RxJava使得我们解决复杂业务变得非常方便。

但是这些还不够,有的时候我们自己的封装的业务也需要支持RxJava,举个例子:查询数据、处理本地文件等操作,总而言之就是一些耗时任务。而且还要处理这些操作的成功、失败、线程切换等操作。

RX操作符之Observable的创建方式

创建方式一:just、from、repeat、repeatWhen
1.just方法

创建发送指定值的Observerble,just只是简单的原样发射,将数组或Iterable当做单个数据。如果传递的值为null,则发送的Observable的值为null。参数最多为9个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable<String> myObservable = Observable.just("just1","just1","just1","just1","just1","just1","just1","just1","just1","just1");

Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);

结果:

1
2
3
4
5
6
7
8
9
10
11
just1
just1
just1
just1
just1
just1
just1
just1
just1
just1
onCompleted.............
2.from方法

将数据转换成为Observables,而不是需要混合使用Observables和其它类型的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
String[]items = {"just1","just1","just1","just1","just1","just1"};

Observable<String> myObservable = Observable.from(items);

Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);

结果:

1
2
3
4
5
6
7
just1
just1
just1
just1
just1
just1
onCompleted.............
3.repeat方法

repeat()重复地执行某个操作,如果不传递参数,结果将会被无限地重复执行,默认在trampoline调度器上执行,该方法为非静态方法,不可以直接通过Observable来调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
String[]items = {"just1","just2","just3","just4","just5","just6"};

Observable<String> myObservable = Observable.from(items).repeat();


Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);

repeat()如果传入数字类型的参数,则重复地执行指定次数的某个操作,默认在trampoline调度器上执行,不可以直接通过Observable来调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
String[]items = {"just1","just2","just3","just4","just5","just6"};

Observable<String> myObservable = Observable.from(items).repeat(2);


Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);

repeatWhen()不是缓存和重放原始Observable的数据序列,而是有条件的重新订阅和发射原来的Observable,当Observable中的call()方法中调用了重复执行的代码时,onNext()将会被重复执行。如果该方法执行后返回void,则结束执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final String[]items = {"just1","just2","just3","just4","just5","just6"};

Observable<String> myObservable = Observable.from(items).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.delay(5, TimeUnit.SECONDS);
}
});


Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("onNext.................."+s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);

doWhile 属于可选包rxjava-computation-expressions,不是RxJava标准操作符的一部分。doWhile在原始序列的每次重复后检查某个条件,如果满足条件才重复发射。 whileDo 属于可选包rxjava-computation-expressions,不是RxJava标准操作符的一部分。whileDo在原始序列的每次重复前检查某个条件,如果满足条件才重复发射。

创建方式二:defer、range、interval、timer、Empty、Never、Throw
1.defer

Defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
String[]items = {"just1","just2","just3","just4","just5","just6"};

Observable<String> myObservable = Observable.from(items);

Subscriber<String> mySubscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("onNext................."+s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

Subscriber<String> mySubscriber1 = new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println("onNext................."+s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);
myObservable.subscribe(mySubscriber1);

image 尽管打印的结果一样,但是它们不是取自同一个Observable的数据

2.Range

创建一个发射特定整数序列的Observable,接收两个参数,第一个参数是范围的起始值,第二个参数是范围的数据数目,即需要多产生多少个这样的值。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置为负数,会抛异常)。range默认不在任何特定的调度器上执行。有一个变体可以通过可选参数指定Scheduler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
String[]items = {"just1","just2","just3","just4","just5","just6"};

Observable<Integer> myObservable = Observable.range(5,10);


Subscriber<Integer> mySubscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer s) {
System.out.println("onNext................."+s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);

image

3.Interval

Interval操作符返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列。RxJava将这个操作符实现为interval方法。它接受一个表示时间间隔的参数和一个表示时间单位的参数。结果递增且不断增加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable<Long> myObservable = Observable.interval(2,TimeUnit.SECONDS);
Subscriber<Long> mySubscriber = new Subscriber<Long>() {
@Override
public void onNext(Long s) {
System.out.println("onNext................."+s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};

myObservable.subscribe(mySubscriber);

image

4.Timer

创建一个Observable,它在一个给定的延迟后发射一个特殊的值,延迟2s后发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable<Long> myObservable = Observable
.timer(2,TimeUnit.SECONDS);
Subscriber<Long> mySubscriber = new Subscriber<Long>() {
@Override
public void onNext(Long s) {
System.out.println("onNext................."+s);
}

@Override
public void onCompleted() {
System.out.println("onCompleted.................");
}

@Override
public void onError(Throwable e) {
System.out.println("onError....................");
}
};
myObservable.subscribe(mySubscriber);

结果:

1
2
onNext.................0
onCompleted.................
5.Empty

创建一个不发射任何数据但是正常终止的Observable

6.Never

创建一个不发射数据也不终止的Observable

7.Throw

创建一个不发射数据以一个错误终止的Observable

坚持原创技术分享,您的支持将鼓励我继续创作!