基于flink+redis的请求泛化去重

1.概述

上个文章已经说了泛化去重部分是想用flink去实现,然后实现的时候发现,如果使用flink的数据聚合和stream sql去实现很蠢,因为如果使用时间窗口进行批处理,在当前我的需求的情况下,完全不符合,用大炮打蚊子的感觉。

后来想到的通过提取请求包中的关键字进行hash,然后以hash,原数据的方式存在redis,通过redis进行去重处理。(后来看ysrc的被动扫描,发现这是人家三年前就想到的的思路……)

那么flink部分就只需要用flatmap对数据进行泛化处理,然后插入redis即可。(其实这里拿python 整个kafka的消费者就行,没必要用flink,但是我想明白的时候已经快写完了。)

还剩最后大头的扫描器了,想抄别人的看着都费劲,我要裂开了。

2.main

本质上就是一个kafka的消费者,拿到数据流进flatmap处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xx.xx.xx.xx:xx");
properties.setProperty("zookeeper.connect", "xx.xx.xx.xx:xx");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("proxy", new SimpleStringSchema(), properties);
myConsumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(myConsumer);
stream.flatMap(new generalization());
env.execute("proxy");
}

3.flatmap

这里的思路就是分类处理,由于我kafka里是统一格式的json串,所以这里处理的时候,value就是kafka里的每一条数据,直接写个java bean就行。

Url.Generalization功能是获取所有目录的url,进行敏感文件之类的扫描

Get和POST就很明显了,是对有无body的请求,进行分开处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
try {
RedisUtil jedis = new RedisUtil("xx.xx.xx.xx", 6379);

Gson gson = new Gson();
Packet packet = gson.fromJson(value, Packet.class);


Url.Generalization(jedis, value, packet);

if (packet.getContent_length() == null) {
Get.Generalization(jedis, value, packet);
} else {
Post.Generalization(jedis, value, packet);
}
jedis.close();
} catch (Exception e) {
e.printStackTrace();
}
}

4.RedisUtil

RedisUtil就是通过调用jedis实现的一个工具类,这里需要说一下的是,一个redis实例有0-15,16个库,泛化过程中会将完成的各类请求包,存在不同序号的库中,这样扫描器扫描取数据的时候,就非常的方便。

目前这里用到的库,0为url,1为get请求,2为json post,3为xml post,4为标准post,5为上传请求,6为其他请求。

5.URL

这里很简单,获取目录,然后遍历出来每一级的目录,最后特征计算hash,存入redis。

upload successful

6.GET

GET这里首先判断了有无参数,然后将参数的key全部提出来,做一个简单的排序以后,跟其他特征拼起来计算hash,存入redis。

upload successful

7.POST

7.1 PostUtil

这个工具类就是对body进行类型判断的

upload successful

7.2 Json Post

content-type和isJson判断

upload successful

7.3 XML Post

这里跟群友激烈讨论后,觉得xml的请求,同一接口,多个不同参数的情况较少,处理xml的key又非常麻烦,所以就只简单判断了接口

upload successful

7.4 上传请求

通过content-type判断

upload successful

7.5 标准请求

等同于GET的处理

7.6 其他请求

各种各样的其他请求,直接用content做hash关键字,只单纯去重

upload successful