<>Flink接收Kafka流数据使用Table API和SQL处理

Apache Flink具有两个关系API - Table API和SQL - 用于统一流和批处理。 Table
API是Scala和Java语言集成查询API,允许以非常直观的方式来做数据处理,此API的核心概念是 Table 用作查询的输入和输出。

上代码:
一、相关依赖
<!-- Flink相关--> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId> <version>1.3.2</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId> <version>1.3.2</version>
</dependency> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId> <version>1.3.2</version>
</dependency> <dependency> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.10</artifactId> <version>1.3.2</version>
</dependency> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId> <version>1.3.2</version>
</dependency> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.10</artifactId>
<version>1.3.2</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-table_2.10</artifactId>
<version>1.3.2</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version>
</dependency>
二、Flink流处理
import org.apache.flink.api.common.typeinfo.TypeInformation; import
org.apache.flink.streaming.api.TimeCharacteristic; import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import
org.apache.flink.streaming.connectors.kafka.Kafka09TableSource; import
org.apache.flink.streaming.util.serialization.*; import
org.apache.flink.table.api.Table; import
org.apache.flink.table.api.TableEnvironment; import
org.apache.flink.table.api.Types; import
org.apache.flink.table.api.java.StreamTableEnvironment; import
org.apache.flink.table.sources.TableSource; import org.apache.flink.types.Row;
import java.util.Properties; public class TestTableAPI { public static void
main(String[] args) throws Exception { //构建环境 final StreamExecutionEnvironment
env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置启动检查点!!
env.enableCheckpointing(5000); 设置活动时间 (这里使用的处理时间,有三种时间可选)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//创建一个TableEnvironment StreamTableEnvironment TEnv =
TableEnvironment.getTableEnvironment(env); //kafka配置信息 Properties props = new
Properties(); //换成你自己的地址。如: masterLinux:9092 ==>localhost:9092
props.setProperty("bootstrap.servers", "masterLinux:9092");
props.setProperty("group.id", "flink-group"); props.put("enable.auto.commit",
"true"); props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms",
"30000"); props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
//准备接收kafka数据,创建TypeInformation String[] names = new String[] { "machine",
"time","number"}; TypeInformation[] types = new TypeInformation[] {Types.INT(),
Types.STRING(), Types.INT()}; TypeInformation typeInfo=Types.ROW(names,types);
//TableSource 提供对外部数据的访问 // args[0] = "test"; //传入的是kafka中的topic TableSource
jsonSource =new Kafka09TableSource("test1", props, new
JsonRowDeserializationSchema(typeInfo),typeInfo); //注册TableSource
TEnv.registerTableSource("JsonTable", jsonSource); /** * 执行查询逻辑 */ //使用Table
API方式: //Table sqlResult = TEnv.scan("JsonTable").select("*"); //使用sql方式: Table
sqlResult = TEnv.sql("select machine,avg(number) from JsonTable group by
machine"); //输出结果 TEnv.toRetractStream(sqlResult,Row.class).print(); //
FlinkJedisPoolConfig conf = new
FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build(); //
dataStream.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new
RedisExampleMapper())); env.execute("TestTableAPI Test"); } }
三、向Kafka发送消息的测试类
import com.alibaba.fastjson.JSONObject; import
org.apache.kafka.clients.producer.*; import java.text.SimpleDateFormat; import
java.util.Date; import java.util.Properties; /** * KafkaProducerTest 发送Kafka消息
*/ public class KafkaProducerTest{ public static void main(String[] args)
throws Exception{ Properties props = new Properties(); //换成你自己的地址。如:
masterLinux:9092==>localhost:9092 props.put("bootstrap.servers",
"masterLinux:9092"); props.put("acks", "all"); props.put("retries", 0);
props.put("batch.size", 16384); props.put("linger.ms", 1); props.put(
"buffer.memory", 33554432); props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); props.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props); int
totalMessageCount= 10000; SimpleDateFormat myFmt=new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss"); for (int i = 0; i < totalMessageCount; i++) {
JSONObject object= new JSONObject(); object.put("time",myFmt.format(new Date()))
; object.put("machine",(int) (Math.random()*10)); object.put("number",(int) (
Math.random()*1000)); String value =object.toJSONString(); System.out.println(
"发送消息:"+value); producer.send(new ProducerRecord<>("test1", value), new Callback
() { @Override public void onCompletion(RecordMetadata metadata, Exception
exception) { if (exception != null) { System.out.println("Failed to send
message with exception " + exception); } } }); Thread.sleep(500L); }
producer.close(); } }

技术
下载桌面版
GitHub
Gitee
SourceForge
百度网盘(提取码:draw)
云服务器优惠
华为云优惠券
腾讯云优惠券
阿里云优惠券
Vultr优惠券
站点信息
问题反馈
邮箱:[email protected]
吐槽一下
QQ群:766591547
关注微信