2

RACSignal常用方法深入分析(终)

 2 years ago
source link: https://chipengliu.github.io/2019/03/04/RACSignal-Operations-4/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本篇文章接着上篇继续分析常用的 RACSignal 方法的第四部分进行分析。

switchToLatest

测试代码:

- (void)testSwitchToLast {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@3];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendCompleted];
});
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@6];
[subscriber sendCompleted];
return nil;
}];

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:signal0];
[subscriber sendNext:signal1];
[subscriber sendNext:signal2];
[subscriber sendCompleted];
return nil;
}];

[[sourceSignal switchToLatest] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 0
value = 1
value = 3
value = 6

底层实现:

- (RACSignal *)switchToLatest {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACMulticastConnection *connection = [self publish];

RACDisposable *subscriptionDisposable = [[connection.signal
flattenMap:^(RACSignal *x) {
NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);

// -concat:[RACSignal never] prevents completion of the receiver from
// prematurely terminating the inner signal.
return [x takeUntil:[connection.signal concat:[RACSignal never]]];
}]
subscribe:subscriber];

RACDisposable *connectionDisposable = [connection connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[connectionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -switchToLatest", self.name];
}
  1. -switchToLatest 内部原信号调用 -publish 转化为热信号,返回的 connection 会持有热信号,也就是其 signal 属性
  2. 对热信号执行 -flattenMap: 方法,通过断言判断热信号发送的信号值是否也是 RACSignal,也就是说 -switchToLatest 方法是处理高阶信号的
  3. flattenMap 内部会对热信号执行 concat: 方法,这里的传入参数为 [RACSignal never],RACSignal x 执行 takeUntil:, 作用是为了防止热信号发送的 RACSignal 过早结束导致整个订阅都被结束

image-20190301161002720

switch: cases: default:

测试代码:

- (void)testswitchCasesDefault {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"00"];
return nil;
}];

RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"11"];
return nil;
}];

RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"22"];
[subscriber sendCompleted];
return nil;
}];

RACSignal *defaultSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1024];
[subscriber sendCompleted];
return nil;
}];

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"0"];
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendCompleted];
return nil;
}];

NSDictionary *dict = @{@"0" : signal0,
@"1" : signal1,
@"2" : signal2
};

[[RACSignal switch:sourceSignal cases:dict default:defaultSignal] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
value = 00
value = 11
value = 22

底层实现:

+ (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
NSCParameterAssert(signal != nil);
NSCParameterAssert(cases != nil);

for (id key in cases) {
id value __attribute__((unused)) = cases[key];
NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
}

NSDictionary *copy = [cases copy];

return [[[signal
map:^(id key) {
if (key == nil) key = RACTupleNil.tupleNil;

RACSignal *signal = copy[key] ?: defaultSignal;
if (signal == nil) {
NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
}

return signal;
}]
switchToLatest]
setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
}
  1. -switch:case:default: 首先对参数 signal、case 做非空判断,然后遍历字典 case 并判断每一个 value 是否为 RACSignal 对象
  2. 订阅原信号进行 map 操作,根据 key 来从 case 里面取出对应的 RACSignal 并返回,如果取出来的是 nil,则取 defaultSignal,若 defaultSignal 为nil,则返回 error signal,这样原信号就被转化高阶信号,然后再进行 switchToLatest 操作,把最终的信号值发给订阅者

if: then: else:

测试代码:

- (void)testIfThenElse {
RACSignal *signalTrue = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"true"];
return nil;
}];

RACSignal *signalFalse = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"false"];
return nil;
}];

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@(NO)];
[subscriber sendNext:@(YES)];
[subscriber sendCompleted];
return nil;
}];

[[RACSignal if:sourceSignal then:signalTrue else:signalFalse] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];;
}
value = false
value = true

底层实现:

+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
NSCParameterAssert(boolSignal != nil);
NSCParameterAssert(trueSignal != nil);
NSCParameterAssert(falseSignal != nil);

return [[[boolSignal
map:^(NSNumber *value) {
NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);

return (value.boolValue ? trueSignal : falseSignal);
}]
switchToLatest]
setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
}

+if: then: else:+switch: cases: default: 原理类似,把原信号发送的布尔值通过 map 操作转化对应的 RACSignal 返回。YES->trueSignal, NO->falseSignal,原信号被转化成高阶信号,再执行 switchToLatest ,后面逻辑与 +switch: cases: default: 一样

catch:

测试代码:

- (void)testCatch {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSError *error = [NSError errorWithDomain:@"" code:-1 userInfo:nil];
[subscriber sendError:error];
return nil;
}];

RACSignal *catchSignal = [sourceSignal catch:^RACSignal * _Nonnull(NSError * _Nonnull error) {
NSLog(@"excute catch block, error = %@", error);
return [RACSignal return:@"error text"];
}];

[catchSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
excute catch block, error = Error Domain= Code=-1 "(null)"
value = error text

底层实现:

- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
NSCParameterAssert(catchBlock != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
RACSignal *signal = catchBlock(error);
NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
catchDisposable.disposable = [signal subscribe:subscriber];
} completed:^{
[subscriber sendCompleted];
}];

return [RACDisposable disposableWithBlock:^{
[catchDisposable dispose];
[subscriptionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -catch:", self.name];
}

当原信号发送 sendError 的时候把 error 传入 catchBlock 闭包并返回 RACSignal,通过断言判断返回的 signal 是否为空,然后再订阅 signal,把 signal 的信号发给最终的订阅者

测试代码:

- (void)testTry {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"source signal dispose");
}];
}];

RACSignal *trySignal = [sourceSignal try:^BOOL(id _Nullable value, NSError * _Nullable __autoreleasing * _Nullable errorPtr) {
NSInteger i = [value integerValue];
if (i > 2) {
*errorPtr = [NSError errorWithDomain:@"" code:-1 userInfo:nil];
return NO;
}

return YES;
}];

[trySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];

[trySignal subscribeError:^(NSError * _Nullable error) {
NSLog(@"error = %@", error);
}];
}
value = 0
value = 1
value = 2
source signal dispose
error = Error Domain= Code=-1 "(null)"
source signal dispose

底层实现:

- (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
NSCParameterAssert(tryBlock != NULL);

return [[self flattenMap:^(id value) {
NSError *error = nil;
BOOL passed = tryBlock(value, &error);
return (passed ? [RACSignal return:value] : [RACSignal error:error]);
}] setNameWithFormat:@"[%@] -try:", self.name];
}
  1. tryBlock 根据原信号发送的信号值返回 passed 和 error(error 是以指针地址的形式传入,tryBlock 内部对其进行赋值)
  2. 如果 passed 为 YES,则将原信号发送的 value 包装成 RACSignal;若 passed 为 NO,则返回 RACErrorSignal ,通过这2包装,原信号会被包装成一个高阶信号

firstOrDefault: success: error:

测试代码:

- (void)testFirstOrDefaultSuccessError {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendCompleted];
});
return nil;
}];

BOOL success;
NSError *error;
id value = [sourceSignal firstOrDefault:@10 success:&success error:&error];

NSLog(@"value = %@|thread=%@", value, [NSThread currentThread]);
}
value = 0|thread=<NSThread: 0x608000075600>{number = 1, name = main}

底层实现:

- (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
NSCondition *condition = [[NSCondition alloc] init];
condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];

__block id value = defaultValue;
__block BOOL done = NO;

// Ensures that we don't pass values across thread boundaries by reference.
__block NSError *localError;
__block BOOL localSuccess;

[[self take:1] subscribeNext:^(id x) {
[condition lock];

value = x;
localSuccess = YES;

done = YES;
[condition broadcast];
[condition unlock];
} error:^(NSError *e) {
[condition lock];

if (!done) {
localSuccess = NO;
localError = e;

done = YES;
[condition broadcast];
}

[condition unlock];
} completed:^{
[condition lock];

localSuccess = YES;

done = YES;
[condition broadcast];
[condition unlock];
}];

[condition lock];
while (!done) {
[condition wait];
}

if (success != NULL) *success = localSuccess;
if (error != NULL) *error = localError;

[condition unlock];
return value;
}
  1. 定义好状态相关的变量
    • value: 记录原信号发送的值
    • done:判断原信号是否发送了 sendCompleted / sendError / sendNext
    • localError:记录原信号发送的 NSError
    • localSuccess:记录原信号是否发送过 sendCompleted / sendNext
    • condition :同步用的锁
  2. 订阅原信号并通过 take: 方法只获取第一个信号值
    • sendNext:condition 先加锁,记录 value,如果为nil,则保持为 defaultValue,condition 调用 broadcast 通知所有等待的线程
    • sendCompleted:和 sendNext 类似
    • sendError:如果之前没有发送过 sendNext,则会保存 error,然后 condition 调用 broadcast 通知所有等待的线程
  3. 订阅完原信号之后,会马上对 condition 进行加锁,一直等待原信号发送信号值并会阻塞当前线程

-firstOrDefault: success: error: 用法有点类似 python 中 await,有点像协程的概念

要注意的是如果执行该方法的线程和原信号发送 sendCompleted / sendError / sendNext 是统一线程,很容易会造成死锁,因为函数内部会先对 condition 加锁一次(步骤3),然后收到原信号发送的信号值的时候幽会对 condition 加锁一次,这时候如果在统一线程会造成死锁

waitUntilCompleted:

测试代码:

- (void)testWaitUntilCompleted {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendCompleted];
});
return nil;
}];

NSError *error;
BOOL success = [sourceSignal waitUntilCompleted:&error];
NSLog(@"success = %@", @(success));
}
success = 1

底层实现:

- (BOOL)waitUntilCompleted:(NSError **)error {
BOOL success = NO;

[[[self
ignoreValues]
setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
firstOrDefault:nil success:&success error:error];

return success;
}
  1. 先对原信号执行 ignoreValues 方法,过滤掉所有 sendNext 事件
  2. 执行 firstOrDefault: success: error: ,若原信号发送 sendCompleted,则返回 YES,若发送 sendError 则返回 NO

defer:

测试代码:

- (void)testDefer {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];

[[RACSignal defer:^RACSignal * _Nonnull{
NSLog(@"execute defer block");
return sourceSignal;
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
execute defer block
value = 0

底层实现:

+ (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
NSCParameterAssert(block != NULL);

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [block() subscribe:subscriber];
}] setNameWithFormat:@"+defer:"];
}

+defer: 首先创建返回新的信号,然后执行 block,并订阅 block 返回的信号;+defer: 主要作用是延迟订阅,也就是在订阅前进行相关自定义的操作

initially:

测试代码:

- (void)testInitially {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];

RACSignal *initialSignal = [sourceSignal initially:^{
NSLog(@"execute initial block");
}];

[initialSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
execute initial block
value = 0

底层实现:

- (RACSignal *)initially:(void (^)(void))block {
NSCParameterAssert(block != NULL);

return [[RACSignal defer:^{
block();
return self;
}] setNameWithFormat:@"[%@] -initially:", self.name];
}

-initially: 是通过封装 defer 实现的,在 defer 的 block 中先执行block,在返回自己,也就是订阅原信号之前先进行 block 中的相关操作

deliverOn:

测试代码:

- (void)testDeliverOn {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendCompleted];
});
return nil;
}];

[[sourceSignal deliverOn:[RACScheduler mainThreadScheduler]] subscribeNext:^(id _Nullable x) {
NSLog(@"value = x|thread = %@", [NSThread currentThread]);
}];

[sourceSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = x|thread = %@", [NSThread currentThread]);
}];
}
value = 0|thread = <NSThread: 0x600000a77240>{number = 3, name = (null)}
value = 0|thread = <NSThread: 0x60c0000718c0>{number = 1, name = main}

底层实现:

- (RACSignal *)deliverOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[scheduler schedule:^{
[subscriber sendNext:x];
}];
} error:^(NSError *error) {
[scheduler schedule:^{
[subscriber sendError:error];
}];
} completed:^{
[scheduler schedule:^{
[subscriber sendCompleted];
}];
}];
}] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
}

-deliverOn: 当原信号发送 sendNext/sendError/sendCompleted 的时候,会根据传入的调度器 scheduler ,在对应的线程中把信号发给订阅者;换句话来说,就是通过传入 scheduler 指定对应线程接受信号

subscribeOn:

- (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

RACDisposable *schedulingDisposable = [scheduler schedule:^{
RACDisposable *subscriptionDisposable = [self subscribe:subscriber];

[disposable addDisposable:subscriptionDisposable];
}];

[disposable addDisposable:schedulingDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
}

-subscribeOn:-deliverOn: 类似,主要区别在于 -subscribeOn: 是指定订阅动作所在的线程,也就是执行 didSubscribe 闭包的线程被指定,但是 sendNext/sendError/sendCompleted 所在线程是不确定的

deliverOnMainThread

- (RACSignal *)deliverOnMainThread {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block volatile int32_t queueLength = 0;

void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
int32_t queued = OSAtomicIncrement32(&queueLength);
if (NSThread.isMainThread && queued == 1) {
block();
OSAtomicDecrement32(&queueLength);
} else {
dispatch_async(dispatch_get_main_queue(), ^{
block();
OSAtomicDecrement32(&queueLength);
});
}
};

return [self subscribeNext:^(id x) {
performOnMainThread(^{
[subscriber sendNext:x];
});
} error:^(NSError *error) {
performOnMainThread(^{
[subscriber sendError:error];
});
} completed:^{
performOnMainThread(^{
[subscriber sendCompleted];
});
}];
}] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
}

-deliverOnMainThread-deliverOn 作用类似,前者是将 sendNext/sendError/sendCompleted 都放在主线程上执行

OSAtomicIncrement32 方法是对某个值进行自增计算并且是线程安全,如果当前线程是主线程并且 OSAtomicIncrement32 函数返回值是 1,说明当前主线程没有待执行的 sendNext/sendError/sendCompleted 事件,这时候会直接运行block,反之则把任务放在主线程异步执行


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK