Flink流计算处理-旁路输出

news/2024/6/16 2:17:06 标签: flink, java, 大数据

使用Flink做流数据处理时,除了主流数据输出,还自定义侧流输出即旁路输出,以实现灵活的数据拆分。

定义旁路输出标签

首先需要定义一个OutputTag,代码如下:

java">// 这需要是一个匿名的内部类,以便我们分析类型
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

在ProcessFunction使用Context调用

可以通过以下Function中,将outputTag作为参数传递到Context中

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

代码示例:

java">DataStream<Integer> input = ...;

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

SingleOutputStreamOperator<Integer> mainDataStream = input
  .process(new ProcessFunction<Integer, Integer>() {

      @Override
      public void processElement(
          Integer value,
          Context ctx,
          Collector<Integer> out) throws Exception {
        // 发送数据到主要的输出
        out.collect(value);

        // 发送数据到旁路输出
        ctx.output(outputTag, "sideout-" + String.valueOf(value));
      }
    });

在 DataStream 运算结果上使用 getSideOutput(OutputTag) 方法获取旁路输出流:

java">final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
// 获取到侧流输出DataStream,输出结果类型要与outputTag 定义的一致
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

本文中只列出了Java代码的实现;
Flink官网还有Scala/python代码实现

参考链接:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/datastream/side_output/


http://www.niftyadmin.cn/n/58506.html

相关文章

[LeetCode 1138]字母板上的路径

题目描述 题目链接&#xff1a;[LeetCode 1138]字母板上的路径 我们从一块字母板上的位置 (0, 0) 出发&#xff0c;该坐标对应的字符为 board[0][0]。 在本题里&#xff0c;字母板为board [“abcde”, “fghij”, “klmno”, “pqrst”, “uvwxy”, “z”]&#xff0c;如下所…

[]复习]cityengine2019/2022导入shp数据生成福田区建筑群

时间是一把杀猪刀 和人工智能比起来我太弱了.很无助.无法给自己升级系统. cityengine2019目前载入那种地区线上数据是行不通了,2022可以整一个邮箱试用一个月.https://www.esri.com/zh-cn/arcgis/products/arcgis-cityengine/trial/professionals 我整了一个万能无线邮箱,任意x…

Java中的异常处理

1.概述 在 Java 中&#xff0c;所有的异常都有一个共同的祖先java.lang包中的 Throwable类。 异常是程序中的一些错误&#xff0c;但并不是所有的错误都是异常&#xff0c;并且错误有时候是可以避免的。 比如说&#xff0c;你的代码少了一个分号&#xff0c;那么运行出来结果…

两个QMainWindow能用信号与槽吗?

是的&#xff0c;两个QMainWindow可以通过信号和槽进行通信。您可以在其中一个QMainWindow中定义信号&#xff0c;在另一个QMainWindow中连接该信号&#xff0c;并定义槽函数来处理信号发射的数据。 以下是一个简单的示例&#xff1a; from PyQt5.QtCore import pyqtSignal, …

LeetCode第332场周赛

2023.2.12LeetCode第332场周赛 6354. 找出数组的串联值 思路 双指针模拟&#xff0c;两个指针相遇的时候要特判 算法 class Solution { public:long long findTheArrayConcVal(vector<int>& nums) {long long ans 0;int i 0, j nums.size() - 1;while (i <…

P7面试送命题

面试总结&#xff0c;对标市场P7。什么叫送命题&#xff0c;一道题回答不上来面试直接挂的题目。JVM 运行时数据区域内存回收机制GC root有哪些volatile原理synchronize原理JDK 集合家族介绍HashMap原理ConcurrentHashMap原理Thread生命周期ThreadPoolExecutor生命周期、实例化…

ubuntu部署quark-5

下载源代码 解压zip sudo apt-get install unzip unzip xxxxx.zip安装所需要的包 sudo apt install python2 # 查看是否配置了默认的python sudo update-alternatives --list python若没有设置&#xff0c;会显示&#xff1a; # update-alternatives: error: no alterna…

30分钟吃掉wandb可视化自动调参

wandb.sweep: 低代码&#xff0c;可视化&#xff0c;分布式 自动调参工具。使用wandb 的 sweep 进行超参调优&#xff0c;具有以下优点。(1)低代码&#xff1a;只需配置一个sweep.yaml配置文件&#xff0c;或者定义一个配置dict&#xff0c;几乎不用编写调参相关代码。(2)可视化…