本文共 27441 字,大约阅读时间需要 91 分钟。
表可以是虚拟(VIEWS
)或常规(TABLES
)。VIEWS
可以从现有Table
对象创建,通常是Table API或SQL查询的结果。TABLES
描述外部数据,例如文件,数据库表或消息队列。
表三部分标识符:目录、数据库、表名。其中,目录、数据库是可选的。
tEnv.useCatalog("custom_catalog");tEnv.useDatabase("custom_database");
表可以是临时的,并与单个Flink会话的生命周期相关,也可以是永久的,并且在多个Flink会话和群集中可见。
永久表需要一个(例如Hive Metastore)来维护有关表的元数据。创建永久表后,连接到目录的任何Flink会话都可以看到该表,并且该表将继续存在,直到明确删除该表为止。
另一方面,临时表始终存储在内存中,并且仅在它们在其中创建的Flink会话期间存在。这些表对其他会话不可见。它们没有绑定到任何目录或数据库,但可以在一个目录或数据库的名称空间中创建。如果删除了它们的相应数据库,则不会删除临时表。
(1)虚拟表
tableEnv.createTemporaryView("projectedTable", projTable);
(2)通过连接器(数据源)
tableEnvironment .connect(...) .withFormat(...) .withSchema(...) .inAppendMode() .createTemporaryTable("tableName")
maven依赖
org.apache.flink flink-table-api-java-bridge_2.11 1.11.0 org.apache.flink flink-table-planner_2.11 1.11.0 org.apache.flink flink-table-planner-blink_2.11 1.11.0 org.apache.flink flink-streaming-scala_2.11 1.11.0 org.apache.flink flink-table-common 1.11.0 org.apache.flink flink-connector-kafka_2.11 1.11.0 org.apache.flink flink-json 1.11.1 org.apache.flink flink-core 1.11.1 org.apache.flink flink-clients_2.12 1.11.1 org.apache.flink flink-java 1.11.1 org.apache.flink flink-streaming-java_2.12 1.11.1 org.apache.flink flink-csv 1.11.1 org.apache.flink flink-connector-kafka-0.11_2.11 RELEASE compile
package com.flink.sql.environment.readFile;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table counts = orders.select($("name"),$("age")); //5、转化成DataStream打印在控制台 DataStreamrowDataStream = tEnv.toAppendStream(counts, Row.class); rowDataStream.print(); env.execute("readFileCreateTableStream"); }}
package com.flink.sql.environment.readFile;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableBatch { public static void main(String[] args) throws Exception { //1、批式环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table counts = orders.select($("name"),$("age")); //5、转化成DataStream打印在控制台 DataSetresult = tEnv.toDataSet(counts, Row.class); result.print(); }}
package com.flink.sql.environment.groupBy;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table select = orders.groupBy($("name")).select($("name"), $("age").count().as("count")); //5、转化成DataStream打印在控制台 DataStream> tuple2DataStream = tEnv.toRetractStream(select, Row.class); tuple2DataStream.print(); env.execute("readFileCreateTableStream"); }}
package com.flink.sql.environment.groupBy;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableBatch { public static void main(String[] args) throws Exception { //1、批式环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); //2、连接外部文件系统,格式化方法,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); Table select = orders.groupBy($("name")).select($("name"), $("age").count().as("count")); //5、转化成DataStream打印在控制台 DataSetresult = tEnv.toDataSet(select, Row.class); result.print(); }}
package com.flink.sql.environment.sqlQuery;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadFileCreateTableStream { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、sql查询 Table table = tEnv.sqlQuery("select name from Orders"); //4、转化成DataStream打印在控制台 DataStreamrowDataStream = tEnv.toAppendStream(table, Row.class); rowDataStream.print(); env.execute("readFileCreateTableStream"); }}
package com.flink.sql.environment.kafka;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.*;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class ReadKafkaCreateTableStream { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new Kafka().version("universal") .topic("aaaa") .startFromLatest() .property("bootstrap.servers", "centos:9092")) .withFormat(new Csv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table counts = orders.select($("name"),$("age")); //5、转化成DataStream打印在控制台 DataStreamrowDataStream = tEnv.toAppendStream(counts, Row.class); rowDataStream.print(); env.execute("readFileCreateTableStream"); }}
可以通过将DataStream
或DataSet
转换为Table`,反之亦然来实现此交互。
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTable { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成tuple2 SingleOutputStreamOperator > streamOperator = streamSource.map(new MapFunction >() { @Override public Tuple2 map(String s) throws Exception { String[] split = s.split(","); return new Tuple2<>(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields Table table = tEnv.fromDataStream(streamOperator,$("name"),$("age")); //5、table 查询 Table name = table.select("name"); //6、table转换成流打印在控制台 DataStream rowDataStream = tEnv.toAppendStream(name, Row.class); rowDataStream.print(); env.execute("StreamToTable"); }}
POJO类型的规则,如果满足以下条件,则Flink会将数据类型识别为POJO类型(并允许“按名称”字段引用):
(1)该类是公共的和独立的(没有非静态内部类)
(2)该类具有公共的无参数构造函数 (3)类(和所有超类)中的所有非静态,非瞬态字段都是公共的(并且是非最终的),或者具有公共的getter和setter方法,该方法遵循针对getter和setter的Java bean命名约定。请注意:如果无法将用户定义的数据类型识别为POJO类型,则必须将其定义为GenericType并使用Kryo进行序列化。
package com.flink.sql.environment.streamToTable;public class Entity { private String name; private String country; public Entity() { } public Entity(String name, String country) { this.name = name; this.country = country; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getCountry() { return country; } public void setCountry(String country) { this.country = country; }}
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTableEntity { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成POJO SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields Table table = tEnv.fromDataStream(streamOperator,$("name"),$("country")); //5、table 查询 Table name = table.select("name"); //6、table转换成流打印在控制台 DataStream rowDataStream = tEnv.toAppendStream(name, Row.class); rowDataStream.print(); env.execute("StreamToTablefile"); }}
利用as为字段起别名
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTableEntity { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成tuple2 SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields,利用as起别名 Table table = tEnv.fromDataStream(streamOperator,$("name").as("myDefined_name"),$("country")); //5、table 查询 Table name = table.select($("myDefined_name"),$("country")); //6、table转换成流打印在控制台 DataStream rowDataStream = tEnv.toAppendStream(name, Row.class); rowDataStream.print(); env.execute("StreamToTablefile"); }}
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.typeutils.TupleTypeInfo;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.expressions.In;public class TableToStream { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); TupleTypeInfo> tupleType = new TupleTypeInfo<>( Types.STRING, Types.INT); DataStream > tuple = tEnv.toAppendStream(orders, tupleType); tuple.print(); env.execute("StreamToTable"); }}
package com.flink.sql.environment.streamToTable;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.typeutils.TupleTypeInfo;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.OldCsv;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.expressions.In;public class TableToStream { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 tEnv.connect(new FileSystem().path("D:\\test\\a.txt")) .withFormat(new OldCsv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .inAppendMode() .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); DataStream> dataStream = tEnv.toRetractStream(orders, POJO.class); dataStream.print("pojo"); env.execute("StreamToTablefile"); }}
package com.flink.sql.environment.outPut;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.SinkFunction;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Csv;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTablefile { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成tuple2 SingleOutputStreamOperator > streamOperator = streamSource.map(new MapFunction >() { @Override public Tuple2 map(String s) throws Exception { String[] split = s.split(","); return new Tuple2<>(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields Table table = tEnv.fromDataStream(streamOperator,$("name"),$("age")); //5、table 查询 Table name = table.select($("name"),$("age")); DataStream rowDataStream = tEnv.toAppendStream(name, Row.class); rowDataStream.writeAsText("D:\\test\\b.txt"); env.execute("StreamToTablefile"); }}
package com.flink.sql.environment.outPut;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Csv;import org.apache.flink.table.descriptors.FileSystem;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.$;public class StreamToTablefile { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、文本数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成tuple2 SingleOutputStreamOperator > streamOperator = streamSource.map(new MapFunction >() { @Override public Tuple2 map(String s) throws Exception { String[] split = s.split(","); return new Tuple2<>(split[0],split[1]); } }); //4、将DataStream转换为table并带有fields Table table = tEnv.fromDataStream(streamOperator,$("name"),$("age")); //5、table 查询 Table name = table.select($("name"),$("age")); //6、指定外部系统 tEnv.connect(new FileSystem().path("D:\\test\\b.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("name", DataTypes.STRING()).field("age",DataTypes.STRING())) .createTemporaryTable("outPutTable"); //7、执行并输出外部系统 name.executeInsert("outPutTable"); env.execute("StreamToTablefile"); }}
程序会报一下错误,但是不影响预期功能:
Exception in thread “main” java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
package com.flink.sql.environment.outPut;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Csv;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import static org.apache.flink.table.api.Expressions.$;public class SourceKafakSinkKafak { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、连接外部文件系统,格式,注册字段,临时表 tEnv.connect(new Kafka().version("universal") .topic("aaaa") .startFromLatest() .property("bootstrap.servers", "centos:9092")) .withFormat(new Csv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.INT())) .createTemporaryTable("Orders"); //3、读取表 Table orders = tEnv.from("Orders"); //4、读取表字段 Table select = orders.select($("name").substring(1,3), $("age")); tEnv.connect(new Kafka().version("universal") .topic("bbbb") .property("bootstrap.servers", "centos:9092")) .withFormat(new Csv()) .withSchema(new Schema().field("name", DataTypes.STRING()).field("age",DataTypes.BIGINT())) .createTemporaryTable("outPut_table"); select.executeInsert("outPut_table"); env.execute("readFileCreateTableStream"); }}
转载地址:http://pimzi.baihongyu.com/