已解决
Android RxJava3 原理浅析
来自网友在路上 179879提问 提问时间:2023-11-10 14:35:08阅读次数: 79
最佳答案 问答题库798位专家为你答疑解惑
使用
val retrofit =Retrofit.Builder().baseUrl("https://api.github.com/").addConverterFactory(GsonConverterFactory.create()).addCallAdapterFactory(RxJava3CallAdapterFactory.create()).build()val api = retrofit.create(API::class.java)api.getRepo("rengwuxian").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<MutableList<Repo>>{override fun onSubscribe(d: Disposable?) {LogUtil.logD("onSubscribe${Thread.currentThread().name}")clv1.text = "onSubscribe"}override fun onSuccess(t: MutableList<Repo>) {LogUtil.logD(t[0].name)clv1.text =t[0].name}override fun one rror(e: Throwable) {LogUtil.logD(e.message!!)clv1.text = e.message!!}})
RxJava需要我们主动切线程
.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread())
也可以
.addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io()))
在所有配置中设为后台线程
.subscribeOn(Schedulers.io()) 那么这个就不需要了
源码:
private RxJava3CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {this.scheduler = scheduler;this.isAsync = isAsync;}
内部赋值
Single.just 发送顺时事件
Single.just("1").subscribe(object : SingleObserver<String?>{override fun onSubscribe(d: Disposable?) {TODO("Not yet implemented")}override fun onSuccess(t: String?) {TODO("Not yet implemented")}override fun one rror(e: Throwable?) {TODO("Not yet implemented")}})
源码:
public static <@NonNull T> Single<T> just(T item) {Objects.requireNonNull(item, "item is null"); //先判断空return RxJavaPlugins.onAssembly(new SingleJust<>(item)); }//钩子方法public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {Function<? super Single, ? extends Single> f = onSingleAssembly;if (f != null) {return apply(f, source);}return source;}//Single
/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.single;import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;public final class SingleJust<T> extends Single<T> {final T value;public SingleJust(T value) {this.value = value;}@Overrideprotected void subscribeActual(SingleObserver<? super T> observer) {observer.onSubscribe(Disposable.disposed());observer.onSuccess(value);}}关键方法@Overrideprotected void subscribeActual(SingleObserver<? super T> observer) {observer.onSubscribe(Disposable.disposed()); //产生订阅 然后丢弃 是个枚举类observer.onSuccess(value); //成功 ,没有失败的回调因为不可能失败 因为是一个可用对象}
操作符: map,转换对象类型
val single: Single<Int> = Single.just(1)val singleString = single.map(object : Function<Int,String>{override fun apply(t: Int): String {return t.toString()}})
多个single map
按时间发送数据: 从0开始发送,间隔1秒
Observable.interval(0,1,TimeUnit.SECONDS).subscribe(object : Observer<Long>{override fun onSubscribe(d: Disposable?) {TODO("Not yet implemented")}override fun onNext(t: Long?) {clv1.text= t!!.toString()}override fun one rror(e: Throwable?) {TODO("Not yet implemented")}override fun onComplete() {TODO("Not yet implemented")}})
public static Observable<Long> interval(long initialDelay, long period, @NonNull TimeUnit unit) {return interval(initialDelay, period, unit, Schedulers.computation());}
interval
/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.observable;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;public final class ObservableInterval extends Observable<Long> {final Scheduler scheduler;final long initialDelay;final long period;final TimeUnit unit;public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {this.initialDelay = initialDelay;this.period = period;this.unit = unit;this.scheduler = scheduler;}@Overridepublic void subscribeActual(Observer<? super Long> observer) {IntervalObserver is = new IntervalObserver(observer);observer.onSubscribe(is); //传入Scheduler sch = scheduler;if (sch instanceof TrampolineScheduler) {Worker worker = sch.createWorker();is.setResource(worker);worker.schedulePeriodically(is, initialDelay, period, unit);} else {Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);is.setResource(d);}}static final class IntervalObserverextends AtomicReference<Disposable>implements Disposable, Runnable {private static final long serialVersionUID = 346773832286157679L;final Observer<? super Long> downstream;long count;IntervalObserver(Observer<? super Long> downstream) {this.downstream = downstream;}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return get() == DisposableHelper.DISPOSED;}@Overridepublic void run() {if (get() != DisposableHelper.DISPOSED) {downstream.onNext(count++);}}public void setResource(Disposable d) {DisposableHelper.setOnce(this, d);}}
}===
/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.observable;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;public final class ObservableInterval extends Observable<Long> {final Scheduler scheduler;final long initialDelay;final long period;final TimeUnit unit;public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {this.initialDelay = initialDelay;this.period = period;this.unit = unit;this.scheduler = scheduler;}@Overridepublic void subscribeActual(Observer<? super Long> observer) {IntervalObserver is = new IntervalObserver(observer);observer.onSubscribe(is);Scheduler sch = scheduler;if (sch instanceof TrampolineScheduler) {Worker worker = sch.createWorker();is.setResource(worker);worker.schedulePeriodically(is, initialDelay, period, unit);} else {Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);is.setResource(d);}}//实现了Disposable 然后extends AtomicReference引用,线程安全的Disposablestatic final class IntervalObserverextends AtomicReference<Disposable>implements Disposable, Runnable {private static final long serialVersionUID = 346773832286157679L;final Observer<? super Long> downstream;long count;IntervalObserver(Observer<? super Long> downstream) {this.downstream = downstream;}@Overridepublic void dispose() {DisposableHelper.dispose(this); //调用内部的dis}@Overridepublic boolean isDisposed() {return get() == DisposableHelper.DISPOSED;}@Overridepublic void run() {if (get() != DisposableHelper.DISPOSED) {downstream.onNext(count++);}}public void setResource(Disposable d) {DisposableHelper.setOnce(this, d);}}
}// disposepublic static boolean dispose(AtomicReference<Disposable> field) {Disposable current = field.get(); //拿到内部的Disposable d = DISPOSED;if (current != d) { //如果已经被取消current = field.getAndSet(d);if (current != d) {if (current != null) {current.dispose();}return true;}}return false;}//setResource 把内部值设为传入的值 ,只设置一次public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {Objects.requireNonNull(d, "d is null");if (!field.compareAndSet(null, d)) {d.dispose();if (field.get() != DISPOSED) {reportDisposableSet();}return false;}return true;}
实际设置定时任务的代码: ObservableInterval->subscribeActual
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
@Override public void run() {if (get() != DisposableHelper.DISPOSED) {downstream.onNext(count++);} }
ObservableInterval 内部维护了一个IntervalObserver创建和取消
SingleMap.delay
public final Single<T> delay(long time, @NonNull TimeUnit unit) {return delay(time, unit, Schedulers.computation(), false); }
/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.single;import java.util.concurrent.TimeUnit;import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;public final class SingleDelay<T> extends Single<T> {final SingleSource<? extends T> source;final long time;final TimeUnit unit;final Scheduler scheduler;final boolean delayError;public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {this.source = source;this.time = time;this.unit = unit;this.scheduler = scheduler;this.delayError = delayError;}@Overrideprotected void subscribeActual(final SingleObserver<? super T> observer) {final SequentialDisposable sd = new SequentialDisposable(); //内部维护observer.onSubscribe(sd); //下游的sdsource.subscribe(new Delay(sd, observer)); //传递到上游}final class Delay implements SingleObserver<T> {private final SequentialDisposable sd;final SingleObserver<? super T> downstream;Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {this.sd = sd;this.downstream = observer;}@Overridepublic void onSubscribe(Disposable d) {sd.replace(d);}@Overridepublic void onSuccess(final T value) {sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
//线程调度,进行置换,置换为本地,不再是上游}@Overridepublic void one rror(final Throwable e) {sd.replace(scheduler.scheduleDirect(new one rror(e), delayError ? time : 0, unit));}final class OnSuccess implements Runnable {private final T value;OnSuccess(T value) {this.value = value;}@Overridepublic void run() {downstream.onSuccess(value);}}final class one rror implements Runnable {private final Throwable e;OnError(Throwable e) {this.e = e;}@Overridepublic void run() {downstream.onError(e);}}}
}/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.disposables;import java.util.concurrent.atomic.AtomicReference;import io.reactivex.rxjava3.disposables.Disposable;/*** A Disposable container that allows updating/replacing a Disposable* atomically and with respect of disposing the container itself.* <p>* The class extends AtomicReference directly so watch out for the API leak!* @since 2.0*/
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {private static final long serialVersionUID = -754898800686245608L;/*** Constructs an empty SequentialDisposable.*/public SequentialDisposable() {// nothing to do}/*** Construct a SequentialDisposable with the initial Disposable provided.* @param initial the initial disposable, null allowed*/public SequentialDisposable(Disposable initial) {lazySet(initial);}/*** Atomically: set the next disposable on this container and dispose the previous* one (if any) or dispose next if the container has been disposed.* @param next the Disposable to set, may be null* @return true if the operation succeeded, false if the container has been disposed* @see #replace(Disposable)*/public boolean update(Disposable next) {return DisposableHelper.set(this, next);}/*** Atomically: set the next disposable on this container but don't dispose the previous* one (if any) or dispose next if the container has been disposed.* @param next the Disposable to set, may be null* @return true if the operation succeeded, false if the container has been disposed* @see #update(Disposable)*/public boolean replace(Disposable next) {return DisposableHelper.replace(this, next);}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}
}
ObservableMap
/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.observable;import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver;import java.util.Objects;public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {super(source);this.function = function;}@Overridepublic void subscribeActual(Observer<? super U> t) {source.subscribe(new MapObserver<T, U>(t, function));
//通过上游调用}static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {super(actual);this.mapper = mapper;}@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != NONE) {downstream.onNext(null);return;}U v;try {v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}downstream.onNext(v);}@Overridepublic int requestFusion(int mode) {return transitiveBoundaryFusion(mode);}@Nullable@Overridepublic U poll() throws Throwable {T t = qd.poll();return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;}}
}//上游public final void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;
//有上下游 且可中断的if (d instanceof QueueDisposable) {this.qd = (QueueDisposable<T>)d;}if (beforeDownstream()) {downstream.onSubscribe(this);afterDownstream();}}}//dispose 上游dispose@Overridepublic void dispose() {upstream.dispose();}ObservableDelay/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.observable;import java.util.concurrent.TimeUnit;import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.SerializedObserver;public final class ObservableDelay<T> extends AbstractObservableWithUpstream<T, T> {final long delay;final TimeUnit unit;final Scheduler scheduler;final boolean delayError;public ObservableDelay(ObservableSource<T> source, long delay, TimeUnit unit, Scheduler scheduler, boolean delayError) {super(source);this.delay = delay;this.unit = unit;this.scheduler = scheduler;this.delayError = delayError;}@Override@SuppressWarnings("unchecked")public void subscribeActual(Observer<? super T> t) {Observer<T> observer;if (delayError) {observer = (Observer<T>)t; //} else {observer = new SerializedObserver<>(t);}Scheduler.Worker w = scheduler.createWorker();//线程调度器source.subscribe(new DelayObserver<>(observer, delay, unit, w, delayError));
//上游的DelayObserver}static final class DelayObserver<T> implements Observer<T>, Disposable {final Observer<? super T> downstream;final long delay;final TimeUnit unit;final Scheduler.Worker w;final boolean delayError;Disposable upstream;DelayObserver(Observer<? super T> actual, long delay, TimeUnit unit, Worker w, boolean delayError) {super();this.downstream = actual;this.delay = delay;this.unit = unit;this.w = w;this.delayError = delayError;}@Overridepublic void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;downstream.onSubscribe(this);}}@Overridepublic void onNext(final T t) {w.schedule(new OnNext(t), delay, unit);}@Overridepublic void one rror(final Throwable t) {w.schedule(new one rror(t), delayError ? delay : 0, unit);//local延时}@Overridepublic void onComplete() {w.schedule(new OnComplete(), delay, unit);}@Overridepublic void dispose() {upstream.dispose();w.dispose();}@Overridepublic boolean isDisposed() {return w.isDisposed();}final class OnNext implements Runnable {private final T t;OnNext(T t) {this.t = t;}@Overridepublic void run() {downstream.onNext(t);}}final class one rror implements Runnable {private final Throwable throwable;OnError(Throwable throwable) {this.throwable = throwable;}@Overridepublic void run() {try {downstream.onError(throwable);} finally {w.dispose();}}}final class OnComplete implements Runnable {@Overridepublic void run() {try {downstream.onComplete();} finally {w.dispose();}}}}
}关联上下游 先验证 然后赋值public void onSubscribe(Disposable d) {if (DisposableHelper.validate(this.upstream, d)) {this.upstream = d;downstream.onSubscribe(this); //启动下游subsc}}
主要是看生产者有没有上游,有没有自己生产的 1 有上游,2自己生产的,
dispose 取消也是这个原则
线程切换:
subscribeOn
/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.single;import java.util.concurrent.atomic.AtomicReference;import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;public final class SingleSubscribeOn<T> extends Single<T> {final SingleSource<? extends T> source;final Scheduler scheduler;public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {this.source = source;this.scheduler = scheduler;}@Overrideprotected void subscribeActual(final SingleObserver<? super T> observer) {final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);observer.onSubscribe(parent);Disposable f = scheduler.scheduleDirect(parent); //runnable,切线程parent.task.replace(f);//切换线程}static final class SubscribeOnObserver<T>extends AtomicReference<Disposable>implements SingleObserver<T>, Disposable, Runnable {private static final long serialVersionUID = 7000911171163930287L;final SingleObserver<? super T> downstream;final SequentialDisposable task;final SingleSource<? extends T> source;SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {this.downstream = actual;this.source = source;this.task = new SequentialDisposable();}@Overridepublic void onSubscribe(Disposable d) {
//d =上游传递的Disposable DisposableHelper.setOnce(this, d);}@Overridepublic void onSuccess(T value) {downstream.onSuccess(value);}@Overridepublic void one rror(Throwable e) {downstream.onError(e);}@Overridepublic void dispose() {DisposableHelper.dispose(this);//上游取消切换task.dispose(); //内部取消}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}@Overridepublic void run() {source.subscribe(this);//对上游进行订阅}}}
SingleObserveOn:
/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.single;import java.util.concurrent.atomic.AtomicReference;import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;public final class SingleObserveOn<T> extends Single<T> {final SingleSource<T> source;final Scheduler scheduler;public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {this.source = source;this.scheduler = scheduler;}@Overrideprotected void subscribeActual(final SingleObserver<? super T> observer) {source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
//调用subscribe,第一时间不切换线程}static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>implements SingleObserver<T>, Disposable, Runnable {private static final long serialVersionUID = 3528003840217436037L;final SingleObserver<? super T> downstream;final Scheduler scheduler;T value;Throwable error;ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {this.downstream = actual; this.scheduler = scheduler;}@Overridepublic void onSubscribe(Disposable d) {if (DisposableHelper.setOnce(this, d)) //赋值下游
{downstream.onSubscribe(this); //调用下游的Disposable
//取消的时候 下游也取消}}@Overridepublic void onSuccess(T value) {this.value = value;Disposable d = scheduler.scheduleDirect(this);DisposableHelper.replace(this, d);//置换切线程的任务,发消息前取消上游,然后切线程任务 }@Overridepublic void one rror(Throwable e) {this.error = e;Disposable d = scheduler.scheduleDirect(this);DisposableHelper.replace(this, d);}@Overridepublic void run() {Throwable ex = error;if (ex != null) {downstream.onError(ex);} else {downstream.onSuccess(value);}}@Overridepublic void dispose() {DisposableHelper.dispose(this);//取消的时候 下游也取消}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}}
}/** Copyright (c) 2016-present, RxJava Contributors.** Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in* compliance with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software distributed under the License is* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See* the License for the specific language governing permissions and limitations under the License.*/package io.reactivex.rxjava3.internal.operators.single;import java.util.concurrent.atomic.AtomicReference;import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;public final class SingleObserveOn<T> extends Single<T> {final SingleSource<T> source;final Scheduler scheduler;public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {this.source = source;this.scheduler = scheduler;}@Overrideprotected void subscribeActual(final SingleObserver<? super T> observer) {source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));}static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>implements SingleObserver<T>, Disposable, Runnable {private static final long serialVersionUID = 3528003840217436037L;final SingleObserver<? super T> downstream;final Scheduler scheduler;T value;Throwable error;ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {this.downstream = actual;this.scheduler = scheduler;}@Overridepublic void onSubscribe(Disposable d) {if (DisposableHelper.setOnce(this, d)) {downstream.onSubscribe(this);}}@Overridepublic void onSuccess(T value) {this.value = value;Disposable d = scheduler.scheduleDirect(this); //切线程DisposableHelper.replace(this, d);}@Overridepublic void one rror(Throwable e) {this.error = e;Disposable d = scheduler.scheduleDirect(this); //切线程DisposableHelper.replace(this, d);}@Overridepublic void run() {Throwable ex = error;if (ex != null) {downstream.onError(ex);} else {downstream.onSuccess(value);}}@Overridepublic void dispose() {DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());}}
}
切线程work
@NonNullpublic Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {final Worker w = createWorker(); //createWorker 1final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//钩子方法DisposeTask task = new DisposeTask(decoratedRun, w);//w.schedule(task, delay, unit);return task;}1@NonNull@Overridepublic Worker createWorker() {return new NewThreadWorker(threadFactory); //threadFactory 2}2
public interface ScheduledExecutorService extends ExecutorService {}//本质通过线程池创建管理 Executorspublic static ScheduledExecutorService create(ThreadFactory factory) {final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);exec.setRemoveOnCancelPolicy(PURGE_ENABLED);return exec;}
切换主线程 mainThread通过 :Looper.getMainLooper()
static final Scheduler DEFAULT = internalFrom(Looper.getMainLooper(), true);
@Override@SuppressLint("NewApi") // Async will only be true when the API is available to call.public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {if (run == null) throw new NullPointerException("run == null");if (unit == null) throw new NullPointerException("unit == null");run = RxJavaPlugins.onSchedule(run);ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);Message message = Message.obtain(handler, scheduled);if (async) {message.setAsynchronous(true);}
//发消息到主线程 handler = new Handler(looper); loop = mainLoophandler.sendMessageDelayed(message, unit.toMillis(delay));return scheduled;}
查看全文
99%的人还看了
相似问题
猜你感兴趣
版权申明
本文"Android RxJava3 原理浅析":http://eshow365.cn/6-37291-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!
- 上一篇: Go入门简介
- 下一篇: ISP算法——UVNR