RXjava響應式編程
此文作者大暴雨原創,轉載請注明出處。
如果線程的知識不是很豐富,請先查看 rxjava源碼中的線程知識 一文
rxjava總結就是:異步
實現主要是通過擴展觀察者模式
首先我們看一下只有一條線程是怎么實現響應式的。這比較簡單。首先看一下Observable這個類(被觀察者),還有Subscriber (觀察者),Subscriber實現了Observer 類。
Observable從create開始
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
create方法就是new一個Observable對象,參數是hook.onCreate(f)。hook是RxJavaObservableExecutionHook類,RxJavaObservableExecutionHook這個類什么也沒有做就是傳進去什么參數,返回什么。所以這里的hook.onCreate(f),返回的就是f對象,就是OnSubscribe。
再看一下Observable的構造方法,參數是OnSubscribe獲取
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
Observable的構造方法就是為了獲取訂閱對象
看一下觀察者和被觀察者是怎么聯系的。執行訂閱的方法就會執行subscribe.call方法.
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
...
subscriber.onStart();
...
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
...
}
}
首先調用subscriber.onStart();這個onstart要自己重寫,因為這個onStart方法,默認是空的,什么也沒有做。最后調用hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber);hook.onSubscribeStart這返回的observable.onSubscribe這個參數,之前已經說過,hook是什么也沒有干的,你傳什么參數進去就返回什么。實際上就是observable.onSubscribe的call方法。onSubscribe這個接口繼承自Action1,action1的call方法然后再拆分為不同的接口,例如onNext(),onCompleted(),onError()。這樣就實現了從創建到觸發call回調了。接下來看一下多線程,響應式是怎么實現的。畢竟異步才是響應式的核心。
響應式多線程的實現
1.調用的方式
指定事件在哪個線程執行:subscribeOn(Scheduler scheduler)
指定事件的響應在哪個線程:observeOn(Scheduler scheduler)
首先看參數是Scheduler調度器。調度器的類型Schedulers這個類。
public final class Schedulers {
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;
private static final Schedulers INSTANCE = new Schedulers();
private Schedulers() {
Scheduler c = RxJavaPlugins.getInstance().getSchedulersHook().getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = new EventLoopsScheduler();
}
Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}
Scheduler nt = RxJavaPlugins.getInstance().getSchedulersHook().getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = NewThreadScheduler.instance();
}
}
省略一些get類,和測試類
...
...
}
這里可以看到rxjava提供三種線程,android有多一些。Schedulers是一個單列的。RxJavaPlugins.getInstance().getSchedulersHook()這個類和hook是一樣的,傳什么就返回什么,不傳就返回null。所以這里一定是跑else的代碼。
接下來已io為例,看一下初始化Scheduler。CachedThreadScheduler繼承Scheduler。
final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler-";
一些初始化的操作,略
...
//緩存線程的操作
private static final class CachedWorkerPool {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final CompositeSubscription allWorkers;//保存正在使用的subscriptions
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeSubscription();
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
NewThreadWorker.tryEnableCancelPolicy(evictor);
task = evictor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
evictExpiredWorkers();
}
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
);
}
evictorService = evictor;
evictorTask = task;
}
ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
//從隊列去找
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
//沒有worker,就new一個
ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
//添加到緩存allWorkers中
allWorkers.add(w);
return w;
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
//插入隊列尾部
expiringWorkerQueue.offer(threadWorker);
}
//移除當前除了當前外的ThreadWorker。
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
...
}
final AtomicReference<CachedWorkerPool> pool;
static final CachedWorkerPool NONE;
static {
NONE = new CachedWorkerPool(0, null);
NONE.shutdown();
}
public CachedThreadScheduler() {
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
...
...
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
//線程操作的類
private static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();//保存正在使用的subscriptions
private final CachedWorkerPool pool;//緩存線程
private final ThreadWorker threadWorker;
@SuppressWarnings("unused")
volatile int once;
static final AtomicIntegerFieldUpdater<EventLoopWorker> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once");
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.threadWorker = pool.get();//獲取工作線程
}
@Override
public void unsubscribe() {
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
// unsubscribe should be idempotent, so only do this once
pool.release(threadWorker);//插入線程
}
innerSubscription.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
//這個是真正執行任務的線程
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);//執行線程,把action關聯起來
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
}
private static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
}
這里有兩個內部類,CachedWorkerPool和EventLoopWorker這兩個類。CachedWorkerPool是緩存線程,把ThreadWorker放到隊列里面,提供一些操作的方法,例如get方法獲取ThreadWorker對象,還有一些插入,清除等操作。EventLoopWorker線程的操作類,首先從緩存線程中取出一個ThreadWorker,在schedule方法中實現了threadWorker.scheduleActual這個方法,這個方法就是真正執行任務的線程。接着簡要看一下threadWorker.scheduleActual這個方法。
threadWorker類的scheduleActual方法:
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
ScheduledAction的run方法:
public void run() {
try {
lazySet(Thread.currentThread());
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie = null;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
} finally {
unsubscribe();
}
}
cheduledAction這個類是執行的線程,這個類就是把action和線程綁定,在線程中調用action.call()方法。這里啟動executor線程池,執行任務,run.add(f)線程執行結果添加到cheduledAction中。這樣子就能在一個線程中執行call方法。
這里是以io為例,所有都是繼承Scheduler類的,所以看一下Scheduler這個類。
public abstract class Scheduler {
static final long CLOCK_DRIFT_TOLERANCE_NANOS;
static {
CLOCK_DRIFT_TOLERANCE_NANOS = TimeUnit.MINUTES.toNanos(
Long.getLong("rx.scheduler.drift-tolerance", 15));
}
public abstract Worker createWorker();
public abstract static class Worker implements Subscription {
public abstract Subscription schedule(Action0 action);
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
final long periodInNanos = unit.toNanos(period);
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
final Action0 recursiveAction = new Action0() {
long count;
long lastNowNanos = firstNowNanos;
long startInNanos = firstStartInNanos;
@Override
public void call() {
if (!mas.isUnsubscribed()) {
action.call();
long nextTick;
long nowNanos = TimeUnit.MILLISECONDS.toNanos(now());
if (nowNanos + CLOCK_DRIFT_TOLERANCE_NANOS < lastNowNanos
|| nowNanos >= lastNowNanos + periodInNanos + CLOCK_DRIFT_TOLERANCE_NANOS) {
nextTick = nowNanos + periodInNanos;
startInNanos = nextTick - (periodInNanos * (++count));
} else {
nextTick = startInNanos + (++count * periodInNanos);
}
lastNowNanos = nowNanos;
long delay = nextTick - nowNanos;
mas.set(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
};
MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
// Should call `mas.set` before `schedule`, or the new Subscription may replace the old one.
mas.set(s);
s.set(schedule(recursiveAction, initialDelay, unit));
return mas;
}
public long now() {
return System.currentTimeMillis();
}
}
public long now() {
return System.currentTimeMillis();
}
在父類可以看到,createWorker,創建一個EventLoopWorker,然后就是調用Worker的schedulePeriodically方法也會調用schedule方法,去執行任務。上面已經說了,schedule方法就是通過從緩存隊列中獲取ThreadWorker,然后執行scheduleActual方法。去實現在線程中執行任務。我們已經理解了參數Scheduler,這個時候看看怎么用這個調度器類。
指定事件在哪個線程執行:subscribeOn(Scheduler scheduler)
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, false));
}
首先看一下ScalarSynchronousObservable.scalarScheduleOn方法
public Observable<T> scalarScheduleOn(final Scheduler scheduler) {
final Func1<Action0, Subscription> onSchedule;
if (scheduler instanceof EventLoopsScheduler) {
final EventLoopsScheduler els = (EventLoopsScheduler) scheduler;
onSchedule = new Func1<Action0, Subscription>() {
@Override
public Subscription call(Action0 a) {
return els.scheduleDirect(a);
}
};
} else {
onSchedule = new Func1<Action0, Subscription>() {
@Override
public Subscription call(final Action0 a) {
final Scheduler.Worker w = scheduler.createWorker();
w.schedule(new Action0() {
@Override
public void call() {
try {
a.call();
} finally {
w.unsubscribe();
}
}
});
return w;
}
};
}
return create(new ScalarAsyncOnSubscribe<T>(t, onSchedule));
}
可以看到,如果是EventLoopsScheduler,就直接創建Func1,如果不是,我們舉例io就不是EventLoopsScheduler,是CachedThreadScheduler,就是跑else代碼,scheduler.createWorker(),創建一個Worker,然后執行schedule,schedule方法已經說了,就是指定線程執行。在線程的call方法中執行onSchedule.call()回調。
源碼分析到此處。如果哪里有錯誤,請指出謝謝。
此文作者大暴雨原創,轉載請注明出處。