Hystrix:1.5.18,rxjava:1.2.0
没错Hystrix中的观察者模式实现使用的正是优秀的rxjava开源类库,我们跟着官网的HystrixCommandAsyncDemo案例代码来一步步走进这个优秀的开源库

开始演示

  1. 初始化Hystrix请求上下文HystrixRequestContext.initializeContext
  2. 观察模拟订单确认和信用卡支付的用户请求observeSimulatedUserRequestForOrderConfirmationAndCreditCardPayment,该方法返回一个可观察对象,也就是被观察者对象Observable
  3. 订阅可观察对象Observable.subscribe
  4. 实现订阅对象也就是观察者对象Subscriber

观察模拟订单确认和信用卡支付的用户请求

  1. 创建获取用户账户信息的命令GetUserAccountCommand
  2. 执行观察方法GetUserAccountCommand.observe
  3. 创建ReplaySubject(默认容量16),订阅封装了当前命令的可观察对象并返回
public Observable<R> observe() {
    // us a ReplaySubject to buffer the eagerly subscribed-to Observable
    ReplaySubject<R> subject = ReplaySubject.create();
    // eagerly kick off subscription
    final Subscription sourceSubscription = toObservable().subscribe(subject);
    // return the subject that can be subscribed to later while the execution has already started
    return subject.doOnUnsubscribe(new Action0() {
        @Override
        public void call() {
            sourceSubscription.unsubscribe();
        }
    });
}

创建可观察对象toObservable

  1. 封装当前命令并创建一个可观察对象toObservable
  2. 创建terminateCommandCleanup终止命令清理动作,更改当前命令状态为终止并做对应的处理
//doOnCompleted handler already did all of the SUCCESS work
//doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
final Action0 terminateCommandCleanup = new Action0() {
     @Override
     public void call() {
         if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
             handleCommandEnd(false); //用户代码还没有执行情况下的处理动作
         } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
             handleCommandEnd(true); //用户代码已经执行情况下的处理动作
         }
     }
 };
  1. 创建unsubscribeCommandCleanup取消订阅清理动作,同样类似于终止命令清理动作,分为用户代码执行与未执行两种情况的处理。标记当前命令状态为取消订阅并存储命令的延迟时间
  2. 创建applyHystrixSemantics动作,标记我们正在执行ExecutionHook钩子方法,如果钩子抛出一个异常,那么会快速失败并且没有fallback兜底动作。没有状态是不一致的。如果隔离级别使用了信号量,那么这里会有一个执行信号量executionSemaphore控制是否执行,信号量默认与类名称一一对应如果命令没有指定HystrixCommandKey的情况下
  3. 包装所有的子钩子动作wrapWithAllOnNextHooks,在调用当前执行钩子完成后调用通知所有的子钩子方法onComplete(已废弃,也就是最终会被Emit彻底替代)以及onEmit
  4. 将上面所有的Action动作以及Func动作包装为OnSubscribeDefer类型的Action动作后再封装为Observable可观察对象返回

订阅可观察对象subscribe

public final Subscription subscribe(final Observer<? super T> observer) {
    if (observer instanceof Subscriber) {
        return subscribe((Subscriber<? super T>)observer);
    }
    if (observer == null) {
        throw new NullPointerException("observer is null");
    }
    return subscribe(new ObserverSubscriber<T>(observer));
}
  1. 如果观察者observer(ReplaySubject)是Subscriber类型则直接订阅
  2. 如果观察者Observer不是Subscriber类型则封装为ObserverSubscriber类型后进行对可观察者订阅
  3. 启动订阅者subscriber(ObserverSubscriber).start,默认仅做了一层包装
  4. 如果订阅者不是SafeSubscriber类型则封装为SafeSubscriber类型
  5. 通过_onObservableStart_钩子执行调用,默认执行钩子为RxJavaObservableExecutionHookDefault.call,什么都不做直接返回入参的onSubscribe对象。
  6. 调用onSubscribe对象(toObservable方法中的Func0的匿名实现为入参创建的Observable对象)的call方法入参为订阅者subscriber(SafeSubscriber包装的ReplaySubscriber对象)
  7. 调用OnSubscribeDefer对象(toObservable方法中的Func0的匿名实现的Observable对象)call方法,调用observableFactory工厂方法获取可观察对象,observableFactory工厂实现即AbstractCommand中toObservable方法中的Func0的匿名实现
public static <T> Observable<T> defer(Func0<Observable<T>> observableFactory) {
    //入参设置为Observable对象的onSubscribe属性
    return create(new OnSubscribeDefer<T>(observableFactory));
}
// observableFactory工厂实现
return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
        ...
        Observable<R> hystrixObservable =
                Observable.defer(applyHystrixSemantics)
                        .map(wrapWithAllOnNextHooks);
        Observable<R> afterCache;
        // put in cache
        if (requestCacheEnabled && cacheKey != null) {
            // wrap it for caching
            ...
        } else {
            afterCache = hystrixObservable;
        }
        return afterCache
                .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                .doOnCompleted(fireOnCompletedHook);
    }
});
  1. 创建hystrixObservable对象,可观察者包装OnSubscribeDefer对象并且可观察者工厂为applyHystrixSemantics动作,外层再包装一层OnSubscribeMap对象,source源头对应上层的applyHystrixSemantics,transformer为wrapWithAllOnNextHooks
  2. 如果启用了请求缓存功能则优先从缓存中获取hystrixObservable对象,如果不存在则将上面创建的对象缓存并返回
  3. 如果没有启用请求缓存功能,则使用hystrixObservable对象
  4. Observable对象再次包装OnSubscribeDoOnEach对象,source为hystrixObservable,transformer为ActionObserver包装的terminateCommandCleanup,ActionObserver的onError与onCompleted均为terminateCommandCleanup动作
  5. Observable对象再次包装OnSubscribeLift对象,parent为onSubscribe对象(即上层包装OnSubscribeDoOnEach得到的Observable对象),operator为unsubscribeCommandCleanup动作
  6. Observable对象再次包装OnSubscribeDoOnEach对象,source为上层包装的Observable对象,transformer为ActionObserver包装的fireOnCompletedHook,ActionObserver的onError为空动作(链路上层已经有了onError处理,此处不需要重复声明),onCompleted为fireOnCompletedHook动作
//OnSubscribeDefer
@Override
public void call(final Subscriber<? super T> s) {
    Observable<? extends T> o;
    try {
        o = observableFactory.call();
    } catch (Throwable t) {
        Exceptions.throwOrReport(t, s);
        return;
    }
    o.unsafeSubscribe(Subscribers.wrap(s));
}
  1. 工厂返回可观察对象Observable,调用对象unsafeSubscribe方法,入参为Subscribers包装的Subject对象,包装的Subscriber对象什么都没有做直接调用原对象对应的方法

非安全订阅可观察对象unsafeSubscribe

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
        // new Subscriber so onStart it
        subscriber.onStart();
        // allow the hook to intercept and/or decorate
        RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
        ...
        return Subscriptions.unsubscribed();
    }
}
  1. toObservable的observableFactory返回的可观察者对象绑定的observableFactory工厂为applyHystrixSemantics
  2. 启动订阅者(SafeSubscriber)onStart,默认什么都不做
  3. 通过_onObservableStart_钩子执行调用,默认执行钩子为RxJavaObservableExecutionHookDefault.call,什么都不做直接返回入参的onSubscribe对象(toObservable创建返回的可观察者对象)。
  4. 调用OnSubscribeDefer对象(toObservable方法中的Func0的匿名实现的Observable对象)call方法,调用observableFactory工厂方法获取可观察对象,observableFactory工厂即applyHystrixSemantics
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    // 标记我们正执行在ExecutionHook钩子上
    // 如果这个钩子抛出一个异常,那么将发生一个快速失败并且没有fallback兜底。没有状态是不一致的
    executionHook.onStart(_cmd);
    /* 决定是否允许被执行 */
    if (circuitBreaker.attemptExecution()) {
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    executionSemaphore.release();
                }
            }
        };
        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };
        if (executionSemaphore.tryAcquire()) {
            try {
                /* 用于跟踪userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        //熔断后的兜底降级逻辑
        return handleShortCircuitViaFallback();
    }
}
  1. 执行executionHook钩子方法onStart方法,默认HystrixCommandExecutionHookDefault实现什么都不处理
  2. 熔断器判断是否允许被执行,如果没启用则是NoOpCircuitBreaker实现的熔断器类型,允许所有的请求通过。如果启用默认HystrixCircuitBreaker类型的实现HystrixCircuitBreakerImpl
public boolean attemptExecution() {
    //如果强制打开则熔断所有的请求,则请求全部走降级逻辑handleShortCircuitViaFallback
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    //如果强制关闭则允许所有的请求通过
    if (properties.circuitBreakerForceClosed().get()) {
        return true;
    }
    //没有触发熔断则通过
    if (circuitOpened.get() == -1) {
        return true;
    } else {
        //是否当前时间大于(熔断器打开时间+熔断器后多少时间允许重试配置的阈值)
        if (isAfterSleepWindow()) {
            //仅第一个请求在熔断器休眠窗口之后允许执行
            //if the executing command succeeds, the status will transition to CLOSED
            //if the executing command fails, the status will transition to OPEN
            //if the executing command gets unsubscribed, the status will transition to OPEN
            if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }
}
  1. 获取信号量getExecutionSemaphore,如果隔离模式采用信号量模式则返回默认实现TryableSemaphoreActual(允许通过请求最大值配置为executionIsolationSemaphoreMaxConcurrentRequests,降级对应的信号量配置为fallbackIsolationSemaphoreMaxConcurrentRequests);如果是线程池+队列模式则返回一个NoOp的信号量TryableSemaphoreNoOp。线程池+队列的模式相信大家比较熟悉,我们后面将在信号量的模式下继续梳理
  2. 创建单次开关的信号量释放动作singleSemaphoreRelease,semaphoreHasBeenReleased默认为false
  3. 创建抛出异常的标记动作markExceptionThrown
  4. 信号量申请TryableSemaphoreActual.tryAcquire。可以看到该方法是没有任何锁的,因为方法中所有的操作均是原子的,而且线程共享的变量也是被声明为final的Integer类型,保证了方法是线程安全的。是一个很典型的无锁并发。
static class TryableSemaphoreActual implements TryableSemaphore {
    protected final HystrixProperty<Integer> numberOfPermits;
    private final AtomicInteger count = new AtomicInteger(0);
...
    public boolean tryAcquire() {
        int currentCount = count.incrementAndGet();
        // numberOfPermits允许通过的最大请求数,声明为final方式
        if (currentCount > numberOfPermits.get()) {
            count.decrementAndGet();
            return false;
        } else {
            return true;
        }
    }
...
  1. 如果信号量申请失败则走信号量申请拒绝的兜底逻辑handleSemaphoreRejectionViaFallback
  2. 如果信号量申请成功,则继续执行命令,设置当前时间(用于统计命令各个环节的时间花费)
  3. 执行命令并返回可观察对象executeCommandAndObserve。
  4. 创建markEmits、markOnCompleted、handleFallback、setRequestContext(不是属性也是一个Action1动作)动作
  5. 使用特殊隔离模式执行命令executeCommandWithSpecifiedIsolation,创建一个可观察对象并返回
//省略线程池+队列隔离模式的代码
return Observable.defer(new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
        executionResult = executionResult.setExecutionOccurred();
        if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
            return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
        }

        metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
        // semaphore isolated
        // store the command that is being run
        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
        try {
            executionHook.onRunStart(_cmd);
            executionHook.onExecutionStart(_cmd);
            return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
        } catch (Throwable ex) {
            //If the above hooks throw, then use that as the result of the run method
            return Observable.error(ex);
        }
    }
});
  1. 存储即将执行的命令key,Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
  2. 执行前回调执行钩子executionHook方法回传cmd命令
  3. 返回执行可观察对象getUserExecutionObservable调用子类实现的getExecutionObservable方法
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}
  1. 返回用户的execution可观察对象,将userObservable包装一层OnSubscribeLift(ExecutionHookApplication)动作的可观察者,再次包装一层DeprecatedOnRunHookApplication已废弃。

  2. execution可观察对象执行doOnNext创建OnSubscribeDoOnEach对象onNext动作为markEmits,执行doOnCompleted创建OnSubscribeDoOnEach对象onComplete动作为markOnCompleted,执行onErrorResumeNext创建OperatorOnErrorResumeNextViaFunction对象resumeFunction动作为handleFallback,执行doOnEach创建OnSubscribeDoOnEach对象onNotification动作为setRequestContext。即将动作封装为可观察对象,并创建对应的Observer观察者等待可观察者对象的通知

  3. 返回的execution可观察对象同上一步类似逻辑,绑定doOnError动作markExceptionThrown,doOnTerminate动作singleSemaphoreRelease,doOnUnsubscribe动作singleSemaphoreRelease

  4. applyHystrixSemantics可观察工厂最终返回getExecutionObservable获得execution可观察对象,并绑定了一系列动作的可观察对象与观察者;最终封装图如下:image.png

  5. 继续执行OnSubscribeDefer对象的call方法,使用SafeSubscriber对象订阅execution可观察者对象unsafeSubscribe

  6. 执行execution可观察者对象工厂方法的call方法,即执行Observable.just(run())

  7. 执行用户命令逻辑run(),并将返回值封装为可观察者对象ScalarSynchronousObservable(同时创建JustOnSubscribe订阅者回调对象)返回。注意此处的命令执行是没有使用线程池的。

  8. 同样,使用SafeSubscriber对象订阅ScalarSynchronousObservable可观察者对象unsafeSubscribe,调用OnSubscribe对象call方法

  9. 调用JustOnSubscribe对象call方法

public void call(Subscriber<? super T> s) {
    s.setProducer(createProducer(s, value));
}
  1. 为订阅者SafeSubscriber设置生产者,根据订阅者、用户命令执行返回值创建生产者,STRONG_MODE(对应系统属性_rx.just.strong-mode配置,默认为false)。如果为true则创建_SingleProducer生产者,否则创建WeakSingleProducer(弱版本的_SingleProducer实现,通过使用简单的实例属性避免调用重入,所以该生产者的request方法是并发不安全的_)
  2. SafeSubscriber订阅者设置生产者调用父类Subscriber实现
public void setProducer(Producer p) {
    long toRequest;
    boolean passToSubscriber = false;
    synchronized (this) {
        toRequest = requested;
        producer = p;
        if (subscriber != null) {
            // middle operator ... we pass through unless a request has been made
            if (toRequest == NOT_SET) {
                // we pass through to the next producer as nothing has been requested
                passToSubscriber = true;
            }
        }
    }
    // do after releasing lock
    if (passToSubscriber) {
        subscriber.setProducer(producer);
    } else {
        // we execute the request with whatever has been requested (or Long.MAX_VALUE)
        if (toRequest == NOT_SET) {
            producer.request(Long.MAX_VALUE);
        } else {
            producer.request(toRequest);
        }
    }
}
  1. 如果subscriber不为空并且toRequest为默认值则调用实际订阅者的设置方法,即ObserverSubscriber;同样,调用设置生产者方法,此时subscriber为空,并且toRequest未设置NOT_SET,使用生产者发送请求入参为Long最大值

生产者WeakSingleProducer

  1. 弱单例生产者发送请求
public void request(long n) {
    if (once) {
        return;
    }
    if (n < 0L) {
        throw new IllegalStateException("n >= required but it was " + n);
    }
    if (n == 0L) {
        return;
    }
    once = true;
    Subscriber<? super T> a = actual;
    if (a.isUnsubscribed()) {
        return;
    }
    T v = value;
    try {
        a.onNext(v);
    } catch (Throwable e) {
        Exceptions.throwOrReport(e, a, v);
        return;
    }

    if (a.isUnsubscribed()) {
        return;
    }
    a.onCompleted();
}
  1. 回调实际订阅者SafeSubscriber.onNext方法传入返回值,SafeSubscriber直接回调下一个节点
  2. 回调ObserverSubscriber.onNext方法
  3. 回调观察者observer.onNext方法,即ReplaySubject
  4. 回调ReplayState.onNext方法
public void onNext(T t) {
    ReplayBuffer<T> b = buffer;

    b.next(t);
    for (ReplayProducer<T> rp : get()) {
        b.drain(rp);
    }
}
  1. 回调ReplayBuffer.next方法,即ReplayUnboundedBuffer,缓存至tailIndex
  2. 此时replay生产者为空,链路执行结束返回,返回用户执行结果的可观察对象Observable

生产者ReplayProducer

ReplayProducer是一个生产者与订阅者的实现。用于跟踪一个特殊指定的订阅者当前replay消息的位置

  1. 默认情况下requestCache是启用的,也就是applyHystrixSemantics返回的可观察对象会再次包装为Cache类型(HystrixCommandResponseFromCache)并缓存下来
  2. HystrixCommandResponseFromCache缓存对象默认构造器会自动订阅ReplaySubject对象(该对象既可以做观察者,也可以做被观察者)
  3. 可以看到缓存的生命周期是通过是否存在订阅来控制的,当订阅者为0时则取消对原被观察者对象的订阅对象ReplaySubject对象的订阅
protected HystrixCachedObservable(final Observable<R> originalObservable) {
    ReplaySubject<R> replaySubject = ReplaySubject.create();
    this.originalSubscription = originalObservable
            .subscribe(replaySubject);
    this.cachedObservable = replaySubject
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    outstandingSubscriptions--;
                    if (outstandingSubscriptions == 0) {
                        originalSubscription.unsubscribe();
                    }
                }
            })
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    outstandingSubscriptions++;
                }
            });
}

总结

观察者模式简单的讲就是被观察者提供一个列表,观察者在观察被观察者时将自身注册至被观察者列表中。在被观察者执行某些动作时,通知所有的观察者。
Hystrix的技术架构使用了命令行模式。优秀的设计模式使得服务的扩展性以及复用性很高。通过组合多个命令来实现一个复杂的命令或者说复杂的业务功能实现。面向组合开发。高内聚低耦合。而Hystrix底层的命令行模式我们也看到使用了RxJava的观察者模式实现。而RxJava观察者模式的优点则是基于事件触发提供异步服务。拥有优秀的并发性以及扩展性。当然我们看到其实现的代码复杂度也是较高的。RxJava的设计中使用了装饰器设计模式。一层层的包装来支持多个观察者与多个被观察者的传递通知
RxJava的观察者模式设计比常规自己写的观察者实现更为强大。增加了观察者与被观察者的很多特性支持。例如:flatMap支持依赖关系的被观察者合并。zip组合多个被观察者至一个。例如官网给出的图片示例image.png

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐