博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink table & sql(一)table基础概念、环境搭建、source、sink
阅读量:3959 次
发布时间:2019-05-24

本文共 27441 字,大约阅读时间需要 91 分钟。

一、concepts

1、表

表可以是虚拟(VIEWS)或常规(TABLES)。VIEWS可以从现有Table对象创建,通常是Table API或SQL查询的结果。TABLES描述外部数据,例如文件,数据库表或消息队列。

表三部分标识符:目录、数据库、表名。其中,目录、数据库是可选的。

tEnv.useCatalog("custom_catalog");tEnv.useDatabase("custom_database");
1.1 临时表与永久表

表可以是临时的,并与单个Flink会话的生命周期相关,也可以是永久的,并且在多个Flink会话和群集中可见。

永久表需要一个(例如Hive Metastore)来维护有关表的元数据。创建永久表后,连接到目录的任何Flink会话都可以看到该表,并且该表将继续存在,直到明确删除该表为止。

另一方面,临时表始终存储在内存中,并且仅在它们在其中创建的Flink会话期间存在。这些表对其他会话不可见。它们没有绑定到任何目录或数据库,但可以在一个目录或数据库的名称空间中创建。如果删除了它们的相应数据库,则不会删除临时表。

1.2 表的创建

(1)虚拟表

tableEnv.createTemporaryView("projectedTable", projTable);

(2)通过连接器(数据源)

tableEnvironment  .connect(...)  .withFormat(...)  .withSchema(...)  .inAppendMode()  .createTemporaryTable("tableName")

二、table 工程搭建

maven依赖

org.apache.flink
flink-table-api-java-bridge_2.11
1.11.0
org.apache.flink
flink-table-planner_2.11
1.11.0
org.apache.flink
flink-table-planner-blink_2.11
1.11.0
org.apache.flink
flink-streaming-scala_2.11
1.11.0
org.apache.flink
flink-table-common
1.11.0
org.apache.flink
flink-connector-kafka_2.11
1.11.0
org.apache.flink
flink-json
1.11.1
org.apache.flink
flink-core
1.11.1
org.apache.flink
flink-clients_2.12
1.11.1
org.apache.flink
flink-java
1.11.1
org.apache.flink
flink-streaming-java_2.12
1.11.1
org.apache.flink
flink-csv
1.11.1
org.apache.flink
flink-connector-kafka-0.11_2.11
RELEASE
compile

1、读取文件创建表,打印在控制台

1.1 流式
package com.flink.sql.environment.readFile;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table counts = orders.select($("name"),$("age")); //5、转化成DataStream打印在控制台 DataStream
rowDataStream = tEnv.toAppendStream(counts, Row.class); rowDataStream.print(); env.execute("readFileCreateTableStream"); }}
1.2 批式
package com.flink.sql.environment.readFile;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableBatch {
public static void main(String[] args) throws Exception {
//1、批式环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table counts = orders.select($("name"),$("age")); //5、转化成DataStream打印在控制台 DataSet
result = tEnv.toDataSet(counts, Row.class); result.print(); }}

2、group by

2.1 stream
package com.flink.sql.environment.groupBy;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table select = orders.groupBy($("name")).select($("name"), $("age").count().as("count")); //5、转化成DataStream打印在控制台 DataStream
> tuple2DataStream = tEnv.toRetractStream(select, Row.class); tuple2DataStream.print(); env.execute("readFileCreateTableStream"); }}
2.2 batch
package com.flink.sql.environment.groupBy;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableBatch {
public static void main(String[] args) throws Exception {
//1、批式环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); //2、连接外部文件系统,格式化方法,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); Table select = orders.groupBy($("name")).select($("name"), $("age").count().as("count")); //5、转化成DataStream打印在控制台 DataSet
result = tEnv.toDataSet(select, Row.class); result.print(); }}

3、flink query sql

package com.flink.sql.environment.sqlQuery;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、sql查询 Table table = tEnv.sqlQuery("select name from Orders"); //4、转化成DataStream打印在控制台 DataStream
rowDataStream = tEnv.toAppendStream(table, Row.class); rowDataStream.print(); env.execute("readFileCreateTableStream"); }}

4、flink table消费kafka

package com.flink.sql.environment.kafka;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.*;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadKafkaCreateTableStream {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new Kafka().version("universal") .topic("aaaa") .startFromLatest() .property("bootstrap.servers", "centos:9092")) .withFormat(new Csv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table counts = orders.select($("name"),$("age")); //5、转化成DataStream打印在控制台 DataStream
rowDataStream = tEnv.toAppendStream(counts, Row.class); rowDataStream.print(); env.execute("readFileCreateTableStream"); }}

5、与DataStream和DataSet API集成

可以通过将DataStreamDataSet转换为Table`,反之亦然来实现此交互。

5.1将DataStream或DataSet转换为表
(1)基于tuple
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTable {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成tuple2 SingleOutputStreamOperator
> streamOperator = streamSource.map(new MapFunction
>() {
@Override public Tuple2
map(String s) throws Exception {
String[] split = s.split(","); return new Tuple2<>(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields Table table = tEnv.fromDataStream(streamOperator,$("name"),$("age")); //5、table 查询 Table name = table.select("name"); //6、table转换成流打印在控制台 DataStream
rowDataStream = tEnv.toAppendStream(name, Row.class); rowDataStream.print(); env.execute("StreamToTable"); }}
(2)基于pojo类

POJO类型的规则,如果满足以下条件,则Flink会将数据类型识别为POJO类型(并允许“按名称”字段引用):

(1)该类是公共的和独立的(没有非静态内部类)

(2)该类具有公共的无参数构造函数
(3)类(和所有超类)中的所有非静态,非瞬态字段都是公共的(并且是非最终的),或者具有公共的getter和setter方法,该方法遵循针对getter和setter的Java bean命名约定。

请注意:如果无法将用户定义的数据类型识别为POJO类型,则必须将其定义为GenericType并使用Kryo进行序列化。

package com.flink.sql.environment.streamToTable;public class Entity {
private String name; private String country; public Entity() {
} public Entity(String name, String country) {
this.name = name; this.country = country; } public String getName() {
return name; } public void setName(String name) {
this.name = name; } public String getCountry() {
return country; } public void setCountry(String country) {
this.country = country; }}
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTableEntity {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成POJO SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields Table table = tEnv.fromDataStream(streamOperator,$("name"),$("country")); //5、table 查询 Table name = table.select("name"); //6、table转换成流打印在控制台 DataStream
rowDataStream = tEnv.toAppendStream(name, Row.class); rowDataStream.print(); env.execute("StreamToTablefile"); }}

利用as为字段起别名

package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTableEntity {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成tuple2 SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields,利用as起别名 Table table = tEnv.fromDataStream(streamOperator,$("name").as("myDefined_name"),$("country")); //5、table 查询 Table name = table.select($("myDefined_name"),$("country")); //6、table转换成流打印在控制台 DataStream
rowDataStream = tEnv.toAppendStream(name, Row.class); rowDataStream.print(); env.execute("StreamToTablefile"); }}
5.2 将表转换为DataStream或DataSet
(1)转tuple类型
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.typeutils.TupleTypeInfo;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.expressions.In;public class TableToStream {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); TupleTypeInfo
> tupleType = new TupleTypeInfo<>( Types.STRING, Types.INT); DataStream
> tuple = tEnv.toAppendStream(orders, tupleType); tuple.print(); env.execute("StreamToTable"); }}
(2)转pojo类型
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.typeutils.TupleTypeInfo;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.expressions.In;public class TableToStream {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); DataStream
> dataStream = tEnv.toRetractStream(orders, POJO.class); dataStream.print("pojo"); env.execute("StreamToTablefile"); }}

6、sink到外部文件系统

6.1 流式输出(外部文本系统)
package com.flink.sql.environment.outPut;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.SinkFunction;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Csv;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTablefile {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成tuple2 SingleOutputStreamOperator
> streamOperator = streamSource.map(new MapFunction
>() {
@Override public Tuple2
map(String s) throws Exception {
String[] split = s.split(","); return new Tuple2<>(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields Table table = tEnv.fromDataStream(streamOperator,$("name"),$("age")); //5、table 查询 Table name = table.select($("name"),$("age")); DataStream
rowDataStream = tEnv.toAppendStream(name, Row.class); rowDataStream.writeAsText("D:\\test\\b.txt"); env.execute("StreamToTablefile"); }}
6.2 table输出(外部文本系统)
package com.flink.sql.environment.outPut;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Csv;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTablefile {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成tuple2 SingleOutputStreamOperator
> streamOperator = streamSource.map(new MapFunction
>() {
@Override public Tuple2
map(String s) throws Exception {
String[] split = s.split(","); return new Tuple2<>(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields Table table = tEnv.fromDataStream(streamOperator,$("name"),$("age")); //5、table 查询 Table name = table.select($("name"),$("age")); //6、指定外部系统 tEnv.connect(new FileSystem().path("D:\\test\\b.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("name", DataTypes.STRING()).field("age",DataTypes.STRING())) .createTemporaryTable("outPutTable"); //7、执行并输出外部系统 name.executeInsert("outPutTable"); env.execute("StreamToTablefile"); }}

7、sink到kafka

程序会报一下错误,但是不影响预期功能:

Exception in thread “main” java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.

package com.flink.sql.environment.outPut;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Csv;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;public class SourceKafakSinkKafak {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new Kafka().version("universal") .topic("aaaa") .startFromLatest() .property("bootstrap.servers", "centos:9092")) .withFormat(new Csv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table select = orders.select($("name").substring(1,3), $("age")); tEnv.connect(new Kafka().version("universal") .topic("bbbb") .property("bootstrap.servers", "centos:9092")) .withFormat(new Csv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.BIGINT())) .createTemporaryTable("outPut_table"); select.executeInsert("outPut_table"); env.execute("readFileCreateTableStream"); }}

转载地址:http://pimzi.baihongyu.com/

你可能感兴趣的文章