RAC篇(中) - 信號的各種轉換和操作


bind函數會返回一個新的信號N。整體思路是對原信號O進行訂閱,每當信號O產生一個值就將其轉變成一個中間信號M,並馬上訂閱M, 之后將信號M的輸出作為新信號N的輸出。

- (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSignalBindBlock bindingBlock = block();
        [self subscribeNext:^(id x) {
            BOOL stop = NO;
            id signal = bindingBlock(x, &stop);
            [signal subscribeNext:^(id x) {
                [subscriber sendNext:x];
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
                [subscriber sendCompleted];
            }];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];
        return nil;
    }];
}

flattenMap是對bind的包裝,為bind提供bindBlock。因此flattenMap與bind操作實質上是一樣的,都是將原信號傳出的值轉換成中間信號,同時馬上去訂閱這個中間信號,之后將中間信號的輸出作為新信號的輸出。如果原信號是一個信號中的信號(sendNext:一個信號),那么這個原信號的輸出值就是一個信號

- (instancetype)flattenMap:(RACStream* (^)(id value))block 
{
    Class class =self.class;
    return[self bind:^{
        return^(id value,BOOL*stop) {
            id stream = block(value) ?: [class empty];
            return stream;
        };
    }];
}

map操作可將原信號輸出的數據通過自定義的方法轉換成所需的數據, 同時將變化后的數據作為新信號的輸出。它實際調用了flattenMap, 只不過中間信號是直接將mapBlock處理的值返回

常用的filter內部也是使用了flattenMap。與map相同,它也是將filter后的結果使用中間信號進行包裝並對其進行訂閱,之后將中間信號的輸出作為新信號的輸出,以此來達到輸出filter結果的目的。

- (instancetype)map:(id(^)(id value))block
{
    Class class = self.class;

    return[self flattenMap:^(id value) {
        return[class return:block(value)]; // (1)
    };
}

flatten: 該操作主要作用於信號的信號。原信號作為信號的信號,在被訂閱時輸出的數據必然也是個信號(signalValue),這往往不是我們想要的。當我們執行[O flatten]操作時,因為flatten內部調用了flattenMap,flattenMap里對應的中間信號就是原信號輸出signalValue。因此在flatten操作中新信號被訂閱時輸出的值就是原信號O的各個子信號輸出值的集合。 主要用來打平信號的信號。

- (instancetype)flatten
{
    return [self flattenMap:^(RACSignal *signalValue) { // (1)
        return [signalValue]; // (2)
    };
}

 switchToLatest:與flatten相同,其主要目的也是用於”壓平”信號的信號。但與flatten不同的是,flatten是在多管線匯聚后,將原信號O的各子信號輸出作為新信號N的輸出,但switchToLatest僅僅只是將O輸出的最新信號L的輸出作為N的輸出。

- (RACSignal*)switchToLatest 
{
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACMulticastConnection *connection = [self publish];
      // connection.signal 在這里就是一個RACSubject
        [[connection.signal flattenMap:^(RACSignal *signalValue) {// 如果connection的sourceSignal產生了新的值,那么flattenMap中的bind函數的subsribe()就會回調,這里的signalValue就是sourceSignal產生的信號值
       return [signalValue takeUntil:[connection.signal concat:[RACSignal never]]]; 
     }] subscribe:subscriber]; // 將takeUtil的信號產生的值綁定到subscriber上
     RACDisposable *connectionDisposable = [connection connect]; 
     return [RACDisposable disposableWithBlock:^{ }];
  }];
}
# RACSignal class
- (RACMulticastConnection *)publish {
    RACSubject *subject = [RACSubject subject]];
    RACMulticastConnection *connection = [self multicast:subject];
    return connection;
}

- (RACMulticastConnection *)multicast:(RACSubject *)subject {
    RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
    return connection;
}
# RACMulticastConnection class
- (instancetype)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
    self = [super init];
    _sourceSignal = source;
    _signal = subject;
    return self;
}
- (RACDisposable *)connect {
    self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];// 原信號添加一個信號訂閱者_signal
  return self.serialDisposable; 
}

 takeUntil: 如果有新的信號進來,那么原來的信號就會dispose

- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {// 如果有新的信號進來,那么原來的信號就會dispose
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        [signalTrigger subscribeNext:^(id _) {
            [subscriber sendCompleted];
        } completed:^{
            [subscriber sendCompleted];
        }];

        [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];
        return nil;
    }];
}

 scanWithStart : 該操作可將上次reduceBlock處理后輸出的結果作為參數,傳入當次reduceBlock操作,往往用於信號輸出值的聚合處理。scanWithStart內部仍然用到了核心操作bind。它會在bindBlock中對value進行操作,同時將上次操作得到的結果running作為參數帶入,一旦本次reduceBlock執行完,就將結果保存在running中,以便下次處理時使用,最后再將本次得出的結果用信號包裝后,傳遞出去。

- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id(^)(id,id,NSUInteger))reduceBlock {
    Class class =self.class;
    return [self bind:^{
        __block id running = startingValue;
        __block NSUIntegerindex = 0;
        return^(id value, BOOL*stop) {
            running = reduceBlock(running, value, index++); // (1)
            return [class return:running]; // (2)
        };
    }];
}

throttle:這個操作接收一個時間間隔interval作為參數,如果Signal發出的next事件之后interval時間內不再發出next事件,那么它返回的Signal會將這個next事件發出。

 

take: 只接收原有信號前幾個信號值,信號發送數量大於指定序號的會被忽略

- (__kindof RACStream *)take:(NSUInteger)count {
    Class class = self.class;
    if (count == 0) return class.empty;
    return [[self bind:^{
        __block NSUInteger taken = 0;
        return ^ id (id value, BOOL *stop) {
            if (taken < count) {
                ++taken;
                if (taken == count) *stop = YES;
                return [class return:value];
            } else {
                return nil;
            }
        };
    }]];
}

skip: 過濾原有信號前幾個信號值,從指定序號位置開始接收序號

- (__kindof RACStream *)skip:(NSUInteger)skipCount {
    Class class = self.class;
    return [[self bind:^{
        __block NSUInteger skipped = 0;
        return ^(id value, BOOL *stop) {
            if (skipped >= skipCount) return [class return:value];
            skipped++;
            return class.empty;
        };
    }]];
}

startWith: 除了接收原有信號的值,還會在最前面接收一個指定的信號值。

- (__kindof RACStream *)startWith:(id)value {
    return [[self.class return:value] concat:self];
}

repeat: 對原有信號發送了complete前的所有信號值反復接收

- (RACSignal *)repeat {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return subscribeForever(self,
            ^(id x) {
                [subscriber sendNext:x];
            },
            ^(NSError *error, RACDisposable *disposable) {
                [disposable dispose];
                [subscriber sendError:error];
            },
            ^(RACDisposable *disposable) {
                                // 不會處理complete信號
            });
    }] setNameWithFormat:@"[%@] -repeat", self.name];
}

 

static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
    RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
        [signal subscribeNext:next error:^(NSError *e) {
            @autoreleasepool {
                error(e, nil);
            }
            recurse();//[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
        } completed:^{
            @autoreleasepool {
                completed(nil);
            }
            recurse();//[[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
    }]; 
  };
  recursiveBlock(
^{
    [[RACScheduler scheduler] scheduleRecursiveBlock:recursiveBlock];
  });
  return nil;
}

 

- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
    [self schedule:^{
        void (^reallyReschedule)(void) = ^{
            [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
        };
        __block NSLock *lock = [[NSLock alloc] init];
        lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];
        __block NSUInteger rescheduleCount = 0;
        __block BOOL rescheduleImmediately = NO;
        @autoreleasepool {
            recursiveBlock(^{// 這里會觸發信號的重復訂閱,並觸發[signal sendNext:next error:error complete:complete]
                [lock lock];
                BOOL immediate = rescheduleImmediately;
                if (!immediate) ++rescheduleCount;
                [lock unlock];
                if (immediate) reallyReschedule();
            });
        }
        [lock lock];
        NSUInteger synchronousCount = rescheduleCount;
        rescheduleImmediately = YES;
        [lock unlock];
        for (NSUInteger i = 0; i < synchronousCount; i++) {
            reallyReschedule();
        }
    }];
}

 

retry: 如果原有信號發送了一個error,那么這里會嘗試重新訂閱原有信號,在重復指定次數后發送失敗的信號.常用於網絡請求的失敗重試機制

- (RACSignal *)retry:(NSInteger)retryCount {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block NSInteger currentRetryCount = 0;
        return subscribeForever(self,
            ^(id x) {
                [subscriber sendNext:x];
            },
            ^(NSError *error, RACDisposable *disposable) {
                if (retryCount == 0 || currentRetryCount < retryCount) {
                    currentRetryCount++;
                    return;
                }

                [disposable dispose];
                [subscriber sendError:error];
            },
            ^(RACDisposable *disposable) {// 會觸發完成信號,不會無限訂閱
                [disposable dispose];
                [subscriber sendCompleted];
            });
    }];
}

 

aggregateWithStart: reduceBlock: 設一個初始值,對原信號的值進行累加並輸出最終值

- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
    return [[[self
        scanWithStart:start reduceWithIndex:reduceBlock]
        startWith:start]
        takeLast:1];
}
- (__kindof RACStream *)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
    Class class = self.class;

    return [self bind:^{
        __block id running = startingValue;
        __block NSUInteger index = 0;

        return ^(id value, BOOL *stop) {
            running = reduceBlock(running, value, index++);
            return [class return:running];
        };
    }];
}
- (RACSignal *)takeLast:(NSUInteger)count {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
        return [self subscribeNext:^(id x) {
            [valuesTaken addObject:x ? : RACTupleNil.tupleNil];

            while (valuesTaken.count > count) {
                [valuesTaken removeObjectAtIndex:0];// 移除之前的所有信號值
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            for (id value in valuesTaken) {
                [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
            }
            [subscriber sendCompleted];
        }];
    }];
}

 delay: 對原信號值延遲輸出

throttle: 指定原有信息的輸出間隔時間,時間內的信號值會被過濾

concat: 拼接兩個信號的輸出值為一個整體等價於(flatten:1)

- (RACSignal *)concat:(RACSignal *)signal {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [signal subscribe:subscriber];
        }];
        return nil;
    }];
}

zip: 拉鏈式輸出兩個信號的值為一個元祖.例如a信號輸出[1,2,3,4,5],b信號輸出[6,7,8,9],那么zip后輸出為[[1,6],[2,7],[3,8],[4,9]]

- (RACSignal *)zipWith:(RACSignal *)signal {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block BOOL selfCompleted = NO;
        NSMutableArray *selfValues = [NSMutableArray array];
        __block BOOL otherCompleted = NO;
        NSMutableArray *otherValues = [NSMutableArray array];
        void (^sendCompletedIfNecessary)(void) = ^{
            @synchronized (selfValues) {
                BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
                BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
                if (selfEmpty || otherEmpty) [subscriber sendCompleted];
            }
        };
        void (^sendNext)(void) = ^{
            @synchronized (selfValues) {
                if (selfValues.count == 0) return;
                if (otherValues.count == 0) return;
                RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
                [selfValues removeObjectAtIndex:0];
                [otherValues removeObjectAtIndex:0];
                [subscriber sendNext:tuple];
                sendCompletedIfNecessary();
            }
        };
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            @synchronized (selfValues) {
                [selfValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {
                selfCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];
        RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
            @synchronized (selfValues) {
                [otherValues addObject:x ?: RACTupleNil.tupleNil];
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (selfValues) {
                otherCompleted = YES;
                sendCompletedIfNecessary();
            }
        }];
        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            [otherDisposable dispose];
        }];
    }];
}

 

 combineLatest: 打包兩個信號的輸出值,任何一個信號輸出變化,都會取出最近兩個信號的輸出值。例如a信號輸出[1,2,3,4,5],b信號輸出[6,7,8,9],那么zip后輸出為[[1,6],[2,6],[2,7],[3,7],[3,8],[4,8][4,9],[5,9]].(這里可能會有更多的輸出)

- (RACSignal *)combineLatestWith:(RACSignal *)signal {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block id lastSelfValue = nil;
        __block BOOL selfCompleted = NO;

        __block id lastOtherValue = nil;
        __block BOOL otherCompleted = NO;
        void (^sendNext)(void) = ^{
            if (lastSelfValue == nil || lastOtherValue == nil) return;
            [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
        };
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            lastSelfValue = x ?: RACTupleNil.tupleNil;
            sendNext();
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            selfCompleted = YES;
            if (otherCompleted) [subscriber sendCompleted];
        }];
        [signal subscribeNext:^(id x) {
            lastOtherValue = x ?: RACTupleNil.tupleNil;
            sendNext();
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            otherCompleted = YES;
            if (selfCompleted) [subscriber sendCompleted];
        }];
        return nil;
    }];
}

 

if:then:else : 如果if信號返回真,則輸出then的信號值,如果if信號返回非真,則輸出else的信號值。這里的then和else的信號值隨着if的信號值切換會重復輸出

+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
    return [[boolSignal
        map:^(NSNumber *value) {
            return (value.boolValue ? trueSignal : falseSignal);// 這里返回后變成了高階信號
        }]
        switchToLatest];
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM