当前位置:首页 > 编程笔记 > 正文
已解决

Flink RowData 与 Row 相互转化工具类

来自网友在路上 157857提问 提问时间:2023-09-20 05:11:23阅读次数: 57

最佳答案 问答题库578位专家为你答疑解惑

RowData与Row区别

(0)都代表了一条记录。都可以设置RowKind,和列数量Aritry。
(1)RowData 属于Table API,而Row属于Stream API
(2)RowData 属于Table内部接口,对用户不友好。而Row使用简单。
(3)RowData 要拿到field值必须提供列索引和LogicalType类型。而Row只需要提供列名或列索引即可。

请自己阅读注释内容。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;import java.util.*;
import java.util.stream.Collectors;/*** @author: lisai* @create: 2023-03-15 16:51* @Description:*/
public class RowUtils {public interface TypedMapFunc<IN, OUT> extends MapFunction<IN, OUT>, ResultTypeQueryable<OUT> {DataType getProducedDataType();}public static List<RowData.FieldGetter> getRowDataFieldGetters(DataType rowDataType) {Preconditions.checkArgument(rowDataType.getLogicalType().getTypeRoot() == LogicalTypeRoot.ROW);return getRowDataFieldGetters(rowDataType.getChildren().stream().map(DataType::getLogicalType).collect(Collectors.toList()));}public static List<RowData.FieldGetter> getRowDataFieldGetters(RowType rowType) {return getRowDataFieldGetters(rowType.getFields().stream().map(RowType.RowField::getType).collect(Collectors.toList()));}public static List<RowData.FieldGetter> getRowDataFieldGetters(List<LogicalType> logicalTypes) {List<RowData.FieldGetter> fieldGetterList = new ArrayList<>();for (int i = 0; i < logicalTypes.size(); i++) {final RowData.FieldGetter fieldGetter = RowData.createFieldGetter(logicalTypes.get(i), i);fieldGetterList.add(fieldGetter);}return fieldGetterList;}public static void copyRowData(RowData input, GenericRowData output, List<RowData.FieldGetter> fieldGetters) {for (int i = 0; i < input.getArity() && i < output.getArity(); i++) {if (input instanceof GenericRowData) {output.setField(i, ((GenericRowData) input).getField(i));} else {Preconditions.checkArgument(fieldGetters != null);Object value = fieldGetters.get(i).getFieldOrNull(input);output.setField(i, value);}}}public static TypedMapFunc<RowData, Row> getRowDataToRowMapFunc(DataType rowDataType) {LogicalType logicalType = rowDataType.getLogicalType();Preconditions.checkArgument(logicalType.getTypeRoot() == LogicalTypeRoot.ROW);return new TypedMapFunc<RowData, Row>() {private RowData.FieldGetter[] fieldGetters = getRowDataFieldGetters(rowDataType).toArray(new RowData.FieldGetter[0]);@Overridepublic TypeInformation<Row> getProducedType() {RowType rowType = (RowType) logicalType;List<RowType.RowField> rowFields = rowType.getFields();List<DataType> rowDataTypes = rowDataType.getChildren();TypeInformation<?>[] fieldTypeInfos = rowDataTypes.stream().map(t -> InternalTypeInfo.of(t.getLogicalType())).toArray(TypeInformation[]::new);String[] fieldNames = rowFields.stream().map(RowType.RowField::getName).toArray(String[]::new);return new RowTypeInfo(fieldTypeInfos, fieldNames);}@Overridepublic DataType getProducedDataType() {return rowDataType.bridgedTo(Row.class);}@Overridepublic Row map(RowData rowData) throws Exception {Row row = new Row(rowData.getRowKind(), rowData.getArity());for (int i = 0; i < rowData.getArity(); i++) {RowData.FieldGetter fieldGetter = fieldGetters[i];row.setField(i, fieldGetter.getFieldOrNull(rowData));}return row;}};}public static TypedMapFunc<Row, RowData> getRowToRowRowMapFunc(DataType rowDataType) {Preconditions.checkArgument(rowDataType.getLogicalType().getTypeRoot() == LogicalTypeRoot.ROW);return new TypedMapFunc<Row, RowData>() {/*** @Description: 注意input Row中所有的数据类型必须是Flink Table API规定的内部类型。具体参考 {@DataTypeUtils.toInternalDataType()}* @param* @return TypeInformation<org.apache.flink.table.data.RowData>*/@Overridepublic TypeInformation<RowData> getProducedType() {return InternalTypeInfo.of((RowType)rowDataType.getLogicalType());}@Overridepublic DataType getProducedDataType() {return rowDataType.bridgedTo(RowData.class);}@Overridepublic RowData map(Row row) throws Exception {GenericRowData rowData = new GenericRowData(row.getKind(), row.getArity());for (int i = 0; i < rowData.getArity(); i++) {rowData.setField(i, row.getField(i));}return rowData;}};}}
查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"Flink RowData 与 Row 相互转化工具类":http://eshow365.cn/6-9789-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!