RxJava 实例

简介: RxJava 实例

1.定义

RxJava 是一个 基于事件流、实现异步操作的库

2.作用

用于实现异步操作,类似于 Android中的 AsyncTaskHandler+new Thread的作用

3. 特点

由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava

  • 逻辑简洁
  • 实现优雅
  • 使用简单

更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁 & 优雅

  • Rxjava原理 基于 一种扩展的观察者模式
  • Rxjava的扩展观察者模式中有4个角色:
角色 作用 类比
被观察者(Observable) 产生事件 顾客
观察者(Observer) 接收事件,并给出响应动作 厨房
订阅(Subscribe) 连接 被观察者 & 观察者 服务员
事件(Event) 被观察者 & 观察者 沟通的载体 菜式

4.使用

导包

implementation 'io.reactivex.rxjava2:rxjava:2.1.3'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.2.0'

1.定义被观察者

三种方式:Observable.create、Observable.just、Observable.fromArray

    public void buyFood(){
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("potato");
                    e.onNext("tomato");
                    e.onNext("noodles");
                    e.onComplete();
            }
        });
 
        Observable<String> observable1 = Observable.just("potato","tomato","noodles");
        String [] foods = new String[]{"potato","tomato","noodles"};
        Observable<String> observable2 = Observable.fromArray(foods);
    }

2.定义观察者

使用observer或者subscriber

    Observer<String> mObserver = new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
 
        }
 
        @Override
        public void onNext(@NonNull String s) {
 
        }
 
        @Override
        public void onError(@NonNull Throwable e) {
 
        }
 
        @Override
        public void onComplete() {
 
        }
    };
    Subscriber<String> mStringSubscriber = new Subscriber<String>() {
        @Override
        public void onSubscribe(Subscription s) {
            
        }
 
        @Override
        public void onNext(String s) {
 
        }
 
        @Override
        public void onError(Throwable t) {
 
        }
 
        @Override
        public void onComplete() {
 
        }
    };

特别注意:2种方法的区别,即Subscriber 抽象类与Observer 接口的区别

// 相同点:二者基本使用方式完全一致(实质上,在RxJava的 subscribe 过程中,Observer总是会先被转换成Subscriber再使用)

// 不同点:Subscriber抽象类对 Observer 接口进行了扩展,新增了两个方法:

   // 1\. onStart():在还未响应事件前调用,用于做一些初始化工作

   // 2\. unsubscribe():用于取消订阅。在该方法被调用后,观察者将不再接收 & 响应事件

   // 调用该方法前,先使用 isUnsubscribed() 判断状态,确定被观察者Observable是否还持有观察者Subscriber的引用,如果引用不能及时释放,就会出现内存泄露

3.订阅

observable.subscribe(observer);

5.优雅实现

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    e.onNext("potato");
                    e.onNext("tomato");
                    e.onNext("noodles");
                    e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
 
            }
 
            @Override
            public void onNext(@NonNull String s) {
 
            }
 
            @Override
            public void onError(@NonNull Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        });

其他方法

observeOn:观察者的执行线程

observeOn 作用于该操作符之后直到出现新的observeOn操作符

subscribeOn:被观察者的执行线程

subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符 ,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制

实例

    //将字符串转换成double类型
    String path = "12.3";
    public void test(){
        //输入事件
        Observable.just(path)
                //对事件进行处理
                .map(new Function<String, Double>() {
            @Override
            public Double apply(@NonNull String s) throws Exception {
                return Double.parseDouble(s);
            }
        })
                //指定被观察者执行线程
                .subscribeOn(Schedulers.io())
                //指定观察者执行线程
                .observeOn(AndroidSchedulers.mainThread())
                //订阅观察者
                .subscribe(
                new Observer<Double>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
 
                    }
 
                    @Override
                    public void onNext(@NonNull Double aDouble) {
 
                    }
 
                    @Override
                    public void onError(@NonNull Throwable e) {
 
                    }
 
                    @Override
                    public void onComplete() {
 
                    }
                });
    }

Rxjava中的Scheduler相当于线程控制器,Rxjava通过它来指定每一段代码应该运行在什么样的线程。Rxjava提供了5种调度器:

  • .io()
  • .computation()
  • .immediate()
  • .newThread()
  • .trampoline()


另外,Android还有一个专用的AndroidSchedulers.mainThread()指定操作将在Android主线程运行。Rxjava通过subscribeOn()observeOn()两个方法来对线程进行控制,subscribeOn()指定subscribe()时间发生的线程,也就是事件产生的线程,observeOn()指定Subscriber做运行的线程,也就是消费事件的线程。


Schedulers.io() 用于I/O操作,比如:读写文件,数据库,网络交互等等。行为模式和newThread()差不多,重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存


Schedulers.computation()计算工作默认的调度器,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。


Schedulers.immediate()  这个调度器允许你立即在当前线程执行你指定的工作。这是默认的Scheduler


Schedulers.newThread()    它为指定任务启动一个新的线程。


Schedulers.trampoline()

当我们想在当前线程执行一个任务时,并不是立即,我们可以用trampoline() 将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。

RXView 防抖

    private void test3() {
        RxView.clicks(控件).throttleFirst(1000, TimeUnit.MILLISECONDS).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
 
            }
 
            @Override
            public void onNext(@NonNull Object o) {
                Log.e(TAG, "onNext: 响应事件....." );
            }
 
            @Override
            public void onError(@NonNull Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        });

RxJava篇(应用场景) - 简书 (jianshu.com)


目录
相关文章
|
4天前
|
安全 Android开发
你是否了解 RxJava 的 Disposable ?
你是否了解 RxJava 的 Disposable ?
89 0
RxJava2 中 doFinally 和 doAfterTerminate 的比较
RxJava2 中 doFinally 和 doAfterTerminate 的比较
251 0
|
IDE 开发工具 Android开发
使用rxjava创建一个rxbus事件处理框架
RxJava已经出现很多个年头了,但是依然被很多公司使用,如果现在还对RxJava了解的不够透彻, 可以看这个系列对它的分析:相信看完后你对它会有个更全面的认识。 这个系列主要从下面几个方面来讲解: **RxJava基本操作符使用** **RxJava响应式编程是如何实现的** **RxJava的背压机制及Flowable是如何实现背压的** **RxJava的线程切换原理
|
JSON Java 数据格式
rxjava2+retrofit2 简介
rxjava2+retrofit2 简介
75 0
|
数据处理
RxJava2实现RxBus
RxJava2实现RxBus
145 0
|
Java 程序员
Rxjava实战笔记 | Rxjava的基本使用解析(同步结合示例)
Rxjava实战笔记 | Rxjava的基本使用解析(同步结合示例)
|
Java Android开发
关于Rxjava的简单使用
本篇只是讲一下Rxjava的简单入门使用,想要详解的请移步其他博主文章,关于RxJava详解的文章网上一大堆,本片文章内容适合小白学习。
133 1
|
安全 Android开发
详解 RxJava 的 Disposable
RxJava2 的 Disposable,可以在适当时机取消订阅、截断数据流,避免 Android 中的内存泄露。
1006 0
|
Java 数据库 UED
RxJava的简介
RxJava的简介
246 0
RxJava的简介
|
Java Go Android开发
RxJava2
函数式编程是一种编程范式。我们常见的编程范式有命令式编程、函数式编程和逻辑式编程。我们常见的面向对象编程是一种命令式编程。命令式编程是面向计算机硬件的抽象,有变量、赋值语句、表达式和控制语句。而函数式编程是面向数学的抽象,将计算描述为一种表达式求值,函数可以在任何地方定义,并且可以对函数进行组合。响应式编程是一种面向数据流和变化传播的编程范式,数据更新是相关联的。把函数式编程里的一套思路和响应式编程合起来就是函数响应式编程。函数响应式编程可以极大地简化项目,特别是处理嵌套回调的异步事件、复杂的列表过滤和变换或者时间相关问题。在Android开发中使用函数响应式编程的主要有两大框架:
146 0
RxJava2
http://www.vxiaotou.com