当前位置:首页 > 编程笔记 > 正文
已解决

Flink SQL自定义标量函数(Scalar Function)

来自网友在路上 179879提问 提问时间:2023-11-11 10:09:43阅读次数: 79

最佳答案 问答题库798位专家为你答疑解惑

使用场景: 标量函数即 UDF,⽤于进⼀条数据出⼀条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.ScalarFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参、出参都是直接体现在 eval 函数的签名中

开发案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;/*** 输入数据: * nc -lk 88888* a,1** 输出结果:* res1=>:3> +I[97]* res2=>:3> +I[97]* res3=>:3> +I[97]*/
public class ScalarFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<Tuple2<String, String>> tpStream = source.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String input) throws Exception {return new Tuple2<>(input.split(",")[0], input.split(",")[1]);}});Table table = tEnv.fromDataStream(tpStream, "id,name");tEnv.createTemporaryView("SourceTable",table);// 在 Table API ⾥不经注册直接调⽤函数Table res1 = tEnv.from("SourceTable").select(call(HashFunction.class, $("id")));// 注册函数tEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);// 在 Table API ⾥调⽤注册好的函数Table res2 = tEnv.from("SourceTable").select(call("HashFunction", $("id")));// 在 SQL ⾥调⽤注册好的函数Table res3 = tEnv.sqlQuery("SELECT HashFunction(id) FROM SourceTable");tEnv.toDataStream(res1).print("res1=>");tEnv.toDataStream(res2).print("res2=>");tEnv.toDataStream(res3).print("res3=>");env.execute();}public static class HashFunction extends ScalarFunction {// 接受任意类型输⼊,返回 INT 型输出public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {return o.hashCode();}}
}

测试结果:

在这里插入图片描述

查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"Flink SQL自定义标量函数(Scalar Function)":http://eshow365.cn/6-37505-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!