Skip to content

数仓开发之DWD层

DWD层设计要点:
DWD层的设计依据是维度建模理论,该层存储维度模型的事实表
DWD层表名的命名规范为dwd_数据域_表名

1. 埋点日志结构分析

前端埋点获取的JSON字符串(日志)可能存在common、start、page、displays、actions、err、ts七种字段。其中

  • common: 对应的是公共信息,是所有日志都有的字段。
  • err: 对应的是错误信息,所有日志都可能有的字段。
  • start: 对应的是启动信息,启动日志才有的字段。
  • page: 对应的是页面信息,页面日志才有的字段。
  • displays: 对应的是曝光信息,曝光日志才有的字段,曝光日志可以归为页面日志,因此必然有page字段
  • actions: 对应的是动作信息,动作日志才有的字段,同样属于页面日志,必然有page字段。动作信息和曝光信息可以同时存在。
  • ts: 对应的是时间戳,单位:毫秒,所有日志都有的字段
    综上,我们可以将前端埋点获取的日志分为两大类:启动日志和页面日志。二者都有common字段和ts字段,都可能有err字段。页面日志一定有page字段,一定没有start字段,可能有displays和actions字段;启动日志一定有start字段,一定没有page、displays和actions字段。

2. 模块设计

DIM层只有一个应用程序,放在了realtime-dim的src目录下。但DWD层有很多应用程序,为了解耦,我们为每个应用创建一个单独的模块,这些模块都是realtime-dwd的子模块。

3. 流量域未经加工的事务事实表(日志分流)

Alt text

3.1 创建realtime-dwd-base-log模块

Alt text

3.2 主要功能分析

  1. 数据清洗(ETL)
    数据传输过程中可能会出现部分数据丢失的情况,导致JSON数据结构不再完整,因此需要对脏数据进行过滤。
  2. 新老访客状态标记修复
    日志数据common字段下的is_new字段是用来标记新老访客状态的,1表示新访客,0表示老访客。前端埋点采集到的数据可靠性无法保证,可能会出现老访客被标记为新访客的问题,因此需要对该标记进行修复。
  3. 分流
    本节将通过分流对日志数据进行拆分,生成五张事务事实表写入Kafka:
    (1)流量域页面浏览事务事实表
    (2)流量域启动事务事实表
    (3)流量域动作事务事实表
    (4)流量域曝光事务事实表
    (5)流量域错误事务事实表

前端埋点新老访客状态标记设置规则

以神策提供的第三方埋点服务中新老访客状态标记设置规则为例:

  1. Web端:用户第一次访问埋入神策SDK页面的当天(即第一天),JS SDK会在网页的cookie中设置一个首日访问的标记,并设置第一天24点之前,该标记为true,即第一天触发的网页端所有事件中,is_new = 1。第一天之后,该标记则为false,即第一天之后触发的网页端所有事件中,is_new=0;
  2. 小程序端:用户第一天访问埋入神策SDK的页面时,小程序SDK会在storage缓存中创建一个首日为true的标记,并且设置第一天24点之前,该标记均为true。即第一天触发的小程序端所有事件中,is_new = 1。第一天之后,该标记则为false,即第一天之后触发的小程序端所有事件中,is_new = 0;
  3. APP端:用户安装App后,第一次打开埋入神策SDK的App的当天,Android/iOS SDK会在手机本地缓存内,创建一个首日为 true 的标记,并且设置第一天24点之前,该标记均为true。即第一天触发的APP端所有事件中,is_new = 1。第一天之后,该标记则为false,即第一天之后触发的APP端所有事件中,is_new = 0。

3.3 编写工具类

  1. 编写FlinkSinkUtil工具类
java
public class FlinkSinkUtil {

    public static Sink<String> getKafkaSink(String topic) {

        return KafkaSink.<String>builder()
                .setBootstrapServers(Constant.KAFKA_BROKERS)
                .setRecordSerializer(KafkaRecordSerializationSchema.<String>builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build())
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("rocket-" + topic + new Random().nextLong())
                .setProperty("transaction.timeout.ms", 15 * 60 * 1000 + "")
                .build();
    }
}
  1. 编写DateFormatUtil工具类
java
public class DateFormatUtil {
    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    private static final DateTimeFormatter dtfForPartition = DateTimeFormatter.ofPattern("yyyyMMdd");
    private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    /**
     * 2023-07-05 01:01:01 转成 ms 值
     * @param dateTime
     * @return
     */
    public static Long dateTimeToTs(String dateTime) {
        LocalDateTime localDateTime = LocalDateTime.parse(dateTime, dtfFull);
        return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();
    }

    /**
     * 把毫秒值转成 年月日:  2023-07-05
     * @param ts
     * @return
     */
    public static String tsToDate(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtf.format(localDateTime);
    }

    /**
     * 把毫秒值转成 年月日时分秒:  2023-07-05 01:01:01
     * @param ts
     * @return
     */
    public static String tsToDateTime(Long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtfFull.format(localDateTime);
    }

    public static String tsToDateForPartition(long ts) {
        Date dt = new Date(ts);
        LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());
        return dtfForPartition.format(localDateTime);
    }

    /**
     * 把 年月日转成 ts
     * @param date
     * @return
     */
    public static long dateToTs(String date) {
        return dateTimeToTs(date + " 00:00:00");
    }
}

3.4 主程序实现

在realtime-dwd-base-log模块的com.rocket.flink.dwd.base包下创建应用程序DwdBaseLog,继承BaseApp,写入以下内容:

java
public class DwdBaseLog extends BaseApp {

    public static void main(String[] args) throws Exception {
        new DwdBaseLog().start(10010, "dwdCkps", Constant.TOPIC_LOG);
    }

    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> kafkaStreamSource) {
        // 过滤不完整数据
        SingleOutputStreamOperator<JSONObject> jsonObjectStream = etl(kafkaStreamSource);
        // jsonObjectStream.print();
        // 注册水位线
        KeyedStream<JSONObject, String> keyedStream = keyByWithWatermark(jsonObjectStream);
        // 处理新旧访客
        SingleOutputStreamOperator<JSONObject> isNewFixStream = isNewFix(keyedStream);

        OutputTag<String> startTag = new OutputTag<String>("start") {
        };
        OutputTag<String> errTag = new OutputTag<String>("err") {
        };
        OutputTag<String> displayTag = new OutputTag<String>("display") {
        };
        OutputTag<String> actionTag = new OutputTag<String>("action") {
        };
        // 拆分不同类型的日志
        SingleOutputStreamOperator<String> pageStream = splitLog(isNewFixStream, errTag, startTag, displayTag, actionTag);

        // 写出不同的主题中
        SideOutputDataStream<String> actionStream = pageStream.getSideOutput(actionTag);
        SideOutputDataStream<String> displayStream = pageStream.getSideOutput(displayTag);
        SideOutputDataStream<String> errorStream = pageStream.getSideOutput(errTag);
        SideOutputDataStream<String> startStream = pageStream.getSideOutput(startTag);
//        actionStream.print("action");
//        displayStream.print("display");
//        errorStream.print("error");
//        startStream.print("start");
        actionStream.sinkTo(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_ACTION));
        displayStream.sinkTo(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_DISPLAY));
        errorStream.sinkTo(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_ERR));
        startStream.sinkTo(FlinkSinkUtil.getKafkaSink(Constant.TOPIC_DWD_TRAFFIC_START));
    }

    private static SingleOutputStreamOperator<String> splitLog(SingleOutputStreamOperator<JSONObject> isNewFixStream, OutputTag<String> errTag, OutputTag<String> startTag, OutputTag<String> displayTag, OutputTag<String> actionTag) {
        SingleOutputStreamOperator<String> pageStream = isNewFixStream.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
                // 1. err
                JSONObject err = value.getJSONObject("err");
                if (err != null) {
                    ctx.output(errTag, err.toJSONString());
                    value.remove("err");
                }
                JSONObject common = value.getJSONObject("common");
                JSONObject start = value.getJSONObject("start");
                JSONObject page = value.getJSONObject("page");
                Long ts = value.getLong("ts");
                // 2. 启动日志
                if (start != null) {
                    ctx.output(startTag, start.toJSONString());
                } else if (page != null) {
                    // 2. 曝光日志
                    JSONArray displays = value.getJSONArray("displays");
                    if (displays != null) {
                        for (int i = 0; i < displays.size(); i++) {
                            JSONObject display = displays.getJSONObject(i);
                            display.put("common", common);
                            display.put("page", page);
                            display.put("ts", ts);
                            ctx.output(displayTag, display.toJSONString());
                        }
                        // 删除displays
                        value.remove("displays");
                    }
                }
                // 3. action
                JSONArray actions = value.getJSONArray("actions");
                if (actions != null) {
                    for (int i = 0; i < actions.size(); i++) {
                        JSONObject action = actions.getJSONObject(i);
                        action.put("common", common);
                        action.put("page", page);
                        action.put("ts", ts);
                        ctx.output(actionTag, action.toJSONString());
                    }
                    // 删除actions
                    value.remove("actions");
                }
                // 5. 只剩下page页面信息进行处理, 写出到主流
                out.collect(page.toJSONString());
            }
        });
        return pageStream;
    }

    private static SingleOutputStreamOperator<JSONObject> isNewFix(KeyedStream<JSONObject, String> keyedStream) {
        // 进行新旧访客的修复
        return keyedStream.process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
            ValueState<String> firstLoginState = null;

            @Override
            public void open(OpenContext openContext) throws Exception {
                // 创建键控状态
                firstLoginState = getRuntimeContext().getState(new ValueStateDescriptor<String>("first_login_dt", String.class));
            }

            @Override
            public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
                // 获取当前数据的is_new字段
                JSONObject commonJson = value.getJSONObject("common");
                String isNew = commonJson.getString("is_new");
                String firstLoginDt = firstLoginState.value();
                Long ts = value.getLong("ts");
                String currentDt = DateFormatUtil.tsToDateTime(ts);
                if ("1".equals(isNew)) {
                    // 判断当前状态情况
                    if (firstLoginDt != null && !firstLoginDt.equals(currentDt)) {
                        // 如果首次登录日期不是当天,说明是伪装的新用户
                        commonJson.put("is_new", 0);
                    } else if (firstLoginDt == null) {
                        // 状态为空
                        firstLoginState.update(currentDt);
                    } else {
                        // 无需处理,新用户当天登录好几次
                    }
                } else if ("0".equals(isNew)) {
                    if (firstLoginDt == null) {
                        // 老用户,flink里面没有存这个用户,需要补充访客的信息
                        firstLoginState.update(DateFormatUtil.tsToDate(ts - 24 * 60 * 60 * 1000L));
                    } else {
                        // 正常情况,无需处理
                    }
                } else {
                    // 既不是0也不是1,是错误数据
                }
            }
        });
    }

    private static KeyedStream<JSONObject, String> keyByWithWatermark(SingleOutputStreamOperator<JSONObject> jsonObjectStream) {
        return jsonObjectStream.assignTimestampsAndWatermarks(WatermarkStrategy.<JSONObject>forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> event.getLong("ts")))
                .keyBy(jsonObject -> jsonObject.getJSONObject("common").getString("mid"));
    }

    private static SingleOutputStreamOperator<JSONObject> etl(DataStreamSource<String> kafkaStreamSource) {
        return kafkaStreamSource.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String s, Collector<JSONObject> collector) throws Exception {
                try {
                    JSONObject jsonObject = JSONObject.parseObject(s);
                    // 日志只有start和page两种类型
                    if (jsonObject.containsKey("start") || jsonObject.containsKey("page")) {
                        JSONObject common = jsonObject.getJSONObject("common");
                        // 保障后续数据能够进行操作
                        if (common != null) {
                            if (common.containsKey("mid")) {
                                collector.collect(jsonObject);
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

4. 互动域评论事务事实表

4.1 创建realtime-dwd-interaction-comment-info模块

4.2 主要任务

提取评论操作生成评论表,并将字典表中的相关维度退化到评论表中,写出到Kafka对应主题。