Skip to content

Java messageconsumer

Roy edited this page Nov 6, 2017 · 10 revisions

本指南以1.4.5版本的java客户端为起点编写。

在消息生产者发送消息成功后,消息就存储在服务端的数据目录里(服务端dataPath配置的目录),按照topic-partition的目录格式存储。接下来,消息的消费者就需要消费这些消息,这一节就是讲述如何消费某个topic下的消息。

配置消费者

每个Java的消费者都需要一个ConsumerConfig的配置实例。

消费者分组

在MetaQ里,消费者被认为是一个集群,也就是说认为是有一组的机器在共同分担消费一个topic。因此消费者配置ConsumerConfig中最重要的配置是group,每个消费者都必须告诉MetaQ它属于哪个group,然后MetaQ会找出这个group下所有注册上来的消费者,在他们之间做负载均衡,共同消费一个或多个topic。注意,不同group之间可以认为是不同的消费者,他们消费同一个topic下的消息的进度是不同。

举例来说,假设你有一个topic为business-logs,是所有业务系统的日志。然后现在你对这些日志要做两个事情:一个是存储到HDFS这样的分布式文件系统,以便后续做分析处理;以个是Twitter Storm这样的实时分析系统,做实时的数据分析、告警和展现。显然,这里你就需要两个group,比如我们有一个group叫hdfs-writer,它有三台机器同时消费business-logs,将日志存储到HDFS集群。同时,你也有另一个group叫storm-spouts,有5台机器用来给storm集群喂数据。这两个group是隔离,虽然是消费同一个topic,但是两者是消费进度(消费了多少个消息,等待消费多少个消息等信息)是不同的。但是同一个group内,例如hdfs-writer的三台机器,这三台机器是共同消费business-logs下的消息,同一条消息只会被这hdfs-writer三台机器中的一台处理,但是这条消息还会被twitter-spouts等其他分组内的某一台机器消费。

创建ConsumerConfig

创建ConsumerConfig并传入分组名称:

final String group = "hdfs-writer";
ConsumerConfig consumerConfig = new ConsumerConfig(group);

ConsumerConfig的其他重要选项还包括:

  • fetchRunnerCount, 因为MetaQ的消费者是以pull模型来从服务端拉取数据并消费,这个参数设置并行拉取的线程数,默认是CPUs个。关于消费的并发模型请看下面的并发处理小节。
  • fetchTimeoutInMills,同步抓取的请求超时,默认10秒,通常不需要修改此参数。
  • maxDelayFetchTimeInMills,当上一次没有抓取到的消息,抓取线程sleep的最大时间,默认5秒,单位毫秒。当某一次没有抓取到消息的时候,抓取线程会开始休眠maxDelayFetchTimeInMills的10分之1时间,如果下次还是没有抓到,则休眠maxDelayFetchTimeInMills的10分之2时间,以此类推直到最多休眠maxDelayFetchTimeInMills时间。中途如果任何一次抓取开始获取数据,则计数清零从10分之1重新开始计算。当你对消息的实时性特别敏感的时候应该调小此参数,并同时调小服务端的unflushInterval参数。
  • consumerId, 单个消费者的id,必须全局唯一,通常用于标识分组内的单个消费者,可不设置,系统会根据IP和时间戳自动生成。
  • offset, 第一次消费开始位置的offset,默认都是从服务端的最早数据开始消费。
  • commitOffsetPeriodInMills, 保存消费者已经消费的数据的offset的间隔时间,默认5秒,单位毫秒。更大的间隔,在故障和重启时间可能重复消费的消息更多,更小的间隔,可能给存储造成压力。
  • maxFetchRetries,同一条消息在处理失败情况下最大重试消费次数,默认5次,超过就跳过这条消息并调用RejectConsumptionHandler处理。关于RejectConsumptionHandler请看下面的拒绝处理小节。

这些参数都有相应的getter/setter方法来设置。

创建消费者

通过MessageSessionFactorycreateConsumer方法即可创建消费者:

 final MessageConsumer consumer = sessionFactory.createConsumer(consumerConfig);

Offset的存储

MetaQ的消费模型是一种拉取的模型,消费者根据上次消费数据的绝对偏移量(offset)从服务端的数据文件中拉取后面的数据继续消费,因此这个offset信息就非常关键,需要可靠地保存。默认情况下,MetaQ是将offset信息保存在你使用的zookeeper集群上,也就是ZkOffsetStorage所做的事情,它实现了OffsetStorage接口。通常这样的保存是可靠并且安全的,但是有时候可能你也需要其他选项,目前还提供两个不同的OffsetStorage实现:

  • LocalOffsetStorage,使用consumer的本地文件作为offset存储,默认存储在${HOME}/.meta_offsets的文件里。适合消费者分组只有一个消费者的情况,无需共享offset信息。例如广播类型的消费者就特别合适。
  • MysqlOffsetStorage,使用Mysql作为offset存储,使用前需要创建表结构:
CREATE TABLE `meta_topic_partition_group_offset` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `topic` varchar(255) NOT NULL,
  `partition` varchar(255) NOT NULL,
  `group_id` varchar(255) NOT NULL,
  `offset` int(11) NOT NULL,
  `msg_id` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `TOPIC_PART_GRP_IDX` (`topic`,`partition`,`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

你也可以实现自己的OffsetStorage存储。如果你想使用除了zookeeper之外的offset存储,可以在创建消费者的时候传入:

final MessageConsumer consumer = sessionFactory.createConsumer(consumerConfig,new MysqlOffsetStorage(dataSource));

mysql存储需要传入JDBC数据源。

第一次消费的offset初始值。

前面提到ConsumerConfig有个offset参数可以设置第一次消费的时候开始的绝对偏移量,默认这个参数是0,也就是从服务端现有消息的最小偏移量开始,从头开始消费所有消息。

但是,通常情况下,新的消费分组都是希望从最新的消息开始消费,ComsumerConfig提供了一个setConsumeFromMaxOffset(boolean always)方法来设置从最新位置开始消费。其中always参数表示是否每次消费者启动都从最新位置开始消费,这样就忽略了在消费者停止期间的消息。通常仅在测试的时候将always参数设置为true,以便每次测试最新的消息。除非你真的不需要消费者停止期间(比如重启间隔)的消息,否则不要将always设置为真。

订阅Topic

创建了消费者之后,接下来我们需要订阅某些topic来消费处理。

MessageListener

每个想消费的topic都必须提供一个消息处理器,用来处理该topic下的消息,这跟JMS里的MessgaeListener概念是一样的。MessageListener接口如下:

/**
 * 异步消息监听器
 * 
 * @author boyan
 * @Date 2011-4-23
 * 
 */
public interface MessageListener {
    /**
     * 接收到消息,只有messages不为空并且不为null的情况下会触发此方法
     * 
     * @param messages
     *            TODO 拼写错误,应该是单数,暂时将错就错吧
     */
    public void recieveMessages(Message message) throws InterruptedException;


    /**
     * 处理消息的线程池
     * 
     * @return
     */
    public Executor getExecutor();
}
  • recieveMessages接收到服务端的消息并做业务处理。
  • getExecutor用于提供给consumer处理这些消费使用的线程池,可以为null。

处理消息

recieveMessages处理每条你收到的消息,比如我可以简单地打印:

       new MessageListener() {

            @Override
            public void recieveMessages(final Message message) {
                System.out.println("Receive message " + new String(message.getData()));
            }
            @Override
            public Executor getExecutor() {
                return null;
            }
        }

并发处理

前面在提到ConsumerConfigfetchRunnerCount用来设定从服务端拉取数据的并发线程数目,默认是CPUs个。如果你的MessageListenergetExecutor方法返回null,也就是不提供一个线程池来处理消费,那么这些消息就会在抓取线程上处理,因此在多核机器上,这些消息其实也是并发被处理的,在单核机器上就是串行处理了(因为默认只有一个抓取线程)。

但是当getExecutor返回一个线程池的时候,抓取线程将只负责抓取并解析出消息,消息的处理将交给你提供的线程池来处理。比如你可以设定并发30个线程来处理消息:

final ExecutorService executor = Executors.newFixedThreadPool(30);
......

public Executor getExecutor(){
    return this.executor;
}

注意,getExecutor方法不要每次重复创建线程池,应当一直返回同一个线程池。

总结:

  • getExecutor返回null,处理消息是在抓取线程上执行,默认抓取线程数目等于你的CPU个数。如果你是多核机器,处理消息将是并发执行。
  • getExecutor返回一个线程池,消息处理将采用该线程池。

线程池繁忙

当线程池繁忙并且线程池的队列满的情况下,你可以采用不同的策略,比如默认的AbortPolicy,无论你采用什么策略,MetaQ的consumer都能保证只在消息被成功处理的情况下递增offset。

订阅和完成订阅

订阅消息通过subscribe方法即可:

final String topic ="meta-test";
int maxSize = 1024*1024;
MessageListener messageListener=......
consumer.subscribe(topic, maxSize, messageListener);

topic和messageListener不用解释大家都清楚,maxSize参数需要解释下。同样,因为MetaQ采用pull模型拉取消息消费,那么就涉及到每次拉取多少消息的问题。这个maxSize就是用来设置每一次拉取请求的buffer大小,单位是字节数。例如上面的例子是每次尽量拉取最多1M的消息。因为可能没有足够的消息来填满1M,所以返回的消息总字节数是小于等于这个设定值。

因此,这个maxSize至少要比你的最大消息还要大上20个字节(消息的额外头部大小),例如你最大的消息大小可能是1024,那么这个maxSize不能小于1044。通常我会建议将这个参数设置成最大消息大小的2-3倍。这样可以做批量拉取,并且不会导致重复消费的消息太多。

subscribe方法可订阅多个topic,但是对同一个topic只能调用一次:

consumer.subscribe("topic1",1024,messageListener1);
consumer.subscribe("topic2",1024,messageListener1);
consumer.subscribe("topic3",1024,messageListener2);

上面的代码同时订阅了topic1,topic2,topic3,其中1和2采用同一个消息处理器,而3采用自己的消费处理器。

在调用了subsribe订阅消息后,订阅过程并没有真正开始,还需要调用completeSubscribe来使订阅生效。subscribe支持链式调用:

consumer.subscribe("topic1",1024,messageListener1).subscribe("topic2",1024,messageListener1).completeSubscribe();

completeSubscribe正常完成的情况下,消费者将从zookeeper上查找到提供这些topic的服务器并建立TCP链接,然后启动抓取线程,按照maxSize的大小抓取消息,抓取后的消息解析出来后提交给消息处理器的receiveMessages方法处理。

回滚消息

MetaQ的Consumer会在两种情况下回滚一条消息并重试消费:

  • recieveMessages抛出任何运行时异常,都将导致消费回滚并重试。
  • 在消息处理过程中调用Messgae.setRollbackOnly()方法主动回滚消息。

消息被回滚后将会尝试重新调用recieveMessages做消费,如果连续消费失败超过maxFetchRetries设定的阈值,将会调用拒绝处理器RejectConsumptionHandler来处理。

拒绝消费处理器

RejectConsumptionHandler是1.4.4才引入的新接口,它类似JDK5线程池中的RejectedExecutionHandler,用在当某条消息重复多次无法消费成功情况下的特殊处理。默认它有两个实现:

  • DropPolicy,丢弃策略,简单地将该条消息丢掉
  • LocalRecoverPolicy,本地恢复策略,尝试存储这条消息到本地文件并在后台继续重试,处理消息继续下一条,不阻碍主线程。

默认采用的是LocalRecoverPolicy策略。LocalRecoverPolicy策略会将这些多次消费失败的消息存储在${HOME}/.meta_recover目录下,并在后台尝试重新消费这些消息。

你可以修改这个策略,通过MessageConsumer.setRejectConsumptionHandler(handler)方法,比如设置为丢弃策略:

consumer.setRejectConsumptionHandler(new SimpleMessageConsumer.DropPolicy());

错误处理

通常来说,你应该将任何处理消息过程中产生的业务异常包装(使用try/cactch)为RuntimeException并重新抛出给MetaQ处理来回滚消息,除非你确认这些异常不应该导致消息再次重复消费。中断异常InterruptException可以简单地再次抛出响应。

中断处理

消息处理器的recieveMessages方法应该可以响应中断,也就是说在处理线程被中断的情况下应该抛出InterruptException来响应。中断可能发生在consumer之间做重新负载均衡或者关闭的时候,正确的响应中断可以优雅地暂停或者停止处理线程的当前任务。关于线程中断的处理参阅《Java并发编程实践》一书。

消息过滤

从1.4.6版本开始,我们引入了新的ConsumerMessageFilter接口,用于消费者过滤消息,在某些场景下你可能只想消费一个topic下满足一定要求的消息:

public interface ConsumerMessageFilter {
    /**
     * Test if the filter can accept a metaq message.Any exceptions threw by
     * this method means the message is not accepted.This method must be
     * thread-safe.
     * 
     * @param group
     * @param message
     * @return true if it accepts.
     */
    public boolean accept(String group, Message message);
}

这个接口的实现必须是线程安全的,抛出任何异常都将被认为消息不被接受,也就是忽略消费。如果可以消费,返回true就可。

比如下面这个实现,只接收消息属性为accept的消息:

package com.taobao.metamorphosis.example.filter;

import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.consumer.ConsumerMessageFilter;


public class AttributeMessageFilter implements ConsumerMessageFilter {

    @Override
    public boolean accept(String group, Message message) {
        if (message.getAttribute() == null) {
            return false;
        }
        return message.getAttribute().equals("accept");
    }

}

在订阅的subscribe方法传入该实现:

    messageConsumer.subscribe(topic, 1024*1024, messageListener, new AttributeMessageFilter());

那么messageListener将只接收符合条件的消息。

关闭

consumer同样使用shutdown方法来关闭:

consumer.shutdown();