开发者社区> 问答> 正文

订阅者如何来控制发布者?

我有一个发布者,其发布速度可能比订阅者处理数据的速度快。为了解决这个问题,我开始使用背压。因为我不想丢弃任何数据,所以我使用了反应性拉背压。我理解这是因为订阅服务器可以告诉发布服务器何时发布更多数据,如本节及以下各段所述。

发布者是一个Flowable,它并行地异步执行其工作,然后合并到一个顺序的Flowable中。数据最多应缓冲10个元素,并且当此缓冲区已满时,Flowable不应再发布任何数据并等待下一个请求。

订户是一个DisposableSubscriber,它在开始时请求10个项目。每个消耗的物品都需要进行一些计算,然后再请求一个新物品。

我的MWE看起来像这样:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
    src.add(i);
}
Flowable.fromIterable(src)
        .parallel(10, 1)
        .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
        .flatMap(i -> Single.fromCallable(() -> {
            System.out.println("publisher: " + i);
            Thread.sleep(200);
            return i;
        }).toFlowable())
        .sequential(1)
        .onBackpressureBuffer(10)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .doOnError(Throwable::printStackTrace)
        .subscribeWith(new DisposableSubscriber<Integer>() {
            @Override
            protected void onStart() {
                request(10);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("subscriber: " + integer);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
try {
    Thread.sleep(1000000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

我期望此代码执行以下操作:订户请求前10个项目。发布者发布前10个项目。然后,订户进行计算onNext并请求更多项目,这些项目将由发布者发布。

实际发生的情况:首先,发布者似乎无限制地发布项目。在某个时候,例如在发布了14个项目之后,订户将处理其第一个项目。在这种情况下,发布者继续发布项目。大约发布30个项目后,将io.reactivex.exceptions.MissingBackpressureException: Buffer is full引发a,并且流结束。

我的问题:我在做什么错?如何让订阅者控制发布者是否以及何时发布数据?显然,我做错了什么。否则,期望与现实不会有太大不同。

上述MWE的示例输出:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

问题来源:Stack Overflow

展开
收起
montos 2020-03-22 15:16:44 856 0
1 条回答
写回答
取消 提交回答
  • 不是Rx方面的专家,但是让我at一息.. observeOn(...)它有自己的默认缓冲区大小128。因此,从一开始,它就会向上游请求更多的缓冲区。

    observeOn(...)接受可选的缓冲区大小覆盖,但是即使您提供了它,ParallelFlowable也会flatMap(...)比您想要的更频繁地调用您的方法。我不是100%知道为什么,也许它有自己的内部缓冲,在将滑轨合并回顺序时会执行。

    我认为您可以通过使用flatMap(...)而不是parralel(...)提供maxConcurrency参数来更接近所需的行为。

    要记住的另一件事是您不想调用subscribeOn(...)-这是要完全影响上游Flowable。因此,如果您已经在调用parallel(...).runOn(...),它无效,否则效果将是意外的。

    有了以上这些,我认为这可以使您更接近所需的内容:

    List<Integer> src = new ArrayList<>();
        for (int i = 0; i < 200; i++) {
            src.add(i);
        }
        Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
        Flowable.fromIterable(src)
                .flatMap(
                        i -> Flowable.just( i )
                                .subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
                                .map( __ -> {
                                    System.out.println("publisher: " + i);
                                    Thread.sleep(200);
                                    return i;
                                } ),
                        10) // max concurrency
                .observeOn(Schedulers.newThread(), false, 10) // override buffer size
                .doOnError(Throwable::printStackTrace)
                .subscribeWith(new DisposableSubscriber<Integer>() {
                    @Override
                    protected void onStart() {
                        request(10);
                    }
                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("subscriber: " + integer);
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        request(1);
                    }
                    @Override
                    public void onError(Throwable t) {
                    }
                    @Override
                    public void onComplete() {
                    }
                });
        try {
            Thread.sleep(1000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    

    回答来源:Stack Overflow

    2020-03-22 15:17:19
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载