秒杀项目实践(五)——交易性能瓶颈优化

PunkLu 2019年12月22日 40次浏览

交易性能瓶颈优化

压测及瓶颈分析

使用/order/createorder?token=8237e7df-21a3-4c2e-af9c-df6b47272871接口进行交易下单性能压测,方法为post、并指定好itemId、amountId和promoId的值。进行压测可以发现,交易性能非常差。查看服务器性能指标及代码分析可以发现,交易验证完全依赖数据库,并且减库存的SQL会产生行锁,所有减库存的操作都是串行进行的,会影响交易链路的性能。

交易验证优化

优化校验商品及其秒杀活动是否有效的代码:

在ItemService接口中新增判断活动是否有效的接口:

// 验证Item及PromoModel是否有效
ItemModel getItemByIdCache(Integer id);

在ItemServiceImpl中增加实现:

@Override
    public ItemModel getItemByIdCache(Integer id) {
        ItemModel itemModel = (ItemModel) redisTemplate.opsForValue().get("item_validate_" + id);
        // 如果缓存中不存在,去数据库中查找
        if (itemModel == null){
            itemModel = this.getItemById(id);
            redisTemplate.opsForValue().set("item_validate_" + id,itemModel);
        }
        return itemModel;
    }

替换OrderServiceImpl中下单方法createOrder中的:

ItemModel itemModel = itemService.getItemById(itemId);

为:

ItemModel itemModel = itemService.getItemByIdCache(itemId);

同理,可以优化用户是否登录的判断逻辑:

在UserService接口中新增方法:

// 通过缓存获取用户对象
UserModel getUserByIdInCache(Integer id);

在UserServiceImpl中增加实现:

@Override
    public UserModel getUserByIdInCache(Integer id) {
        UserModel userModel = (UserModel)redisTemplate.opsForValue().get("user_validate_"+id);
        // 如果Redis中不存在,则从数据库中获取
        if (userModel == null) {
            userModel = this.getUserById(id);
            redisTemplate.opsForValue().set("user_validate_" + id,userModel);
            redisTemplate.expire("user_validate_"+id,10, TimeUnit.MINUTES);
        }
        return userModel;
    }

在OrderServiceImpl中的下单方法createOrder中替换:

UserModel userModel = userService.getUserById(userId);

为:

UserModel userModel = userService.getUserByIdInCache(userId);

启动项目,走完一遍登录到下单的流程,可以发现没有问题。

再次压测

将项目打包上传至服务器重新部署,再次压测,可以发现,性能得到了很大的增强。查看linux服务器性能,可以发现性能全部堆积到了Redis上而不是之前的mysql上。

库存行锁优化

交易最后都要执行下面这条SQL:

<update id="decreaseStock">
    update item_stock
    set stock = stock - #{amount}
    where item_id = #{itemId} and stock >= #{amount}
</update>

因为item_id字段在创建的时候被加上了索引,数据库会在item_id = #加上行锁而不是默认的表锁。

优化策略:

  1. 将扣减库存缓存进Redis中

  2. 异步同步数据库

    将Redis中的扣减库存数据异步化的同步进数据库中

  3. 库存数据库最终一致性保证

首先实现扣减库存缓存化:

在PromoService中添加一个接口:

/**
     * 发布秒杀活动
     * @param promoId
     */
void publishPromo(Integer promoId);

在PromoServiceImpl中增加实现,将秒杀商品的库存放到Redis中:

@Override
    public void publishPromo(Integer promoId) {
        // 通过活动id获取活动
        PromoDO promoDO = promoDOMapper.selectByPrimaryKey(promoId);
        if (promoDO.getItemId() == null || promoDO.getItemId().intValue() == 0){
            // 说明活动不存在
            return;
        }
        ItemModel itemModel = itemService.getItemById(promoDO.getItemId());
        // 将库存同步到redis内
        redisTemplate.opsForValue().set("promo_item_stock_" + itemModel.getId(),itemModel.getStock());

}

在ItemController中添加发布接口:

/**
     * 将秒杀商品的库存保存到Redis中
     * @param id
     * @return
     */
    @RequestMapping(value = "/publishpromo",method = RequestMethod.GET)
    @ResponseBody
    public CommonReturnType publishpromo(@RequestParam(name = "id")Integer id){
        promoServive.publishPromo(id);
        return CommonReturnType.create(null);
    }

启动项目,访问http://localhost:8090/item/publishpromo?id=1测试发现可以将商品的库存放入到Redis中。

接下来实现下单时从Redis减库存,修改原有的ItemServiceImpl中的decreaseStock方法:

@Override
    public boolean decreaseStock(Integer itemId, Integer amount) {
        int affectRow = itemStockDOMapper.decreaseStock(itemId,amount);
        if (affectRow > 0 ){
            // 受影响行数大于0,说明扣减库存成功
            return true;
        }else {
            // 受影响行数小于0,说明扣减库存失败
            return false;
        }
    }

为:

@Override
    public boolean decreaseStock(Integer itemId, Integer amount) {
        // int affectRow = itemStockDOMapper.decreaseStock(itemId,amount);
        // result即完成操作后剩下的库存数
        Long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
        if (result >= 0 ){
            // 受影响行数大于0,说明扣减库存成功
            return true;
        }else {
            // 受影响行数小于0,说明扣减库存失败
            return false;
        }
    }

重启项目,可以发现下单成功后,Redis中的库存数减一。

这样就算完成了下单库存缓存化,但是这样一来库存全部在Redis中,Redis如果发生宕机等事故会造成库存信息丢失。可以再通过异步消息扣减数据库内存来保障数据高可用性。

异步同步数据库库存

在上面商品库存缓存化的基础上,使用异步消息扣减数据库中的内存。保证最终一致性。这里使用RocketMQ实现。

RocketMQ的安装可以参照

代码层面实现异步操作

生产者实现:

引入RocketMQ客户端依赖:

<!-- RocketMQ客户端 -->
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.3.0</version>
    </dependency>

application.properties添加RocketMQ配置:

# MQ NameServer的地址
mq.nameserver.addr=192.168.1.112:9876
mq.topicname=stock

新建mq包,并新建MqProducer生产者和MqConsumer消费者类:

MqProducer:

package tech.punklu.seckill.mq;

import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

/**
 * Mq消息生产者
 */
@Component
public class MqProducer {

    private DefaultMQProducer producer;

    @Value("${mq.nameserver.addr}")
    private String nameAddr;

    @Value("${mq.topicname}")
    private String topicName;

    /**
     * Bean初始化完成后调用
     */
    @PostConstruct
    public void init() throws MQClientException {
        // 做MqProducer的初始化,producer_group为生产者组,没有实际意义
        producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr(nameAddr);
        producer.start();
    }

    /**
     * 同步库存扣减消息
     * @param itemId
     * @param amount
     * @return
     */
    public boolean asyncReduceStock(Integer itemId,Integer amount) {
        Map<String,Object> bodyMap = new HashMap<>();
        bodyMap.put("itemId",itemId);
        bodyMap.put("amount",amount);
        Message message = new Message(topicName,"increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
        try {
            producer.send(message);
        } catch (MQClientException e) {
            e.printStackTrace();
            return false;
        } catch (RemotingException e) {
            e.printStackTrace();
            return false;
        } catch (MQBrokerException e) {
            e.printStackTrace();
            return false;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }
}

MqConsumer:

package tech.punklu.seckill.mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
 * Mq消息消费者
 */
@Component
public class MqConsumer {

    private DefaultMQPushConsumer consumer;

    @Value("${mq.nameserver.addr}")
    private String nameAddr;

    @Value("${mq.topicname}")
    private String topicName;

    @PostConstruct
    public void init() throws MQClientException {
        consumer = new DefaultMQPushConsumer("stock_consumer_group");
        consumer.setNamesrvAddr(nameAddr);
        // 订阅topicName这个topic上的所有消息
        consumer.subscribe(topicName,"*");

        // 接收到消息后的处理方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 接收到消息后扣减数据库的库存
                Message message = list.get(0);
                String jsonStr = new String(message.getBody());
                Map<String,Object> map = JSON.parseObject(jsonStr, Map.class);
                Integer itemId = (Integer) map.get("itemId");
                Integer amount = (Integer) map.get("amount");
                itemStockDOMapper.decreaseStock(itemId,amount);
                // 设置返回消费成功,RocketMQ即不会再次推送
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

然后将ItemServiceImpl中的decreaseStock扣减库存方法从:

@Override
    public boolean decreaseStock(Integer itemId, Integer amount) {
        // int affectRow = itemStockDOMapper.decreaseStock(itemId,amount);
        // result即完成操作后剩下的库存数
        Long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
        if (result > 0 ){
            // 受影响行数大于0,说明扣减库存成功
            return true;
        }else {
            // 受影响行数小于0,说明扣减库存失败
            return false;
        }
    }

修改为:

@Override
    public boolean decreaseStock(Integer itemId, Integer amount) {
        // int affectRow = itemStockDOMapper.decreaseStock(itemId,amount);
        // result即完成操作后剩下的库存数
        Long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
        if (result >= 0 ){
            // 受影响行数大于0,说明扣减库存成功
            boolean mqResult = mqProducer.asyncReduceStock(itemId,amount);
            if (!mqResult){
                redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
                return false;
            }
            return true;
        }else {
            // 受影响行数小于0,说明扣减库存失败
            redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
            return false;
        }
    }

即增加扣减库存成功后向RocketMQ发送扣减数据库的消息,消费者接收到之后再异步扣减消息。

重启项目后完成一次下单操作可以发现,redis中的库存会减一,RocketMQ的Consumer接收到消息后也会将数据库中的库存减去一。

事务型消息

经过以上的代码完成了异步扣减数据库内的库存的功能。但是在OrderServiceImpl中的下订单方法createOrder中可以看到减完库存后还有创建订单的代码需要执行,如果扣完库存后创建订单出现问题事务回滚后,Redis中和RocketMQ中的扣减库存消息没有回滚就会出现少卖的现象。

修复

在ItemService接口中新增增加Redis中缓存的库存方法:

// 库存增加
boolean increaseStock(Integer itemId,Integer amount);

在ItemServiceImpl中增加实现:

@Override
public boolean increaseStock(Integer itemId, Integer amount) {
        redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
        return true;
}

将ItemServiceImpl中的decreaseStock扣减库存方法从:

@Override
public boolean decreaseStock(Integer itemId, Integer amount) {
        // int affectRow = itemStockDOMapper.decreaseStock(itemId,amount);
        // result即完成操作后剩下的库存数
        Long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
        if (result >= 0 ){
            // 受影响行数大于0,说明扣减库存成功
            boolean mqResult = mqProducer.asyncReduceStock(itemId,amount);
            if (!mqResult){
                redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
                return false;
            }
            return true;
        }else {
            // 受影响行数小于0,说明扣减库存失败
            redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue());
            return false;
        }
    }

修改为:

 @Override
    public boolean decreaseStock(Integer itemId, Integer amount) {
        // int affectRow = itemStockDOMapper.decreaseStock(itemId,amount);
        // result即完成操作后剩下的库存数
        Long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
        if (result >= 0 ){
            return true;
        }else {
            // 受影响行数小于0,说明扣减库存失败
            this.increaseStock(itemId,amount);
            return false;
        }
    }

将原来的发送MQ消息异步扣减库存的代码单独拿出来,在ItemService接口中新增:

// 异步更新库存
    boolean asyncDecreaseStock(Integer itemId,Integer amount);

在ItemServiceImpl中增加实现:

@Override
    public boolean asyncDecreaseStock(Integer itemId, Integer amount) {
        boolean mqResult = mqProducer.asyncReduceStock(itemId,amount);
        return mqResult;
}

在OrderServiceImpl类中的createOrder方法返回之前增加代码:

// 在最近的一个Transactional注解标注的事务被成功Commit之后被执行
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                // 异步更新库存
                boolean mqResult = itemService.asyncDecreaseStock(itemId,amount);
            }
        });

即在createOrder的事务完成之后才向MQ发送异步扣减库存的消息。但是这样还是不能保证极端情况下的异步扣减肯定能成功。这种情况下可以使用RocketMQ提供的事务型消息。

使用RocketMQ事务型消息保证事务
新建库存流水表
DROP TABLE IF EXISTS `stock_log`;
CREATE TABLE `stock_log` (
  `stock_log_id` varchar(64) NOT NULL,
  `item_id` int(11) NOT NULL DEFAULT '0',
  `amount` int(11) NOT NULL DEFAULT '0',
  `status` int(11) NOT NULL DEFAULT '0' COMMENT '//1表示初始状态,2表示下单扣减库存成功,3表示下单回滚',
  PRIMARY KEY (`stock_log_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

新建与库存流水表对应的DAO层

库存流水表DO对象:

package tech.punklu.seckill.dataobject;

public class StockLogDO {

    // 库存流水编号
    private String stockLogId;

    // 商品编号
    private Integer itemId;

    // 购买商品的数量
    private Integer amount;

    // 流水状态 1表示初始状态,2表示下单扣减库存成功,3表示下单回滚'
    private Integer status;


    public String getStockLogId() {
        return stockLogId;
    }


    public void setStockLogId(String stockLogId) {
        this.stockLogId = stockLogId == null ? null : stockLogId.trim();
    }


    public Integer getItemId() {
        return itemId;
    }


    public void setItemId(Integer itemId) {
        this.itemId = itemId;
    }


    public Integer getAmount() {
        return amount;
    }


    public void setAmount(Integer amount) {
        this.amount = amount;
    }


    public Integer getStatus() {
        return status;
    }


    public void setStatus(Integer status) {
        this.status = status;
    }
}

DAO层接口:

package tech.punklu.seckill.dao;

import tech.punklu.seckill.dataobject.StockLogDO;

public interface StockLogDOMapper {

    int deleteByPrimaryKey(String stockLogId);


    int insert(StockLogDO record);


    int insertSelective(StockLogDO record);


    StockLogDO selectByPrimaryKey(String stockLogId);


    int updateByPrimaryKeySelective(StockLogDO record);


    int updateByPrimaryKey(StockLogDO record);
}

DAO层XML文件:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="tech.punklu.seckill.dao.StockLogDOMapper">
    <resultMap id="BaseResultMap" type="tech.punklu.seckill.dataobject.StockLogDO">
        <id column="stock_log_id" jdbcType="VARCHAR" property="stockLogId" />
        <result column="item_id" jdbcType="INTEGER" property="itemId" />
        <result column="amount" jdbcType="INTEGER" property="amount" />
        <result column="status" jdbcType="INTEGER" property="status" />
    </resultMap>
    <sql id="Base_Column_List">
        stock_log_id, item_id, amount, status
    </sql>
    <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap">
        select
        <include refid="Base_Column_List" />
        from stock_log
        where stock_log_id = #{stockLogId,jdbcType=VARCHAR}
    </select>
    <delete id="deleteByPrimaryKey" parameterType="java.lang.String">
        delete from stock_log
        where stock_log_id = #{stockLogId,jdbcType=VARCHAR}
    </delete>
    <insert id="insert" parameterType="tech.punklu.seckill.dataobject.StockLogDO">
        insert into stock_log (stock_log_id, item_id, amount,
        status)
        values (#{stockLogId,jdbcType=VARCHAR}, #{itemId,jdbcType=INTEGER}, #{amount,jdbcType=INTEGER},
        #{status,jdbcType=INTEGER})
    </insert>
    <insert id="insertSelective" parameterType="tech.punklu.seckill.dataobject.StockLogDO">
        insert into stock_log
        <trim prefix="(" suffix=")" suffixOverrides=",">
            <if test="stockLogId != null">
                stock_log_id,
            </if>
            <if test="itemId != null">
                item_id,
            </if>
            <if test="amount != null">
                amount,
            </if>
            <if test="status != null">
                status,
            </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides=",">
            <if test="stockLogId != null">
                #{stockLogId,jdbcType=VARCHAR},
            </if>
            <if test="itemId != null">
                #{itemId,jdbcType=INTEGER},
            </if>
            <if test="amount != null">
                #{amount,jdbcType=INTEGER},
            </if>
            <if test="status != null">
                #{status,jdbcType=INTEGER},
            </if>
        </trim>
    </insert>
    <update id="updateByPrimaryKeySelective" parameterType="tech.punklu.seckill.dataobject.StockLogDO">
        update stock_log
        <set>
            <if test="itemId != null">
                item_id = #{itemId,jdbcType=INTEGER},
            </if>
            <if test="amount != null">
                amount = #{amount,jdbcType=INTEGER},
            </if>
            <if test="status != null">
                status = #{status,jdbcType=INTEGER},
            </if>
        </set>
        where stock_log_id = #{stockLogId,jdbcType=VARCHAR}
    </update>
    <update id="updateByPrimaryKey" parameterType="tech.punklu.seckill.dataobject.StockLogDO">
        update stock_log
        set item_id = #{itemId,jdbcType=INTEGER},
        amount = #{amount,jdbcType=INTEGER},
        status = #{status,jdbcType=INTEGER}
        where stock_log_id = #{stockLogId,jdbcType=VARCHAR}
    </update>
</mapper>
事务型消息实现

在MqProducer中新增事务型消息生产者类变量:

private TransactionMQProducer transactionMQProducer;

并在MqProducer之前写好的init()初始化方法中新增:

transactionMQProducer = new TransactionMQProducer("transaction_producer_group");
        transactionMQProducer.setNamesrvAddr(nameAddr);
        // 发送完事务型消息后要做的操作,这里是发送完异步扣减数据库库存的消息后,创建订单并将之前发送的事务型消息变为可被消费的状态
        transactionMQProducer.setTransactionListener(new TransactionListener() {
            // 此方法默认的返回值是LocalTransactionState.UNKNOW
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                // 真正要做的事
                Integer itemId = (Integer) ((Map)o).get("itemId");
                Integer promoId = (Integer) ((Map)o).get("promoId");
                Integer userId = (Integer) ((Map)o).get("userId");
                Integer amount = (Integer) ((Map)o).get("amount");
                String stockLogId = (String)((Map)o).get("stockLogId");
                try {
                    // 创建订单
                    orderService.createOrder(userId,itemId,promoId,amount,stockLogId);
                } catch (BusinessException e) {
                    e.printStackTrace();
                    // 出现异常,之前发送的protect消息被回滚
                    StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
                    stockLogDO.setStatus(3);
                    stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                // 创建订单完成后,将之前发送的Protect的消息变为可被消费的
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            // 为了防止executeLocalTransaction方法没有明确的返回值告诉消息中间件是提交还是回滚,
            // 消息中间件会定期的回调此方法确认应该进行提交还是回滚操作
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                // 根据是否扣减库存成功,来判断要返回commit还是rollback还是继续保持UNKOWN的状态
                String jsonString = new String(messageExt.getBody());
                Map<String,Object> map = JSON.parseObject(jsonString,Map.class);
                Integer itemId = (Integer)map.get("itemId");
                Integer amount = (Integer)map.get("amount");
                String stockLogId = (String)map.get("stockLogId");
                StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
                if (stockLogDO == null){
                    return LocalTransactionState.UNKNOW;
                }
                // 如果此库存流水状态是可提交给消费者消费的,则告诉RocketMQ将这条消息设置可被消费者消费的状态
                if (stockLogDO.getStatus().intValue() == 2){
                    return LocalTransactionState.COMMIT_MESSAGE;
                }else if (stockLogDO.getStatus().intValue() == 1){
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });
        transactionMQProducer.start();

并在MqProducer中增加基于RocketMQ事务型消息的异步减库存代码:

// 事务型同步库存扣减消息
    public boolean transactionAsyncReduceStock(Integer userId,Integer itemId,Integer promoId,Integer amount,String stockLogId){
        // bodyMap是给Message消息本体用的,可用于checkLocalTransaction方法校验消息状态是否可被改为可消费状态
        Map<String,Object> bodyMap = new HashMap<>();
        bodyMap.put("itemId",itemId);
        bodyMap.put("amount",amount);
        bodyMap.put("stockLogId",stockLogId);

        // argsMap是给本地事务完成后调用的executeLocalTransaction方法用的,用于创建订单
        Map<String,Object> argsMap = new HashMap<>();
        argsMap.put("itemId",itemId);
        argsMap.put("amount",amount);
        argsMap.put("userId",userId);
        argsMap.put("promoId",promoId);
        argsMap.put("stockLogId",stockLogId);
        Message message = new Message(topicName,"increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8")));
        // 事务型消息发送结果
        TransactionSendResult transactionSendResult = null;
        try {
            // 事务型消息发送出去之后到达broker后不是可被消费的状态,而是protect状态
            // 在protect状态下的消息不会被消费
            // argsMap中的值会被executeLocalTransaction中的参数接收到
            transactionSendResult = transactionMQProducer.sendMessageInTransaction(message,argsMap);
        } catch (MQClientException e) {
            e.printStackTrace();
            return false;
        }
        // 如果发送结果是true表明下单成功,false表示下单失败
        if (transactionSendResult.getLocalTransactionState() ==  LocalTransactionState.ROLLBACK_MESSAGE){
            return false;
        }else if (transactionSendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE){
            return true;
        }else {
            return false;
        }
    }

将ItemServiceImpl类中的decreaseStock方法改为:

 @Override
    public boolean decreaseStock(Integer itemId, Integer amount) {
        // int affectRow = itemStockDOMapper.decreaseStock(itemId,amount);
        // result即完成操作后剩下的库存数
        Long result = redisTemplate.opsForValue().increment("promo_item_stock_"+itemId,amount.intValue()*-1);
        if (result > 0 ){
            // 受影响行数大于0,说明扣减库存成功
            return true;
        }else if(result == 0){
            // 受影响行数大于0,也说明扣减库存成功,但已售罄,打上已售罄的标识
            redisTemplate.opsForValue().set("promo_item_stock_invalid_"+itemId,"true");
            // 返回更新库存成功
            return true;
        }else {
            // 受影响行数小于0,说明扣减库存失败
            this.increaseStock(itemId,amount);
            return false;
        }
    }

将OrderServiceImpl类中的createOrder方法改为:

@Override
    @Transactional
    public OrderModel createOrder(Integer userId, Integer itemId,Integer promoId, Integer amount,String stockLogId) throws BusinessException {
        // 校验下单状态,下单的商品是否存在,用户是否合法,购买数量是否正确
        // ItemModel itemModel = itemService.getItemById(itemId);
        // 优化为从Redis中缓存中获取活动是否有效
        ItemModel itemModel = itemService.getItemByIdCache(itemId);
        if (itemModel == null){
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"商品信息不存在");
        }

        // UserModel userModel = userService.getUserById(userId);
        // 优化为从Redis缓存中获取相应的用户信息
        UserModel userModel = userService.getUserByIdInCache(userId);
        if (userModel == null){
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"用户信息不存在");
        }
        if (amount <= 0 || amount > 99){
            throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"数量信息不正确");
        }

        // 校验秒杀活动信息
        if (promoId != null){
            // 校验对应活动是否存在这个适用商品
            if (promoId.intValue() != itemModel.getPromoModel().getId()){
                throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"秒杀活动信息不正确");
            }else if (itemModel.getPromoModel().getStatus().intValue() !=2){
                // 校验秒杀活动是否正在进行中
                throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"秒杀活动还未开始");
            }

        }
        // 落单减库存
        boolean result = itemService.decreaseStock(itemId,amount);
        if (!result){
            throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH);
        }
        // 订单入库
        OrderModel orderModel = new OrderModel();
        orderModel.setUserId(userId);
        orderModel.setItemId(itemId);
        orderModel.setAmount(amount);
        // 若promoId不为null,说明是秒杀活动相关下单,以秒杀活动价格计算,否则,以正常价格计算
        if (promoId != null){
            orderModel.setItemPrice(itemModel.getPromoModel().getPromoItemPrice());
        }else {
            orderModel.setItemPrice(itemModel.getPrice());
        }
        orderModel.setPromoId(promoId);
        orderModel.setOrderPrice(orderModel.getItemPrice().multiply(new BigDecimal(amount)));

        // 生成交易流水号,订单号
        orderModel.setId(generateOrderNo());
        OrderDO orderDO = convertFromOrderModel(orderModel);
        orderDOMapper.insertSelective(orderDO);
        // 增加商品销量
        itemService.increaseSales(itemId,amount);

        // 设置库存流水状态为成功
        StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
        if (stockLogDO == null){
            throw new BusinessException(EmBusinessError.UNKNOWN_ERROR);
        }
        // 将库存流水状态设置为成功,告诉RocketMQ这条消息可提交给消费者消费
        stockLogDO.setStatus(2);
        stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);


/*        // 在最近的一个Transactional注解标注的事务被成功Commit之后被执行
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
            @Override
            public void afterCommit() {
                // 异步更新库存
                boolean mqResult = itemService.asyncDecreaseStock(itemId,amount);
*//*                // 若创建MQ异步更新库存消息失败,增加Redis内的库存数量并回滚此方法的事务
                if (!mqResult){
                    itemService.increaseStock(itemId,amount);
                    throw new BusinessException(EmBusinessError.MQ_SEND_FAIL);
                }*//*
            }
        });*/

        // 返回前端
        return orderModel;
    }

然后将OrderController的createOrder下单方法改为:

/**
     * 封装下单请求
     * @param itemId
     * @param amount
     * @return
     */
    @RequestMapping(value = "/createorder",method = RequestMethod.POST,consumes = CONTENT_TYPE_FORMED)
    @ResponseBody
    public CommonReturnType createOrder(@RequestParam(name = "itemId")Integer itemId,
                                        @RequestParam(name = "amount")Integer amount,
                                        @RequestParam(name = "promoId")Integer promoId) throws BusinessException {
       /*Boolean isLogin =  (Boolean) httpServletRequest.getSession().getAttribute("IS_LOGIN");
       if (isLogin == null || !isLogin.booleanValue()){
            throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户未登录");
       }
       // 获取用户的登录信息
        UserModel userModel = (UserModel)httpServletRequest.getSession().getAttribute("LOGIN_USER");*/
       String token = httpServletRequest.getParameterMap().get("token")[0];
       if (StringUtils.isEmpty(token)){
           throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户未登录");
       }
       UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token);
       if (userModel == null){
           throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户未登录");
       }
       // 先从Redis检查是否已售罄
        boolean result = redisTemplate.hasKey("promo_item_stock_invalid_"+itemId);
        if (result){
            throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH);
        }
        // OrderModel orderModel = orderService.createOrder(userModel.getId(),itemId,promoId,amount);
        // 变成使用MQ事务型消息
        // 先加入库存流水init状态
        String stockLogId = itemService.initStockLog(itemId,amount);
        // 再去完成对应的下单事务型消息机制
        boolean orderResult = mqProducer.transactionAsyncReduceStock(userModel.getId(),itemId,promoId,amount,stockLogId);
        if (!orderResult){
            throw new BusinessException(EmBusinessError.UNKNOWN_ERROR,"下单失败");
        }
        return CommonReturnType.create(null);
    }