<>一:背景
我们项目中用到Flink的Java客户端 用来做数据处理
数据源:kafka
发送源:kafka
原来只有一个业务需求,可以理解为对mq消息中的一个字段做累计和。
现在又多了两个业务需求,可以理解为对mq消息的其他字段做累加和。
此时面临的问题是:
flink 做完类似于 map filter keyby reduce 等算子操作时,是否只能为一个业务使用?
如果可以供多个业务使用数据源,则我们不需要考虑 如果只能供一个业务使用数据源,我们需要将同数据源的数据复制一份,或者复制一份客户端代码。 听起来很不优雅
如果没明白上面这个诉求,我们看下代码
// 创建数据流 DataStream<TaskNodeInstance> sourceStream = StreamCommon.
getKafkaSourceStream(parameters, env); // 对消息进行过滤 DataStream<TaskNodeInstance>
filterStream= sourceStream .filter(new NotNullFilter()) .filter(new BasicFilter(
)); // 业务1.失败响应码统计 filterStream.filter(new RespCodeFilter()) .flatMap(new
RespCodeCountMap()) .keyBy("taskId") .timeWindow(Time.seconds(10)) .reduce(new
RespCodeCountReduce(), new RespCodeCountWindowFunction()) .addSink(new
RespCodeStatSink(parameters)).name("RespCodeStatSink:" + profile); //
业务2.智能推荐位结果统计 filterStream.filter(new RecommendFilter()) .map(new
RecommendCountMap()) .keyBy("taskId") .timeWindow(Time.seconds(10)) .reduce(new
RecommendCountReduce(), new RecommendCountWindow()) .addSink(new RecommendSink(
parameters)).name("RecommendCountSink:" + profile) ;
宁看这样写有问题吗?
如果在业务1.失败响应码统计中 map、reduce等算子消费了 filterStream 对象的数据,那么我们业务2.智能推荐位结果统计这样写肯定不对了。
数据已经被消费了
当时我直观上是这样认为的,然后想到了两个解决办法
* 再为另一个业务写一份客户端代码
* 在本客户端代码中,再增加同一个kafka-topic数据源,group-id不同即可
上面两种方法都可以解决问题,但是不优雅。
于是我开始debug-flink-client的源码
背后的真相竟令人暖心
原来每一步算子操作都会 new 出来新的对象
不会影响之前的数据
也就是说 我上面那个代码是正确的
<>二:Debug-flink-client-DataStream源码
debug-dataStream类的源码
拿map算子进入发现
what???返回的是new 的新对象。那意思每次算子操作都会产生新的对象,对之前的DataStream不会影响了。
我们debug跑起来看下内存地址,由于链式编程没有返回对象值,这里我把每一步的链式编程都用一个临时对象接受下
看下对象指向的内存地址
好啊 好啊 果然每一步算子操作都是new 出来一个新的DataStream
太好了 那么我上面写的业务代码就没问题
<>三:验证
打开 flink-web界面,提交作业。
跑起来
看看 taskManager的总览
当时看到这个,基本上验证了我所说的。最原始的DataStream,从kafka数据源获取到的对象,是不会被污染的,可以供多个业务使用。
再看下 taskManager的日志:
太开心了,可以了。
棒