Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Obtain bloodline anomalies #3799

Open
2 of 3 tasks
18216499322 opened this issue Sep 11, 2024 · 0 comments · Fixed by #3816
Open
2 of 3 tasks

[Bug] Obtain bloodline anomalies #3799

18216499322 opened this issue Sep 11, 2024 · 0 comments · Fixed by #3816
Labels
Bug Something isn't working

Comments

@18216499322
Copy link
Contributor

18216499322 commented Sep 11, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

image
获取血缘异常1
企业微信截图_17260215991103
获取血缘异常2
image
获取血缘异常3

What you expected to happen

获取血缘正常

How to reproduce

作业配置1:
image

作业SQL1:

--任务名称:JOB_DWD_SD_SO_ORDR_DETL_KAFKA
--创建时间: 2024.04.11
--开发人  : 李娉
--任务描述:DWD订单实时模型

set state.backend = rocksdb;
set state.checkpoint-storage = filesystem;
set state.checkpoints.num-retained = 10;
set execution.checkpointing.mode = EXACTLY_ONCE;
set execution.checkpointing.interval = 300000;
set execution.checkpointing.min-pause = 300000;
set execution.checkpointing.timeout = 300000;
set execution.checkpointing.tolerable-failed-checkpoints = 999;
set execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
set yarn.application.queue = flink-task;
set taskmanager.numberOfTaskSlots = 1;
set table.exec.state.ttl = 13h;
set table.exec.mini-batch.enabled = true;  --微批
set table.exec.source.cdc-events-duplicate = true;  --CDC去重
set parallelism.default = 3;
set jobmanager.memory.process.size = 2g;
set taskmanager.memory.process.size = 12g;
set taskmanager.memory.managed.size=8g;
set taskmanager.memory.network.max=256m;
set taskmanager.memory.network.min=256m;
set table.exec.sink.upsert-materialize=none;
set taskmanager.memory.jvm-metaspace.size=256m;
set state.backend.rocksdb.block.cache-size=64m;
set state.backend.rocksdb.block.blocksize=32kb;
set state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM;
set state.backend.rocksdb.memory.managed=false;


create table ods_order_sale_goods_snapshot_r 
	(  
	store_code string, 
	--import_code string, 
	good_code string, 
	create_time string, 
	category_code string, 
	purchase_kind_code string, 
	modify_time string, 
	is_unfree_goods char, 
	is_unmember_goods char, 
	member_price decimal(22,4), 
	retail_price decimal(22,4), 
	purchase_kind_des string, 
	sale_order_code string, 
	id string, 
	category_des string, 
	proc_time as proctime(), 
	sale_time string  ,
	origin_table STRING METADATA FROM 'value.source.table' VIRTUAL  ,
	PRIMARY KEY (id) NOT ENFORCED
	) with ( 
	'connector'='kafka', 
	'topic'='tp_ods_yf_order_center', 
	'properties.bootstrap.servers'='${FLINK_CDC_KAFKA_URL}', 
	'scan.startup.mode'='timestamp','scan.startup.timestamp-millis' = '1722528000000',
	'format'='debezium-json',
	'properties.group.id' = 'tp_ods_yf_order_center_dink_dwd_sale_order', 
	'properties.security.protocol' = 'SASL_PLAINTEXT', 
	'properties.sasl.mechanism' = 'SCRAM-SHA-256', 
	'properties.sasl.jaas.config' = '${FLINK_CDC_KAFKA_CONSUME}' 
	);
					
create table ods_order_sale_order_details_r 
	( 
	--import_code string, 
	member_coupon_type string, 
	lot_num string, 
	item_type string, 
	modify_time string, 
	perform_price_type string, 
	row_num int, 
	account_price decimal(22,4), 
	perform_price decimal(22,4), 
	sale_order_code string, 
	refer_bill_code string, 
	id string, 
	cost_price decimal(22,4), 
	store_code string, 
	account_money decimal(22,4), 
	good_code string, 
	quantity decimal(22,4), 
	apportion_price decimal(22,4), 
	create_time string, 
	--is_preorder string, 
	perform_profit_margin decimal(22,4), 
	--promotionbill_plan_code string, 
	service_charge decimal(22,4), 
	promotionbill_plan_type string, 
	account_price_gross_money decimal(22,4), 
	refer_bill_type string, 
	refer_bill_row_num int, 
	output_tax string, 
	--useful int, 
	proc_time as proctime(), 
	sale_time string ,
	PRIMARY KEY (sale_order_code,row_num,lot_num,good_code) NOT ENFORCED
	) with ( 
	'connector'='kafka', 
	'topic'='tp_ods_yf_order_center', 
	'properties.bootstrap.servers'='${FLINK_CDC_KAFKA_URL}', 
	'scan.startup.mode'='timestamp','scan.startup.timestamp-millis' = '1722528000000',
	'format'='debezium-json',
	'properties.group.id' = 'tp_ods_yf_order_center_dink_dwd_sale_order', 
	'properties.security.protocol' = 'SASL_PLAINTEXT', 
	'properties.sasl.mechanism' = 'SCRAM-SHA-256', 
	'properties.sasl.jaas.config' = '${FLINK_CDC_KAFKA_CONSUME}' 
	);
					
create table ods_order_sales_ordersalesman_r 
	( 
	store_code       string, 
	create_time      string, 
	edit_time        string, 
	sale_order_code  string, 
	sales_job_number     string, 
	id string, 
	row_num int, 
	sale_details_id string, 
	sale_time string ,
	PRIMARY KEY (id) NOT ENFORCED
	) with ( 
	'connector'='kafka', 
	'topic'='tp_ods_yf_order_center', 
	'properties.bootstrap.servers'='${FLINK_CDC_KAFKA_URL}', 
	'scan.startup.mode'='timestamp','scan.startup.timestamp-millis' = '1722528000000',
	'format'='debezium-json',
	'properties.group.id' = 'tp_ods_yf_order_center_dink_dwd_sale_order', 
	'properties.security.protocol' = 'SASL_PLAINTEXT', 
	'properties.sasl.mechanism' = 'SCRAM-SHA-256', 
	'properties.sasl.jaas.config' = '${FLINK_CDC_KAFKA_CONSUME}' 
	);
					
create table ods_order_sale_order_r 
	( 
	--import_code string, 
	modify_time string, 
	--around_money decimal(22,4), 
	--odd_change_money decimal, 
	--customer_pay_money decimal, 
	sale_order_code string, 
	other_free_money decimal, 
	gift_free_money decimal, 
	bill_type string, 
	refer_bill_code string, 
	pair_count decimal, 
	store_code string, 
	member_id string, 
	cashdesk_id string, 
	point_number decimal, 
	coupon_code string, 
	create_time string, 
	casher_code string, 
	bill_kind string, 
	order_code string, 
	order_from string, 
	refer_bill_type string, 
	coupon_plan_code string, 
	cash_free_money decimal, 
	coupon_type string, 
	proc_time as proctime(), 
	sale_time string ,
	PRIMARY KEY (sale_order_code) NOT ENFORCED
	) with ( 
	'connector'='kafka', 
	'topic'='tp_ods_yf_order_center', 
	'properties.bootstrap.servers'='${FLINK_CDC_KAFKA_URL}', 
	'scan.startup.mode'='timestamp','scan.startup.timestamp-millis' = '1722528000000',
	'format'='debezium-json',
	'properties.group.id' = 'tp_ods_yf_order_center_dink_dwd_sale_order', 
	'properties.security.protocol' = 'SASL_PLAINTEXT', 
	'properties.sasl.mechanism' = 'SCRAM-SHA-256', 
	'properties.sasl.jaas.config' = '${FLINK_CDC_KAFKA_CONSUME}' 
	);
	

CREATE TABLE IF NOT EXISTS ods_zt_ord_order_r ( 
	  id                        string, 
	  parent_order_code         string, 
	  order_code                string, 
	  customer_code             string, 
	  top_channel_code          string, 
	  goods_channel_code        string, 
	  pos_code                  string, 
	  channel_code              string, 
	  warehouse_code            string, 
	  state                     string, 
	  need_receive_amount       decimal(22,4), 
	  customer_need_pay_amount   decimal(22,4), 
	  goods_total_amount         decimal(22,4), 
	  expense_total_amount       decimal(22,4), 
	  preferential_total_amount  decimal(22,4), 
	  subsidy_total_amount       decimal(22,4), 
	  decucted_total_amount      decimal(22,4), 
	  exercise_total_amount      decimal(22,4), 
	  ordonnance_id             string, 
	  splited                   string, 
	  delivery_provider         string, 
	  payee                     string, 
	  referrer_id               string, 
	  ticket_num                string, 
	  referrer_name             string, 
	  out_trade_code            string, 
	  remark                    string, 
	  creater_id                string, 
	  create_time               string, 
	  modify_time               string, 
	  del_flag                  string, 
	  from_warehouse_code       string, 
	  order_type                int, 
	PRIMARY KEY (`id`) NOT ENFORCED 
	) WITH ( 
	'connector'='kafka', 
	'topic'='tp_ods_yf_ecp', 
	'properties.bootstrap.servers'='${FLINK_CDC_KAFKA_URL}', 
	'scan.startup.mode'='timestamp','scan.startup.timestamp-millis' = '1722528000000',
	'format'='debezium-json',
	'properties.group.id' = 'tp_ods_yf_ecp_dink_dwd_ordr_detl', 
	'properties.security.protocol' = 'SASL_PLAINTEXT', 
	'properties.sasl.mechanism' = 'SCRAM-SHA-256', 
	'properties.sasl.jaas.config' = '${FLINK_CDC_KAFKA_CONSUME}' 
	);

CREATE TABLE IF NOT EXISTS dwd_sd_so_ordr_detl_r ( 
	stsc_date string comment '统计日期-天',  
	sale_ordr_doc string comment '销售订单编号', 
	--store_code string comment '门店编码', 
	ordr_sour_code string comment '订单来源编码', 
	sale_order_proc_time timestamp comment '', 
	ordr_sale_time string comment '', 
	prmn_prog_type_code string comment '促销方案类型编码', 
	ordr_type_code string comment '订单类型编码(销售单/退货单)', 
	ordr_cate_code string comment '订单类别编码(标准/团购单/赠品记账(积分兑奖)/财务记账(订金))', 
	prmn_prog_code string comment '促销方案编码', 
	coup_type string comment '', 
	coup_code string comment '', 
	line_item INT comment '行项目', 
	refn_ordr_type_code string comment '参考单据类型编码(销售单/团购单/处方单/订金退货申请单)', 
	refn_ordr_doc string comment '参考单据编号', 
	refn_ordr_item INT comment '参考单据行项目号', 
	memb_disc_mode string comment '会员优惠方式(会员折扣/会员价/VIP会员价)', 
	--proj_cate string comment '', 
	goods_code string comment '商品编码', 
	sale_order_details_proc_time timestamp comment '', 
	cate_clas_code string comment '', 
	cate_clas_name string comment '', 
	purc_clas_code_new string comment '', 
	purc_clas_name_new string comment '', 
	lotn string comment '批号', 
	sale_tax string comment '', 
	memb_id string comment '会员ID', 
	memb_point decimal comment '', 
	chk_out_id string comment '', 
	casr_id string comment '', 
	is_memb_goods string comment '非会员商品标识', 
	retail_pric decimal(22,4) comment '零售单价', 
	memb_pric decimal(22,4) comment '会员单价', 
	exec_pric_type string comment '执行价类型编码', 
	--befo_divid_pric decimal(22,4) comment '', 
	divid_pric decimal(22,4) comment '医保单价', 
	acnt_pric decimal(22,4) comment '记账单价', 
	exec_pric decimal(22,4) comment '执行单价', 
	dct_amt decimal comment '', 
	gift_dct_amt decimal comment '', 
	other_free_amt decimal comment '', 
	amt1 decimal(22,4) comment '', 
	amt2 decimal(22,4) comment '', 
	sale_amt decimal(22,4) comment '销售金额', 
	sale_qty decimal(22,4) comment '销售数量', 
	china_med_qty decimal comment '', 
	etl_time timestamp comment '', 
	is_n_surd_prof string comment '不让利商品标识', 
	cost_pric decimal(22,4) comment '成本单价', 
	cost_amt decimal(22,4) comment '', 
	--sale_cost_amt decimal(22,4) comment '销售成本额', 
	is_effe_ordr string comment '是否有效订单', 
	order_code string comment '', 
	is_ecp_self_dstn_ordr string comment '是否ECP自配送订单', 
	stat_date string comment '', 
	proj_cate_code string comment '项目类别编码', 
	purchase_kind_code string comment '项目类别编码', 
	sale_goods_snapshot_proc_time timestamp comment '', 
	casr_code string comment '收银员编码', 
	service_charge decimal(22,4)  comment '', 
	sale_pers_id string comment '营销员ID', 
	phmc_code string comment '门店编码', 
	out_phmc_code  string comment '出货门店', 
	is_ydch_flag   string comment '是否异店出货',
	coup_prog_code string comment '券方案编码',
	PRIMARY KEY (stsc_date,sale_ordr_doc,goods_code,line_item,lotn) NOT ENFORCED
	) WITH ( 
	  'connector' = 'kafka', 
	  'properties.bootstrap.servers' = '${FLINK_DW_KAFKA_URL}', 
	  'properties.sasl.jaas.config' = '${FLINK_DW_KAFKA_PRODUCER}', 
	  'properties.security.protocol' = 'SASL_PLAINTEXT', 
	  'properties.sasl.mechanism' = 'SCRAM-SHA-256', 
	  'topic' = 'dwd_sd_so_ordr_detl_dk_r', 
	  'format' = 'debezium-json' 
	);


insert into dwd_sd_so_ordr_detl_r
select ifnull(date_format(cast(t1.sale_time as timestamp), 'yyyyMMdd'), 'NA')  as stsc_date
     , t1.sale_order_code                                                                  as sale_ordr_doc
     --, t1.store_code                                                                       as store_code
     , (case when coalesce(t.order_from, '') = '' then '9999' else t.order_from end)       as ordr_sour_code
     , t.proc_time                                               as sale_order_proc_time
     , date_format(cast(t1.sale_time as timestamp),'yyyy-MM-dd HH:mm:ss') as ordr_sale_time
     , t1.promotionbill_plan_type                                as prmn_prog_type_code
     , t.bill_type                                               as ordr_type_code
     , t.bill_kind                                               as ordr_cate_code
     , t1.promotionbill_plan_type                                as prmn_prog_code
     , t.coupon_type                                             as coup_type
     , t.coupon_code                                             as coup_code
     , t1.row_num                                                as line_item
     , t1.refer_bill_type                                        as refn_ordr_type_code
     , t1.refer_bill_code                                        as refn_ordr_doc
     , t1.refer_bill_row_num                                     as refn_ordr_item
     , t1.member_coupon_type                                     as memb_disc_mode
     --, t1.item_type                                              as proj_cate
     , t1.good_code                                              as goods_code
     , t1.proc_time                                              as sale_order_details_proc_time
     , t2.category_code                                          as cate_clas_code
     , t2.category_des                                           as cate_clas_name
     , t2.purchase_kind_code                                     as purc_clas_code_new
     , t2.purchase_kind_des                                      as purc_clas_name_new
     , t1.lot_num                                                as lotn
     , t1.output_tax                                             as sale_tax
     , t.member_id                                               as memb_id
     , t.point_number                                            as memb_point
     , t.cashdesk_id                                             as chk_out_id
	 ,case when coalesce(t.casher_code , '') = '' then 'NA' else LPAD(CAST(t.casher_code  as string), 8, '0') end as casr_id
     , t2.is_unmember_goods                                      as is_memb_goods
     , t2.retail_price                                           as retail_pric
     , t2.member_price                                           as memb_pric
     , t1.perform_price_type                                     as exec_pric_type
     --, t1.perform_price                                          as befo_divid_pric
     , t1.apportion_price                                        as divid_pric
     , t1.account_price                                          as acnt_pric
     , t1.perform_price                                          as exec_pric
     , t.cash_free_money                                         as dct_amt
     , t.gift_free_money                                         as gift_dct_amt
     , t.other_free_money                                        as other_free_amt
     , t1.perform_profit_margin                                  as amt1
     , t1.account_price_gross_money                              as amt2
     , t1.account_money                                          as sale_amt
     , t1.quantity                                               as sale_qty
     , t.pair_count                                              as china_med_qty
     , current_timestamp                                         as etl_time
     , t2.is_unfree_goods                                        as is_n_surd_prof
     , t1.cost_price                                             as cost_pric
     , t1.cost_price * t1.quantity                               as cost_amt
     --, t1.cost_price * t1.quantity                               as sale_cost_amt
     , case when t.bill_kind in ('3', '4', '15') then 'N' else 'Y' end as is_effe_ordr
     , coalesce(t.order_code, t.sale_order_code)                 as order_code
     , 'N'                                                       as is_ecp_self_dstn_ordr
     , date_format(cast(t1.sale_time as timestamp), 'yyyyMMdd')        as stat_date
     , t1.item_type                                              as proj_cate_code
     , t2.purchase_kind_code                                     as purchase_kind_code
     , t2.proc_time                                              as sale_goods_snapshot_proc_time
     , t.casher_code                                             as casr_code
     , t1.service_charge                                         as service_charge
     ,case when coalesce(t3.sales_job_number , '') = '' then 'NA' else LPAD(CAST(t3.sales_job_number  as string), 8, '0') end as sale_pers_id
	 ,case when t4.from_warehouse_code is not null then t4.from_warehouse_code
	       when coalesce(trim(t1.store_code),'')='' then 'NA'
	  else trim(t1.store_code) end                               as phmc_code
	 ,case when t4.warehouse_code is not null then t4.warehouse_code 
	       when coalesce(trim(t1.store_code),'')='' then 'NA'
	  else trim(t1.store_code) end                               as out_phmc_code
     ,if(t4.pos_code is not null, 'Y', 'N')                      as is_ydch_flag
	 ,t.coupon_plan_code                                         as coup_prog_code
  from ods_order_sale_order_r t
 inner join ods_order_sale_order_details_r t1
    on t.sale_order_code = t1.sale_order_code
   and date_format(cast(t1.sale_time as timestamp), 'yyyyMMdd')=date_format(localtimestamp,'yyyyMMdd') 
  left join ods_order_sale_goods_snapshot_r t2
    on t1.sale_order_code = t2.sale_order_code
   and t1.good_code = t2.good_code
   and t2.origin_table='sale_goods_snapshot'
   and date_format(cast(t2.sale_time as timestamp), 'yyyyMMdd')=date_format(localtimestamp,'yyyyMMdd') 
  left join ods_order_sales_ordersalesman_r t3
	on t1.sale_order_code = t3.sale_order_code
   and t1.row_num = t3.row_num
   and date_format(cast(t3.sale_time as timestamp), 'yyyyMMdd')=date_format(localtimestamp,'yyyyMMdd') 
  left join ods_zt_ord_order_r t4 
    on t.sale_order_code = t4.pos_code 
   and t4.order_type =1
 where date_format(cast(t.sale_time as timestamp), 'yyyyMMdd')=date_format(localtimestamp,'yyyyMMdd') 

作业配置2:
image

环境SQL2:

set state.backend = rocksdb;
set state.backend.incremental = true;
set state.checkpoint-storage = filesystem;
set state.checkpoints.num-retained = 10;
set execution.checkpointing.mode = EXACTLY_ONCE;
set execution.checkpointing.interval = 300000;
set execution.checkpointing.min-pause = 300000;
set execution.checkpointing.timeout = 900000;
set execution.checkpointing.max-concurrent-checkpoints=1;
set execution.checkpointing.tolerable-failed-checkpoints = 999;
set execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
set yarn.application.queue = flink-task;
set taskmanager.memory.process.size = 4g;
set jobmanager.memory.process.size = 2g;
set parallelism.default = 1;
set taskmanager.numberOfTaskSlots = 1;
set table.local-time-zone = Asia/Shanghai;
set table.exec.mini-batch.enabled = true;
set table.exec.mini-batch.allow-latency = 5s;
set table.exec.mini-batch.size = 5000;
--  set table.exec.sink.upsert-materialize=none;
set table.exec.state.ttl = 25h;
set table.optimizer.agg-phase-strategy = TWO_PHASE;
--  set table.optimizer.distinct-agg.split.enabled = true;
set table.exec.source.cdc-events-duplicate = true;
set akka.tcp.timeout=5 min;
set akka.lookup.timeout=5 min;
set akka.ask.timeout=5 min;
set heartbeat.timeout=300000;
set heartbeat.interval=60000;
set heartbeat.rpc-failure-threshold=5;
set pipeline.object-reuse=true;
--  set containerized.taskmanager.env.MALLOC_ARENA_MAX=1;
--  set pipeline.max-parallelism=16;
set state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED_HIGH_MEM;
set state.backend.rocksdb.memory.managed=false;
set taskmanager.memory.network.max=256m;
set taskmanager.memory.network.min=256m;
set taskmanager.memory.jvm-metaspace.size=256m;

CREATE CATALOG paimon WITH (
    'type' = 'paimon',
    'metastore' = 'hive',
    'uri'='${HIVE_METASTORE_URI}',
    --  'hadoop-conf-dir'='/etc/hadoop_config',
    'warehouse' = '${PAIMON_WAREHOUSE_DIR}'
);

作业SQL2:

set taskmanager.memory.process.size = 8g;

CREATE TABLE IF NOT EXISTS zt_gds_channel_goods (
    `id` STRING
    ,`channel_code` STRING COMMENT '渠道编码'
    ,`goods_code` STRING
    ,`channel_goods_code` STRING COMMENT '渠道商品编码'
    ,`channel_goods_name` STRING COMMENT '渠道商品的名称'
    ,`guide_price` DECIMAL(13,3) COMMENT '指导价格'
    ,`channel_price` DECIMAL(13,3) COMMENT '渠道价格'
    ,`cost_price` DECIMAL(13,2) COMMENT '成本价'
    ,`retail_price` DECIMAL(13,2) COMMENT '零售价'
    ,`activity_price` DECIMAL(13,2) COMMENT '活动价'
    ,`activity_begin_time` BIGINT COMMENT '活动价开始时间'
    ,`activity_end_time` BIGINT COMMENT '活动价结束时间'
    ,`price_source` STRING COMMENT '渠道商品的渠道价格来源于叶子渠道'
    ,`price_source_type` STRING COMMENT '0 零售价  1 会员价 2 自主定价'
    ,`status` STRING COMMENT '状态 下架 上架'
    ,`recommend_index` INT COMMENT '推荐指数'
    ,`goods_mark` STRING COMMENT '商品角标'
    ,`remark` STRING
    ,`create_time` BIGINT COMMENT '创建时间'
    ,`modify_time` BIGINT COMMENT '修改时间'
    ,PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'tp_ods_yf_ecp',
  'properties.bootstrap.servers' = '${FLINK_CDC_KAFKA_URL}',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  'properties.sasl.jaas.config' = '${FLINK_CDC_KAFKA_CONSUME}',
  'properties.group.id' = 'tp_ods_yf_ecp_delta_lsj_001',
  'format' = 'drs-json','drs-json.table.include' = 'zt_gds_channel_goods',
  'scan.startup.mode'='timestamp', 
  'scan.startup.timestamp-millis' = '1700755200000'
);

CREATE TABLE IF NOT EXISTS paimon.prd_delta_r.delta_zt_gds_channel_goods (
    `id` STRING
    ,`channel_code` STRING COMMENT '渠道编码'
    ,`goods_code` STRING
    ,`channel_goods_code` STRING COMMENT '渠道商品编码'
    ,`channel_goods_name` STRING COMMENT '渠道商品的名称'
    ,`guide_price` DECIMAL(13,3) COMMENT '指导价格'
    ,`channel_price` DECIMAL(13,3) COMMENT '渠道价格'
    ,`cost_price` DECIMAL(13,2) COMMENT '成本价'
    ,`retail_price` DECIMAL(13,2) COMMENT '零售价'
    ,`activity_price` DECIMAL(13,2) COMMENT '活动价'
    ,`activity_begin_time` TIMESTAMP(0) COMMENT '活动价开始时间'
    ,`activity_end_time` TIMESTAMP(0) COMMENT '活动价结束时间'
    ,`price_source` STRING COMMENT '渠道商品的渠道价格来源于叶子渠道'
    ,`price_source_type` STRING COMMENT '0 零售价  1 会员价 2 自主定价'
    ,`status` STRING COMMENT '状态 下架 上架'
    ,`recommend_index` INT COMMENT '推荐指数'
    ,`goods_mark` STRING COMMENT '商品角标'
    ,`remark` STRING
    ,`create_time` TIMESTAMP(0) COMMENT '创建时间'
    ,`modify_time` TIMESTAMP(0) COMMENT '修改时间'
	,`etl_time` TIMESTAMP(0) COMMENT 'etl时间'
	,`stat_date` STRING COMMENT '分区时间'
    ,PRIMARY KEY ( stat_date,`id`) NOT ENFORCED
) PARTITIONED BY (stat_date)
with(
'metastore.partitioned-table'='true',
'bucket' = '1',
'bucket-key'='id',
'changelog-producer'='input',
'full-compaction.delta-commits'='1',
'partition.expiration-time' = '48 h',
'partition.expiration-check-interval' = '12 h',
'partition.timestamp-formatter' = 'yyyyMMdd',
'partition.timestamp-pattern' = '$stat_date'
);

insert into paimon.prd_delta_r.delta_zt_gds_channel_goods
SELECT
    `id` 
    ,`channel_code`  --  渠道编码 
    ,`goods_code` 
    ,`channel_goods_code`  --  渠道商品编码 
    ,`channel_goods_name`  --  渠道商品的名称 
    ,`guide_price`  --  指导价格 
    ,`channel_price`  --  渠道价格 
    ,`cost_price`  --  成本价 
    ,`retail_price`  --  零售价 
    ,`activity_price`  --  活动价 
    ,TO_TIMESTAMP(FROM_UNIXTIME(`activity_begin_time`),'yyyy-MM-dd HH:mm:ss') as activity_begin_time  --  活动价开始时间 
    ,TO_TIMESTAMP(FROM_UNIXTIME(`activity_end_time`),'yyyy-MM-dd HH:mm:ss') as activity_end_time --  活动价结束时间 
    ,`price_source`  --  渠道商品的渠道价格来源于叶子渠道 
    ,`price_source_type`  --  0 零售价  1 会员价 2 自主定价 
    ,`status`  --  状态 下架 上架 
    ,`recommend_index`  --  推荐指数 
    ,`goods_mark`  --  商品角标 
    ,`remark` 
    ,TO_TIMESTAMP(FROM_UNIXTIME(`create_time`),'yyyy-MM-dd HH:mm:ss') as create_time  --  创建时间 
    ,TO_TIMESTAMP(FROM_UNIXTIME(`modify_time`),'yyyy-MM-dd HH:mm:ss') as modify_time  --  修改时间 
	,NOW() as etl_time
	,date_format(NOW(),'yyyyMMdd') as stat_date
 FROM `zt_gds_channel_goods`;

作业配置3:
image

作业SQL3:

set taskmanager.memory.process.size = 1g;
set jobmanager.memory.process.size = 1g;
set execution.checkpointing.interval = 30000;
set execution.checkpointing.min-pause = 30000;

CREATE TABLE `zt_cu_account` (
  `account_id` varchar(40)   COMMENT '账户id',
  `customer` varchar(40)  COMMENT '用户id',
  `update_time` varchar(20)   COMMENT '更新时间',
  PRIMARY KEY (account_id)  NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'tp_ods_zt_coupon',
  'properties.bootstrap.servers' = '${FLINK_CDC_KAFKA_URL}',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  'properties.sasl.jaas.config' = '${FLINK_CDC_KAFKA_CONSUME}',
  'properties.group.id' = 'tp_ods_zt_coupon_sr_dim_id',
  'format' = 'drs-json','drs-json.ignore-parse-errors' = 'true',
  'drs-json.table.include' = 'zt_cu_account',
  'scan.startup.mode'='timestamp', 'scan.startup.timestamp-millis' = '1676736000000'
);

CREATE TABLE `dim_zt_cu_account` (
  `account_id` varchar(40)   COMMENT '账户id',
  `customer` varchar(40)  COMMENT '用户id',
  `update_time` timestamp  COMMENT '更新时间',
  `elt_time` timestamp,
  PRIMARY KEY (`account_id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url'='${FLINK_TRANS_241_URL}',
  'username' = '${FLINK_TRANS_241_USER}',
  'password' = '${FLINK_TRANS_241_PASSWORD}',
  'table-name' = 'yf_dzb_zt_cu_account',
  'sink.parallelism' = '1'
);

insert into dim_zt_cu_account
SELECT
	account_id,
	customer,
	TO_TIMESTAMP(FROM_UNIXTIME(CAST(update_time as bigint)),'yyyy-MM-dd HH:mm:ss') update_time,
	now() etl_time
FROM
	zt_cu_account;

注:把drs-json格式改成debezium-json格式的

Anything else

No response

Version

dev

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@18216499322 18216499322 added Bug Something isn't working Waiting for reply Waiting for reply labels Sep 11, 2024
@aiwenmo aiwenmo removed the Waiting for reply Waiting for reply label Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants