当前位置:首页 > 编程笔记 > 正文
已解决

FlinkCDC for mysql to Clickhouse

来自网友在路上 150850提问 提问时间:2023-10-23 01:48:14阅读次数: 50

最佳答案 问答题库508位专家为你答疑解惑

完整依赖

<dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.0</version></dependency><!--       <dependency>-->
<!--           <groupId>org.apache.flink</groupId>-->
<!--           <artifactId>flink-jdbc_2.12</artifactId>-->
<!--           <version>1.10.3</version>-->
<!--       </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.13.0</version><type>test-jar</type></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.4.0</version></dependency><dependency><groupId>com.aliyun</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.12.0</version></dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.6</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.6</version></dependency></dependencies>

Flink CDC

package name.lijiaqi.cdc;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.google.gson.Gson;
import com.google.gson.internal.LinkedTreeMap;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.source.SourceRecord;import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.HashMap;public class MySqlBinlogSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("test").username("flinkcdc").password("dafei1288").deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 添加 sourceenv.addSource(sourceFunction)// 添加 sink.addSink(new ClickhouseSink());env.execute("mysql2clickhouse");}// 将cdc数据反序列化public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {Gson jsstr = new Gson();HashMap<String, Object> hs = new HashMap<>();String topic = sourceRecord.topic();String[] split = topic.split("[.]");String database = split[1];String table = split[2];hs.put("database",database);hs.put("table",table);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//获取数据本身Struct struct = (Struct)sourceRecord.value();Struct after = struct.getStruct("after");if (after != null) {Schema schema = after.schema();HashMap<String, Object> afhs = new HashMap<>();for (Field field : schema.fields()) {afhs.put(field.name(), after.get(field.name()));}hs.put("data",afhs);}String type = operation.toString().toLowerCase();if ("create".equals(type)) {type = "insert";}hs.put("type",type);collector.collect(jsstr.toJson(hs));}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}public static class ClickhouseSink extends RichSinkFunction<String>{Connection connection;PreparedStatement pstmt;private Connection getConnection() {Connection conn = null;try {Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String url = "jdbc:clickhouse://localhost:8123/default";conn = DriverManager.getConnection(url,"default","dafei1288");} catch (Exception e) {e.printStackTrace();}return conn;}@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = getConnection();String sql = "insert into sink_ch_test(id,name,description) values (?,?,?)";pstmt = connection.prepareStatement(sql);}// 每条记录插入时调用一次public void invoke(String value, Context context) throws Exception {//{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}Gson t = new Gson();HashMap<String,Object> hs = t.fromJson(value,HashMap.class);String database = (String)hs.get("database");String table = (String)hs.get("table");String type = (String)hs.get("type");if("test".equals(database) && "test_cdc".equals(table)){if("insert".equals(type)){System.out.println("insert => "+value);LinkedTreeMap<String,Object> data = (LinkedTreeMap<String,Object>)hs.get("data");String name = (String)data.get("name");String description = (String)data.get("description");Double id = (Double)data.get("id");// 未前面的占位符赋值pstmt.setInt(1, id.intValue());pstmt.setString(2, name);pstmt.setString(3, description);pstmt.executeUpdate();}}}@Overridepublic void close() throws Exception {super.close();if(pstmt != null) {pstmt.close();}if(connection != null) {connection.close();}}}
}

Flink SQL CDC

package name.lijiaqi.cdc;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class MysqlToMysqlMain {public static void main(String[] args) throws Exception {EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 数据源表String sourceDDL ="CREATE TABLE mysql_binlog (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING\n" +") WITH (\n" +" 'connector' = 'mysql-cdc',\n" +" 'hostname' = 'localhost',\n" +" 'port' = '3306',\n" +" 'username' = 'flinkcdc',\n" +" 'password' = 'dafei1288',\n" +" 'database-name' = 'test',\n" +" 'table-name' = 'test_cdc'\n" +")";String url = "jdbc:mysql://127.0.0.1:3306/test";String userName = "root";String password = "dafei1288";String mysqlSinkTable = "test_cdc_sink";// 输出目标表String sinkDDL ="CREATE TABLE test_cdc_sink (\n" +" id INT NOT NULL,\n" +" name STRING,\n" +" description STRING,\n" +" PRIMARY KEY (id) NOT ENFORCED \n " +") WITH (\n" +" 'connector' = 'jdbc',\n" +" 'driver' = 'com.mysql.jdbc.Driver',\n" +" 'url' = '" + url + "',\n" +" 'username' = '" + userName + "',\n" +" 'password' = '" + password + "',\n" +" 'table-name' = '" + mysqlSinkTable + "'\n" +")";// 简单的聚合处理String transformSQL ="insert into test_cdc_sink select * from mysql_binlog";tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);TableResult result = tableEnv.executeSql(transformSQL);// 等待flink-cdc完成快照result.print();env.execute("sync-flink-cdc");}}
查看全文

99%的人还看了

猜你感兴趣

版权申明

本文"FlinkCDC for mysql to Clickhouse":http://eshow365.cn/6-22062-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!