Páginas

viernes, 26 de abril de 2013

Ejemplo JMS Topic con Spring 2.5 y ActiveMQ

Introducción

En el siguiente ejemplo montamos una demostración de cómo configurar un publicador (publisher) y un suscriptor (suscriber) sobre un topic JMS usando Spring 2.5 para configurar todo el sistema.

Como servidor JMS usaremos Apache ActiveMQ.

En una entrada anterior ya hemos comentado cómo instalar ActiveMQ, así pues, vamos directamente con el ejemplo.

Proyecto publicador (publisher)

Este jar implementa un sencillo publicador de mensajes que entrega sus mensajes a sus N suscriptores mediante una cola de tipo "topic" en un servidor JMS.

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>es.ine.sgtic</groupId>
    <artifactId>inerel</artifactId>
    <version>1.0</version>
  </parent>
  <artifactId>inerel-jms-topic-productor</artifactId>
  <dependencies>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring</artifactId>
          <version>2.5</version>
      </dependency>
      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.8.0</version>
      </dependency>
  </dependencies>
</project>



applicationContext.xml

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">

    <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
    <!-- URL del servidor JMS                             -->
    <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
    <bean id="activemq.connectionfactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>

    <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
    <!-- CONFIGURACION PRODUCTOR VOLCANDO SOBRE UNA TOPIC -->
    <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <!-- Ajustamos el nombre de la TOPIC en ActiveMQ      -->
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <bean id="activemq.cola-topic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="cola-topic"/>
        </bean>
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <!-- Ajustamos el JmsTemplate:                        -->
        <!--  pubSubDomain==true => Cola tipo TOPIC           -->
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <bean id="activemq.jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="activemq.connectionfactory"/>
            <property name="pubSubDomain" value="true"/>
        </bean>
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <!-- Ajustamos el generador de peticiones             -->
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <bean id="productorPeticiones" class="es.ine.sgtic.jms.topic.productor.impl.ProductorPeticionesImpl">
            <property name="jmsTemplate" ref="activemq.jmsTemplate"/>
            <property name="destino" ref="activemq.cola-topic"/>
        </bean>
</beans>


AppTestProductorTopic

package es.ine.app;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.xml.XmlBeanFactory;
import org.springframework.core.io.FileSystemResource;

import es.ine.sgtic.jms.topic.productor.IProductorPeticiones;

public class AppTestProductorTopic {

    public static void main(String args[]){
   
        //Instanciamos la factoría de beans
        final BeanFactory fabricaDeBeans = new XmlBeanFactory(new FileSystemResource("src/main/resources/applicationContext.xml"));
       
        //Obtenemos el bean productor de peticiones
        final IProductorPeticiones productor = (IProductorPeticiones) fabricaDeBeans.getBean("productorPeticiones");
       
        //Generamos la petición y ala enviamos
        productor.generarPeticion();
    }
}


IProductorPeticiones.java

package es.ine.sgtic.jms.topic.productor;

public interface IProductorPeticiones {
  
    public void generarPeticion();
}

ProductorPeticionesImpl .java

package es.ine.sgtic.jms.topic.productor.impl;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import es.ine.sgtic.jms.topic.productor.IProductorPeticiones;

public class ProductorPeticionesImpl implements IProductorPeticiones {

    private static final String CAMPO_CONTENIDO = "campo-contenido";
    private static final String MENSAJE = "En un lugar de la mancha";
    private JmsTemplate jmsTemplate = null;
    private Topic destino = null;

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void setDestino(Topic destino) {
        this.destino = destino;
    }

    public void generarPeticion() {
        //Generamos y enviamos una petición
        jmsTemplate.send(this.destino,
                         new MessageCreator() {
                            public Message createMessage(Session session) throws JMSException {
                                MapMessage mensaje = session.createMapMessage();
                                mensaje.setString(CAMPO_CONTENIDO, MENSAJE);
                                return mensaje;
                            }
                        });
    }
}


Proyecto suscriptor (suscriber)

Este jar implementa un sencillo suscriptor que apunta a cierta cola JMS de tipo "topic", por lo que recibe un mensaje cada vez que un publicador deja algo en la cola.

Nosotros sólo implementamos el consumidor de los mensajes (id="consumidorPeticiones"), es Spring quien proporciona un listener (id="listenerPeticiones") sobre la cola "topic".

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>es.ine.sgtic</groupId>
    <artifactId>inerel</artifactId>
    <version>1.0</version>
  </parent>
  <artifactId>inerel-jms-topic-consumidor</artifactId>
  <dependencies>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring</artifactId>
          <version>2.5</version>
      </dependency>
      <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.8.0</version>
      </dependency>
  </dependencies>
</project>



applicationContext.xml

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
    <!-- URL del servidor JMS                             -->
    <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
    <bean id="activemq.connectionfactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616"/>
    </bean>

    <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
    <!-- CONFIGURACION CONSUMIDOR LEYENDO DE UNA TOPIC    -->
    <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <!-- Ajustamos el nombre de la TOPIC en ActiveMQ      -->
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <bean id="activemq.cola-topic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg index="0" value="cola-topic"/>
        </bean>
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <!-- Consumidor de las peticiones                     -->      
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <bean id="consumidorPeticiones" class="es.ine.sgtic.jms.topic.consumidor.impl.ConsumidorAvisosTopicImpl"/>
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <!-- Listener de las peticiones                       -->      
        <!-- ++++++++++++++++++++++++++++++++++++++++++++++++ -->
        <jms:listener-container connection-factory="activemq.connectionfactory" destination-type="topic">
            <jms:listener id="listenerPeticiones" destination="cola-topic" ref="consumidorPeticiones"/>
        </jms:listener-container>
</beans> 


AppTestConsumidorTopic.java

package es.ine.sgtic.app;

import javax.jms.JMSException;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.xml.XmlBeanFactory;
import org.springframework.core.io.FileSystemResource;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

public class AppTestConsumidorTopic {

    public static void main(String[] args) throws JMSException {

        //Instanciamos la factoría de beans
        final BeanFactory factoriaBeans = new XmlBeanFactory(new FileSystemResource("src/main/resources/applicationContext.xml"));
       
        //Instanciamos el bean escuchador de peticiones, con esto queda activo y a la escucha
        final DefaultMessageListenerContainer listenerPeticiones = (DefaultMessageListenerContainer) factoriaBeans.getBean("listenerPeticiones");
       
        //TODO: Si necesitásemos instanciar otro consumidor podríamos invocar:
        //listenerPeticiones.initialize();
    }
}


ConsumidorAvisosTopicImpl.java

package es.ine.sgtic.jms.topic.consumidor.impl;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

public class ConsumidorAvisosTopicImpl implements MessageListener {

    private static final String CAMPO_CONTENIDO = "campo-contenido";
  
    @Override
    public void onMessage(Message mensaje) {
        try {
            if (mensaje instanceof MapMessage) {
                final MapMessage mapMessage = (MapMessage) mensaje;
                System.out.println(mapMessage.getString(CAMPO_CONTENIDO));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

No hay comentarios:

Publicar un comentario