ActiveMQ的队列会跨多个make-or-breakkor节点吗,如果

一、什么是ActiveMQ?
百度百科对ActiveMQ的描述:
二、ActiveMQ有什么优点?哪些情况下适合使用ActiveMQ?
 2.1ActiveMQ 优点:
    (1) 跨平台(JAVA编写与平台无关有,ActiveMQ几乎可以运行在任何的JVM上)&&& && (2) 支持多种语言&&&  & (3) 降低系统间模块的耦合度,解耦(消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和)&&&  & (4) 对Spring的,软件扩展性好&&& && (5) 自动重连功能
 2.2 ActiveMQ的使用场景:
&&&  & 多个项目之间集成&&&  & 系统前后端隔离,屏蔽高安全区(处理高并发)
   再举两个例子:(来自知乎上的观点);
   例一:
   假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:      校验用户名等信息,如果没问题会在数据库中添加一个用户记录&&&     如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信&&&     分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他&&&     发送给用户一个包含操作指南的系统通知 等等&&   但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全    部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结   && 果,由消息队列异步的进行这些操作。   或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出   现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一   般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。   所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可   以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。
   例二:
   跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。   多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送   方和接收方同时在线。   在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。   应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于   同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。   消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段   处理的结果放入队列,后一个阶段从队列中获取消息继续处理。   应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。
三、ActiveMQ官方(最新)下载地址 http://activemq.apache.org/
四、ActiveMQ官方(最新)下载地址
  4.1 与Tomcat相似解压即可, 解压后
    bin:其中包含MQ的启动脚本  conf:包含MQ的所有配置文件  data:日志文件及持久性消息数据  example:MQ的示例  lib:MQ运行所需的所有Lib  webapps:MQ的Web控制台及一些相关的DEMO  activemq-all-5.5.0.jar:所有MQ JAR包的集合,用于用户系统调用
  4.2 启动ActiveMQ, 进入bin后xp系统双击activemq.bat 如果win7系统是32或64位启动对应的activemq.bat批处理文件
  ActiveMQ四种协议的简单介绍:
    TCP协议:默认端口61616
    TCP(Transmission Control Protocol 传输控制协议)是一种面向连接的、可靠的、基于字节流的传输层通信协议,由IETF的RFC 793定义。在简化的计算机网络OSI    模型中,它完成第四层传输层所指定的功能,用户数据报协议(UDP)是同一层内[1]& 另一个重要的传输协议。在因特网协议族(Internet protocol suite)中,TCP层    是位于IP层之上,应用层之下的中间层。不同主机的应用层之间经常需要可靠的、像管道一样的连接,但是IP层不提供这样的流机制,而是提供不可靠的包交换。
    AMQP协议:默认端口5672    即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议    的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。    STOMP协议:默认端口61613
    STOMP,Streaming Text Orientated Message Protocol,是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单    文本协议
    MQTT协议:默认端口1883    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几    乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
    OpenWire协议:(?控制台上没有看到)官方网站对其的介绍如下:    OpenWire
is our cross language Wire Protocol to allow native access to ActiveMQ
from a number of different languages and platforms. The Java       OpenWire
transport is the default transport in ActiveMQ 4.x or later. For other
languages see the following...
    ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动 netstat -an|find &61616&
    4.3 访问ActiveMQ的WEB管理员监控控制台&
    4.3 停止ActiveMQ服务,按Ctrl+Shift+C,之后输入y即可
&   & 注意:启动ActiveMQ时闪退问题解决方案    解压缩apache-activemq-5.14.0-bin.zip后双击/bin目录下的activemq.bat批处理文件发现启动窗口一闪而过无法启动,最后找到原因是因为在环境变量-系统变量中    JAVA_HOME、classpath、Path这些系统变量中没有添加JDK相关的安装路径。
    JAVA_HOME下添加"D:\soft\Java\jdk1.7.0_09"    classpath下添加".;%JAVA_HOME%\%JAVA_HOME%\lib\tools.jar"    Path下添加"%JAVA_HOME%\"    特别注意如果系统变量中没有classpath变量、JAVA_HOME变量则新增然后再添加上述相关路径。
    上述路径是我本地JDK版本安装的路径。如果JDK安装在其他版本下则相应的调整下就可以了。
五、使用TCP方式测试ActiveMQ:
Sender code
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Sender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// MessageProducer:消息发送者
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("TestQueue");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,此处学习,实际根据项目决定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 构造消息,此处写死,项目就是参数,或者方法获取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i &= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq " + "HelloWorld " + i);
// 发送消息到目的地方
System.out.println("发送消息:" + "ActiveMq " + "HelloWorld " + i);
producer.send(message);
Receiver code
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.D
import javax.jms.MessageC
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// 消费者,消息接收者
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.FALSE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("TestQueue");
consumer = session.createConsumer(destination);
while (true) {
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
TextMessage message = (TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息:" + message.getText());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
阅读(...) 评论()JMS两个主要概念:
消息目标:
2、主题两种传递模式:
点对点消息传递模型:每个消息都有一个发送者和一个消费者
发送者-〉队列-〉接受者
发布-订阅者消息传递模型:一条订阅信息,可以发送给多个订阅者
发布者-〉主题-〉订阅者们JMS优点:
确保投送Apache-ActiveMQ:开源消息中介先下载ActiveMQ,在网上找的版本是5.11.1,然后在bin中启动activeMQ.(备注:启动之前需要配置好环境变量,并且确定JDK的版本。我这边用的是64位的JDK)启动后的照片如下:访问的端口是:http://localhost:8161/
可以直接进入admin界面,用户名和密码都是admin,界面如下:经常会查看queues,topics,即为消息队列和订阅者发布者。如果想要停掉服务在控制台需要按ctrl+shift+c,ctrl+c ,Y 。我这边是这样停的,也有人说去掉中间的ctrl+c。编写代码如下:消息生产者:package com.liang.
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.D
import javax.jms.JMSE
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
* 消息的发布者(发送者)
public class JMSProducer {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; //默认连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String URL=ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址
private static final int SIZE=11; //发送的消息数量
public static void main(String[] args) {
ConnectionF//连接工厂
S//会话 接受或者发送消息的线程
Des//消息的目的地
MessageProducer messageP//消息生产者
factory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);//实例化连接工厂
connection=factory.createConnection();//通过连接工厂获取连接
connection.start();//启动连接
session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//创建session
//destination = session.createQueue("testQueue1");//point to point//创建消息队列
destination=session.createTopic("testTopic1");//topic //创建消息队列
messageProducer=session.createProducer(destination);//创建消息生产者
sendMessage(session, messageProducer);//发送消息
session.commit();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
* 发送消息
public static void sendMessage(Session session,MessageProducer messageProducer){
for (int i = 0; i & SIZE; i++) {
TextMessage message=session.createTextMessage("消息发布者,发布消息"+i);//创建一条文本消息
messageProducer.send(message);//通过消息生产者发出消息
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
消息接收者:import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.D
import javax.jms.JMSE
import javax.jms.MessageC
import javax.jms.S
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class JMSConsumer1 {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;
private static final String URL=ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
ConnectionFactory connectionF
connectionFactory=new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
connection=connectionFactory.createConnection();
connection.start();
session=connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// destination = session.createQueue("testQueue1"); //point to point
destination=session.createTopic("testTopic1"); //topic
consumer=session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if(textMessage != null){
System.out.println("收到的消息:" + textMessage.getText());
consumer.setMessageListener(new Listener1());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}import javax.jms.JMSE
import javax.jms.M
import javax.jms.MessageL
import javax.jms.TextM
public class Listener1 implements MessageListener {
public void onMessage(Message message) {
System.out.println("订阅者1:"+((TextMessage)message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
备注:我测试的时候,消息队列的接收方是一直启动的。而发送方是发送完就停掉了。订阅者和发布者也是一样。要先启动订阅者,在启动发布者。
消息队列-ActiveMQ学习笔记(三)-发布-订阅消息模式实现
发布-订阅消息模式与点对点模式类似,只不过在session创建消息队列时,由session.createQuene()变为session.createTopic()。
消息发布者代码:
浅谈对ActiveMQ的理解
现如今项目开发过程中由于功能多,流程长,机制繁多且复杂,让一个线程去跑完一个业务的整个流程的话会显得非常繁琐且耦合性太强,代码量很大,这个时候我们就可以用到一款技术叫做消息队列,我们可以通过定义好发送...
JMS消息中间件原理及ActiveMQ使用方法
JMS:Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进...
ActiveMQ的使用与遇到的相关坑(点对点,发布与订阅,resreq)
ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台...
SpringMVC+JMS结合ActiveMQ的小demo,程序可运行。现分享出来,供初学者参考,如有错漏之处,请批评指正。...
推荐ActiveMQ视频教学 一头扎进JMS之ActiveMQ
本博客只是根据视频做的简单总结。
一、新建项目,引入jar包
新建java项目J...
JMS其实并没有想象的那么高大上,看完这篇博文之后,你就知道什么叫简单。。...
没有更多推荐了,监控 WebSphere MQ 队列深度的几种常见方法
MQ 适用场景以及监控 MQ 队列深度WebSphere MQ 作为一种应用范围广泛的异步消息中间件产品,提供了一种方便地将消息流与应用解耦的方式,使得应用设计更加灵活。如在城市 A 的应用程序 1 发送请求给在城市 B 的应用程序 2,由于中间跨越了互联网,使用 MQ 将请求作为异步消息传输是一种很好的方式,避免了过长时间的同步请求等待,并且可以完全依赖 MQ 稳定的 Assury-Once-Delivery 传输能力。但这种传输往往依赖于互联网以及远端应用程序的设计,而远端应用程序很有可能归第三方运维团队甚至公司维护,因此有时在本地无法直接获知其状态。这时就需要监控本地的传输队列深度,以期尽早发现异常进行处理。监控队列深度的手段
本文介绍了 3 种常用的监控 MQ 队列深度的方式,包括使用 MQ 触发机制、MQ 自带的性能事件、监控程序定期主动查询,以及应如何应急处理队列深度到达预警值的情况。使用触发机制,在队列到达特定深度时触发处理流程MQ 的触发机制实现了在三种情况下可以触发一个后续的操作,如启动一个特定的应用程序。这三种情况分别为:当应用队列中进入第一条消息时(TRIGTYPE=FIRST)、当应用队列每进入一条消息时(TRIGTYPE=EVERY)、以及当应用队列深度达到特定值时(TRIGTYPE=DEPTH)。我们可以为队列配置任何一种情况,以使用 MQ 的触发机制。在本文中我们期望监控队列的深度,因此可以使用 TRIGTYPE=DEPTH 的触发方式,即队列深度到达特定值时触发一个流程。首先定义一个 INIT 队列,应用队列在满足触发条件时会在这个 INIT 队列中生成一个具有特殊格式的触发消息:DEFINE QLOCAL(INIT.QUEUE) LIKE(SYSTEM.DEFAULT.LOCAL.QUEUE)然后修改我们需要监控的本地队列 MYTESTQ,指定其到达特定深度时(为了便于展示,我们设置这个深度为 10,即 10 条消息)产生触发消息(产生在之前定义的 INIT 队列中),并期待触发启动名为 MONITOR.PROCESS 的后续操作:ALTER QLOCAL(MYTESTQ) INITQ('INIT.QUEUE') PROCESS('MONITOR.PROCESS') TRIGGER TRIGTYPE(DEPTH) TRIGDPTH(10)之后我们定义 MONITOR.PROCESS 这个后续操作的具体内容,即启动 D:\TEMP\RUN.BAT 这个 windows 平台的可执行文件。DEFINE PROCESS(MONITOR.PROCESS) APPLICID('D:\TEMP\RUN.BAT') APPLTYPE (WINDOWS)最后,我们还需要启动触发监控器,MQ 自带了一个示例的触发监控器(源代码可以参考 &MQ 安装目录 &/tools/c/Samples/amqstrg0.c),我们直接使用命令开启它即可。在这里它监控队列管理器 QML 上的 INIT.QUEUE 队列,即我们之前定义的 INIT 队列。runmqtrm -m QML -q INIT.QUEUE完成了这些配置,我们就可以测试向 MYTESTQ 应用队列放入消息。当放入第 10 条消息时,我们观察到 RUN.BAT 被启动了,即表明触发成功。读者可以自行编写 RUN.BAT 的逻辑,以满足后续处理逻辑的需求,如向管理员发送报警通知等。MQ 的触发机制还允许配置一些参数,使其可以触发其它类型的程序,如 DOS 程序,主机上的 CICS 交易等。另外,还可以通过配置实现向被触发的程序(如 RUN.BAT)传递参数。具体配置可以参考 MQ 产品官方信息中心。需要注意的是,TRIGTYPE=DEPTH 这种触发机制在第一次成功触发之后,会将队列的触发开关关闭,因此后续无法重复触发。这是为了避免触发过多且重复的处理程序而设计的。如需后续继续触发,可在被触发的程序中加入 MQSET 的 MQ API 逻辑,将需要监控的队列的 TRIGGER 属性再次开启即可。可以参考下文了解如何使用 MQSET 修改队列的属性。使用 MQ 自带的性能事件,在队列到达特定深度时生成事件消息MQ 事件消息是指在特定的条件下 MQ 自身机制可以生成一些特殊类型的消息,并将其放到系统事件队列中。事件消息一般分为两大类,第一类为审计事件,其中包括安全验证失败、下达 Command 记录、以及配置变化记录等事件;第二种为监控事件,包括关键资源启停、通道启停、通道错误、应用错误、性能事件等。本文我们期望监控 MQ 队列深度,因此可以使用性能事件,其中可以监控的指标为:队列深度涨到预设值(以下简称 Q_DEPTH_HIGH)、队列深度降到预设值(以下简称 Q_DEPTH_LOW)、以及队列写满(以下简称 Q_FULL)。这三个值分别对应队列的 QDEPTHHI、QDEPTHLO、以及 MAXDEPTH 三个属性。因此我们可以适当配置这三个属性的值,并开启相应的事件开关,即可让队列管理器在适当的时候生成事件消息。性能事件产生的事件消息会生成在 SYSTEM.ADMIN.PERFM.EVENT 系统队列中。首先我们需要开启队列管理器的性能事件开关,使用 MQSC 命令:ALTER QMGR PERFMEV(ENABLED)然后我们需要设置合理的队列深度属性,在此为了方便演示我们设置很小的值:ALTERQLOCAL(TESTQ) MAXDEPTH(10) QDEPTHHI(80) QDEPTHLO(20) QDPHIEV(ENABLED) QDPMAXEV(ENABLED)在这里我们设置了队列 TESTQ 的最大深度为 10 条消息,当深度涨到 80%(即 8 条消息)时产生 Q_DEPTH_HIGH 事件;当队列深度由一个大于等于 Q_DEPTH_HIGH 的值降低到队列最大深度的 20%(即 2 条消息)时产生 Q_DEPTH_LOW 事件;当队列深度到达 10(即最大深度)时产生 Q_FULL 事件,并且队列拒绝应用程序放入新的消息(报返回码 2053 的错误)。需要注意的是,QDEPTHHI 和 QDEPTHLO 属性分别指所占队列深度的百分比,而非绝对值。另外,我们只需开启 QDPHIEV 开关即可,这样可以避免队列深度从 5(还未到达 80% 的预警值)降到 2 时产生不必要的 Q_DEPTH_LOW 事件。一旦队列深度涨到预警值 8,则队列管理器在生成 Q_DEPTH_HIGH 事件的同时,会开启 QDPLOEV 开关,并关闭 QDPHIEV 开关(因此只有队列从一个大于等于 8 的深度降回到 2 时才会生成 Q_DEPTH_LOW 事件)。之后若队列深度降回 2,则队列管理器会再次调整这两个开关。此外 QDPMAXEV 是控制是否生成 Q_FULL 事件的开关。现在我们尝试使用任何应用程序向这个队列中依次放入 10 条消息,并观察 SYSTEM.ADMIN.PERFM.EVENT 系统队列(以下简称 EVENT 队列)的深度。我们可以看到,当 TESTQ 队列深度到达 8 时 EVENT 队列新产生了一条消息;当深度到达 10 时 EVENT 队列又新产生了一条消息;之后我们使用应用程序逐条读走 TESTQ 的消息,当读到其只有 2 条消息时,EVENT 队列又新产生了一条消息。整个过程一共在 EVENT 队列中产生了 3 条消息。那么监控程序如何使用 EVENT 队列中的这些事件消息呢?首先我们来看如何解析这些消息,使监控程序明白这些事件产生的具体原因。MQ 生成的事件消息拥有统一的 PCF 格式,应用程序使用 MQGET API 拿到事件消息后,其返回的 MsgBuffer 参数(MQBYTE 数组类型)所指向的即为事件消息的内容,它是以 PCF 格式开头的。因此我们可以将这个 MsgBuffer(本文中该对象名称为 buffer)转为 MQCFH 类型(即 PCF 格式)的指针:清单 1. 解析 PCF 头pcfh=(MQCFH *)
switch(pcfh-&Type) /* Refer to cmqcfc.h for all the options of constants */
case MQCFT_EVENT:
printf("Type = MQCFT_EVENT\n");
printf("Type = Unknown Type\n");
switch(pcfh-&Command) /* Refer to cmqcfc.h for all the options of constants */
case MQCMD_PERFM_EVENT:
printf("Command = MQCMD_PERFM_EVENT\n");
printf("Command = Unknown Command\n");
switch(pcfh-&Reason) /* Refer to cmqc.h for all the options of constants */
case MQRC_Q_DEPTH_HIGH:
printf("Reason = MQRC_Q_DEPTH_HIGH\n");
case MQRC_Q_DEPTH_LOW:
printf("Reason = MQRC_Q_DEPTH_LOW\n");
case MQRC_Q_FULL:
printf("Reason = MQRC_Q_FULL\n");
printf("Reason = Unknown Reason\n");
}将 buffer 转型为 MQCFH 类型的指针后,就可以引用其中的 Type、Command、和 Reason 成员,来判断该消息是否为事件消息、是否为性能事件消息、以及产生的原因(Q_DEPTH_HIGH、Q_DEPTH_LOW、还是 Q_FULL)。本文的代码只做简单示例,MQ 事件消息可以有很多的可能性,详细的组合可以参考 cmqcfc.h 以及 cmqc.h 头文件。下一步就是获得详细的参数,比如具体是哪一个队列产生的该消息,以及相应的队列配置(如允许的最大深度是多少)。清单 2. 解析 Parameter /* Parameter Type */
/* Parameter Structure Length */
/* Point to the beginning of parameter buffer (skip pcf header) */
MQBYTE * ptrBuf = buffer + pcfh-&StrucL
/* loop counter */
for (i = 1; i &= pcfh-&ParameterC i++)
printf("Parameter %d: ", i); /* Print current No. of parameter */
type = (MQLONG)*ptrB
switch (type)
case MQCFT_INTEGER:
printf("type=MQCFT_INTEGER, "); /* Print parameter type */
pcfin = (MQCFIN *)ptrB
length= pcfin-&StrucL
switch(pcfin-&Parameter) /* Refer to cmqc.h for all the options of constants */
case MQIA_TIME_SINCE_RESET:
printf("Parameter=MQIA_TIME_SINCE_RESET, Value=&%d&\n", (int)pcfin-&Value);
case MQIA_HIGH_Q_DEPTH:
printf("Parameter=MQIA_HIGH_Q_DEPTH, Value=&%d&\n", (int)pcfin-&Value);
case MQIA_MSG_ENQ_COUNT:
printf("Parameter=MQIA_MSG_ENQ_COUNT, Value=&%d&\n", (int)pcfin-&Value);
case MQIA_MSG_DEQ_COUNT:
printf("Parameter=MQIA_MSG_DEQ_COUNT, Value=&%d&\n", (int)pcfin-&Value);
printf("Parameter=Unknown Parameter");
case MQCFT_STRING:
printf("type=MQCFT_ STRING, "); /* Print parameter type */
pcfst = (MQCFST *)ptrB
length= pcfst-&StrucL
switch(pcfst-&Parameter) /* Refer to cmqc.h for all the options of constants */
case MQCA_Q_MGR_NAME:
printf("Parameter=MQCA_Q_MGR_NAME, String=&%s&\n", pcfst-&String);
case MQCA_BASE_OBJECT_NAME:
printf("Parameter=MQCA_BASE_OBJECT_NAME, String=&%s&\n", pcfst-&String);
printf("Parameter=Unknown Parameter");
printf("Unknown Parameter Type");
}在这里我们首先跳过了刚刚已经解析过的 buffer 开头的 PCF 头(MQCFH),指向了第一条 Parameter。之后逐一解析了所有的 Parameter,下面的输出示例展示了一个 Q_DEPTH_HIGH 事件对应的输出。清单 3. 输出示例Parameter 1: type=MQCFT_STRING, Parameter=MQCA_Q_MGR_NAME,
String=&QML _&
Parameter 2: type=MQCFT_STRING, Parameter=MQCA_BASE_OBJECT_NAME, String=&TESTEVQ _&
Parameter 3: type=MQCFT_INTEGER, Parameter=MQIA_TIME_SINCE_RESET, Value=&4022&
Parameter 4: type=MQCFT_INTEGER, Parameter=MQIA_HIGH_Q_DEPTH, Value=&8&
Parameter 5: type=MQCFT_INTEGER, Parameter=MQIA_MSG_ENQ_COUNT, Value=&8&
Parameter 6: type=MQCFT_INTEGER, Parameter=MQIA_MSG_DEQ_COUNT, Value=&2&其中包括队列管理器以及队列的名称,还有相应的队列深度值等信息。本文的代码只做简单示例,具体的 Parameter 可能有很多种,详细的组合可以参考 cmqc.h 头文件。最后,可以根据实际需要,配置 SYSTEM.ADMIN.PERFM.EVENT 队列的触发机制(可以考虑使用 TRIGTYPE 为 FIRST 或 EVERY),使其一旦有事件消息则立即触发我们的应用程序进行解析和处理。或者也可以考虑使用 MQ 提供的 MQGET API,配合 WAIT 参数捕获这个 EVENT 队列的消息。监控程序定期主动查询队列深度除了上述两种可以使监控程序被动获得队列深度到达预警值的方式以外,监控程序还可以主动地查询队列的深度,MQ 提供了 MQINQ 的 API,可以查询队列的全部属性,其中就包括代表队列深度的 CURDEPTH 属性。清单 4. MQINQMQLONG inqSelectors[3]; /* MQINQ selections */
MQLONG intAttrs[3]; /* Return values */
inqSelectors[0]=MQIA_CURRENT_Q_DEPTH;
inqSelectors[1]=MQIA_Q_DEPTH_HIGH_LIMIT;
inqSelectors[2]=MQIA_Q_DEPTH_LOW_LIMIT;
MQINQ(Hcon, Hobj, 3, inqSelectors, 3,
&intAttrs, 0,
&CompCode,
&CReason);
if (CompCode != MQCC_FAILED)
printf("MQIA_CURRENT_Q_DEPTH = %d\n", (int)intAttrs[0]);
printf("MQIA_Q_DEPTH_HIGH_LIMIT = %d\n", (int)intAttrs[1]);
printf("MQIA_Q_DEPTH_LOW_LIMIT = %d\n", (int)intAttrs[2]);
}在分别调用 MQCONN 连接到队列管理器,并且调用 MQOPEN 打开指定队列后 ( 须注意的是 MQOPEN 时需要添加 MQOO_INQUIRE 到 open option 里面 ),我们就可以调用 MQINQ 方法获得队列的一些属性,如上例中我们传入了三个 selector,分别获得了队列当前深度、Q_DEPTH_HIGH 和 Q_DEPTH_LOW 的预设值。其输出结果为:清单 5. MQINQ 输出MQIA_CURRENT_Q_DEPTH = 3
MQIA_Q_DEPTH_HIGH_LIMIT = 80
MQIA_Q_DEPTH_LOW_LIMIT = 20我们可以使用类似的逻辑,让监控程序定期查询队列的深度,并对于过高的深度作出响应。监控程序可以自由定义这个周期,同时也可以将每次的查询结果保存以备审计用途。应用场景:在有些用户场景中,对于一个应用队列,用户可能会有多个线程同时读取其中的消息进行消费。那么到底应该开启多少个线程做并行消费处理呢?我们现在用到的这种方法就比较适用这个场景了。消费程序可以在主线程中定期查询队列深度,如果超出预定值,则说明队列中有消息堆积,即消费者程序处理得慢了,则此时主线程可以开启更多的消费者线程加大并行处理能力。处理方式监控程序在捕获到队列深度过高的情况后,还需要进行应急处理,以下推荐几种常见的思路。调用第三方异常报警系统,如发送 Email、短信等通知监控程序可以调用第三方报警的接口,向系统管理员发送通知短信或
Email,进行人工干预。如因为连接对端系统的网络出现故障导致本地传输队列有消息堆积,则需尽快协调对端的运维人员进行修复。调用 MQ API 使队列不再接受新消息还有一种常见做法是监控程序修改一些系统参数,自动处理异常。如上文所述的开启更多的消费者线程加快消息处理的速度等。但这种方法的能力比较有限,如在某个时间段网络负载突然加大,导致消息传送到对端瞬间变慢,但同时 PUT 消息的程序是本地程序不受网络因素影响,即队列消息进入的速度远远大于出的速度,这时由于网络瓶颈的存在即使加多消费者线程也不能完全解决问题。在这种情况下可以考虑阻塞这个队列以防其被迅速撑满,只允许消息从这个队列出去,而禁止新的消息进入。这样可以给系统运维人员一个缓冲的时间修复网络异常。但代价是前端 PUT 消息的应用会感知这个错误,因此需要前端应用能够处理这种异常。这种处理方式往往需要从架构层面考虑异常问题,如在网络响应异常时 PUT 的应用是否可以切换到备用网络,或者是否有备用方式可以将新的请求转发,以及应用能否通知最前端系统告知其后台有网络异常等。这种方式需要监控程序在捕获队列深度异常的时候,能够通过 MQ API 修改队列管理器或者队列的参数,如通过 MQSET API 将对列设置为只允许消息出并拒绝新的消息进入:清单 6. MQSETMQLONG inqSelectors[1]; /* MQINQ selections */
MQLONG intAttrs[1]; /* Return values */
inqSelectors[0] = MQIA_INHIBIT_PUT;
intAttrs[0] = MQQA_PUT_INHIBITED;
MQSET(Hcon, Hobj, 1, inqSelectors, 1,
intAttrs, 0,
&CompCode,
&CReason);
if (CompCode != MQCC_FAILED)
printf("MQSET done successfully\n");
}在分别调用 MQCONN 连接到队列管理器,并且调用 MQOPEN 打开指定队列后 ( 须注意的是 MQOPEN 时需要添加 MQOO_SET 到 open option 里面 ),我们就可以调用 MQSET 方法将队列的 PUT 属性设为 DISABLED(MQQA_PUT_INHIBITED),即禁止 PUT 新消息到这个队列。这个改动可以立即生效,前端应用程序在向这个队列 PUT 新的消息时,则会收到代表队列禁止 PUT 的错误返回码或异常:2051 Queue put inhibited应用程序需要能够捕获这个错误并且进入备用处理逻辑(如通过其他方式发送这些新的请求,或通知前端界面当前有严重错误等)。批量 PUT 操作的应用捕获到 2051 错误后还需要调用 MQBACK 方法回滚事务。通过这种方式,系统作为一个整体对网络响应异常做出一个解决方案。这种方式无法直接解决问题,但可以最大程度地为运维人员争取时间。文中所述的这种网络响应异常本身就属于极端情况的严重问题了,因此需要运维人员一起参与从架构等更高层面整体考虑应急方案。本文只图尽量启发读者思考更为完善的应急措施。
相关主题参考 ,查看 MQ Infocenter 中关于触发(Trigger)的描述。参考 ,查看 MQ Infocenter 中关于 PCF 编程的描述。访问 developerWorks,了解关于信息管理的更多信息,获取技术文档、how-to 文章、培训、下载、产品信息以及其他资源。
添加或订阅评论,请先或。
有新评论时提醒我
static.content.url=http://www.ibm.com/developerworks/js/artrating/SITE_ID=10Zone=WebSphereArticleID=978098ArticleTitle=监控 WebSphere MQ 队列深度的几种常见方法publish-date=}

我要回帖

更多关于 置下拉列表文本 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信