Ver Fonte

添加定时任务批量过期活动代码

wangzhijun há 16 horas atrás
pai
commit
0558aa460a

+ 156 - 48
nightFragrance-massage/src/main/java/com/ylx/massage/task/massageTask.java

@@ -1,9 +1,12 @@
 package com.ylx.massage.task;
 
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.collection.CollectionUtil;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.ylx.common.constant.MassageConstants;
 import com.ylx.common.core.domain.entity.SysDept;
@@ -11,21 +14,26 @@ import com.ylx.common.utils.DateUtils;
 import com.ylx.common.utils.StringUtils;
 import com.ylx.massage.domain.*;
 import com.ylx.massage.enums.*;
+import com.ylx.point.domain.PointActivity;
 import com.ylx.point.domain.PointUserLog;
 import com.ylx.massage.mapper.TConsumptionLogMapper;
 import com.ylx.massage.service.*;
 import com.ylx.massage.utils.DateTimeUtils;
 import com.ylx.massage.utils.LocationUtil;
 import com.ylx.massage.utils.WeChatUtil;
+import com.ylx.point.service.IPointActivityService;
+import com.ylx.point.service.IPointUserLogService;
 import com.ylx.system.service.ISysDeptService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
 import java.math.BigDecimal;
 import java.time.LocalDateTime;
 import java.util.*;
+import java.util.stream.Collectors;
 
 
 /**
@@ -72,7 +80,9 @@ public class massageTask {
     private WeChatUtil weChatUtil;
 
     @Resource
-    private com.ylx.point.service.IPointUserLogService pointUserLogService;
+    private IPointUserLogService pointUserLogService;
+    @Resource
+    private IPointActivityService pointActivityService;
 
     /**
      * 取消超时未支付的订单
@@ -110,6 +120,7 @@ public class massageTask {
 
     /**
      * 取消超时未支付订单
+     *
      * @param nowDate
      * @return Long
      */
@@ -140,6 +151,7 @@ public class massageTask {
 
     /**
      * 取消超时未支付的商品订单
+     *
      * @param nowDate
      * @return Long
      */
@@ -470,6 +482,7 @@ public class massageTask {
     /**
      * 定时任务入口:批量更新过期的积分记录
      */
+    @Transactional(rollbackFor = Exception.class)
     public void expirePoints() {
         Date nowDate = new Date();
         log.info("开始执行积分过期任务,当前时间:{}", nowDate);
@@ -500,79 +513,110 @@ public class massageTask {
     }
 
     /**
-     * 单批次处理逻辑
+     * 定时任务入口:批量更新过期的活动记录
      */
-    private Long processBatch(Date nowDate, Long lastId, int batchSize) {
-        // 1. 查询:使用 ID > lastId 代替分页,并限制数量
+    @Transactional(rollbackFor = Exception.class)
+    public void expireActivity() {
+        Date nowDate = new Date();
+        log.info("开始执行活动过期任务,当前时间:{}", nowDate);
+
+        // 建议添加分布式锁,防止多实例并发执行
+        // String lockKey = "lock:points:expire:" + DateUtils.getDate();
+        // if (redisLock.tryLock(lockKey)) { ... }
+
+        try {
+            // 使用游标模式,避免深分页问题
+            Long lastId = 0L;
+            int batchSize = 500; // 适当调大批次
+
+            while (true) {
+                // 获取最后一批处理的ID,作为下一批的起点
+                Long currentLastId = processActivityBatch(nowDate, lastId, batchSize);
+
+                // 如果返回的ID没有变化,或者小于等于上一轮ID,说明没有更多数据了
+                if (currentLastId <= lastId) {
+                    break;
+                }
+                lastId = currentLastId;
+            }
+        } finally {
+            // redisLock.unlock(lockKey);
+            log.info("结束执行活动过期任务,当前时间:{}", DateUtils.getNowDate());
+        }
+    }
+
+    public Long processBatch(Date nowDate, Long lastId, int batchSize) {
+
+        // 1. 查询:使用 list 代替 page,减少 count 查询开销
         LambdaQueryWrapper<PointUserLog> queryWrapper = new LambdaQueryWrapper<>();
-        queryWrapper.eq(PointUserLog::getOpType, 1)
-                .eq(PointUserLog::getIsExpired, 0)
-                .lt(PointUserLog::getExpireTime, nowDate)
-                .isNotNull(PointUserLog::getExpireTime)
-                .gt(PointUserLog::getId, lastId) // 关键优化:游标推进
-                .orderByAsc(PointUserLog::getId); // 必须按ID排序
-
-        Page<PointUserLog> page = new Page<>(1, batchSize);
-        Page<PointUserLog> resPage = pointUserLogService.page(page, queryWrapper);
-        List<PointUserLog> records = resPage.getRecords();
-
-        if (CollectionUtil.isEmpty(records)) {
+        queryWrapper.eq(PointUserLog::getOpType, 1)          // 获取积分
+                .eq(PointUserLog::getIsExpired, 0)           // 未过期
+                .lt(PointUserLog::getExpireTime, nowDate)    // 已到期
+                .isNotNull(PointUserLog::getExpireTime)      // 有有效期
+                .gt(PointUserLog::getId, lastId)             // 游标推进
+                .orderByAsc(PointUserLog::getId)             // 必须排序
+                .last("LIMIT " + batchSize);                 // 物理限制数量
+
+        List<PointUserLog> records = pointUserLogService.list(queryWrapper);
+
+        if (CollUtil.isEmpty(records)) {
             return lastId;
         }
 
-        // 准备批量数据
-        List<PointUserLog> insertList = new ArrayList<>();
-        List<Long> updateIds = new ArrayList<>();
-        long currentMaxId = lastId;
+        // 2. 准备批量数据
+        List<PointUserLog> insertList = new ArrayList<>(records.size());
+        List<Long> updateIds = new ArrayList<>(records.size());
 
         for (PointUserLog pointLog : records) {
-            // 2. 构建过期流水(插入)
-            PointUserLog expireLog = new PointUserLog();
-            // ... (属性拷贝,建议使用 BeanUtil.copyProperties)
-            expireLog.setOpenId(pointLog.getOpenId());
-            expireLog.setActivityId(pointLog.getActivityId());
-            expireLog.setActivityName(pointLog.getActivityName());
-            expireLog.setTaskId(pointLog.getTaskId());
-            expireLog.setTaskType(pointLog.getTaskType());
-            expireLog.setOpType(3); // 过期
-            expireLog.setSourceLogId(pointLog.getId());
-            expireLog.setPoints(-pointLog.getPoints()); // 注意:过期通常是扣减,这里应该是负数,具体看业务定义
-            expireLog.setIsExpired(1); // 这里的1通常指这条流水本身是“过期”类型的记录,状态是生效的
+            PointUserLog expireLog = BeanUtil.copyProperties(pointLog, PointUserLog.class);
+
+            // 重置主键,让数据库自增
+            expireLog.setId(null);
+            // 覆盖关键字段
+            expireLog.setOpType(3);
+            expireLog.setSourceLogId(pointLog.getId()); // 关联原流水ID
+            expireLog.setIsExpired(1);
 
             insertList.add(expireLog);
             updateIds.add(pointLog.getId());
-
-            if (pointLog.getId() > currentMaxId) {
-                currentMaxId = pointLog.getId();
-            }
         }
 
-        // 3. 批量执行:利用数据库事务保证原子性
-        boolean success = false;
+        // 3. 批量执行
         try {
-            // 开启事务 (如果是Spring环境,建议在Service层加@Transactional)
-            // 批量插入过期流水
-            pointUserLogService.saveBatch(insertList);
+            // A. 批量插入过期流水
+            // 注意:saveBatch 默认不开启 rewriteBatchedStatements,数据量大时建议在配置中开启
+            boolean insertSuccess = pointUserLogService.saveBatch(insertList);
+
+            if (!insertSuccess) {
+                throw new RuntimeException("批量插入过期流水失败");
+            }
 
-            // 批量更新原记录状态 (使用 LambdaUpdateWrapper in 查询)
+            // B. 批量更新原记录状态
             LambdaUpdateWrapper<PointUserLog> updateWrapper = new LambdaUpdateWrapper<>();
             updateWrapper.in(PointUserLog::getId, updateIds)
                     .set(PointUserLog::getIsExpired, 1);
-            pointUserLogService.update(updateWrapper);
 
-            success = true;
+            boolean updateSuccess = pointUserLogService.update(updateWrapper);
+
+            if (!updateSuccess) {
+                throw new RuntimeException("批量更新原流水状态失败");
+            }
+
             log.info("批次处理成功,处理条数:{}", records.size());
+
         } catch (Exception e) {
-            log.error("批次处理失败", e);
-            // 根据业务需求决定是否抛出异常中断,或者记录错误日志继续
-            throw new RuntimeException("积分过期处理失败", e);
+            log.error("批次处理失败,lastId: {}, error: {}", lastId, e.getMessage(), e);
+            // 抛出异常触发 @Transactional 回滚,保证数据一致性
+            throw e;
         }
 
-        return success ? currentMaxId : lastId;
+        // 4. 返回游标位置
+        return records.get(records.size() - 1).getId();
     }
 
     /**
      * 获取用户当前可用积分余额
+     *
      * @param openId 用户openId
      * @return 当前余额
      */
@@ -590,4 +634,68 @@ public class massageTask {
         return pointUserLog.getPoints();
     }
 
+    /**
+     * 处理单批次活动过期逻辑
+     *
+     * @param nowDate   当前时间
+     * @param lastId    上一批次最后处理的ID(游标起点)
+     * @param batchSize 批次大小
+     * @return 本批次最后处理的ID
+     */
+    public Long processActivityBatch(Date nowDate, Long lastId, int batchSize) {
+
+        // 1. 构建分页查询对象
+        // 注意:虽然传入了页码1,但实际数据定位由 queryWrapper 中的 gt(id, lastId) 控制
+        Page<PointActivity> page = new Page<>(1, batchSize);
+
+        // 2. 构建查询条件
+        LambdaQueryWrapper<PointActivity> queryWrapper = new LambdaQueryWrapper<>();
+        queryWrapper.gt(PointActivity::getId, lastId)          // 核心游标:ID 大于上一批次的最大值
+                .ne(PointActivity::getStatus, 2)           // 排除已经是“已结束”的
+                .lt(PointActivity::getEndTime, nowDate)    // 核心条件:结束时间 < 当前时间
+                .orderByAsc(PointActivity::getId);         // 必须按 ID 排序,保证游标连续性
+
+        // 3. 执行分页查询
+        // 使用 service 层自带的 page 方法
+        Page<PointActivity> resultPage = pointActivityService.page(page, queryWrapper);
+
+        List<PointActivity> records = resultPage.getRecords();
+
+        // 4. 判断是否还有数据
+        if (CollUtil.isEmpty(records)) {
+            // 如果没有查到数据,返回当前的 lastId,外层循环检测到相等就会停止
+            return lastId;
+        }
+
+        // 5. 提取 ID 列表进行批量更新
+        Long currentLastId = records.get(records.size() - 1).getId();
+        List<Long> ids = records.stream().map(PointActivity::getId).collect(Collectors.toList());
+
+        UpdateWrapper<PointActivity> updateWrapper = new UpdateWrapper<>();
+        updateWrapper.in("id", ids)
+                .eq("status", 1); // 乐观检查:确保更新时状态仍为“进行中”,防止并发冲突
+
+        PointActivity updateEntity = new PointActivity();
+        updateEntity.setStatus(2);           // 状态改为 2 (已结束)
+        updateEntity.setUpdateTime(nowDate); // 更新时间
+        updateEntity.setUpdateBy("admin");   // 更新人
+
+        // 6. 执行更新
+        try {
+            boolean success = pointActivityService.update(updateEntity, updateWrapper);
+
+            if (success) {
+                log.info("批次处理成功,更新ID范围: {} - {}, 数量: {}", ids.get(0), currentLastId, ids.size());
+            } else {
+                log.warn("批次处理部分失败或无更新,ID范围: {} - {}", ids.get(0), currentLastId);
+            }
+        } catch (Exception e) {
+            log.error("批次处理失败,lastId: {}, error: {}", lastId, e.getMessage(), e);
+            throw e;
+        }
+
+        // 7. 返回本批次最大的 ID,作为下一轮的游标起点
+        return currentLastId;
+    }
+
 }