数仓开发之DWD层
DWD层设计要点:
DWD层的设计依据是维度建模理论,该层存储维度模型的事实表
DWD层表名的命名规范为dwd_数据域_表名
1. 埋点日志结构分析
前端埋点获取的JSON字符串(日志)可能存在common、start、page、displays、actions、err、ts七种字段。其中
- common: 对应的是公共信息,是所有日志都有的字段。
- err: 对应的是错误信息,所有日志都可能有的字段。
- start: 对应的是启动信息,启动日志才有的字段。
- page: 对应的是页面信息,页面日志才有的字段。
- displays: 对应的是曝光信息,曝光日志才有的字段,曝光日志可以归为页面日志,因此必然有page字段
- actions: 对应的是动作信息,动作日志才有的字段,同样属于页面日志,必然有page字段。动作信息和曝光信息可以同时存在。
- ts: 对应的是时间戳,单位:毫秒,所有日志都有的字段
综上,我们可以将前端埋点获取的日志分为两大类:启动日志和页面日志。二者都有common字段和ts字段,都可能有err字段。页面日志一定有page字段,一定没有start字段,可能有displays和actions字段;启动日志一定有start字段,一定没有page、displays和actions字段。
2. 模块设计
DIM层只有一个应用程序,放在了realtime-dim的src目录下。但DWD层有很多应用程序,为了解耦,我们为每个应用创建一个单独的模块,这些模块都是realtime-dwd的子模块。
3. 流量域未经加工的事务事实表(日志分流)
3.1 创建realtime-dwd-base-log模块
3.2 主要功能分析
- 数据清洗(ETL)
数据传输过程中可能会出现部分数据丢失的情况,导致JSON数据结构不再完整,因此需要对脏数据进行过滤。 - 新老访客状态标记修复
日志数据common字段下的is_new字段是用来标记新老访客状态的,1表示新访客,0表示老访客。前端埋点采集到的数据可靠性无法保证,可能会出现老访客被标记为新访客的问题,因此需要对该标记进行修复。 - 分流
本节将通过分流对日志数据进行拆分,生成五张事务事实表写入Kafka:
(1)流量域页面浏览事务事实表
(2)流量域启动事务事实表
(3)流量域动作事务事实表
(4)流量域曝光事务事实表
(5)流量域错误事务事实表
前端埋点新老访客状态标记设置规则
以神策提供的第三方埋点服务中新老访客状态标记设置规则为例:
- Web端:用户第一次访问埋入神策SDK页面的当天(即第一天),JS SDK会在网页的cookie中设置一个首日访问的标记,并设置第一天24点之前,该标记为true,即第一天触发的网页端所有事件中,is_new = 1。第一天之后,该标记则为false,即第一天之后触发的网页端所有事件中,is_new=0;
- 小程序端:用户第一天访问埋入神策SDK的页面时,小程序SDK会在storage缓存中创建一个首日为true的标记,并且设置第一天24点之前,该标记均为true。即第一天触发的小程序端所有事件中,is_new = 1。第一天之后,该标记则为false,即第一天之后触发的小程序端所有事件中,is_new = 0;
- APP端:用户安装App后,第一次打开埋入神策SDK的App的当天,Android/iOS SDK会在手机本地缓存内,创建一个首日为 true 的标记,并且设置第一天24点之前,该标记均为true。即第一天触发的APP端所有事件中,is_new = 1。第一天之后,该标记则为false,即第一天之后触发的APP端所有事件中,is_new = 0。
3.3 编写工具类
- 编写FlinkSinkUtil工具类
java
public class FlinkSinkUtil {
public static Sink<String> getKafkaSink(String topic) {
return KafkaSink.<String>builder()
.setBootstrapServers(Constant.KAFKA_BROKERS)
.setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("rocket-" + topic + new Random().nextLong())
.setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "")
.build();
}
}
- 编写DateFormatUtil工具类
java
public class DateFormatUtil {
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter dtfForPartition = DateTimeFormatter.ofPattern("yyyyMMdd");
private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
/**
* 2023-07-05 01:01:01 转成 ms 值
* @param dateTime
* @return
*/
public static Long dateTimeToTs(String dateTime) {
LocalDateTime localDateTime = LocalDateTime.parse(dateTime, dtfFull);
return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
}
/**
* 把毫秒值转成 年月日: 2023-07-05
* @param ts
* @return
*/
public static String tsToDate(Long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtf.format(localDateTime);
}
/**
* 把毫秒值转成 年月日时分秒: 2023-07-05 01:01:01
* @param ts
* @return
*/
public static String tsToDateTime(Long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtfFull.format(localDateTime);
}
public static String tsToDateForPartition(long ts) {
Date dt = new Date(ts);
LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
return dtfForPartition.format(localDateTime);
}
/**
* 把 年月日转成 ts
* @param date
* @return
*/
public static long dateToTs(String date) {
return dateTimeToTs(date + " 00:00:00");
}
}
3.4 主程序实现
在realtime-dwd-base-log模块的com.rocket.flink.dwd.base包下创建应用程序DwdBaseLog,继承BaseApp,写入以下内容:
java
public class DwdBaseLog extends BaseApp {
public static void main(String[] args) throws Exception {
new DwdBaseLog().start(10010, "dwdCkps", Constant.TOPIC_LOG);
}
@Override
public void handle(StreamExecutionEnvironment env, DataStreamSource<String> kafkaStreamSource) {
// 过滤不完整数据
SingleOutputStreamOperator<JSONObject> jsonObjectStream = etl(kafkaStreamSource);
// jsonObjectStream.print();
// 注册水位线
KeyedStream<JSONObject, String> keyedStream = keyByWithWatermark(jsonObjectStream);
// 处理新旧访客
SingleOutputStreamOperator<JSONObject> isNewFixStream = isNewFix(keyedStream);
OutputTag<String> startTag = new OutputTag<String>("start") {
};
OutputTag<String> errTag = new OutputTag<String>("err") {
};
OutputTag<String> displayTag = new OutputTag<String>("display") {
};
OutputTag<String> actionTag = new OutputTag<String>("action") {
};
// 拆分不同类型的日志
SingleOutputStreamOperator<String> pageStream = splitLog(isNewFixStream, errTag, startTag, displayTag, actionTag);
// 写出不同的主题中
SideOutputDataStream<String> actionStream = pageStream.getSideOutput(actionTag);
SideOutputDataStream<String> displayStream = pageStream.getSideOutput(displayTag);
SideOutputDataStream<String> errorStream = pageStream.getSideOutput(errTag);
SideOutputDataStream<String> startStream = pageStream.getSideOutput(startTag);
// actionStream.print("action");
// displayStream.print("display");
// errorStream.print("error");
// startStream.print("start");
actionStream.sinkTo(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_ACTION));
displayStream.sinkTo(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_DISPLAY));
errorStream.sinkTo(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_ERR));
startStream.sinkTo(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_START));
}
private static SingleOutputStreamOperator<String> splitLog(SingleOutputStreamOperator<JSONObject> isNewFixStream, OutputTag<String> errTag, OutputTag<String> startTag, OutputTag<String> displayTag, OutputTag<String> actionTag) {
SingleOutputStreamOperator<String> pageStream = isNewFixStream.process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
// 1. err
JSONObject err = value.getJSONObject("err");
if (err != null) {
ctx.output(errTag, err.toJSONString());
value.remove("err");
}
JSONObject common = value.getJSONObject("common");
JSONObject start = value.getJSONObject("start");
JSONObject page = value.getJSONObject("page");
Long ts = value.getLong("ts");
// 2. 启动日志
if (start != null) {
ctx.output(startTag, start.toJSONString());
} else if (page != null) {
// 2. 曝光日志
JSONArray displays = value.getJSONArray("displays");
if (displays != null) {
for (int i = 0; i < displays.size(); i++) {
JSONObject display = displays.getJSONObject(i);
display.put("common", common);
display.put("page", page);
display.put("ts", ts);
ctx.output(displayTag, display.toJSONString());
}
// 删除displays
value.remove("displays");
}
}
// 3. action
JSONArray actions = value.getJSONArray("actions");
if (actions != null) {
for (int i = 0; i < actions.size(); i++) {
JSONObject action = actions.getJSONObject(i);
action.put("common", common);
action.put("page", page);
action.put("ts", ts);
ctx.output(actionTag, action.toJSONString());
}
// 删除actions
value.remove("actions");
}
// 5. 只剩下page页面信息进行处理, 写出到主流
out.collect(page.toJSONString());
}
});
return pageStream;
}
private static SingleOutputStreamOperator<JSONObject> isNewFix(KeyedStream<JSONObject, String> keyedStream) {
// 进行新旧访客的修复
return keyedStream.process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
ValueState<String> firstLoginState = null;
@Override
public void open(OpenContext openContext) throws Exception {
// 创建键控状态
firstLoginState = getRuntimeContext().getState(new ValueStateDescriptor<String>("first_login_dt", String.class));
}
@Override
public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
// 获取当前数据的is_new字段
JSONObject commonJson = value.getJSONObject("common");
String isNew = commonJson.getString("is_new");
String firstLoginDt = firstLoginState.value();
Long ts = value.getLong("ts");
String currentDt = DateFormatUtil.tsToDateTime(ts);
if ("1".equals(isNew)) {
// 判断当前状态情况
if (firstLoginDt != null && !firstLoginDt.equals(currentDt)) {
// 如果首次登录日期不是当天,说明是伪装的新用户
commonJson.put("is_new", 0);
} else if (firstLoginDt == null) {
// 状态为空
firstLoginState.update(currentDt);
} else {
// 无需处理,新用户当天登录好几次
}
} else if ("0".equals(isNew)) {
if (firstLoginDt == null) {
// 老用户,flink里面没有存这个用户,需要补充访客的信息
firstLoginState.update(DateFormatUtil.tsToDate(ts - 24 * 60 * 60 * 1000L));
} else {
// 正常情况,无需处理
}
} else {
// 既不是0也不是1,是错误数据
}
}
});
}
private static KeyedStream<JSONObject, String> keyByWithWatermark(SingleOutputStreamOperator<JSONObject> jsonObjectStream) {
return jsonObjectStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getLong("ts")))
.keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
}
private static SingleOutputStreamOperator<JSONObject> etl(DataStreamSource<String> kafkaStreamSource) {
return kafkaStreamSource.flatMap(new FlatMapFunction<String, JSONObject>() {
@Override
public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
try {
JSONObject jsonObject = JSONObject.parseObject(s);
// 日志只有start和page两种类型
if (jsonObject.containsKey("start") || jsonObject.containsKey("page")) {
JSONObject common = jsonObject.getJSONObject("common");
// 保障后续数据能够进行操作
if (common != null) {
if (common.containsKey("mid")) {
collector.collect(jsonObject);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
4. 互动域评论事务事实表
4.1 创建realtime-dwd-interaction-comment-info模块
4.2 主要任务
提取评论操作生成评论表,并将字典表中的相关维度退化到评论表中,写出到Kafka对应主题。