已解决
Flink-CEP基于web日志检测暴力破解和异地登陆行为代码示例
来自网友在路上 170870提问 提问时间:2023-10-24 15:19:57阅读次数: 70
最佳答案 问答题库708位专家为你答疑解惑
Flink-CEP基于web日志检测暴力破解和异地登陆行为Demo
代码示例
(1)主程序代码
import Beans.EventPOJO;
import Beans.WaringMsgPOJO;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import scala.util.parsing.json.JSONObject;import java.net.URL;
import java.util.List;
import java.util.Map;public class cepTestPro {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 2. 从文件中读取数据,转换为POJO数据对象,配置水位线URL resource = cepTestPro.class.getResource("/loginlog.csv");DataStream<EventPOJO> eventPOJOStream = env.readTextFile(resource.getPath()).map(line -> {String[] fields = line.split(",");return new EventPOJO(fields[0].trim(), fields[1].trim(), fields[2].trim(), fields[3].trim(), Long.valueOf(fields[4].trim()));}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<EventPOJO>(Time.seconds(2)) {@Overridepublic long extractTimestamp(EventPOJO eventPOJO) {return eventPOJO.getTimestamp() * 1000L;}});// 3. 按照用户名进行分组KeyedStream<EventPOJO, String> eventKeyedStream = eventPOJOStream.keyBy(event -> event.getName());// eventPOJOStream.print();// 4.1 定义一个cep匹配规则,5秒之内连续3次登陆失败Pattern<EventPOJO, EventPOJO> loginFailPattern = Pattern.<EventPOJO>begin("loginFailEvents").where(new SimpleCondition<EventPOJO>() {@Overridepublic boolean filter(EventPOJO value) throws Exception {return "fail".equals(value.getLoginStatus());}}).times(3).consecutive().within(Time.seconds(5));// 4.2 定义一个cep匹配规则:5秒之内在不同ip地址登陆成功Pattern<EventPOJO, EventPOJO> LoginMorePlacePattern = Pattern.<EventPOJO>begin("firstLogin").where(new SimpleCondition<EventPOJO>() {@Overridepublic boolean filter(EventPOJO eventPOJO) throws Exception {return "success".equals(eventPOJO.getLoginStatus());}}).next("secondLogin").where(new IterativeCondition<EventPOJO>() {@Overridepublic boolean filter(EventPOJO eventPOJO, Context<EventPOJO> context) throws Exception {EventPOJO firstLogin = context.getEventsForPattern("firstLogin").iterator().next();return !eventPOJO.getIp().equals(firstLogin.getIp());}}).within(Time.seconds(5));// 4.3 将匹配模式应用到数据流上,得到patternStreamPatternStream<EventPOJO> loginFailPatternStream = CEP.pattern(eventKeyedStream, loginFailPattern);PatternStream<EventPOJO> LoginMorePlacePatternStream = CEP.pattern(eventKeyedStream, LoginMorePlacePattern);// 4.4 检测出符合匹配规则的复杂事件,进行转换处理,得到告警信息SingleOutputStreamOperator<WaringMsgPOJO> loginMorePlaceWarning = LoginMorePlacePatternStream.select(new LoginMorePlaceWarning());SingleOutputStreamOperator<WaringMsgPOJO> loginFailWarning = loginFailPatternStream.select(new LoginFailWarning());// 5. 打印告警输出loginMorePlaceWarning.print();loginFailWarning.print();// 执行flink任务env.execute();}// 对多地登陆事件的告警输出public static class LoginMorePlaceWarning implements PatternSelectFunction<EventPOJO, WaringMsgPOJO> {@Overridepublic WaringMsgPOJO select(Map<String, List<EventPOJO>> pattern) throws Exception {EventPOJO firstLoginEvent = pattern.get("firstLogin").iterator().next();;EventPOJO secondLoginEvent = pattern.get("secondLogin").get(0);return new WaringMsgPOJO(firstLoginEvent.getName(), firstLoginEvent.getIp()+", "+secondLoginEvent.getIp(), "More place login", firstLoginEvent.getTimestamp(), secondLoginEvent.getTimestamp());}}// 对连续登陆失败事件的告警输出public static class LoginFailWarning implements PatternSelectFunction<EventPOJO, WaringMsgPOJO> {@Overridepublic WaringMsgPOJO select(Map<String, List<EventPOJO>> pattern) throws Exception {EventPOJO firstFailEvent = pattern.get("loginFailEvents").get(0);EventPOJO lastFailEvent = pattern.get("loginFailEvents").get(pattern.get("loginFailEvents").size() - 1);return new WaringMsgPOJO(firstFailEvent.getName(), firstFailEvent.getIp(), "Login fail " + pattern.get("loginFailEvents").size() + " times", firstFailEvent.getTimestamp(), lastFailEvent.getTimestamp());}}}
(2)用到的Bean对象
package Beans;public class EventPOJO {public String name;public String url;public String ip;public String loginStatus;public Long timestamp;public EventPOJO() {}public EventPOJO(String name, String url, String ip, String loginStatus, Long timestamp) {this.name = name;this.url = url;this.ip = ip;this.loginStatus = loginStatus;this.timestamp = timestamp;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getUrl() {return url;}public void setUrl(String url) {this.url = url;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getLoginStatus() {return loginStatus;}public void setLoginStatus(String loginStatus) {this.loginStatus = loginStatus;}public Long getTimestamp() {return timestamp;}public void setTimestamp(Long timestamp) {this.timestamp = timestamp;}@Overridepublic String toString() {return "EventPOJO{" +"name='" + name + '\'' +", url='" + url + '\'' +", ip='" + ip + '\'' +", loginStatus='" + loginStatus + '\'' +", timestamp=" + timestamp +'}';}
}
package Beans;public class WaringMsgPOJO {public String username;public String ip;public String warningMsg;public Long firstTime;public Long lastTime;public WaringMsgPOJO() {}public WaringMsgPOJO(String username, String ip, String warningMsg, Long firstTime, Long lastTime) {this.username = username;this.ip = ip;this.warningMsg = warningMsg;this.firstTime = firstTime;this.lastTime = lastTime;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getIp() {return ip;}public void setIp(String ip) {this.ip = ip;}public String getWarningMsg() {return warningMsg;}public void setWarningMsg(String warningMsg) {this.warningMsg = warningMsg;}public Long getFirstTime() {return firstTime;}public void setFirstTime(Long firstTime) {this.firstTime = firstTime;}public Long getLastTime() {return lastTime;}public void setLastTime(Long lastTime) {this.lastTime = lastTime;}@Overridepublic String toString() {return "WaringMsgPOJO{" +"username='" + username + '\'' +", ip='" + ip + '\'' +", warningMsg='" + warningMsg + '\'' +", firstTime='" + firstTime + '\'' +", lastTime='" + lastTime + '\'' +'}';}
}
(3)测试数据
Bob,./index.html,10.255.82.110,fail,1597184223
Bob,./index.html,10.255.82.110,fail,1597184224
Bob,./index.html,10.255.82.110,fail,1597184226
Bob,./index.html,10.255.82.110,success,1597184225
Bob,./index.html,10.255.82.110,fail,1597184226
Frank,./index.html,10.255.82.110,fail,1597184227
Bob,./index.html,10.255.82.110,fail,1597184227
Frank,./index.html,10.255.82.110,fail,1597184228
Bob,./index.html,10.255.82.110,success,1597184235
Alice,./index.html,10.255.82.110,fail,1597184236
Alice,./index.html,10.255.82.110,fail,1597184237
Frank,./index.html,10.255.82.110,success,15971842238
Frank,./index.html,10.255.82.111,success,15971842239
Alice,./index.html,10.255.82.110,success,1597184245
查看全文
99%的人还看了
相似问题
- nginx使用详解:转发规则、负载均衡、server_name
- SOME/IP 协议介绍(六)接口设计的兼容性规则
- 【IDEA 使用easyAPI、easyYapi、Apifox helper等插件时,导出接口文档缺少代码字段注释的相关内容、校验规则的解决方法】
- AI监管规则:各国为科技监管开辟了不同的道路
- Sentinel 熔断规则 (DegradeRule)
- ClickHouse 语法优化规则
- QT基础入门【QSS】样式规则、选择器类型、子控件介绍
- 华为云,阿里云,腾讯云 安全组配置规则
- 3、Sentinel 动态限流规则
- CentOS/RHEL7环境下更改网卡名称为CentOS6的传统命名规则
猜你感兴趣
版权申明
本文"Flink-CEP基于web日志检测暴力破解和异地登陆行为代码示例":http://eshow365.cn/6-23442-0.html 内容来自互联网,请自行判断内容的正确性。如有侵权请联系我们,立即删除!