Spring-statemachine有限状态机(FSM)使用教程详解
一、状态机有限状态机是一种用来进行对象行为建模的工具,其作用主要是描述对象在它的生命周期内所经历的状态序列,以及如何响应来自外界的各种事件。在电商场景(订单、物流、售后)、社交(IM消息投递)、分布式集群管理(分布式计算平台任务编排)等场景都有大规模的使用。状态机的要素:状态机可归纳为4个要素,现态、条件、动作、次态。“现态”和“条件”是因,“动作”和“次态”是果。1 现态:指当前所处的状态2 条
一、状态机
有限状态机是一种用来进行对象行为建模的工具,其作用主要是描述对象在它的生命周期内所经历的状态序列,以及如何响应来自外界的各种事件。在电商场景(订单、物流、售后)、社交(IM消息投递)、分布式集群管理(分布式计算平台任务编排)等场景都有大规模的使用。
状态机的要素:
状态机可归纳为4个要素,现态、条件、动作、次态。“现态”和“条件”是因,“动作”和“次态”是果。
1 现态:指当前所处的状态
2 条件:又称“事件”,当一个条件被满足,将会触发一个动作,或者执行一次状态的迁移
3 动作:条件满足后执行的动作。动作执行完毕后,可以迁移到新的状态,也可以仍旧保持原状态。动作不是必须的,当条件满足后,也可以不执行任何动作,直接迁移到新的状态。
4 次态:条件满足后要迁往的新状态。“次态”是相对于“现态”而言的,“次态”一旦被激活,就转换成“现态”。
有限状态机体现了两点:首先是离散的,然后是有限的。
- State:状态这个词有些难以定义,状态存储关于过去的信息,就是说它反映从系统开始到现在时刻的输入变化。
- Actions & Transitions:转换指示状态变更,并且用必须满足来确使转移发生的条件来描述它。动作是在给定时刻要进行的活动的描述。
- Guards:检测器出现的原因是为了检测是否满足从一个状态切换到另外一个状态的条件。
- Event:事件,又见事件,笼统说来,对系统重要的某件事情被称为事件。
状态机动作类型:
进入动作:在进入状态时进行
退出动作:在退出状态时进行
输入动作:依赖于当前状态和输入条件进行
转移动作:在进行特定转移时进行
如下图示例:有限的状态集是“opend”以及“closed”。如果“现态”是“opend”,当“条件”为“Close”时,执行的“动作”是“close door”,次态则为“closed”。状态机逻辑执行完毕后“closed”则变成了“现态”。
所以FSM的执行逻辑可以理解为下图,即FSM的下一个状态和输出是由输入和当前状态决定的:
二 常见框架
对于使用Java语言的应用来说,可以选择的集成框架也比较多。
如squirrel-foundation,spring-statemachine,stateless4j 。
squirrel-foundation,stateless4j相对spring-statemachine更加轻量级。
三 状态机示例
现实中的例子:验票闸门,来自wiki
旋转门
用于控制地铁和游乐园游乐设施的旋转门是一个门,在腰高处有三个旋转臂,一个横跨入口通道。最初,手臂被锁定,阻挡了入口,阻止了顾客通过。将硬币或代币存放在旋转门上的槽中可解锁手臂,允许单个客户穿过。在顾客通过之后,再次锁定臂直到插入另一枚硬币。
旋转门被视为状态机,有两种可能的状态:锁定和解锁。有两种可能影响其状态的输入:将硬币放入槽(硬币)并推动手臂(推动)。在锁定状态下,推动手臂无效; 无论输入推送次数多少,它都处于锁定状态。投入硬币 - 即给机器输入硬币 - 将状态从锁定转换为解锁。在解锁状态下,放入额外的硬币无效; 也就是说,给予额外的硬币输入不会改变状态。然而,顾客推动手臂,进行推动输入,将状态转回Locked。
旋转门状态机可由状态转换表表示,显示每个可能状态,它们之间的转换(基于给予机器的输入)和每个输入产生的输出:
Current State | Input | Next State | Output |
---|---|---|---|
Locked | coin | Unlocked | 解锁旋转门,以便游客能够通过 |
Locked | push | Locked | None |
Unlocked | coin | Unlocked | None |
Unlocked | push | Locked | 当游客通过,锁定旋转门 |
旋转栅状态机也可以由称为状态图的有向图表示 (上面)。每个状态由节点(圆圈)表示。边(箭头)显示从一个状态到另一个状态的转换。每个箭头都标有触发该转换的输入。不引起状态改变的输入(例如处于未锁定状态的硬币输入)由返回到原始状态的圆形箭头表示。从黑点进入Locked节点的箭头表示它是初始状态。
1 Spring Statemachine
1.1 定位及特色
Spring Statemachine is a framework for application developers to use state machine concepts with Spring applications. Spring Statemachine 是应用程序开发人员在Spring应用程序中使用状态机概念的框架。
Spring Statemachine 提供如下特色:
- Easy to use flat one level state machine for simple use cases.(易于使用的扁平单级状态机,用于简单的使用案例。)
- Hierarchical state machine structure to ease complex state configuration.(分层状态机结构,以简化复杂的状态配置。)
- State machine regions to provide even more complex state configurations.(状态机区域提供更复杂的状态配置。)
- Usage of triggers, transitions, guards and actions.(使用触发器、transitions、guards和actions。)
- Type safe configuration adapter.(应用安全的配置适配器。)
- Builder pattern for easy instantiation for use outside of Spring Application context(用于在Spring Application上下文之外使用的简单实例化的生成器模式)
- Recipes for usual use cases(通常用例的手册)
- Distributed state machine based on a Zookeeper State machine event listeners.(基于Zookeeper的分布式状态机状态机事件监听器。)
- UML Eclipse Papyrus modeling.(UML Eclipse Papyrus 建模)
- Store machine config in a persistent storage.(存储状态机配置到持久层)
- Spring IOC integration to associate beans with a state machine.(Spring IOC集成将bean与状态机关联起来)
1.2 发展及社区
相关资源:
- Spring Statemachine 主页:Home
- Spring Statemachine Github:Github,Star:500+ & Fork:200+ (201809)
- Spring Statemachine Gitter:Gitter, less than 100 people
发布版本:
最新版本:v2.0.2.RELEASE 及 v1.2.12.RELEASE 已经44次版本发布,更详尽和最新情况请查看Github主页。
2 功能示例
2.1 基本功能
继续旋转门的现实例子,添加Maven依赖,本例子中使用版本如下:
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>2.0.2.RELEASE</version>
</dependency>
定义旋转门所处状态:锁定、解锁,使用枚举
public enum TurnstileStates {
Unlocked, Locked
}
定义旋转门操作事件:推门和投币,使用枚举
public enum TurnstileEvents {
COIN, PUSH
}
状态机配置,其中turnstileUnlock()和customerPassAndLock()即为当前状态变更后的扩展业务操作,可以根据实际业务场景进行修改
@Configuration
@EnableStateMachine
public class StatemachineConfigurer extends EnumStateMachineConfigurerAdapter<TurnstileStates, TurnstileEvents> {
@Override
public void configure(StateMachineStateConfigurer<TurnstileStates, TurnstileEvents> states)
throws Exception {
states
.withStates()
// 初识状态:Locked
.initial(TurnstileStates.Locked)
.states(EnumSet.allOf(TurnstileStates.class));
}
@Override
public void configure(StateMachineTransitionConfigurer<TurnstileStates, TurnstileEvents> transitions)
throws Exception {
transitions
.withExternal()
.source(TurnstileStates.Unlocked).target(TurnstileStates.Locked)
.event(TurnstileEvents.COIN).action(customerPassAndLock())
.and()
.withExternal()
.source(TurnstileStates.Locked).target(TurnstileStates.Unlocked)
.event(TurnstileEvents.PUSH).action(turnstileUnlock())
;
}
@Override
public void configure(StateMachineConfigurationConfigurer<TurnstileStates, TurnstileEvents> config)
throws Exception {
config.withConfiguration()
.machineId("turnstileStateMachine")
;
}
public Action<TurnstileStates, TurnstileEvents> turnstileUnlock() {
return context -> System.out.println("解锁旋转门,以便游客能够通过" );
}
public Action<TurnstileStates, TurnstileEvents> customerPassAndLock() {
return context -> System.out.println("当游客通过,锁定旋转门" );
}
}
启动类及测试用例
@SpringBootApplication
public class StatemachineApplication implements CommandLineRunner {
@Autowired
private StateMachine<TurnstileStates, TurnstileEvents> stateMachine;
public static void main(String[] args) {
SpringApplication.run(StatemachineApplication.class, args);
}
@Override
public void run(String... strings) throws Exception {
stateMachine.start();
System.out.println("--- coin ---");
stateMachine.sendEvent(TurnstileEvents.COIN);
System.out.println("--- coin ---");
stateMachine.sendEvent(TurnstileEvents.COIN);
System.out.println("--- push ---");
stateMachine.sendEvent(TurnstileEvents.PUSH);
System.out.println("--- push ---");
stateMachine.sendEvent(TurnstileEvents.PUSH);
stateMachine.stop();
}
}
结果输出,与上午所描述的状态机所描述的内容一致。
--- push ---
解锁旋转门,以便游客能够通过
--- push ---
--- coin ---
当游客通过,锁定旋转门
--- coin ---
2.2 实用功能
2.2.1 状态存储
状态机持久化,实际环境中,当前状态往往都是从持久化介质中实时获取的,Spring Statemachine通过实现StateMachinePersist接口,write和read当前状态机的状态
本例中,使用的是HashMap作为模拟存储介质,正式项目中需要使用真实的状态获取途径
@Component
public class BizStateMachinePersist implements StateMachinePersist<TurnstileStates, TurnstileEvents, Integer> {
static Map<Integer, TurnstileStates> cache = new HashMap<>(16);
@Override
public void write(StateMachineContext<TurnstileStates, TurnstileEvents> stateMachineContext, Integer integer) throws Exception {
cache.put(integer, stateMachineContext.getState());
}
@Override
public StateMachineContext<TurnstileStates, TurnstileEvents> read(Integer integer) throws Exception {
// 注意状态机的初识状态与配置中定义的一致
return cache.containsKey(integer) ?
new DefaultStateMachineContext<>(cache.get(integer), null, null, null, null, "turnstileStateMachine") :
new DefaultStateMachineContext<>(TurnstileStates.Locked, null, null, null, null, "turnstileStateMachine");
}
}
在StatemachineConfigurer中发布
@Autowired
private BizStateMachinePersist bizStateMachinePersist;
@Bean
public StateMachinePersister<TurnstileStates, TurnstileEvents, Integer> stateMachinePersist() {
return new DefaultStateMachinePersister<>(bizStateMachinePersist);
}
在StatemachineApplication中使用,自动注入StateMachinePersister对象,测试用例如下
stateMachine.start();
stateMachinePersist.restore(stateMachine, 1);
System.out.println("--- push ---");
stateMachine.sendEvent(TurnstileEvents.PUSH);
stateMachinePersist.persist(stateMachine, 1);
stateMachinePersist.restore(stateMachine, 1);
System.out.println("--- push ---");
stateMachine.sendEvent(TurnstileEvents.PUSH);
stateMachinePersist.persist(stateMachine, 1);
stateMachinePersist.restore(stateMachine, 1);
System.out.println("--- coin ---");
stateMachine.sendEvent(TurnstileEvents.COIN);
stateMachinePersist.persist(stateMachine, 1);
stateMachinePersist.restore(stateMachine, 1);
System.out.println("--- coin ---");
stateMachine.sendEvent(TurnstileEvents.COIN);
stateMachinePersist.persist(stateMachine, 1);
stateMachine.stop();
2.2.2 动作监听
定义动作监听类,StatemachineMonitor(名称随意),添加注解@WithStateMachine
。本例中使用id进行状态机绑定,根据文档定义,可以使用name和id两种属性绑定需要监听的状态机实例。如果不定义任何name或者id,默认监听名称为stateMachine
的状态机。
@WithStateMachine(id = "turnstileStateMachine")
public class StatemachineMonitor {
@OnTransition
public void anyTransition() {
System.out.println("--- OnTransition --- init");
}
@OnTransition(target = "Unlocked")
public void toState1() {
System.out.println("--- OnTransition --- toState1");
}
@OnStateChanged(source = "Unlocked")
public void fromState1() {
System.out.println("--- OnTransition --- fromState1");
}
}
其他Context的事件监听,后续文章进行描述,官网链接
2.2.2 状态机工程
实际业务环境中,往往是多线程处理不同的业务ID对应的状态,状态机中利用事件的context传递数据,会出现多线程问题,需要利用状态机工程,利用UUID创建不同状态机。
在StatemachineConfigurer类中,修改@EnableStateMachine
为@EnableStateMachineFactory
,同时添加状态机处理动作封装方法,读者可以根据业务场景定制,本例为一种可行方案
@Service
public class StatemachineService {
@Autowired
private StateMachinePersister<TurnstileStates, TurnstileEvents, Integer> stateMachinePersist;
@Autowired
private StateMachineFactory<TurnstileStates, TurnstileEvents> stateMachineFactory;
public void execute(Integer businessId, TurnstileEvents event, Map<String, Object> context) {
// 利用随记ID创建状态机,创建时没有与具体定义状态机绑定
StateMachine<TurnstileStates, TurnstileEvents> stateMachine = stateMachineFactory.getStateMachine(UUID.randomUUID());
stateMachine.start();
try {
// 在BizStateMachinePersist的restore过程中,绑定turnstileStateMachine状态机相关事件监听
stateMachinePersist.restore(stateMachine, businessId);
// 本处写法较为繁琐,实际为注入Map<String, Object> context内容到message中
MessageBuilder<TurnstileEvents> messageBuilder = MessageBuilder
.withPayload(event)
.setHeader("BusinessId", businessId);
if (context != null) {
context.entrySet().forEach(p -> messageBuilder.setHeader(p.getKey(), p.getValue()));
}
Message<TurnstileEvents> message = messageBuilder.build();
// 发送事件,返回是否执行成功
boolean success = stateMachine.sendEvent(message);
if (success) {
stateMachinePersist.persist(stateMachine, businessId);
} else {
System.out.println("状态机处理未执行成功,请处理,ID:" + businessId + ",当前context:" + context);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
stateMachine.stop();
}
}
}
示例二 spring statemachine
spring statemachine是使用 Spring框架下的状态机概念创建的一种应用程序开发框架。它使得状态机结构层次化,简化了配置状态机的过程。
官方文档:https://docs.spring.io/autorepo/docs/spring-statemachine/1.0.0.M3/reference/htmlsingle/#sm-statecontext
例子一:简单订单流程
使用过程:
1 引入依赖
<!--spring statemachine-->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
2 创建订单状态枚举类和状态转换枚举类
/**
* 订单状态
*/
public enum OrderStatus {
// 待支付,待发货,待收货,订单结束
WAIT_PAYMENT, WAIT_DELIVER, WAIT_RECEIVE, FINISH;
}
/**
* 订单状态改变事件
*/
public enum OrderStatusChangeEvent {
// 支付,发货,确认收货
PAYED, DELIVERY, RECEIVED;
}
3 添加配置
/**
* 订单状态机配置
*/
@Configuration
@EnableStateMachine(name = "orderStateMachine")
public class OrderStateMachineConfig extends StateMachineConfigurerAdapter<OrderStatus, OrderStatusChangeEvent> {
/**
* 配置状态
* @param states
* @throws Exception
*/
@Override
public void configure(StateMachineStateConfigurer<OrderStatus, OrderStatusChangeEvent> states) throws Exception {
states
.withStates()
.initial(OrderStatus.WAIT_PAYMENT)
.states(EnumSet.allOf(OrderStatus.class));
}
/**
* 配置状态转换事件关系
* @param transitions
* @throws Exception
*/
@Override
public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderStatusChangeEvent> transitions) throws Exception {
transitions
.withExternal().source(OrderStatus.WAIT_PAYMENT).target(OrderStatus.WAIT_DELIVER).event(OrderStatusChangeEvent.PAYED)
.and()
.withExternal().source(OrderStatus.WAIT_DELIVER).target(OrderStatus.WAIT_RECEIVE).event(OrderStatusChangeEvent.DELIVERY)
.and()
.withExternal().source(OrderStatus.WAIT_RECEIVE).target(OrderStatus.FINISH).event(OrderStatusChangeEvent.RECEIVED);
}
/**
* 持久化配置
* 实际使用中,可以配合redis等,进行持久化操作
* @return
*/
@Bean
public StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister(){
return new DefaultStateMachinePersister<>(new StateMachinePersist<OrderStatus, OrderStatusChangeEvent, Order>() {
@Override
public void write(StateMachineContext<OrderStatus, OrderStatusChangeEvent> context, Order order) throws Exception {
//此处并没有进行持久化操作
}
@Override
public StateMachineContext<OrderStatus, OrderStatusChangeEvent> read(Order order) throws Exception {
//此处直接获取order中的状态,其实并没有进行持久化读取操作
return new DefaultStateMachineContext<>(order.getStatus(), null, null, null);
}
});
}
}
4 添加订单状态监听器
@Component("orderStateListener")
@WithStateMachine(name = "orderStateMachine")
public class OrderStateListenerImpl{
@OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
public boolean payTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.WAIT_DELIVER);
System.out.println("支付 headers=" + message.getHeaders().toString());
return true;
}
@OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")
public boolean deliverTransition(Message<OrderStatusChangeEvent> message) {
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.WAIT_RECEIVE);
System.out.println("发货 headers=" + message.getHeaders().toString());
return true;
}
@OnTransition(source = "WAIT_RECEIVE", target = "FINISH")
public boolean receiveTransition(Message<OrderStatusChangeEvent> message){
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.FINISH);
System.out.println("收货 headers=" + message.getHeaders().toString());
return true;
}
}
5 service中使用
@Service("orderService")
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
@Autowired
private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister;
private int id = 1;
private Map<Integer, Order> orders = new HashMap<>();
@Override
public Order creat() {
Order order = new Order();
order.setStatus(OrderStatus.WAIT_PAYMENT);
order.setId(id++);
orders.put(order.getId(), order);
return order;
}
@Override
public Order pay(int id) {
Order order = orders.get(id);
System.out.println("threadName=" + Thread.currentThread().getName() + " 尝试支付 id=" + id);
Message message = MessageBuilder.withPayload(OrderStatusChangeEvent.PAYED).setHeader("order", order).build();
if (!sendEvent(message, order)) {
System.out.println("threadName=" + Thread.currentThread().getName() + " 支付失败, 状态异常 id=" + id);
}
return orders.get(id);
}
@Override
public Order deliver(int id) {
Order order = orders.get(id);
System.out.println("threadName=" + Thread.currentThread().getName() + " 尝试发货 id=" + id);
if (!sendEvent(MessageBuilder.withPayload(OrderStatusChangeEvent.DELIVERY).setHeader("order", order).build(), orders.get(id))) {
System.out.println("threadName=" + Thread.currentThread().getName() + " 发货失败,状态异常 id=" + id);
}
return orders.get(id);
}
@Override
public Order receive(int id) {
Order order = orders.get(id);
System.out.println("threadName=" + Thread.currentThread().getName() + " 尝试收货 id=" + id);
if (!sendEvent(MessageBuilder.withPayload(OrderStatusChangeEvent.RECEIVED).setHeader("order", order).build(), orders.get(id))) {
System.out.println("threadName=" + Thread.currentThread().getName() + " 收货失败,状态异常 id=" + id);
}
return orders.get(id);
}
@Override
public Map<Integer, Order> getOrders() {
return orders;
}
/**
* 发送订单状态转换事件
*
* @param message
* @param order
* @return
*/
private synchronized boolean sendEvent(Message<OrderStatusChangeEvent> message, Order order) {
boolean result = false;
try {
orderStateMachine.start();
//尝试恢复状态机状态
persister.restore(orderStateMachine, order);
//添加延迟用于线程安全测试
Thread.sleep(1000);
result = orderStateMachine.sendEvent(message);
//持久化状态机状态
persister.persist(orderStateMachine, order);
} catch (Exception e) {
e.printStackTrace();
} finally {
orderStateMachine.stop();
}
return result;
}
}
6 测试
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class OrderServiceImplTest {
@Autowired
private OrderService orderService;
@Test
public void testMultThread(){
orderService.creat();
orderService.creat();
orderService.pay(1);
new Thread(()->{
orderService.deliver(1);
orderService.receive(1);
}).start();
orderService.pay(2);
orderService.deliver(2);
orderService.receive(2);
System.out.println(orderService.getOrders());
}
}
例子二:状态机工厂
有些时候,一个状态机不够用,因为我们可能要处理多个订单。这个时候就要用到了状态机工厂。
1 配置修改
/**
* 订单状态机配置
*/
@Configuration
//@EnableStateMachine(name = "orderStateMachine")
@EnableStateMachineFactory(name = "orderStateMachineFactory")
public class OrderStateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderStatus, OrderStatusChangeEvent> {
/**订单状态机ID*/
public static final String orderStateMachineId = "orderStateMachineId";
/**
* 配置状态
* @param states
* @throws Exception
*/
@Override
public void configure(StateMachineStateConfigurer<OrderStatus, OrderStatusChangeEvent> states) throws Exception {
states
.withStates()
.initial(OrderStatus.WAIT_PAYMENT)
.states(EnumSet.allOf(OrderStatus.class));
}
/**
* 配置状态转换事件关系
* @param transitions
* @throws Exception
*/
@Override
public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderStatusChangeEvent> transitions) throws Exception {
transitions
.withExternal().source(OrderStatus.WAIT_PAYMENT).target(OrderStatus.WAIT_DELIVER).event(OrderStatusChangeEvent.PAYED)
.and()
.withExternal().source(OrderStatus.WAIT_DELIVER).target(OrderStatus.WAIT_RECEIVE).event(OrderStatusChangeEvent.DELIVERY)
.and()
.withExternal().source(OrderStatus.WAIT_RECEIVE).target(OrderStatus.FINISH).event(OrderStatusChangeEvent.RECEIVED);
}
/**
* 持久化配置
* 实际使用中,可以配合redis等,进行持久化操作
* @return
*/
@Bean
public StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister(){
return new DefaultStateMachinePersister<>(new StateMachinePersist<OrderStatus, OrderStatusChangeEvent, Order>() {
@Override
public void write(StateMachineContext<OrderStatus, OrderStatusChangeEvent> context, Order order) throws Exception {
//此处并没有进行持久化操作
order.setStatus(context.getState());
}
@Override
public StateMachineContext<OrderStatus, OrderStatusChangeEvent> read(Order order) throws Exception {
//此处直接获取order中的状态,其实并没有进行持久化读取操作
StateMachineContext<OrderStatus, OrderStatusChangeEvent> result =new DefaultStateMachineContext<>(order.getStatus(), null, null, null, null, orderStateMachineId);
return result;
}
});
}
}
2 service中使用
@Service("orderService")
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
// @Autowired
// private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;
public static final String stateMachineId = "orderStateMachine";
@Autowired
private StateMachineFactory<OrderStatus, OrderStatusChangeEvent> orderStateMachineFactory;
@Autowired
private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister;
private int id = 1;
private Map<Integer, Order> orders = new HashMap<>();
@Override
public Order creat() {
Order order = new Order();
order.setStatus(OrderStatus.WAIT_PAYMENT);
order.setId(id++);
orders.put(order.getId(), order);
return order;
}
@Override
public Order pay(int id) {
Order order = orders.get(id);
System.out.println(" 等待支付 -> 等待发货 id=" + id + " threadName=" + Thread.currentThread().getName());
Message message = MessageBuilder.withPayload(OrderStatusChangeEvent.PAYED).setHeader("order", order).build();
if (!sendEvent(message, order)) {
System.out.println(" 等待支付 -> 等待发货 失败, 状态异常 id=" + id + " threadName=" + Thread.currentThread().getName());
} else {
System.out.println(" 等待支付 -> 等待发货 成功 id=" + id + " threadName=" + Thread.currentThread().getName());
}
return orders.get(id);
}
@Override
public Order deliver(int id) {
Order order = orders.get(id);
System.out.println(" 等待发货 -> 等待收货 id=" + id + " threadName=" + Thread.currentThread().getName());
if (!sendEvent(MessageBuilder.withPayload(OrderStatusChangeEvent.DELIVERY).setHeader("order", order).build(), orders.get(id))) {
System.out.println(" 等待发货 -> 等待收货 失败,状态异常 id=" + id + " threadName=" + Thread.currentThread().getName());
} else {
System.out.println(" 等待发货 -> 等待收货 成功 id=" + id + " threadName=" + Thread.currentThread().getName());
}
return orders.get(id);
}
@Override
public Order receive(int id) {
Order order = orders.get(id);
System.out.println(" 等待收货 -> 完成 收货 id=" + id + " threadName=" + Thread.currentThread().getName());
if (!sendEvent(MessageBuilder.withPayload(OrderStatusChangeEvent.RECEIVED).setHeader("order", order).build(), orders.get(id))) {
System.out.println(" 等待收货 -> 完成 失败,状态异常 id=" + id + " threadName=" + Thread.currentThread().getName());
} else {
System.out.println(" 等待收货 -> 完成 成功 id=" + id + " threadName=" + Thread.currentThread().getName());
}
return orders.get(id);
}
@Override
public Map<Integer, Order> getOrders() {
return orders;
}
/**
* 发送订单状态转换事件
*
* @param message
* @param order
* @return
*/
private boolean sendEvent(Message<OrderStatusChangeEvent> message, Order order) {
synchronized (String.valueOf(order.getId()).intern()) {
boolean result = false;
StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine = orderStateMachineFactory.getStateMachine(stateMachineId);
System.out.println("id=" + order.getId() + " 状态机 orderStateMachine" + orderStateMachine);
try {
orderStateMachine.start();
//尝试恢复状态机状态
persister.restore(orderStateMachine, order);
System.out.println("id=" + order.getId() + " 状态机 orderStateMachine id=" + orderStateMachine.getId());
//添加延迟用于线程安全测试
Thread.sleep(1000);
result = orderStateMachine.sendEvent(message);
//持久化状态机状态
persister.persist(orderStateMachine, order);
} catch (Exception e) {
e.printStackTrace();
} finally {
orderStateMachine.stop();
}
return result;
}
}
}
3 listener中配置id
@Component("orderStateListener")
@WithStateMachine(id = OrderStateMachineConfig.orderStateMachineId)
public class OrderStateListenerImpl {
@OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
public boolean payTransition(Message<OrderStatusChangeEvent> message) {
System.out.println("----------------------------");
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.WAIT_DELIVER);
System.out.println("支付 headers=" + message.getHeaders().toString() + " event=" + message.getPayload());
System.out.println("----------------------------");
return true;
}
@OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")
public boolean deliverTransition(Message<OrderStatusChangeEvent> message) {
System.out.println("----------------------------");
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.WAIT_RECEIVE);
System.out.println("发货 headers=" + message.getHeaders().toString() + " event=" + message.getPayload());
System.out.println("----------------------------");
return true;
}
@OnTransition(source = "WAIT_RECEIVE", target = "FINISH")
public boolean receiveTransition(Message<OrderStatusChangeEvent> message) {
System.out.println("----------------------------");
Order order = (Order) message.getHeaders().get("order");
order.setStatus(OrderStatus.FINISH);
System.out.println("收货 headers=" + message.getHeaders().toString() + " event=" + message.getPayload());
System.out.println("----------------------------");
return true;
}
}
完整示例参见:链接
四 参考示例
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)