上篇文章大致讲述了同步消费者和异步消费者接收消息的异同(详见《ActiveMQ中消费者是如何接收消息的(一)》http://manzhizhen.iteye.com/blog/2094130 ),但我们还未讲到消息是在什么时候放入消费者ActiveMQMessageConsumer类的“消息容器”unconsumedMessages中的,这很关键,因为为了解耦,消费者类不需要知道你ActiveMQ是怎么获得和分发消息的,我只知道一旦我发现unconsumedMessages中还有未消费的消息,我就会去尽早消费它。

        直接使用过ActiveMQ API编码的人一定知道,一个MOM地址(即连接器地址)由ActiveMQConnectionFactory绑定,ActiveMQConnectionFactory下可以创建多个ActiveMQConnection(所谓的工厂模式),而一个ActiveMQConnection(连接)下可以创建多个ActiveMQSession(会话),而ActiveMQSession下又可以创建多个ActiveMQMessageConsumer(消费者)和多个ActiveMQMessageProducer(生产者)。所以,你大致也猜想得到,消息从MOM服务器发出后,最先到达的是消费者所属的ActiveMQConnection,ActiveMQConnection根据消息的来源(Destination)来分给其下的ActiveMQSession,ActiveMQSession接收到消息后,又会分给其下的消费者们(当然,ActiveMQSession会把消息分发的任务交给它的“手下”ActiveMQSessionExecutor类),这样,消费者的unconsumedMessages中就有了消息(神说,要有光,就有了光 ;我说,要有房,于是,我就买不起房)。

       下面,我们来从源码的角度,来细细剖析这一过程,碎碎念,碎碎念。。。让我们来发动起自己的大脑,由于JMS系统要求支持多种通信协议(什么TCP、UDP之类的)和异构系统(比如Java系统和C++系统通信),如果没有这些特点,则这个JMS实现是没有竞争力的。所以,如果让你来设计一个JMS实现,你首先要做的就是把通信层给解耦,所以,就有了ActiveMQ中Transport接口和TransportSupport抽象类,让我们看看他们的签名:

public interface Transport extends Service  【此接口是为了让客户端有消息被异步发送、同步发送和被消费的能力】

public abstract class TransportSupport extends ServiceSupport implements Transport  【此抽象类是Transport 的一个有用的基础实现类】

       让咱们看看Transport接口中的几个主要方法签名:

void oneway(Object command) throws IOException; 【异步发送消息】

FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException;【带回调功能的异步请求的应答,如果responseCallback不为空,则在应答完成后将被调用】

Object request(Object command) throws IOException; 【异步请求应答】

Object request(Object command, int timeout) throws IOException; 【带超时时间的异步请求应答】

TransportListener getTransportListener(); 【获得当前的传送监听器】

void setTransportListener(TransportListener commandListener); 【设置传送监听器】 

void reconnect(URI uri) throws IOException; 【重定向到另一个地址】

void updateURIs(boolean rebalance,URI[] uris) throws IOException; 【提供可替代的一系列地址】

 

       而奇怪的是,作为Transport接口首要的实现类TransportSupport对其上面中的五个方法的直接实现却是——不支持。。源码如下:

    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
        throw new AssertionError("Unsupported Method");
    }

    public Object request(Object command) throws IOException {
        throw new AssertionError("Unsupported Method");
    }

    public Object request(Object command, int timeout) throws IOException {
        throw new AssertionError("Unsupported Method");
    }

    public void reconnect(URI uri) throws IOException {
        throw new IOException("Not supported");
    }

    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
        throw new IOException("Not supported");
    }

       也许是最常用的传送器不需要去支持这些功能,如果需要,实现类也可以去直接覆盖该方法。TransportSupport类中唯一的成员变量是TransportListener transportListener;,这也意味着一个传送器支持类绑定一个传送监听器类,传送监听器接口TransportListener 最重要的方法就是 void onCommand(Object command);,它用来处理命令 。TransportSupport类中最重要的方法是doConsume,它的作用就是用来“消费消息”,源码如下:

    public void doConsume(Object command) {
        if (command != null) {
            if (transportListener != null) {
                transportListener.onCommand(command);
            } else {
                LOG.error("No transportListener available to process inbound command: " + command);
            }
        }
    }

       其实说白了,TransportSupport只负责维护一些状态(很多状态都没有给实现,直接是不支持或返回false)和抛异常,最重要的事情都交给了它所绑定的传送监听器类TransportListener 了。如代码所示,它直接把Command(消息的内容、执行包装类)对象作为参数去调用传送监听器类的onCommand方法。 你如果看过TransportListener接口的实现类,你就会恍然大悟,大名鼎鼎的ActiveMQConnection就是其实现类之一,于是,消息就这样从传送层到达了我们的连接器上,我们先不急着继续往下追溯,因为传送层这部分还没说完。TransportSupport有很多实现类,具体的说,ActiveMQ支持的每种通信协议,都有对应的TransportSupport实现类。为了方便,因为我们最常用的是TCP协议,所以我们以其实现类TcpTransport来做说明,开门见山,直接给出其构造方法:

    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
                        URI localLocation) throws UnknownHostException, IOException {
        this.wireFormat = wireFormat;
        this.socketFactory = socketFactory;
        try {
            this.socket = socketFactory.createSocket();
        } catch (SocketException e) {
            this.socket = null;
        }
        this.remoteLocation = remoteLocation;
        this.localLocation = localLocation;
        setDaemon(false);
    }

       由这引出了TcpTransport几个重要的成员变量:

protected final URI remoteLocation; // JMS消息服务器地址,也就是创建连接器工程是配置的地址

protected final WireFormat wireFormat; // 用来处理数据包命令、流、通道和数据报的进和出

protected Socket socket; // 由构造函数中传入的SocketFactory来创建,采用TCP协议当然离不开套接字

protected DataOutputStream dataOut;  // 从socket.getOutputStream()包装过的输出流
protected DataInputStream dataIn; // 从socket.getInputStream()包装过的输入流

       现在产生了一个问题,谁来调用这个构造函数,也就是谁来创建TcpTransport,答案就是ActiveMQConnectionFactory,在该工厂创建ActiveMQConnection之前,会先创建好TcpTransport,当然是调用 TransportFactory来创建,我们注意到,Transport是ActiveMQConnection的构造函数的形参之一,该构造函数会去让该Transport绑定自己(this.transport.setTransportListener(this);)。创建好TcpTransport后,就开始需要启动它了,run方法源码如下:

    public void run() {
        LOG.trace("TCP consumer thread for " + this + " starting");
        this.runnerThread=Thread.currentThread();
        try {
            while (!isStopped()) {
                doRun();
            }
        } catch (IOException e) {
            stoppedLatch.get().countDown();
            onException(e);
        } catch (Throwable e){
            stoppedLatch.get().countDown();
            IOException ioe=new IOException("Unexpected error occured: " + e);
            ioe.initCause(e);
            onException(ioe);
        }finally {
            stoppedLatch.get().countDown();
        }
    }

 run()用来从socket中来读取数据包,只要TcpTransport没有停止,它就会不停的调用doRun(),在看看doRun()是怎么实现的:

    protected void doRun() throws IOException {
        try {
            Object command = readCommand();
            doConsume(command);
        } catch (SocketTimeoutException e) {
        } catch (InterruptedIOException e) {
        }
    }

 我们在这里发现了TcpTransport读取数据的方法readCommand,doRun方法就作用就是“读一条,消费一条,读一条,消费一条。。。”,doConsume方法前面已经讲过,这里我们来看消息是怎么接收的,readCommand源码如下:

    protected Object readCommand() throws IOException {
        return wireFormat.unmarshal(dataIn);
    }

 就在此时,传说中的wireFormat出现了,dataIn前面已经说过,现在我们来看看WireFormat接口,WireFormat接口主要是为了将流数据解析和组装。解析的话,说白了,就是提取数据转换成我们ActiveMQ需要的对象,你可以简单的把它想象成对象反序列化的过程。WireFormat接口有多种实现类,默认的是OpenWireFormat,它采用的是OpenWire协议,这是ActiveMQ自己的跨语言Wire协议,它允许客户端从多个不同的语言和平台的本机来和ActiveMQ服务器通讯。在Java环境下,OpenWire是的ActiveMQ4.x或更高版本的默认传输方式,我们这里不必过分去追求OpenWireFormat实现类中的细节(有兴趣的读者可以去官方文档中了解:http://activemq.apache.org/openwire-version-2-specification.html),至少它不是采用JDK里面序列化机制(序列化只适合Java平台并且是低效的),到这里,我们都差不多明白了传送层主要的工作是获得数据并且将数据转换成对象,把对象再传给连接ActiveMQConnection。

       前面所述,我们知道了ActiveMQConnection中的onCommand方法是由Transport来调用的,ActiveMQConnection是一个拥有将近3000行代码的实现类,其重要性不言而喻,我们先来看看待会会用到的几个重要的成员变量:

private final Transport transport; // 绑定的传送器对象

private final ConnectionInfo info; // 负责管理连接的各种状态变量

private final ThreadPoolExecutor executor; // 线程池执行器,用来调度需要多线程执行的任务

private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); // 消费者ID到分发对象映射的Map,这里的分发者对象就是ActiveMQSession而不是ActiveMQConsumer本身。
private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>(); // 生产者ID到生产者对象映射的Map
private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); // 如果某些框架(如Spring)给ActiveMQConnectionFactory上绑定了TransportListener(通过ActiveMQConnectionFactory#setTransportListener(TransportListener transportListener)方法绑定),则该TransportListener会被放入由该工厂创建的所有ActiveMQConnection的transportListeners中

       下面给出ActiveMQConnection构造函数源码:

    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {

        this.transport = transport;  // 让自己绑定传送器
        this.clientIdGenerator = clientIdGenerator;
        this.factoryStats = factoryStats;

        // 配置一个单线程的执行器,如果被闲置它的核心线程可以被超时
        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                // 不要使用守护线程 - 详见 https://issues.apache.org/jira/browse/AMQ-796
                // thread.setDaemon(true);
                return thread;
            }
        });
        // asyncConnectionThread.allowCoreThreadTimeOut(true);
        String uniqueId = connectionIdGenerator.generateId();
        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
        this.info.setManageable(true);
        this.info.setFaultTolerant(transport.isFaultTolerant());
        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);

        this.transport.setTransportListener(this); // 让传送器绑定自己

        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        this.timeCreated = System.currentTimeMillis();  // 记录创建时间
        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }

       从构造函数可以看出,创建ActiveMQConnection对象时,除了和Transport相互绑定,还对线程池执行器executor进行了初始化。下面我们看看该类的核心方法——onCommand:

    @Override
    public void onCommand(final Object o) {
        final Command command = (Command)o;
        if (!closed.get() && command != null) {
            try {
                command.visit(new CommandVisitorAdapter() {
					/**
					 * 处理消息分发
					 * 如果传入的command是MessageDispatch,
					 * 则该command的visit方法就会调用processMessageDispatch方法
					 */ 
                    @Override
                    public Response processMessageDispatch(MessageDispatch md) 
						throws Exception {
						// 等待Transport中断处理完成
                        waitForTransportInterruptionProcessingToComplete();
						
						// 这里通过消费者ID来获取消费者对象
						//(ActiveMQMessageConsumer实现了ActiveMQDispatcher接口),
						// 这也意味着MessageDispatch对象已经包含了消息和
						// 该消息应该被分配给哪个消费者的所有信息
                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
						
                        if (dispatcher != null) {
                            // 以防是嵌入式代理(vm://),这里将消息进行copy
                            // 如果md.getMessage() == null 意味着你已经浏览到消息队列的末端。
                            Message msg = md.getMessage();
                            if (msg != null) {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage(null);
                                md.setMessage(msg);
                            }
							
							// 调用会话ActiveMQSession自己的dispatch方法来处理这条消息
                            dispatcher.dispatch(md); 
							
                        } else {
                            LOG.debug("{} no dispatcher for {} in {}", this, 
								md, dispatchers);
                        }
                        return null;
                    }

					/**
					 * 处理生产者应答
					 * 如果传入的command是ProducerAck,
					 * 则该command的visit方法就会调用processProducerAck方法
					 */ 
                    @Override
                    public Response processProducerAck(ProducerAck pa) throws Exception {
                        if (pa != null && pa.getProducerId() != null) {
                            ActiveMQMessageProducer producer = producers.
								get(pa.getProducerId());
                            if (producer != null) {
                                producer.onProducerAck(pa);
                            }
                        }
                        return null;
                    }
                });
							
				// 注意:为了简化处理,这里省略了CommandVisitorAdapter类的其他方法,
				// 有兴趣的读者可以直接阅读源码
				
				
            } catch (Exception e) {
                onClientInternalException(e);
            }
        }

		// 调用由ActiveMQConnectionFactory统一绑定给其下ActiveMQConnection的传送监听器的
		// 处理命令方法.
		// 一般情况下transportListeners为空,但如果你使用Spring等框架,就另当别论了。
        for (Iterator<TransportListener> iter = transportListeners.iterator(); 
		iter.hasNext();) {
            TransportListener listener = iter.next();
            listener.onCommand(command);
        }
    }

      由于我们关注的是消费者,所以我们只关心processMessageDispatch方法,我们已经看到传给processMessageDispatch方法的参数就是MessageDispatch对象,该对象包含了消息对象本身和消费者ID,所以,processMessageDispatch方法需要做的只是简单的去调用ActiveMQConnection下的ActiveMQSession的dispatch方法来处理这条消息。

       我们先不着急看费者的dispatch方法做了哪些处理,我们更关心processMessageDispatch方法何时被调用?又是由谁来调用?需要注意的是CommandVisitorAdapter类中有诸多方法,它是个适配器类,如果command.visit(new CommandVisitorAdapter() ..);中的command是MessageDispatch(类MessageDispatch 派生于BaseCommand)类对象,则它的visit只会去调用适配器的processMessageDispatch方法,而不会去关心其他方法,所以,你可以根据CommandVisitorAdapter在这里实现的几个方法,就可以猜出传进来的Command对象有几种类型。这里我们可以看看MessageDispatch中visit方法的实现:

    @Override
    public Response visit(CommandVisitor visitor) throws Exception {
        return visitor.processMessageDispatch(this);
    }

 所以,其实你就知道,原来传给processMessageDispatch方法的MessageDispatch对象md就是传给ActiveMQConnection#onCommand方法的final Object o,也就是说MessageDispatch对象在调用onCommand方法之前就已经被创建了,如果我们想知道消息被分配给哪个消费者,就得去追溯这个MessageDispatch对象到底在哪创建的,还记得前面段落提过的TcpTransport类中的wireFormat吗?Command对象的形成就是来自WireFormat接口中Object unmarshal(ByteSequence packet) throws IOException;方法的实现,所以,Command对象的创建也包含了消息分发的过程,这里我们先不纠结消息是怎么决定分配给哪个消费者的,在《ActiveMQ中消费者是如何接收消息的(三)》中会给出答案,我们这里接着看会话ActiveMQSession类的消息处理dispatch方法,下面给出源码:

 

    @Override
    public void dispatch(MessageDispatch messageDispatch) {
        try {
	    交给ActiveMQSessionExecutor对象来处理消息分发
            executor.execute(messageDispatch);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            connection.onClientInternalException(e);
        }
    }

       代码中的executor对象就是ActiveMQSession类中重要的一员(protected final ActiveMQSessionExecutor executor;),专门负责处理消息分发,我们接着看ActiveMQSessionExecutor 类中主要的成员变量:

private final ActiveMQSession session;  // 对其“上级”ActiveMQSession的引用
private final MessageDispatchChannel messageQueue; // 等待分发的消息容器,消费者ActiveMQConsumer中也有哦
private boolean dispatchedBySessionPool; // 是否通过会话池分发
private volatile TaskRunner taskRunner; // 采用异步分发时的任务执行器对象
       大家可能会觉得这个dispatchedBySessionPool标志是用来做什么的,这需要我们追溯到它set方法的调用者——ActiveMQSession类,其setMessageListener方法源码如下:

    @Override
    public void setMessageListener(MessageListener listener) throws JMSException {

		// 如果用户想清理监听器,则listener可以设置为null
		// 如果不为空,我们回去检查此Session是否已经关闭,如果关闭,将抛异常
        if (listener != null) {
            checkClosed();
        }
		
		// 给ActiveMQSession绑定此监听器
        this.messageListener = listener;

        if (listener != null) {
			// 给ActiveMQSession的会话执行器ActiveMQSessionExecutor
			// 的dispatchedBySessionPool标记设置为true
            executor.setDispatchedBySessionPool(true);
        }
    }

 我们可能只知道消费者上可以设置消息监听器,没想到ActiveMQSession上也可以,一旦我们给ActiveMQSession设置了消息监听器,则会话上其他形式的消息接收将变成不可用,也就是说,此时,其下的消费者都将接收不到消息,但所有的消息发送仍然是正常的。那ActiveMQ在会话上提供这个功能是出于什么考虑呢?可以肯定,该方法不是给普通的JMS客户端使用的,但如果你想完全自己来处理消息分配(Spring框架中有可能使用了此方法),使用此方法是个不错的选择。言归正传,我们继续说会话执行器,ActiveMQSessionExecutor#execute方法的实现如下:

    void execute(MessageDispatch message) throws InterruptedException {

        if (!startedOrWarnedThatNotStarted) {

            ActiveMQConnection connection = session.connection;
            long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout();
            if (connection.isStarted() || aboutUnstartedConnectionTimeout < 0L) {
                startedOrWarnedThatNotStarted = true;
            } else {
                long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated();

                // lets only warn when a significant amount of time has passed
                // just in case its normal operation
                if (elapsedTime > aboutUnstartedConnectionTimeout) {
                    LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection
                             + " Received: " + message);
                    startedOrWarnedThatNotStarted = true;
                }
            }
        }

		/ 如果会话设置的不是异步分发且没有采用Session池分发,则调用dispatch方法发送消息
        if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
            dispatch(message);
        } else {
		    // 将消息放入队列中
            messageQueue.enqueue(message);
            wakeup();
        }
    }

 上面关于startedOrWarnedThatNotStarted部分的代码我们大可不必关心,重要的是下面的。如果是通过ActiveMQConnection的createSession创建出来的ActiveMQSession并且我们没有通过ActiveMQSession#setAsyncDispatch方法设置过的话,默认是采用异步分发消息的。所以,在这里直接把该消息放入了messageQueue中,并调用了wakeup()方法,wakeup源码如下:

    public void wakeup() {
		/// 如果不是由会话池分发,则进行如下处理
        if (!dispatchedBySessionPool) {
			// 如果是异步分发
            if (session.isSessionAsyncDispatch()) {
                try {
                    TaskRunner taskRunner = this.taskRunner;
                    if (taskRunner == null) {
                        synchronized (this) {
                            if (this.taskRunner == null) {
                                if (!isRunning()) {
                                    // stop has been called
                                    return;
                                }
								// 注意这里,createTaskRunner方法把this作为Task传进去了!
                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
                                        "ActiveMQ Session: " + session.getSessionId());
                            }
                            taskRunner = this.taskRunner;
                        }
                    }
					// 说白了就是将
                    taskRunner.wakeup();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
			// 如果是会话池分发
            } else {
                while (iterate()) {
                }
            }
        }
    }

         我们这里先看异步分发是怎么一回事,第16行代码我们发现了关键,session.connection.getSessionTaskRunner()从ActiveMQConnection获取到的是一个TaskRunnerFactory任务运行工厂(好高大上的样子,有木有?有木有?),代码中马上又调用了工厂的createTaskRunner方法来创建一个任务运行器,尽然this是第一个参数,也就是把ActiveMQSessionExecutor自己给传进去了(难道疯了),定睛一看,原来ActiveMQSessionExecutor类实现的唯一接口就是Task,我们来看看Task接口:

public interface Task {
    boolean iterate();
}

 我靠,这么简单,让我想起了Runnable接口。。。Task接口在ActiveMQ中表示可能需要迭代完成的任务,聪明的你一下就知道了,那不就是说 ActiveMQSessionExecutor的iterate()方法将被线程池中的一个线程给调用?那不也就是29行采用同步分发while中所调用的iterate()方法?没错!就是这样!一谈到线程池,那些多线程编程的爱好者们似乎都要血液沸腾了,那么ActiveMQ中的这里采用的是什么线程池呢?是它自己实现的吗?No,No,No,下面给出TaskRunnerFactory中的源码:

	protected ExecutorService createDefaultExecutor() {
        ThreadPoolExecutor rc = new ThreadPoolExecutor(0, getMaxThreadPoolSize(), 
			getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), 
			new ThreadFactory() {
			
            @Override
            public Thread newThread(Runnable runnable) {
                String threadName = name + "-" + id.incrementAndGet();
                Thread thread = new Thread(runnable, threadName);
                thread.setDaemon(daemon);
                thread.setPriority(priority);

                LOG.trace("Created thread[{}]: {}", threadName, thread);
                return thread;
            }
        });
		
        if (rejectedTaskHandler != null) {
            rc.setRejectedExecutionHandler(rejectedTaskHandler);
        }
        return rc;
    }

      果然,它用的就是ThreadPoolExecutor,构造函数中的getMaxThreadPoolSize()默认返回的是Integer.MAX_VALUE,而getDefaultKeepAliveTime()返回的是Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30),也就是说默认是30秒。有没有人好奇第11行设置的线程优先级是多少?是ThreadPriorities.INBOUND_CLIENT_SESSION!!看:

public interface ThreadPriorities {
    int INBOUND_BROKER_CONNECTION = 6;
    int OUT_BOUND_BROKER_DISPATCH = 6;
    int INBOUND_CLIENT_CONNECTION = 7;
    int INBOUND_CLIENT_SESSION = 7;
    int BROKER_MANAGEMENT = 9;
}

 这是题外话,回归正题,这里,我们先不着急看iterate()方法中的内容,我们回过头来看看ActiveMQSessionExecutor#execute方法中同步分发时的代码(dispatch(message);):

    void dispatch(MessageDispatch message) {
        // TODO - we should use a Map for this indexed by consumerId
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            ConsumerId consumerId = message.getConsumerId();
            if (consumerId.equals(consumer.getConsumerId())) {
                consumer.dispatch(message);
                break;
            }
        }
    }

    我们再给出ActiveMQSessionExecutor#execute方法中异步分发wakeup()中iterate() 的代码:

	public boolean iterate() {
		// 将消费者监听的所有消息投递到消费者队列中
                // 异步消费者(setMessageListener)才会运行for循环代码
        for (ActiveMQMessageConsumer consumer : this.session.consumers) {
            if (consumer.iterate()) {
                return true;
            }
        }

		// 处理messageQueue中遗留的消息(非阻塞取)
		// 同步消费者(receive())才会运行下面的代码
          MessageDispatch message = messageQueue.dequeueNoWait();
        if (message == null) {
            return false;
        } else {
			// 如果消息不为空,则分发出去
            dispatch(message);
            return !messageQueue.isEmpty();
        }
    }

    从上面的代码中我们发现了,同步分发时,ActiveMQSessionExecutor会去调用消费者ActiveMQMessageConsumer的dispatch方法,而异步分发时会去调用消费者ActiveMQMessageConsumer的iterate方法:

    public boolean iterate() {
        MessageListener listener = this.messageListener.get();
        if (listener != null) {
            MessageDispatch md = unconsumedMessages.dequeueNoWait();
            if (md != null) {
                dispatch(md);
                return true;
            }
        }
        return false;
    }

      这里咱们发现了,不管同步分发还是异步分发,最终调用的都是消费者的dispatch方法,异步分发和同步分发的区别就是这么简单,调用的处理过程都是一样的,只不过异步分发是将分发任务交给线程池去调度而已,默认采用的是异步分发。那什么时候采用异步分发,什么时候采用同步分发呢?我不说你也知道了,对于“快消费者”,我们建议采用同步分发,这样省去了线程池资源调度的开销,对于“慢消费者”,我们建议采用默认的异步分发,这样让消息分发更快。

      在这里,我们还发现同步消费者和异步消费者的一个显著区别:不管Session采用异步分发还是同步分发,异步消费者都不是从自己的unconsumedMessages中取消息来处理,而是直接处理ActiveMQSessionExecutor透传过来的消息,而同步消费者的receive方法只能从unconsumedMessages取消息来处理。不信的话,你结合前面所说的加上我下面展示的ActiveMQMessageConsumer# dispatch代码你就知道一切了:

    @Override
    public void dispatch(MessageDispatch md) {
        MessageListener listener = this.messageListener.get();
        try {
            clearMessagesInProgress();
            clearDeliveredList();
            synchronized (unconsumedMessages.getMutex()) {
                if (!unconsumedMessages.isClosed()) {
                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
                        if (listener != null && unconsumedMessages.isRunning()) {
                            if (redeliveryExceeded(md)) {
                                posionAck(md, "dispatch to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
                                return;
                            }
                            ActiveMQMessage message = createActiveMQMessage(md);
                            beforeMessageIsConsumed(md);
                            try {
                                boolean expired = message.isExpired();
                                if (!expired) {
                                    listener.onMessage(message);
                                }
                                afterMessageIsConsumed(md, expired);
                            } catch (RuntimeException e) {
                                LOG.error(getConsumerId() + " Exception while processing message: " + md.getMessage().getMessageId(), e);
                                if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) {
                                    // schedual redelivery and possible dlq processing
                                    md.setRollbackCause(e);
                                    rollback();
                                } else {
                                    // Transacted or Client ack: Deliver the
                                    // next message.
                                    afterMessageIsConsumed(md, false);
                                }
                            }
                        } else {
                            if (!unconsumedMessages.isRunning()) {
                                // delayed redelivery, ensure it can be re delivered
                                session.connection.rollbackDuplicate(this, md.getMessage());
                            }
                            unconsumedMessages.enqueue(md);
                            if (availableListener != null) {
                                availableListener.onMessageAvailable(this);
                            }
                        }
                    } else {
                        if (!session.isTransacted()) {
                            LOG.warn("Duplicate non transacted dispatch to consumer: "  + getConsumerId() + ", poison acking: " + md);
                            posionAck(md, "Duplicate non transacted delivery to " + getConsumerId());
                        } else {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
                            }
                            boolean needsPoisonAck = false;
                            synchronized (deliveredMessages) {
                                if (previouslyDeliveredMessages != null) {
                                    previouslyDeliveredMessages.put(md.getMessage().getMessageId(), true);
                                } else {
                                    // delivery while pending redelivery to another consumer on the same connection
                                    // not waiting for redelivery will help here
                                    needsPoisonAck = true;
                                }
                            }
                            if (needsPoisonAck) {
                                LOG.warn("acking duplicate delivery as poison, redelivery must be pending to another"
                                        + " consumer on this connection, failoverRedeliveryWaitPeriod="
                                        + failoverRedeliveryWaitPeriod + ". Message: " + md);
                                posionAck(md, "Duplicate dispatch with transacted redeliver pending on another consumer, connection: "
                                        + session.getConnection().getConnectionInfo().getConnectionId());
                            } else {
                                if (transactedIndividualAck) {
                                    immediateIndividualTransactedAck(md);
                                } else {
                                    session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
                                }
                            }
                        }
                    }
                }
            }
            if (++dispatchedCount % 1000 == 0) {
                dispatchedCount = 0;
                Thread.yield();
            }
        } catch (Exception e) {
            session.connection.onClientInternalException(e);
        }
    }

 完了,各位,希望能对你们理解ActiveMQ内部机制有所帮助!

 

 这里,我引用一下当当网架构师张亮在程序员杂志2014.12刊中发表的关于消息中间件的几段话:

       消费消息有两种模式:推送和拉取。推送模式的适用场景是,消息的消费者能力强于生产者,一旦有消息就快速进行消费,不会堆积,也没有延迟,。拉取是消息消费者主动向消息中介(broker)发起请求,获取消息。拉取模式的优点是消费者可以自己控制消息的拉取速度,消息中介不需要维护消费者的状态。如果总是从消息中介推送消息,消费者能力不如生产者时,消费者会被压垮或者必须在消息中介使用定时消息推送,增加消息中介的复杂度。缺点是消息及时性差,取决于拉取的间隔。而且有可能是空拉取,造成资源浪费。

       拉取模式的使用场景是,消息的生产者能力强于消费者,服务器在高峰时间允许堆积消息,然后在波谷时间完成消费。

       因为ActiveMQ采用消息推送方式,所以最适合的场景是默认消息都可在短时间内被消费。数据量越大,查找和消费消息就越慢,消息积压程度与消息速度成反比。

       ActiveMQ的缺点:

       1.吞吐量低。由于ActiveMQ需要建立索引,导致吞吐量下降。这是无法克服的缺点,只要使用完全符合JMS规范的消息中间件,就要接受这个级别的TPS。

       2.无分片功能。这是一个功能缺失,JMS并没有规定消息中间件的集群、分片机制。而由于ActiveMQ是伟企业级开发设计的消息中间件,初衷并不是为了处理海量消息和高并发请求。如果一台服务器不能承受更多消息,则需要横向拆分。ActiveMQ官方不提供分片机制,需要自己实现。

       ActiveMQ的适用场景:

       1.业务系统没有实现幂等性。消费不成功,消息连同业务数据一起回滚,适用于不易实现幂等性的复杂业务场景或敏感性业务。

       2.强事务一致性。消息和业务数据必须处于同一事务状态,假如业务数据回滚,消息必须也回滚成未消费状态。

       3.内部系统。对于TPS要求低的系统,ActiveMQ由于使用简单,完全支持JMS,非常适合快速开发。并且ActiveMQ有完善的监控机制和操作界面。

       ActiveMQ不适用的场景:

       1.性能要求高,且不要求事务。性能是ActiveMQ的短板,如果业务要求消息中间件的性能很高,且不要求强一致性的事务,则不应使用ActiveMQ。

        2.消息量巨大的场景。ActiveMQ不支持消息自动分片机制,如果消息量巨大,导致一台服务器不能处理全部消息,就需要自己开发消息分片功能。

下表是文章中列举的常用消息中间件的对比:

 

ActiveMQ

RabbitMQ

Kafka

RocketMQ

HornetQ

版本号

5.10.0

3.3.4

0.8.1

3.1.9-SNAPSHOT

2.4.0

关注度

成熟度

成熟

成熟

比较成熟

不成熟

成熟

社区活跃度

文档

开发语言

Java

Erlang

Scala

Java

Java

JMS支持

需付费

第三方提供

协议支持

AMQP
MQTT
STOMP
REST

AMQP
MQTT
STOMP
REST

自定义

自定义

AMQP
STOMP
REST

客户端支持

Java、C、
C++、Python、
PHP、Perl、
.Net等

Java、C、
C++、Python、
PHP、Perl、
.Net等

Java、C、
C++、Python、
PHP、Perl、
.Net等

Java、C++

Java

持久化

内存
文件
数据库

内存
文件

文件

文件

内存
文件

事务

支持

支持

不支持

不完全支持

支持

集群

一般

较好

较好

管理界面

第三方提供

第三方提供

亮点

JMS标准
成功案例多

吞吐量略高于ActiveMQ

吞吐量极高
批量处理

初步支持分布式事务
吞吐量不低于Kafka

JMS标准
完美整合Jboss

缺点

吞吐量低
无消息分片功能

吞吐量低
不支持JMS

不支持JMS
不支持事务

不成熟
分布式事务未开发完全
监控界面不完善
文档少

同ActiveMQ

 

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐