已解决
0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析
来自网友在路上 173873提问 提问时间:2023-10-27 03:06:05阅读次数: 73
最佳答案 问答题库738位专家为你答疑解惑
大纲
- udtaf要求func_type必须是general
- delegate function要求非func_type必须是pandas
在研究Flink的“用户自定义方法”(UserDefinedFunction)时,我们看到存在如下几种类型的装饰器:
- UDF:User Defined Scalar Function
- UDTF:User Defined Table Function
- UDAF:User Defined Aggregate Function
- UDTAF:User Defined Table Aggregate Function
在很多案例中,我们看到udf、udtf和udaf几个装饰器修饰function
@udf(result_type=DataTypes.BIGINT())
def add(i, j):return i + j@udtf(result_types=[DataTypes.BIGINT(), DataTypes.BIGINT()])
def range_emit(s, e):for i in range(e):yield s, i@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def mean_udaf(v):return v.mean()
但是没有见到udtaf修饰function的案例,比如
# 错误的
@udtaf(result_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING()) , DataTypes.FIELD("count", DataTypes.BIGINT())]), accumulator_type=DataTypes.ROW([DataTypes.FIELD("word", DataTypes.STRING())]), func_type="general")
def lower(line):yield Row('a', 1)
这是因为这儿存在一个悖论
udtaf要求func_type必须是general
def udtaf(f: Union[Callable, TableAggregateFunction, Type] = None,input_types: Union[List[DataType], DataType, str, List[str]] = None,result_type: Union[DataType, str] = None,accumulator_type: Union[DataType, str] = None,deterministic: bool = None, name: str = None,func_type: str = 'general') -> Union[UserDefinedAggregateFunctionWrapper, Callable]:"""Helper method for creating a user-defined table aggregate function.:param f: user-defined table aggregate function.:param input_types: optional, the input data types.:param result_type: the result data type.:param accumulator_type: optional, the accumulator data type.:param deterministic: the determinism of the function's results. True if and only if a call tothis function is guaranteed to always return the same result given thesame parameters. (default True):param name: the function name.:param func_type: the type of the python function, available value: general(default: general):return: UserDefinedAggregateFunctionWrapper or function... versionadded:: 1.13.0"""if func_type != 'general':raise ValueError("The func_type must be 'general', got %s."% func_type)if f is None:return functools.partial(_create_udtaf, input_types=input_types, result_type=result_type,accumulator_type=accumulator_type, func_type=func_type,deterministic=deterministic, name=name)else:return _create_udtaf(f, input_types, result_type, accumulator_type, func_type,deterministic, name)
如果func_type不是’general’,则会抛出错误,所以func_type="pandas"是不可以的。
udtaf修饰方法后的返回类型是UserDefinedAggregateFunctionWrapper。
def _create_udtaf(f, input_types, result_type, accumulator_type, func_type, deterministic, name):return UserDefinedAggregateFunctionWrapper(f, input_types, result_type, accumulator_type, func_type, deterministic, name, True)
delegate function要求非func_type必须是pandas
Table API下只有这些方法接受udtaf修饰function返回的UserDefinedAggregateFunctionWrapper。
- def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘AggregatedTable’
- def flat_aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) -> ‘FlatAggregateTable’
这些方法的在底层会调用被修饰的UserDefinedFunctionWrapper。
def aggregate(self, func: Union[Expression, UserDefinedAggregateFunctionWrapper]) \-> 'AggregatedTable':"""Performs a global aggregate operation with an aggregate function. You have to close theaggregate with a select statement... versionadded:: 1.13.0"""if isinstance(func, Expression):return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)else:func._set_takes_row_as_input()if hasattr(func, "_alias_names"):alias_names = getattr(func, "_alias_names")func = func(with_columns(col("*"))).alias(*alias_names)else:func = func(with_columns(col("*")))return AggregatedTable(self._j_table.aggregate(func._j_expr), self._t_env)
进而会调用到_java_user_defined_function。由于udtaf修饰的方法不是UserDefinedFunction对象,而是一个function,所以它会通过_create_delegate_function创建新的func 。
class UserDefinedFunctionWrapper(object):
……def _java_user_defined_function(self):……if not isinstance(self._func, UserDefinedFunction):func = self._create_delegate_function()……
而_create_delegate_function则要求udtaf中的function的func_type必须是pandas
def _create_delegate_function(self) -> UserDefinedFunction:assert self._func_type == 'pandas'return DelegatingPandasAggregateFunction(self._func)
这就和之前udtaf中要求func_type必须是general相背。
所以我们没看到udtaf修饰function的案例。
查看全文
99%的人还看了
相似问题
- Kotlin学习——kt里的集合,Map的各种方法之String篇
- Office文件在线预览大全-Word文档在线预览的实现方法-OFD文档在线预览-WPS文件在线预览
- composer切换全局镜像源的方法
- Python通过selenium调用IE11浏览器报错解决方法
- 测试用例的设计方法(全):正交实验设计方法|功能图分析方法|场景设计方发
- Java8新特性 ----- Lambda表达式和方法引用/构造器引用详解
- C#中抽象类、抽象方法和接口暨内联临时变量的精彩表达
- ChatGLM2 大模型微调过程中遇到的一些坑及解决方法(更新中)
- 类方法,静态方法和实例方法的区别及应用场景
- 【链表的说明、方法---顺序表与链表的区别】
猜你感兴趣
版权申明
本文"0基础学习PyFlink——不可以用UDTAF装饰器装饰function的原因分析":http://eshow365.cn/6-25639-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!
- 上一篇: PULP Ubuntu18.04
- 下一篇: Go语言入门心法(十四): Go操作Redis实战