触发事件
(1)评论后,发布通知
(2)点赞后,发布通知
(3)关注后,发布通知
  处理事件
(1)封装事件对象
(2)开发事件的生产者
(3)开发事件的消费者
  在entity包下新建Event类,

private String topic;
    private int userId;
    private int entityType;
    private int entityId;
    private int entityUserId;
    private Map<String, Object> data = new HashMap<>(); //让它具有扩展性

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) { //把void改为Event
        this.topic = topic;
        return this;  //添加这个返回值,为以后调用的灵活方便
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

//    public void setData(Map<String, Object> data) {
//        this.data = data;
//    }
    public Event setData(String key, Object value) {
        this.data.put(key,value);  //方便调用
        return this;
    }

  新建event包,在下面新建EventProducer类

@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    //处理事件(本质是发布消息)
    public void fireEvent(Event event){
        //将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }

}

  新建EventConsumer类

@Component
public class EventConsumer implements CommunityConstant {

    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private MessageService messageService;
    
	@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
        if(record == null || record.value() == null){
            logger.error("消息的内容为空!");
            return;
        }

        Event event = JSONObject.parseObject(record.value().toString(),Event.class);  //json转为对象
        if(event==null){
            logger.error("消息格式错误!");
            return;
        }
        //发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        //以下是系统通知的内容
        Map<String, Object> content = new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());

        if(!event.getData().isEmpty()){ //如果它不为空
            for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }

        message.setContent(JSONObject.toJSONString(content));
        messageService.addMessage(message);
    }

  在CommunityConstant类里添加

/**
     * 主题:评论
     */
    String TOPIC_COMMENT = "comment";
    /**
     * 主题:点赞
     */
    String TOPIC_LIKE = "like";

    /**
     * 主题:关注
     */
    String TOPIC_FOLLOW = "follow";
    
    /**
     * 系统用户ID
     */
    int SYSTEM_USER_ID = 1;

  在CommentController类addComment方法里的commentService.addComment(comment);之后添加

// 触发评论事件
        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId",discussPostId); //Me:这个帖子Id(discussPostId)和EntityId有什么不同?
        if(comment.getEntityType()==ENTITY_TYPE_POST){
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }
        eventProducer.fireEvent(event); //这是并发的、异步的(效率较高)

  在CommentMapper接口类里补充

Comment selectCommentById(int id);

  在comment-mapper.xml里补充

<select id="selectCommentById" resultType="Comment">
        select <include refid="selectFields"></include>
        from comment
        where id = #{id}
    </select>

  在CommentService类里补充

public Comment findCommentById(int id){
        return commentMapper.selectCommentById(id);
    }

  在LikeController类like方法里添加一个参数int postId(相应地改动discuss-detail.html和discuss.js来获取),在map.put(“likeStatus”,likeStatus);之后添加

// 触发点赞事件
        if(likeStatus==1){  //取消点赞就不必通知了
            Event event = new Event()
                    .setTopic(TOPIC_LIKE)
                    .setUserId(hostHolder.getUser().getId())
                    .setEntityType(entityType)
                    .setEntityId(entityId)
                    .setEntityUserId(entityUserId)
                    .setData("postId",postId);
            eventProducer.fireEvent(event);
        }

  在FollowController类follow方法里followService.follow(user.getId(),entityType,entityId);之后添加

//触发关注事件
        Event event = new Event()
                .setTopic(TOPIC_FOLLOW)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(entityType)
                .setEntityId(entityId)
                .setEntityUserId(entityId);
        //关注事件(当点击到关注你的某人时,需要的是跳转到某人的个人主页上)和评论、点赞不一样,而不需要链接到帖子详情页面所以不需要postId
        eventProducer.fireEvent(event);

  附:windows下kafka不稳定,当出现锁死问题启动不成功的时候,可以把server.properties里设置的日志目录,如d:/javing/workspace/data/kafka-logs目录删掉,重新启动一般可解决问题。
  程序启动报错,在ServiceLogAspect类before方法里HttpServletRequest request = attributes.getRequest(); 之前添加

if(attributes==null){
            return;  //像调用kafka消息队列的情况,不记日志了
        }
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐