bind函數會返回一個新的信號N。整體思路是對原信號O進行訂閱,每當信號O產生一個值就將其轉變成一個中間信號M,並馬上訂閱M, 之后將信號M的輸出作為新信號N的輸出。
- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { RACSignalBindBlock bindingBlock = block(); [self subscribeNext:^(id x) { BOOL stop = NO; id signal = bindingBlock(x, &stop); [signal subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [subscriber sendCompleted]; }]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [subscriber sendCompleted]; }]; return nil; }]; }
flattenMap是對bind的包裝,為bind提供bindBlock。因此flattenMap與bind操作實質上是一樣的,都是將原信號傳出的值轉換成中間信號,同時馬上去訂閱這個中間信號,之后將中間信號的輸出作為新信號的輸出。如果原信號是一個信號中的信號(sendNext:一個信號),那么這個原信號的輸出值就是一個信號
- (instancetype)flattenMap:(RACStream* (^)(id value))block { Class class =self.class; return[self bind:^{ return^(id value,BOOL*stop) { id stream = block(value) ?: [class empty]; return stream; }; }]; }
map操作可將原信號輸出的數據通過自定義的方法轉換成所需的數據, 同時將變化后的數據作為新信號的輸出。它實際調用了flattenMap, 只不過中間信號是直接將mapBlock處理的值返回
常用的filter內部也是使用了flattenMap。與map相同,它也是將filter后的結果使用中間信號進行包裝並對其進行訂閱,之后將中間信號的輸出作為新信號的輸出,以此來達到輸出filter結果的目的。
- (instancetype)map:(id(^)(id value))block { Class class = self.class; return[self flattenMap:^(id value) { return[class return:block(value)]; // (1) }; }
flatten: 該操作主要作用於信號的信號。原信號作為信號的信號,在被訂閱時輸出的數據必然也是個信號(signalValue),這往往不是我們想要的。當我們執行[O flatten]操作時,因為flatten內部調用了flattenMap,flattenMap里對應的中間信號就是原信號輸出signalValue。因此在flatten操作中新信號被訂閱時輸出的值就是原信號O的各個子信號輸出值的集合。 主要用來打平信號的信號。
- (instancetype)flatten { return [self flattenMap:^(RACSignal *signalValue) { // (1) return [signalValue]; // (2) }; }
switchToLatest:與flatten相同,其主要目的也是用於”壓平”信號的信號。但與flatten不同的是,flatten是在多管線匯聚后,將原信號O的各子信號輸出作為新信號N的輸出,但switchToLatest僅僅只是將O輸出的最新信號L的輸出作為N的輸出。
- (RACSignal*)switchToLatest { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { RACMulticastConnection *connection = [self publish]; // connection.signal 在這里就是一個RACSubject
[[connection.signal flattenMap:^(RACSignal *signalValue) {// 如果connection的sourceSignal產生了新的值,那么flattenMap中的bind函數的subsribe()就會回調,這里的signalValue就是sourceSignal產生的信號值
return [signalValue takeUntil:[connection.signal concat:[RACSignal never]]];
}] subscribe:subscriber]; // 將takeUtil的信號產生的值綁定到subscriber上
RACDisposable *connectionDisposable = [connection connect];
return [RACDisposable disposableWithBlock:^{ }];
}];
}
# RACSignal class - (RACMulticastConnection *)publish { RACSubject *subject = [RACSubject subject]]; RACMulticastConnection *connection = [self multicast:subject]; return connection; } - (RACMulticastConnection *)multicast:(RACSubject *)subject { RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject]; return connection; } # RACMulticastConnection class - (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject { self = [super init]; _sourceSignal = source; _signal = subject; return self; } - (RACDisposable *)connect { self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];// 原信號添加一個信號訂閱者_signal
return self.serialDisposable;
}
takeUntil: 如果有新的信號進來,那么原來的信號就會dispose
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {// 如果有新的信號進來,那么原來的信號就會dispose return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { [signalTrigger subscribeNext:^(id _) { [subscriber sendCompleted]; } completed:^{ [subscriber sendCompleted]; }]; [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [subscriber sendCompleted]; }]; return nil; }]; }
scanWithStart : 該操作可將上次reduceBlock處理后輸出的結果作為參數,傳入當次reduceBlock操作,往往用於信號輸出值的聚合處理。scanWithStart內部仍然用到了核心操作bind。它會在bindBlock中對value進行操作,同時將上次操作得到的結果running作為參數帶入,一旦本次reduceBlock執行完,就將結果保存在running中,以便下次處理時使用,最后再將本次得出的結果用信號包裝后,傳遞出去。
- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id(^)(id,id,NSUInteger))reduceBlock { Class class =self.class; return [self bind:^{ __block id running = startingValue; __block NSUIntegerindex = 0; return^(id value, BOOL*stop) { running = reduceBlock(running, value, index++); // (1) return [class return:running]; // (2) }; }]; }
throttle:這個操作接收一個時間間隔interval作為參數,如果Signal發出的next事件之后interval時間內不再發出next事件,那么它返回的Signal會將這個next事件發出。
take: 只接收原有信號前幾個信號值,信號發送數量大於指定序號的會被忽略
- (__kindof RACStream *)take:(NSUInteger)count { Class class = self.class; if (count == 0) return class.empty; return [[self bind:^{ __block NSUInteger taken = 0; return ^ id (id value, BOOL *stop) { if (taken < count) { ++taken; if (taken == count) *stop = YES; return [class return:value]; } else { return nil; } }; }]]; }
skip: 過濾原有信號前幾個信號值,從指定序號位置開始接收序號
- (__kindof RACStream *)skip:(NSUInteger)skipCount { Class class = self.class; return [[self bind:^{ __block NSUInteger skipped = 0; return ^(id value, BOOL *stop) { if (skipped >= skipCount) return [class return:value]; skipped++; return class.empty; }; }]]; }
startWith: 除了接收原有信號的值,還會在最前面接收一個指定的信號值。
- (__kindof RACStream *)startWith:(id)value { return [[self.class return:value] concat:self]; }
repeat: 對原有信號發送了complete前的所有信號值反復接收
- (RACSignal *)repeat { return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { return subscribeForever(self, ^(id x) { [subscriber sendNext:x]; }, ^(NSError *error, RACDisposable *disposable) { [disposable dispose]; [subscriber sendError:error]; }, ^(RACDisposable *disposable) { // 不會處理complete信號 }); }] setNameWithFormat:@"[%@] -repeat", self.name]; }
static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) { RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) { [signal subscribeNext:next error:^(NSError *e) { @autoreleasepool { error(e, nil); } recurse();//[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
} completed:^{ @autoreleasepool { completed(nil);
}
recurse();//[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
}];
};
recursiveBlock(^{
[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
});
return nil;
}
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable { [self schedule:^{ void (^reallyReschedule)(void) = ^{ [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable]; }; __block NSLock *lock = [[NSLock alloc] init]; lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)]; __block NSUInteger rescheduleCount = 0; __block BOOL rescheduleImmediately = NO; @autoreleasepool { recursiveBlock(^{// 這里會觸發信號的重復訂閱,並觸發[signal sendNext:next error:error complete:complete] [lock lock]; BOOL immediate = rescheduleImmediately; if (!immediate) ++rescheduleCount; [lock unlock]; if (immediate) reallyReschedule(); }); } [lock lock]; NSUInteger synchronousCount = rescheduleCount; rescheduleImmediately = YES; [lock unlock]; for (NSUInteger i = 0; i < synchronousCount; i++) { reallyReschedule(); } }]; }
retry: 如果原有信號發送了一個error,那么這里會嘗試重新訂閱原有信號,在重復指定次數后發送失敗的信號.常用於網絡請求的失敗重試機制
- (RACSignal *)retry:(NSInteger)retryCount { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { __block NSInteger currentRetryCount = 0; return subscribeForever(self, ^(id x) { [subscriber sendNext:x]; }, ^(NSError *error, RACDisposable *disposable) { if (retryCount == 0 || currentRetryCount < retryCount) { currentRetryCount++; return; } [disposable dispose]; [subscriber sendError:error]; }, ^(RACDisposable *disposable) {// 會觸發完成信號,不會無限訂閱 [disposable dispose]; [subscriber sendCompleted]; }); }]; }
aggregateWithStart: reduceBlock: 設一個初始值,對原信號的值進行累加並輸出最終值
- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock { return [[[self scanWithStart:start reduceWithIndex:reduceBlock] startWith:start] takeLast:1]; }
- (__kindof RACStream *)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock { Class class = self.class; return [self bind:^{ __block id running = startingValue; __block NSUInteger index = 0; return ^(id value, BOOL *stop) { running = reduceBlock(running, value, index++); return [class return:running]; }; }]; }
- (RACSignal *)takeLast:(NSUInteger)count { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count]; return [self subscribeNext:^(id x) { [valuesTaken addObject:x ? : RACTupleNil.tupleNil]; while (valuesTaken.count > count) { [valuesTaken removeObjectAtIndex:0];// 移除之前的所有信號值 } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ for (id value in valuesTaken) { [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value]; } [subscriber sendCompleted]; }]; }]; }
delay: 對原信號值延遲輸出
throttle: 指定原有信息的輸出間隔時間,時間內的信號值會被過濾
concat: 拼接兩個信號的輸出值為一個整體等價於(flatten:1)
- (RACSignal *)concat:(RACSignal *)signal { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { [self subscribeNext:^(id x) { [subscriber sendNext:x]; } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ [signal subscribe:subscriber]; }]; return nil; }]; }
zip: 拉鏈式輸出兩個信號的值為一個元祖.例如a信號輸出[1,2,3,4,5],b信號輸出[6,7,8,9],那么zip后輸出為[[1,6],[2,7],[3,8],[4,9]]
- (RACSignal *)zipWith:(RACSignal *)signal { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { __block BOOL selfCompleted = NO; NSMutableArray *selfValues = [NSMutableArray array]; __block BOOL otherCompleted = NO; NSMutableArray *otherValues = [NSMutableArray array]; void (^sendCompletedIfNecessary)(void) = ^{ @synchronized (selfValues) { BOOL selfEmpty = (selfCompleted && selfValues.count == 0); BOOL otherEmpty = (otherCompleted && otherValues.count == 0); if (selfEmpty || otherEmpty) [subscriber sendCompleted]; } }; void (^sendNext)(void) = ^{ @synchronized (selfValues) { if (selfValues.count == 0) return; if (otherValues.count == 0) return; RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]); [selfValues removeObjectAtIndex:0]; [otherValues removeObjectAtIndex:0]; [subscriber sendNext:tuple]; sendCompletedIfNecessary(); } }; RACDisposable *selfDisposable = [self subscribeNext:^(id x) { @synchronized (selfValues) { [selfValues addObject:x ?: RACTupleNil.tupleNil]; sendNext(); } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ @synchronized (selfValues) { selfCompleted = YES; sendCompletedIfNecessary(); } }]; RACDisposable *otherDisposable = [signal subscribeNext:^(id x) { @synchronized (selfValues) { [otherValues addObject:x ?: RACTupleNil.tupleNil]; sendNext(); } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ @synchronized (selfValues) { otherCompleted = YES; sendCompletedIfNecessary(); } }]; return [RACDisposable disposableWithBlock:^{ [selfDisposable dispose]; [otherDisposable dispose]; }]; }]; }
combineLatest: 打包兩個信號的輸出值,任何一個信號輸出變化,都會取出最近兩個信號的輸出值。例如a信號輸出[1,2,3,4,5],b信號輸出[6,7,8,9],那么zip后輸出為[[1,6],[2,6],[2,7],[3,7],[3,8],[4,8][4,9],[5,9]].(這里可能會有更多的輸出)
- (RACSignal *)combineLatestWith:(RACSignal *)signal { return [RACSignal createSignal:^(id<RACSubscriber> subscriber) { __block id lastSelfValue = nil; __block BOOL selfCompleted = NO; __block id lastOtherValue = nil; __block BOOL otherCompleted = NO; void (^sendNext)(void) = ^{ if (lastSelfValue == nil || lastOtherValue == nil) return; [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)]; }; RACDisposable *selfDisposable = [self subscribeNext:^(id x) { lastSelfValue = x ?: RACTupleNil.tupleNil; sendNext(); } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ selfCompleted = YES; if (otherCompleted) [subscriber sendCompleted]; }]; [signal subscribeNext:^(id x) { lastOtherValue = x ?: RACTupleNil.tupleNil; sendNext(); } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ otherCompleted = YES; if (selfCompleted) [subscriber sendCompleted]; }]; return nil; }]; }
if:then:else : 如果if信號返回真,則輸出then的信號值,如果if信號返回非真,則輸出else的信號值。這里的then和else的信號值隨着if的信號值切換會重復輸出
+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal { return [[boolSignal map:^(NSNumber *value) { return (value.boolValue ? trueSignal : falseSignal);// 這里返回后變成了高階信號 }] switchToLatest]; }