1. 创建Flux及Mono

1.1 使用just从现有的已知内容和大小的数据创建Flux或Mono

Flux.just(new Integer[]{1, 2, 3, 4})
	//观察者监听被观察者(消费者)
	.subscribe(System.out::println);

//使用可变参数创建Flux
Flux.just(1, 2, 3, 4)
	.subscribe(System.out::println);

//使用just创建Mono
Mono.just("1s")
	.subscribe(System.out::println);
 Mono.just(new Integer[]{1, 2, 3, 4})
	.subscribe(System.out::println);

1.2 使用fromIterable从可迭代对象中创建Flux

 //从可迭代的对象中创建Flux
Flux.fromIterable(Arrays.asList(1,2,3,4))
	.subscribe(System.out::println);

ArrayList<Integer> list = Lists.newArrayList(1, 2, 3, 4);
Flux<Integer> flux = Flux.fromIterable(list);
	//在创建Flux后追加元素
	list.add(5);
flux.subscribe(System.out::println);

1.3 使用fromStream从集合流中创建Flux

 Flux.fromStream(Stream.of(1,2,3,4))
                .subscribe(System.out::println);

1.4 使用range中创建一个范围内迭代的Flux

Flux.range(0,10)
	.subscribe(System.out::println);

1.5 创建定时发送Flux

1.5.1 使用interval创建间隔某一时间异步执行的Flux

//interval() 方法可以用来生成从 0 开始递增的 Long 对象的数据序列
Flux.interval(Duration.ofMillis(100))
	//  map可以对数据进行处理
	.map(i->"执行内容:"+i)
	 //限制执行10次
	.take(5)
	.subscribe(System.out::println);
//避免主线程提前结束
Thread.sleep(1100);

1.5.2 使用delayElements延时发送

 Flux.fromIterable(Lists.newArrayList(1,2,3,4))
	//延时发送
	.delayElements(Duration.ofMillis(100L))
	.subscribe(System.out::println);
//避免主线程提前结束
Thread.sleep(1100);

1.6 Flux与Mono之间的相互转换

1.6.1 Flux与Mono之间的相互转换

        Flux<Integer> just = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        just.subscribe(System.out::println);
        Mono<List<Integer>> mono = just.collectList();
        mono.subscribe(System.out::println);
        //自定义收集器
        Mono<List<Integer>> monoList = just.collect(toList());
        monoList.subscribe(System.out::println);
        //Mono转仅有一个元素的Flux
        Flux<List<Integer>> flux = mono.flux();
        flux.subscribe(System.out::println);
        //将一个元素的Flux转Mono
        Mono<Integer> single = Flux.just(1).single();
        single.subscribe(System.out::println);

1.6.2 使用concatWith从多个Mono组合成Flux

Flux<String> flux = Mono.just("1").concatWith(Mono.just("2"));
flux.subscribe(System.out::println);

1.6.3 使用concatWithValues追加Flux

//连接多个Flux
Flux.just("连接")
	//连接两个Flux
	.concatWith(Flux.just("两个"))
	//将元素追加到Flux
	.concatWithValues("或追加")
	.subscribe(System.out::print);

1.7 动态方法创建 Flux

1.7.1 generate动态创建Flux

generate() 方法生成 Flux 序列依赖于 Reactor 所提供的 SynchronousSink 组件,定义如下。

public static <T> Flux<T> generate(Consumer<SynchronousSink<T>> generator)

SynchronousSink 组件包括 next()、complete() 和 error() 这三个核心方法。从 SynchronousSink 组件的命名上就能知道它是一个同步的 Sink 组件,也就是说元素的生成过程是同步执行的。这里要注意的是 next() 方法只能最多被调用一次。使用 generate() 方法创建 Flux 的示例代码如下。

// 同步动态创建, next() 方法只能最多被调用一次
Flux.generate(sink -> {
	sink.next("1");
	//第二次会报错:
	//java.lang.IllegalStateException: More than one call to onNext
	//sink.next("2");
	//如果不调用 complete() 方法,那么就会生成一个所有元素均为“1”的无界数据流
	sink.complete();
}).subscribe(System.out::println);

如果想要在序列生成过程中引入状态,那么可以使用如下所示的 generate() 方法重载。
generate() 重载方法:

public static <T, S> Flux<T> generate(
	Callable<S> stateSupplier, 
	BiFunction<S, SynchronousSink<T>, S> generator) {
		return onAssembly(new FluxGenerate<>(stateSupplier, generator));
}

示例:

 Flux.generate(() -> 1, (i, sink) -> {
            sink.next(i);
            if (i == 5) {
                sink.complete();
            }
            return ++i;
        }).subscribe(System.out::println);

这里我们引入了一个代表中间状态的变量 i,然后根据 i 的值来判断是否终止序列。()->1 设置初始态
显然,以上代码的执行效果会在控制台中输入 1 到 5 这 5 个数字。

1.7.2 create动态创建Flux

create() 方法与 generate() 方法比较类似,但它使用的是一个 FluxSink 组件,定义如下。

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

FluxSink 除了 next()、complete() 和 error() 这三个核心方法外,还定义了背压策略,并且可以在一次调用中产生多个元素。使用 create() 方法创建 Flux 的示例代码如下。

Flux.create(sink -> {
            for (int i = 0; i < 5; i++) {
                sink.next("Tang" + i);
            }
            sink.complete();
}).subscribe(System.out::println);

运行该程序,我们会在系统控制台上得到从“Tang0”到“Tang4”的 5 个数据。

1.8 Mono 对象创建响应式流

对于 Mono 而言,可以认为它是 Flux 的一种特例,所以很多创建 Flux 的方法同样适用。针对静态创建 Mono 的场景,前面给出的 just()、empty()、error() 和 never() 等方法同样适用。除了这些方法之外,比较常用的还有 justOrEmpty() 等方法。

justOrEmpty() 方法会先判断所传入的对象中是否包含值,只有在传入对象不为空时,Mono 序列才生成对应的元素,该方法示例代码如下。

Mono.justOrEmpty(Optional.of("Tang")).subscribe(System.out::println);

另一方面,如果要想动态创建 Mono,我们同样也可以通过 create() 方法并使用 MonoSink 组件,示例代码如下。

Mono.create(sink -> sink.success("Tang")).subscribe(System.out::println);

使用fromCallable动态创建Mono

Mono.fromCallable(() -> {
	Thread.sleep(1000);
	return "1";
}).subscribe(System.out::println);

2.异常处理

2.1 创建包含异常的Flux和Mono

//直接创建一个包含异常的Flux
Flux.error(new Exception());
//直接创建一个包含异常的Mono
Mono.error(new Exception());

2.2 异常处理

        Mono.just("1")
                //连接一个包含异常的Mono
                .concatWith(Mono.error(new Exception("Exception")))
                //异常监听
                .doOnError(error -> System.out.println("错误: " + error))
                //在发生异常时将其入参传递给订阅者
                .onErrorReturn("ErrorReturn")
                .subscribe(System.out::println);

2.2.1 调用subscribe可以指定需要处理的消息类型

consumer:正常消费,errorConsumer:异常处理,completeConsumer:消费完成

public final Disposable subscribe(@Nullable Consumer<? super T> consumer, @Nullable Consumer<? super Throwable> errorConsumer, @Nullable Runnable completeConsumer) 
Flux.just(1,2,3,4)
	.concatWith(Mono.error(new Exception("Exception")))
	.subscribe(System.out::println,System.err::println,()-> System.out.println("完成"));

2.2.2 通过onErrorResume当发生异常时重新产生新的流

Flux.just(1,2,3,4)
	.concatWith(Mono.error(new Exception("Exception")))
	.onErrorResume(e -> {
		System.out.println(e);
		return Flux.just(11,12,13);
	})
	.subscribe(System.out::println);

2.2.3 retry重试

Flux.just(1,2,3,4)
	.concatWith(Mono.error(new Exception("Exception")))
	.retry(1)
	 .subscribe(System.out::println);

3. 常用方法

3.1 merge 、mergeSequential、mergeComparing

3.1.1 merge

merge按照所有流中元素的实际产生序列来合并

Flux.merge(Flux.interval(Duration.ofMillis(10)).take(5),
	Flux.interval(Duration.ofMillis(10)).take(3))
	.log()
	.subscribe();
Thread.sleep(1000);

3.1.2 mergeSequential

mergeSequential按照所有流被订阅的顺序,以流为单位进行合并。
例如: FluxA 和FluxB 只有在A消费完后才会去消费B

Flux.mergeSequential(Flux.interval(Duration.ofMillis(10)).take(5),
	Flux.interval(Duration.ofMillis(10)).take(3))
	.log()
	.subscribe();
Thread.sleep(1000);

3.1.3 mergeComparing

消费两个流中较小的那个

prefetch:比较个数
comparator:比较器
sources:数据流

 public static <T> Flux<T> mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources)
Flux.mergeComparing(Flux.just(1,2,9,4,76,6),
	Flux.just(2,75,4,3,5,6))
	.log()
	.subscribe();
Thread.sleep(1000);

3.2 buffer、bufferTimeout、bufferWhile、bufferUntil

把当前流中的元素收集到集合中,并把集合对象作为流中的新元素

3.2.1 buffer: 收集为集合

当maxSize > skip 时 重叠 如3,2 [1,2,3],[3,4,5],[5,6,7],[7,8,9],[9,10]
当maxSize < skip 时 重叠 如3,4 [1,2,3],[5,6,7],[9,10]
当maxSize = skip 时 准确分割 等价于只传maxSize 如3,3 [1,2,3],[4,5,6],[7,8,9],[10]

 Flux.range(1, 10)
//	.buffer(3,2)
//	.buffer(3,4)
//	.buffer(3,3)
	.buffer(3)
	.subscribe(System.out::println);

3.2.2 bufferTimeout: 可以根据maxSize 或 skip 满足其一就可以切割集合

Flux.interval(Duration.ofMillis(100L))
	.bufferTimeout(9,Duration.ofMillis(1000L))
	.subscribe(System.out::println);
Thread.sleep(10000);

3,2.3 bufferWhile: 则只有当Predicate返回true时才会收集。一旦为false,会立即开始下一次收集。

Flux.range(1, 10)
	.bufferWhile(i -> i % 2 == 0)
	.subscribe(System.out::println);

3.2.4 bufferUntil: 会一直收集直到Predicate返回true。

Flux.range(1, 10)
	.bufferUntil(i -> i % 2 == 0)
	.subscribe(System.out::println);

cutBefore true : Predicate返回true放到下一个集合中

Flux.range(1, 10)
	.bufferUntil(i -> i % 2 == 0,true)
	.subscribe(System.out::println);

3.3 Filter 对流中包含的元素进行过滤

对流中包含的元素进行过滤,只留下满足Predicate指定条件的元素。

Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);

3.4 zipWith 把当前流中的元素与另一个流中的元素按照一对一的方式进行合并

把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。多的元素被舍弃,可以通过BiFunction函数对合并的元素进行处理

Flux.just(1, 2)
	.zipWith(Flux.just(3, 4))
	.subscribe(System.out::println);
//通过BiFunction函数对合并的元素进行处理
 Flux.just(1, 2,3)
	.zipWith(Flux.just(4, 5), (s1, s2) -> s1 + "-" + s2)
	subscribe(System.out::println);

3.5 take 用来从当前流中提取元素

//提取指定数量的元素
Flux.range(1, 1000)
	.take(10)
	.subscribe(System.out::println);

 //按时间间隔提取元素
Flux.interval(Duration.ofMillis(10))
	.take(Duration.ofMillis(100))
	.subscribe(System.out::println);
Thread.sleep(1000);

//提取最后N个元素
Flux.range(1, 1000)
	.takeLast(10)
	.subscribe(System.out::println);

//当Predicate返回true时才进行提取
Flux.range(1, 1000)
	.takeWhile(i -> i < 10)
	.subscribe(System.out::println);

//提取元素直到Predicate返回true
Flux.range(1, 1000)
	.takeUntil(i -> i == 10)
	.subscribe(System.out::println);

3.6 reduce和reduceWith 归约操作,可以进行累加累乘等操作

Flux.range(1, 100)
	.reduce((x, y) -> x + y)
	.subscribe(System.out::println);
        
//设定默认值
 Flux.range(1, 100)
	.reduce(100,(x, y) -> x + y)
	.subscribe(System.out::println);

//可以设置Supplier初始值
Flux.range(1, 100)
	.reduceWith(() -> 100, (x ,y) -> x + y)
	.subscribe(System.out::println)

3.7 flatMap和flatMapSequential 把流中的每个元素转换成一个流,再把所有流中的元素进行合并。

//flatMap按实际生产顺序进行合并
Flux.just(5, 10)
	.flatMap(x -> Flux.interval(
		Duration.ofMillis(x * 10),
		Duration.ofMillis(100)).take(x)
	)
	.subscribe(System.out::println);
Thread.sleep(1000);
        
        
//flatMapSequential按订阅顺序进行合并
Flux.just(5, 10)
	.flatMapSequential(x -> Flux.interval(
		Duration.ofMillis(x * 10),
		Duration.ofMillis(100)).take(x)
	)
	.subscribe(System.out::println);
Thread.sleep(1000);

3.8 concatMap 把流中的每个元素转换成一个流,再把所有流进行合并

concatMap会根据原始流中的元素顺序依次把转换之后的流进行合并,并且concatMap堆转换之后的流的订阅是动态进行的,而flatMapSequential在合并之前就已经订阅了所有的流。

Flux.just(5, 10)
	.concatMap(x -> Flux.interval(
		Duration.ofMillis(x * 10),
		Duration.ofMillis(100)).take(x)
	)
	.subscribe(System.out::println);
Thread.sleep(1000);

3.9 combineLatest 把所有流中的最新产生的元素合并成一个新的元素

把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。

Flux.combineLatest(Arrays::toString,
		Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(5),
		Flux.interval(Duration.ofMillis(50), Duration.ofMillis(100)).take(5))
	.subscribe(System.out::println);
Thread.sleep(1000);

3.10 使用skip跳过元素

//跳过指定条数
Flux.just(1,2,3,4,5,6,7)
	.skip(2)
	.subscribe(System.out::println);
	
//跳过指定时间间隔
Flux.interval(Duration.ofMillis(100))
	.skip(Duration.ofMillis(300))
	.log()
	.subscribe();
Thread.sleep(1000);

3.11 使用distinct去重

Flux.just(1,1,2,2,5,6,7)
	.distinct()
	.subscribe(System.out::println);

3.12 从Flux获取首元素和尾元素

Flux<Integer> just = Flux.just(1, 2, 3, 4, 5, 6, 7);
Mono<Integer> last = just.last();
Mono<Integer> first = just.next();

3.13 从Flux阻塞式取一个元素

Flux<String> flux = Flux.create(skin -> {
	for (int i = 0; i < 2; ++i) {
		try {
			Thread.sleep(100);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		skin.next("这是第" + i + "个元素");
	}
	skin.complete();
});
//flux订阅者所有操作都是无副作用的,即不会改变原flux对象数据
//阻塞式订阅,只要有一个元素进入Flux
String first = flux.blockFirst();
//输出:  这是第0个元素
System.out.println(first);
//还是输出: 这是第0个元素
System.out.println(flux.blockFirst());
//输出: 这是第1个元素
System.out.println(flux.blockLast());
//还是输出: 这是第1个元素
System.out.println(flux.blockLast());

3.14 监听:doOnError、doOnComplete、doFinally、doOnSubscribe

        Flux.just(1,2,3,4,5,6,7,8,9)
                .concatWith(Flux.error(new Exception()))
                //错误时执行
                .doOnError(e -> System.out.println("报错:" + e))
                //完成时执行
                .doOnComplete(()-> System.out.println("数据接收完成"))
                //最后执行
                .doFinally(t-> System.out.println("最后执行信息:" + t))
                .subscribe(System.out::println);
 //消费者参与前执行的最后一件事,入参为消费者对象(一般用于修改、添加、删除源数据流)
Flux.just(1,2,3,4,5,6,7,8,9)
	.log()
	.doOnSubscribe(i ->{
		System.out.println("先请求2个");
		i.request(2);
		System.out.println("再请求3个");
		i.request(3);
		i.cancel();
		System.out.println("取消监听");
	})
	.subscribe(System.out::println);

4. 背压------主动控制订阅量

4.1 原始的Subscriber::onNext

Flux.interval(Duration.ofMillis(10L))
	.subscribe(new Subscriber<Long>() {
		Subscription subscription;
		AtomicInteger count = new AtomicInteger();
		@Override
		public void onSubscribe(Subscription subscription) {
			this.subscription = subscription;
			//首先请求5个
			subscription.request(5);
			count.set(5);
		}

		@Override
		public void onNext(Long aLong) {
			System.out.print(" value:" + aLong);
			try {
				Thread.sleep(100L);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			if (count.decrementAndGet() <= 0){
				System.out.println("    消费完成,重新请求5个");
				subscription.request(5);
				count.set(5);
			}
		}

		@Override
		public void onError(Throwable throwable) {
		 
		 }

		@Override
		public void onComplete() {
			System.out.println("全部消费完成");
		}
});
Thread.sleep(5000L);

4.2 BaseSubscriber

Flux.range(1,50)
	.log()
	.subscribe(new BaseSubscriber<Integer>() {
		private int count = 0;
		private final int limit = 5;
		@Override
		protected void hookOnSubscribe(Subscription subscription) {
			request(limit);
		}

		@Override
        protected void hookOnNext(Integer value) {
        	try {
            	Thread.sleep(100);
            } catch (InterruptedException e) {
                 e.printStackTrace();
            }
            if (++count == limit){
            	request(count);
				count = 0;
			}
		}
	});

4.3 limitRate

        Flux.interval(Duration.ofMillis(10L))
                .take(10)
                .log()
                .limitRate(4)
                .subscribe();
        Thread.sleep(1000L);

当已经处理75%的数据量时会重新请求下一批数据

在这里插入图片描述
在这里插入图片描述

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐