使用Spring的XA事务(五)
用例3 – (事务型 MDP’s) 在统一事务中消费信息和更新数据库图 4: 用例3在统一事务中消费信息和更新数据库
这个用例不同于先前,它所做的是定义一个处理从消息提供者接收到的消息的POJO,同时在上图4所示的
统一事务中更新数据库.
MessageHandler 类的相关代码如下所示.
public void handleOrder(String msg) {
log.debug(“Receieved message->: ” + msg);
MessageSequenceDAO dao = (MessageSequenceDAO) MainApp.springCtx.getBean(“sequenceDAO”);
String app = “spring”;
String appKey = “allocation”;
int upCnt = dao.updateSequence(value++, app, appKey);
log.debug(“Update SUCCESS!! Val: ” + value + ” updateCnt->”+ upCnt);
if (fail)
throw new RuntimeException(“Rollback TESTING!!”);
}
正如所看见的那样,代码仅仅更新数据库mydb2.MessageHandler仅仅是一个有handleOrder方法的POJO.使用Spring
我们将要把这个转化成一个消息驱动的POJO(这和在JEE server服务器中的MDB相似),为了完成这个我们使用
MessageListenerAdapter类,通过反射的方式把消息处理委托给目标侦听方法.这是非常方便的特征,这使简单的
POJO转换成消息驱动的POJO.我们的MDP 现在支持分布式事务.
到我们更清晰的看一下配置的时候了.
<bean id="msgHandler"/>
<bean id="msgListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="msgHandler"/>
<property name="defaultListenerMethod" value="handleOrder"/>
</bean>
上面配置展示了msgListener bean 把调用委托给msgHandler.我们也指定了handleOrder(),这个会在消息从消息
提供者到达后被调用. 这在defaultListenerMethod 属性中设置.让我们看一下消息侦听器,侦听消息提供者上的
目标.
<bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
destroy-method="close">
<property name="concurrentConsumers" value="1"/>
<property name="connectionFactory" ref="queueConnectionFactoryBean" />
<property name="destination" ref="appJmsDestination"/>
<property name="messageListener" ref="msgListener"/>
<property name="transactionManager" ref="transactionManager"/>
<property name="sessionTransacted" value="false"/>
<property name="receiveTimeout" value="5000"/>
<property name="recoveryInterval" value="6000"/>
<property name="autoStartup" value="false"/>
</bean>
本例中的消息侦听器使用由Spring提供的DefaultMessageListenerContainer.concurrentConsumers 属性设置成1.
表明队列中的消费者仅有一个.这个属性主要用于用大量侦听器并发处理队列.特别是在有一个快速的生产者和一
个慢速的消费者和消息的顺序不重要的情况下非常有用.Spring 2.0.3 以及以前版本支持使用
maxConcurrentConsumers属性根据负载动态调整侦听数量.recoveryInterval属性用于恢复目的,在消息提供者关闭
并且我们需要在不停止应用运行重新连接的情况下是有用的.这个特征,运行在一个无限的循环中,随着应用的运行不
断重试,这可能出人所料.处理DMLC需要非常仔细,因为有后台线程,甚至在JVM停止之后,仍旧会从消息提供者哪里接
受消息.Spring 2.0.4中,这个问题已经被解决.像之前所提到的sessionTransacted属性只在Atomikos时被置为”true”
为listenerContainer定义的同样的bean 同时应用于Atomikos 和 Bitronix
请注意transactionManger属性所指的bean定义为上面所定义的.我们在重用同一个bean定义.
就是这么简单,我们仅仅实现了MDP,在统一事务中接受信息并更新数据库.
为了运行本例,请看一下在ant build文件中的AtomikosConsumer任务和BitronixConsumer 这是可下载工程的一部分.
为了在Spring framework 和JTA代码间拦截调用.一个拦截器被使用并被编进JTA实现的jar文件的运行时库.它使用
AspectJ,代码段如下所示.
pointcut xaCalls() : call(* XAResource.*(..))
|| call(* TransactionManager.*(..))
|| call(* UserTransaction.*(..))
;
Object around() : xaCalls() {
log.debug("XA CALL -> This: " + thisJoinPoint.getThis());
log.debug(" -> Target: " + thisJoinPoint.getTarget());
log.debug(" -> Signature: " + thisJoinPoint.getSignature());
Object[] args = thisJoinPoint.getArgs();
StringBuffer str = new StringBuffer(" ");
for(int i=0; i< args.length; i++) {
str.append(" [" + i + "] = " + args[i]);
}
log.debug(str);
Object obj = proceed();
log.debug("XA CALL RETURNS-> " + obj);
return obj;
}
以上代码定义了一个在所有jta相关代码调用时的切入点,同时定义了一个around 消息,
这记录了传递的参数和方法返回值.这在我们尝试跟踪和调试JTA实现问题的时候,将会派上用场.工程中的build.xml
文件包含有编集依赖JTA实现的切面的任务,另一个选择是使用 eclipse的MaintainJ 插件.这具有IDE的舒适以及甚至
可以产生处理流程的序列图.
Some Gotchas
分布式事务是非常复杂的话题,所寻找的实现其事务恢复足够健壮并且具有用户和应用所盼望的所有的ACID标准.
在本文章中所测试的是有关pre-2PC 的异常.对于有关在第二阶段提交过程中的失败,应用程序应该对JTA 的实现
进行完全的测试。它们是非常关键和麻烦的.
所有的我们看到的JTA 实现提供了恢复测试用例.这依赖实现本身以及参与的XA resources 可以很容易的运行.
请注意在事务容量巨大时,使用XA可能引发很高的性能问题.应该注意一下类似”last resource commit”,的关于
2pc优化的支持,这可能适合一些应用需要,在这些需要中只有一个参与资源不能或不需要支持 2PC应该关注一下
由数据库或消息提供者的供应商提供的XA 特征支持以及施加的限制(如果有的话).例如MySQL不支持suspend()
和 resume() 操作以及好像有一些限制在使用 XA 以及在一些保持数据一致状态的情况下.为了更多了解XA,
Principles of Transaction Processing 是一本非常好的书.这涵盖了2PC 失败状况以及在巨大细节上的优化.
同时Mike Spille的blog是另一个好的资源这个关注没有JTA背景的XA,具有很高的信息价值.特别是2pc期间的
失败,有助于理解更多的关于XA事务.
使用Spring framework 发送和接受JMS消息时,应注意在非J(2)EE 环境中使用JmsTemplate和
DefualtMessageListenerContainer.JmsTemplate的情况.对于每一个被发送的消息,都将有一
个新的 JMS 连接被创建.使用DefaultMessageListenerContainer接受消息时同样如此.创建
重量级的 JMS连接是非常耗费资源的并且应用在高负载下不能伸缩.一种选择是寻找某种由
JMS提供者或第三方库支持的连接/会话池.另一种选择是使用来自
spring的SingleConnnectionFactory,这很容易配置一个单一的可重用的连接每一条发送的
消息都要创建会话,这可能不是一个真正的开销,因为JMS的会话是轻量级的.接受消息同样如此,
无论他们是否是事务型的.
结论
在本文中我们看到Spring framework 如何同JTA实现整合以提供分布式的事务.以及如何满足需
要分布式应用但无需一个全面的服务器的应用的需要.本文也展示了Spring framework 如何提
供基于POJO 的解决方案以及声明式事务管理(很小的侵入同时促进最好的设计实践),我们看到
的用例也显示Spring 提供了一个丰富的事务管理抽象,使我们可以无缝的在不同的JTA提供者
之间进行切换.
Resources
Download the source code (8M Zip file).
X/Open XA
JTA API
Spring Framework
ActiveMQ
Mike Spille’s Blog
MySQL XA issues
JTA Implementations:
1.JBossTS
2.Atomikos
3. Bitronix
Tibco Software
Principles of Transaction Processing

post1, Purchase klonopin online. buy klonopin without prescription, Xjk8DnS2