2

RACSignal常用方法深入分析(1)

 2 years ago
source link: https://chipengliu.github.io/2019/01/20/RACSignal-Operations/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

ReactiveCocoa 框架 RACSignal+Operations.h 定义了 RACSignal 常规操作方法,接下来对一些常用的方法进行分析并解析其作用。

doNext

测试代码:

- (void)testDoNext {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"source signal"];

[subscriber sendCompleted];
return nil;
}];

[[sourceSignal doNext:^(id _Nullable x) {
NSLog(@"sourceSignal doNext will execute before sendNext");
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
sourceSignal doNext will execute before sendNext
value = source signal

底层实现:

- (RACSignal *)doNext:(void (^)(id x))block {
NSCParameterAssert(block != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
block(x);
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doNext:", self.name];
}

doNext 传入block 闭包,该闭包的参数就是原信号发送的值,当给订阅者发送 sendNext 之前会执行 block 闭包

类似的 doError,doCompleted 也是在给订阅者发送事件之前就执行相关 block

throttle:valuesPassingTest:

测试代码:

- (void)testThrottleValuesPassingTest {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{

[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *throttleSignal = [sourceSignal throttle:1 valuesPassingTest:^BOOL(id _Nullable next) {
return [next integerValue] <= 2;
}];

[throttleSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 1
value = 3

throttle:valuesPassingTest: 方法有2个参数:

  • 时间间隔 interval
  • 判断条件闭包 predicate

该方法大概作用是:从原信号发出第一个信号(@0)发出开始计时,在 interval 秒内如果第二个信号(@1) 符合 predicate 的判断条件,则该前一个信号会被忽略;如果2个信号间隔时间超过 interval 秒或者不满足 predicate 判断,则前一个信号会发给订阅者。

底层实现:

- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
NSCParameterAssert(interval >= 0);
NSCParameterAssert(predicate != nil);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

// We may never use this scheduler, but we need to set it up ahead of
// time so that our scheduled blocks are run serially if we do.
RACScheduler *scheduler = [RACScheduler scheduler];

// Information about any currently-buffered `next` event.
__block id nextValue = nil;
__block BOOL hasNextValue = NO;
RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];

void (^flushNext)(BOOL send) = ^(BOOL send) {
@synchronized (compoundDisposable) {
[nextDisposable.disposable dispose];

if (!hasNextValue) return;
if (send) [subscriber sendNext:nextValue];

nextValue = nil;
hasNextValue = NO;
}
};

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
BOOL shouldThrottle = predicate(x);

@synchronized (compoundDisposable) {
flushNext(NO);
if (!shouldThrottle) {
[subscriber sendNext:x];
return;
}

nextValue = x;
hasNextValue = YES;
nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
flushNext(YES);
}];
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
flushNext(YES);
[subscriber sendCompleted];
}];

[compoundDisposable addDisposable:subscriptionDisposable];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
}

throttle:valuesPassingTest: 内部主要通过判断 nextValuehasNextValue 2个变量的状态来判断是否给订阅者发送信号。内部先订阅原信号,然后触发订阅者的 didSubscribe,结合测试代码,具体流程:

  1. 收到原信号发出的信号 @0,传入到条件闭包 predicate,返回 YES
  2. RACCompoundDisposable 作为线程间互斥信号量,用 @synchronized 加锁保证 nextValuehasNextValue 操作是原子性
  3. 执行 flushNext(NO) ,把 nextDisposable 进行 dispose,delayScheduler 之前存放的延迟任务如果未被执行会被取消;hasNextValue == NO,直接 return,没有给订阅者发送0
  4. 判断 !shouldThrottle,跳过 if 内部代码,给 nextValue 和 hasNextValue 赋值,nextValue=@0,hasNextValue = YES
  5. 把 flushNext(YES) 加入到延迟队列中,1秒后执行
  6. 原信号发送 @1,此时时间间隔不到 1秒,从步骤3开始重复上述步骤;flushNext(NO) 中 延迟任务被取消, nextValuehasNextValue 会赋值为对应 零值。shouldThrottle 符合,nextValuehasNextValue 又被赋值到 @1 和 YES,保存新的 flushNext(YES) 到延迟任务队列中
  7. 1秒后还没有收到原信号发送的信号,执行步骤6保存的 flushNext(YES),把 @1 发给订阅者;2秒后收到原信号的信号 @2,从步骤3开始重复上述步骤,@2会被保存到 hasNextValue 中,等待下一次延迟任务可以触发的时候发给订阅者,反之被忽略
  8. 步骤7完之后马上收到新的信号@3,从步骤3开始重复上述步骤,步骤7保存的 flushNext(YES) 被取消,因为此时不符合 predicate 判断条件,@3发给订阅者

delay:

测试代码:

- (void)testDelay {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal delay:1] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

NSLog(@"%@", [NSDate date]);
}
2019-01-31 09:34:33.286371+0800 AppTest[38331:4774309] Thu Jan 31 09:34:33 2019
2019-01-31 09:34:34.380736+0800 AppTest[38331:4774309] value = 0
2019-01-31 09:34:34.380881+0800 AppTest[38331:4774309] value = 1
2019-01-31 09:34:34.380986+0800 AppTest[38331:4774309] value = 2
2019-01-31 09:34:34.381076+0800 AppTest[38331:4774309] value = 3

delay: 方法是将原信号的信号事件延迟发送给订阅者

底层实现:

- (RACSignal *)delay:(NSTimeInterval)interval {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

// We may never use this scheduler, but we need to set it up ahead of
// time so that our scheduled blocks are run serially if we do.
RACScheduler *scheduler = [RACScheduler scheduler];

void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
[disposable addDisposable:schedulerDisposable];
};

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
schedule(^{
[subscriber sendNext:x];
});
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
schedule(^{
[subscriber sendCompleted];
});
}];

[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
}

delay: 内部首先先定义 schedule ,其参数是 dispatch_block_t,执行 schedule 的时候,会把参数 block 放到 RACScheduler 延迟 interval 秒触发。

- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block {
return [self after:[NSDate dateWithTimeIntervalSinceNow:delay] schedule:block];
}

- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);

RACDisposable *disposable = [[RACDisposable alloc] init];

dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});

return disposable;
}

根据测试代码,最终会调用 RACQueueScheduler 中的 -after:schedule:,在该方法里面,通过 dispatch_after 来触发入参 block,触发先前会先判断该任务是否被 disposed ,如果是则直接 return。

归纳来说,delay: 方法就是将原信号的 sendNext 和 sendCompleted 事件延迟 interval 秒发送给订阅者。

bufferWithTime:onScheduler:

测试代码:

- (void)testBufferWithTime {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];

RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]];
[bufferSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = <RACTuple: 0x600000003200> (
0,
1,
2,
3
)

底层实现:

- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
NSMutableArray *values = [NSMutableArray array];

void (^flushValues)() = ^{
@synchronized (values) {
[timerDisposable.disposable dispose];

if (values.count == 0) return;

RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
[values removeAllObjects];
[subscriber sendNext:tuple];
}
};

RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (values) {
if (values.count == 0) {
timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
}

[values addObject:x ?: RACTupleNil.tupleNil];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
flushValues();
[subscriber sendCompleted];
}];

return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[timerDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

同样 bufferWithTime:onScheduler: 内部还会创建新的信号,当新的信号被订阅的时候会创建可变数组 values 。然后定义闭包 flushValues,当收到原信号的 sendNext 和 sendCompleted 的时候会触发该闭包。

之后对原信号进行订阅,首先收到原信号的 sendNext 事件的时候,先判断 values 的元素个数,如果个数为 0,则把 flushValues 延迟 interval 秒触发。然后把信号值加入 values 数组中,如果信号值为空,则把 RACTupleNil.tupleNil 加入数组中。

因为测试代码中,@0、@1、@2、@3 四个信号是连续串行发出,所以之前被加入延迟队列中执行的 flushValues 还没有触发的时候,当原信号发送 sendCompleted 的时候,flushValues 会被触发,这时候 values 四个元素被一次性包装成 RACTuple 发送给订阅者。

如果把测试代码修改成:

- (void)testBufferWithTime2 {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendNext:@3];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendCompleted];
});
});
return nil;
}];

RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]];
[bufferSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = <RACTuple: 0x604000204300> (
0,
1
)
value = <RACTuple: 0x600000019300> (
2,
3
)

订阅者此时会受到2个信号,都是 RACTuple 类型,回到之前的实现代码,

  1. 收到 @0,flushValues() 加入延迟队列1秒之后执行,@0 再加入到数组 values 中
  2. 步骤1之后马上收到 @1(1秒内),重复步骤1,这时候延迟队列中有2个任务
  3. 1秒后,第一个加入延迟队列的 flushValues() 执行,首先执行了 [timerDisposable.disposable dispose]; ,这样会将延迟队列中下一个 flushValues() 任务被取消(步骤2加入的)。然后 values 元素包装成 RACTuple 发送给订阅者。
  4. 基于步骤3,再过一秒,也就是从一开始算,2秒过后。收到@2,然后重复步骤,逻辑相似
  5. 最后再过2秒,原信号发送 sendCompleted,执行 flushValues() ,values 元素个数为0,直接返回。

总结:bufferWithTime:onScheduler: 作用是将规定时间内将原信号所发送的全部信号包装成 RACTupe 发送给订阅者

timeout:onScheduler:

测试代码:

- (void)testTimeout {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@3];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *timeoutSig = [sourceSignal timeout:1 onScheduler:[RACScheduler currentScheduler]];
[timeoutSig subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

[timeoutSig subscribeError:^(NSError * _Nullable error) {
NSLog(@"error = %@", error);
}];
}
value = 0
value = 1
value = 2
error = Error Domain=RACSignalErrorDomain Code=1 "(null)"

底层实现:

- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
[disposable dispose];
[subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
}];

[disposable addDisposable:timeoutDisposable];

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[disposable dispose];
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];

[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

timeout:onScheduler: 判断从被订阅开始计时, interval 时间内如果原信号没有把所有信号发送完毕会给订阅者发送 error。

首先创建新的信号,新信号被订阅的时候在延迟队列里面加入任务, interval 后该任务会将 NSError 发送给订阅者。如果在 interval 内原型号发送了 sendCompleted/sendError,会执行 [disposable dispose];,延迟队列中的任务会被取消。同样如果延迟任务被触发,原信号的订阅也被终止。

测试代码:

/* 代码1 */

- (void)testMap {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];

[subscriber sendCompleted];
return nil;
}];

[[sourceSignal map:^id _Nullable(id _Nullable value) {
// BLOCK 1
return @([value integerValue] + 1);
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 2
value = 3
value = 4

底层实现:

/* 代码2 */

- (__kindof RACStream *)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);

Class class = self.class;

return [[self flattenMap:^(id value) {
// BLOCK 2
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}

map 的参数一个参数类型为id,返回值类型也为id的block,内部实现首先通过断言判断 block 是否为空,然后调用 flattenMap 方法

/* 代码3 */

- (__kindof RACStream *)flattenMap:(__kindof RACStream * (^)(id value))block {
Class class = self.class;

return [[self bind:^{
return ^(id value, BOOL *stop) {
// BLOCK 3

id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

flattenMap 是通过封装 bind 方法实现,bind的入参是 RACStreamBindBlock 类型闭包,flattenMap 的入参是一个入参类型为 id,返回值为 RACStream 的闭包

在 flattenMap 中,先判断 block(value) 返回的信号是否为nil,若是则返对应class 的empty 信号,也就是 RACEmptySignal 对象

+ (RACSignal *)empty {
#ifdef DEBUG
// Create multiple instances of this class in DEBUG so users can set custom
// names on each.
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;

dispatch_once(&pred, ^{
singleton = [[self alloc] init];
});

return singleton;
#endif
}

从实现代码上来看,RACEmptySignal 是以单例对象的形式返回

RACEmptySignal.m 文件里还重写了 -subscribe:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);

return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}

RACEmptySignal 信号被订阅之后会马上给订阅者发送 sendCompleted 事件

回到 flattenMap 的实现逻辑中,如果 block(value) 返回不是nil,它是如何返回 RACStream 对象的呢?

执行 block(value) 实际上就是触发 代码2 中的 BLOCK 2

BLOCK 2 中,又会触发另个 block(value),这时候会触发代码1中的 BLOCK 1,这里 block(value) 会返回相关对象(测试代码中返回NSNumber),BLOCK 2 中将 NSNumber 包装成 RACReturnSignal 返回,此时在 BLOCK 3 stream 对象就是 RACReturnSignal。

最后通过 bind 函数的变换,订阅会收到变换过后的值。

image-20190126201245265

mapReplace

测试代码:

- (void)testMapReplace {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];

[subscriber sendCompleted];
return nil;
}];

[[sourceSignal mapReplace:@(10)] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 10
value = 10
value = 10

底层实现:

- (__kindof RACStream *)mapReplace:(id)object {
return [[self map:^(id _) {
return object;
}] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, RACDescription(object)];
}

mapReplace 是通过封装 map 方法实现,把原信号每一个事件都变换成参数 object 传递给订阅者

reduceEach

测试代码:

- (void)testReduceEach {
RACSignal *signal1 = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:RACTuplePack(@1,@2)];
[subscriber sendNext:RACTuplePack(@3,@4)];
[subscriber sendNext:RACTuplePack(@5,@6)];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal1 dispose");
}];
}];

RACSignal *signal2 = [signal1 reduceEach:^id (NSNumber *num1 , NSNumber *num2){
return @([num1 intValue] + [num2 intValue]);
}];

[signal2 subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 3
value = 7
value = 11
signal1 dispose

从输出结果看出来,reduceEach 将原信号 signal1 发送的 RACTuple 数据进行解包聚合发送给订阅者

底层实现:

/* 代码4 */

- (__kindof RACStream *)reduceEach:(RACReduceBlock)reduceBlock {
NSCParameterAssert(reduceBlock != nil);

__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t) {
// BLOCK 1
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}

reduceEach 也是通过封装 map 方法实现。reduceEach 方法的入参是 RACReduceBlock 类型的闭包

typedef id _Nonnull (^RACReduceBlock)();

首先通过断言判断 reduceBlock 闭包是否为nil,然后调用 map 方法,map 方法的入参也就是 BLOCK 1,在 BLOCK 1 里会先判断原信号 signal 发送的数据 t 是否为 RACTuple 类型,然后返回 RACBlockTrampoline 类型对象

@interface RACBlockTrampoline ()
@property (nonatomic, readonly, copy) id block;
@end

RACBlockTrampoline 内部会保存一个 block 对象,然后根据传进来的参数,动态的构造一个 NSInvocation,通过执行 NSInvocation 返回需要的数据。

- (id)invokeWithArguments:(RACTuple *)arguments {
//
SEL selector = [self selectorForArgumentCount:arguments.count];
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];
invocation.selector = selector;
invocation.target = self;

for (NSUInteger i = 0; i < arguments.count; i++) {
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
[invocation setArgument:&arg atIndex:argIndex];
}

[invocation invoke];

__unsafe_unretained id returnVal;
[invocation getReturnValue:&returnVal];
return returnVal;
}

- (SEL)selectorForArgumentCount:(NSUInteger)count {
NSCParameterAssert(count > 0);

switch (count) {
case 0: return NULL;
case 1: return @selector(performWith:);
case 2: return @selector(performWith::);
case 3: return @selector(performWith:::);
case 4: return @selector(performWith::::);
case 5: return @selector(performWith:::::);
case 6: return @selector(performWith::::::);
case 7: return @selector(performWith:::::::);
case 8: return @selector(performWith::::::::);
case 9: return @selector(performWith:::::::::);
case 10: return @selector(performWith::::::::::);
case 11: return @selector(performWith:::::::::::);
case 12: return @selector(performWith::::::::::::);
case 13: return @selector(performWith:::::::::::::);
case 14: return @selector(performWith::::::::::::::);
case 15: return @selector(performWith:::::::::::::::);
}

NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");
return NULL;
}
- (id)performWith:(id)obj1 :(id)obj2 {
id (^block)(id, id) = self.block;
return block(obj1, obj2);
}

...

-invokeWithArguments 中首先通过判断 RACTuple 元素数量选择对应的 selector,最大能支持 15 个元素。

确定好 NSInvocation 的 target 和 seletor,还需要设置参数

for (NSUInteger i = 0; i < arguments.count; i++) {
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
[invocation setArgument:&arg atIndex:argIndex];
}

这里看到设置 Type Encodings 的时候是从偏移量为 2开始,这是因为偏移量0和 偏移量1的参数分别对应着隐藏参数self 和 _cmd。

构造好 invocation 之后,执行 [invocation invoke] 调用动态方法,实际上就是执行代码4中的入参 reduceBlock 闭包,最后通过 [invocation getReturnValue:&returnVal] 拿到闭包的返回值,也就是 RACBlockTrampoline 的最终返回值,最终成为 map 闭包里面的返回值。剩下的就是 map 函数流程。

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:RACTuplePack(@NO,@YES,@NO)];
[subscriber sendNext:RACTuplePack(@YES,@NO)];

[subscriber sendCompleted];
return nil;
}];

RACSignal *orSignal = [sourceSignal or];
[orSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
value = 1
value = 1
- (RACSignal *)or {
return [[self map:^(RACTuple *tuple) {
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");

return @([tuple.rac_sequence any:^(NSNumber *number) {
/// anyBlock
NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);

return number.boolValue;
}]);
}] setNameWithFormat:@"[%@] -or", self.name];
}

这里主要是将 RACTuple 转换成 RACTupleSequence,因为 RACTupleSequence 是继承 RACSequence,这里就会调用 RACSequence 的 - (BOOL)any:(BOOL (^)(id))block 方法

//  RACSequence.m

- (BOOL)any:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);

/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
return [self objectPassingTest:block] != nil;
}
//  RACSequence.m

- (id)objectPassingTest:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);

/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
return [self filter:block].head;
}

- (BOOL)any:(BOOL (^)(id))block 实现里面最终会执行 RACStream 中的 filter 方法

- (__kindof RACStream *)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);

Class class = self.class;

return [[self flattenMap:^ id (id value) {
// flattenMapBlock

/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}

在 flattenMapBlock 中会以 RACTupleSequence 发出的信号值传入到 block中,这些值信号值也就是一开始测试代码中 sourceSignal 发送给订阅者的 RACTuple 中的元素,如果value对应的BOOL值是YES,就转换成一个 RACTupleSequence 信号。如果对应的是NO,则转换成一个empty信号。

上面的 flattenMap 方法实际上最终调用到 RACSequence 的 -bind 方法

/*
block = ^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

return stream;
};

bindBlock = ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

return stream;
};
*/

- (RACSequence *)bind:(RACSequenceBindBlock (^)(void))block {
RACSequenceBindBlock bindBlock = block();
return [[self bind:bindBlock passingThroughValuesFromSequence:nil] setNameWithFormat:@"[%@] -bind:", self.name];
}

然后继续调用 - (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence 这里参数 passthroughSequence 为 nil

- (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {

__block RACSequence *valuesSeq = self;
__block RACSequence *current = passthroughSequence;
__block BOOL stop = NO;

RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id {
/// dependencyBlock

while (current.head == nil) {
if (stop) return nil;

// We've exhausted the current sequence, create a sequence from the
// next value.
id value = valuesSeq.head;

if (value == nil) {
// We've exhausted all the sequences.
stop = YES;
return nil;
}

current = (id)bindBlock(value, &stop);
if (current == nil) {
stop = YES;
return nil;
}

valuesSeq = valuesSeq.tail;
}

NSCAssert([current isKindOfClass:RACSequence.class], @"-bind: block returned an object that is not a sequence: %@", current);
return nil;
} headBlock:^(id _) {
return current.head;
} tailBlock:^ id (id _) {
if (stop) return nil;

return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail];
}];

sequence.name = self.name;
return sequence;
}

上面代码的关键逻辑是调用 RACDynamicSequence 类的 sequenceWithLazyDependency 方法,dependencyBlock 会在之前提及到的函数 -objectPassingTest 中,执行 RACSequence 的 -head 方法中触发

在 方法,dependencyBlock 中,执行current = (id)bindBlock(value, &stop); 根据value的布尔值来产生新的信号,如果为 NO 则返回 RACEmptySequence 类型,此时 current.head 为 nil,进行下一次循环;如过 value 为YES,则返回 RACUnarySequence 类型,current.head 不为 nil,结束循环。

测试代码:

- (void)testAny {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];

[subscriber sendCompleted];
return nil;
}];

[[sourceSignal any:^BOOL(id _Nullable object) {
return [object integerValue] < 2;
}] subscribeNext:^(NSNumber * _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 1
value = 0
value = 0

底层实现:

- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
NSCParameterAssert(predicateBlock != NULL);

return [[[self materialize] bind:^{
return ^(RACEvent *event, BOOL *stop) {
if (event.finished) {
*stop = YES;
return [RACSignal return:@NO];
}

if (predicateBlock(event.value)) {
*stop = YES;
return [RACSignal return:@YES];
}

return [RACSignal empty];
};
}] setNameWithFormat:@"[%@] -any:", self.name];
}

首先原信号会 通过 -materialize 方法转换成 RACEvent 对象

- (RACSignal *)materialize {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[subscriber sendNext:[RACEvent eventWithValue:x]];
} error:^(NSError *error) {
[subscriber sendNext:[RACEvent eventWithError:error]];
[subscriber sendCompleted];
} completed:^{
[subscriber sendNext:RACEvent.completedEvent];
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -materialize", self.name];
}
- (BOOL)isFinished {
return self.eventType == RACEventTypeCompleted || self.eventType == RACEventTypeError;
}

从上面代码发现,RACEvent 如果收到原信号的 sendCompleted / sendError,finished 属性会置为 YES

-any 方法中,先判断 RACEvent 的 finished 属性,如果为 YES,stop接下来的信号 则返回 [RACSignal return:@NO];反之,则会根据入参 predicateBlock 闭包,将 RACEvent 的 value 传入 predicateBlock,如果返回值为YES,stop接下来的信号,则返回 [RACSignal return:@YES];如果 predicateBlock 返回值为 NO,则返回 [RACSignal empty]。

简单地总结来说,any 方法根据 predicateBlock 来对原信号的每一个信号进行判断,若遇到返回 YES 的条件,就给订阅者发送 YES 信号,然后发送 sendCompleted;若 predicateBlock 没有返回 YES 的条件,则最后给 订阅者 发送 NO 信号。

- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
NSCParameterAssert(predicateBlock != NULL);

return [[[self materialize] bind:^{
return ^(RACEvent *event, BOOL *stop) {
if (event.eventType == RACEventTypeCompleted) {
*stop = YES;
return [RACSignal return:@YES];
}

if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
*stop = YES;
return [RACSignal return:@NO];
}

return [RACSignal empty];
};
}] setNameWithFormat:@"[%@] -all:", self.name];
}

all 方法可以理解为 any 方法的对立面,原信号如果发送 sendError 或者 predicateBlock 返回为 NO,就会结束信号的传递,并会给订阅者发值为 NO 的信号。如果整个订阅过程中都没有出现错误以及都满足 predicateBlock 为真的条件,最后会在 RACEventTypeCompleted 的时候发送 YES。

repeat

测试代码:

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];

[subscriber sendCompleted];
return nil;
}];

RACSignal *orSignal = [sourceSignal repeat];
[orSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
value = 1
value = 1
value = 1
...

执行之后会不断地打印 value = 1

- (RACSignal *)repeat {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return subscribeForever(self,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
// Resubscribe.
});
}] setNameWithFormat:@"[%@] -repeat", self.name];
}

执行 -repeat 方法之后,内部会创建新的 RASignal,当新的信号被订阅的时候会执行 subscribeForever 方法

static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
next = [next copy];
error = [error copy];
completed = [completed copy];

RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[compoundDisposable addDisposable:selfDisposable];

__weak RACDisposable *weakSelfDisposable = selfDisposable;

RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
@autoreleasepool {
error(e, compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}

recurse();
} completed:^{
@autoreleasepool {
completed(compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}

recurse();
}];

[selfDisposable addDisposable:subscriptionDisposable];
};

// Subscribe once immediately, and then use recursive scheduling for any
// further resubscriptions.
recursiveBlock(^{
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];

RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
[compoundDisposable addDisposable:schedulingDisposable];
});

return compoundDisposable;
}

subscribeForever 方法有4个参数,分别是源信号 signal,next 闭包,error 闭包和 complete 闭包。函数内部先定义好递归的闭包:recursiveBlock,recursiveBlock 中 首先对 signal 进行订阅,如果源信号 signal 发送 sendError 或者 sendCompleted,就会执行对应的 error/complete 闭包,然后就执行 recursiveBlock 的参数闭包 recurse。

subscribeForever 最后是执行 recursiveBlock 并传入具体的 recurse。

recurse 中首先获取当前的递归调度器 recursiveScheduler,然后执行 -scheduleRecursiveBlock 方法

- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

[self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];
return disposable;
}

- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
@autoreleasepool {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[disposable addDisposable:selfDisposable];

__weak RACDisposable *weakSelfDisposable = selfDisposable;

// 最终调用 schedule 方法
RACDisposable *schedulingDisposable = [self schedule:^{
/// scheduleBlock

@autoreleasepool {
// At this point, we've been invoked, so our disposable is now useless.
[disposable removeDisposable:weakSelfDisposable];
}

if (disposable.disposed) return;

void (^reallyReschedule)(void) = ^{
if (disposable.disposed) return;
[self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
};

// Protects the variables below.
//
// This doesn't actually need to be __block qualified, but Clang
// complains otherwise. :C
__block NSLock *lock = [[NSLock alloc] init];
lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];

__block NSUInteger rescheduleCount = 0;

// Set to YES once synchronous execution has finished. Further
// rescheduling should occur immediately (rather than being
// flattened).
__block BOOL rescheduleImmediately = NO;

@autoreleasepool {
recursiveBlock(^{
[lock lock];
BOOL immediate = rescheduleImmediately;
if (!immediate) ++rescheduleCount;
[lock unlock];

if (immediate) reallyReschedule();
});
}

[lock lock];
NSUInteger synchronousCount = rescheduleCount;
rescheduleImmediately = YES;
[lock unlock];

for (NSUInteger i = 0; i < synchronousCount; i++) {
reallyReschedule();
}
}];

[selfDisposable addDisposable:schedulingDisposable];
}
}

/// RACTargetQueueScheduler.m
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);

RACDisposable *disposable = [[RACDisposable alloc] init];

dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});

return disposable;
}

可以看到会调用到自己的 -schedule 方法,这里测试代码中是在主线程,获取到对应的是 RACTargetQueueScheduler 。这里的 -schedule 方法先判断原信号有没有disposed,若果没有,则把参数 block 放在对应的队列中触发。

可以看得到上面函数中 scheduleBlock 里不断递归执行 [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable],recursiveBlock 不断被触发,对应的 subscribeForever 中的 recurse() 循环执行:

scheduleRecursiveBlock->recursiveBlock->recurse()->reallyReschedule()->scheduleRecursiveBlock

也就是说源信号会循环被订阅触发其给订阅者发送 sendNext 事件,直到源信号发送 error 才结束。

retry:

测试代码:

- (void)testRetry {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];

[subscriber sendError:[NSError errorWithDomain:@"domain" code:-1 userInfo:nil]];

[subscriber sendNext:@3];

[subscriber sendCompleted];
return nil;
}];

RACSignal *retrySignal = [sourceSignal retry:2];
[retrySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 1
value = 2
value = 1
value = 2
value = 1
value = 2
- (RACSignal *)retry:(NSInteger)retryCount {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block NSInteger currentRetryCount = 0;
return subscribeForever(self,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
// 当原始信号发送 sendError 时
if (retryCount == 0 || currentRetryCount < retryCount) {
// Resubscribe.
currentRetryCount++;
return;
}

[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
// 当原信号发送 sendCompleted
[disposable dispose];
[subscriber sendCompleted];
});
}] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
}

-retry: 实现与 -repeat 实现类似,基于 subscribeForever-retry: 是内部维护一个 currentRetryCount 变量,当原始信号发送 sendError 时判断重试次数 currentRetryCount 是否小于 retryCount,若是则重试,如果重试依旧收到 sendError,超过 retryCount 之后就会停止重试。

如果原信号没有发生错误,那么原信号在发送结束,当原信号发送 sendCompleted,subscribeForever 也就接受了,所以 -retry: 操作对于没有任何error的信号 和 直接订阅原信号表现一样。

将测试代码改为:

- (void)testRetryNoError {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];

RACSignal *retrySignal = [sourceSignal retry:2];
[retrySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 1
value = 2

原信号发送 sendCompleted ,整个订阅流程就结束了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK