Páginas

lunes, 19 de marzo de 2018

Pruebas con ActiveMQ

Instalación de ActiveMQ

1) Descargamos el software 'apache-activemq-5.15.3-bin.tar.gz' desde http://activemq.apache.org/download.html
2) Descomprimimos el fichero tar zxvf apache-activemq-5.15.3-bin.tar.gz en $HOME
3) Levantamos el servidor. El script a lanzar será uno de los siguientes dependiendo de nuestro sistema: /bin/linux-x86-64/activemq o /bin/linux-x86-32/activemq):

   $ cd $HOME
   $ ./apache-activemq-5.15.3/bin/linux-x86-64/activemq start
   Starting ActiveMQ Broker...

4) Paramos el servidor:

   $ cd $HOME
   $ ./apache-activemq-5.15.3/bin/linux-x86-64/activemq stop
   Stopping ActiveMQ Broker...
   Stopped ActiveMQ Broker.

5) Consolta de gestión http://127.0.0.1:8161/admin/ (admin/admin)

Introducción a JMS

Un canal de intercambio de mensajes es un lugar común donde ciertas aplicaciones publican mensajes que son consumidos por otras aplicaciones.

En un sistema JMS participan 4 componentes:
  • publicador. Aplicación que genera mensajes y los publica en el canal.
  • consumidor. Aplicación que consume los mensajes de un canal y los procesa
  • mensaje. Información que se pasan entre publicador y consumidor, ambos conocen el formato del mensaje.
  • canal. Lugar por donde se intercambian los mensajes
JMS funciona de forma asíncrona, el publicador y el consumidor no necesitan estar conectados a la vez, ya que el servidor que alberga los canales actúa de intermediario, guardando y distribuyendo los mensajes a medida que el consumidor pueda atenderlos.

En JMS hay dos tipos de canales: Queue (colas) y Topic
  • Queue (cola). Uno o varios publicadores generan mensajes para uno o varios consumidores y cada mensaje le llega a un único consumidor. Los publicadores van dejando sus mensajes en la cola. Los mensajes son recuperados en orden FIFO por el consumidor que solicita un mensaje. Si no hay ningún consumidor solicitando mensajes, la cola los guarda. Si un mensaje es procesado por un consumidor no le llega a los demás consumidores.
  • Topic. Uno o varios publicadores generan mensajes para uno o varios consumidores y cada mensaje le llega a todos los consumidores 'online'. Los publicadores van dejando sus mensajes en el 'topic', los mensajes que llegan al 'topic' se reenvían a todos los suscriptores online (cada suscriptor levanta un listener). Todos los mensajes publicados en el topic mientras un suscriptor permanezca offline se perderán para ese suscriptor.
Ejemplo Cola JMS

package activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Productor implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(Productor.class);

private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Queue queue;
private MessageProducer messageProducer;
private String clientId;

public void run() {
try {
sendMensaje();
closeConnection();
} catch (JMSException e) {
e.printStackTrace();
}
}

public Productor(String clientId, String queueName) throws JMSException {

// Creamos la factoria de conexiones
this.connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);

// Creamos una conexión
this.clientId = clientId;
this.connection = connectionFactory.createConnection();
this.connection.setClientID(this.clientId);

// Creamos una sesion
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Creamos la cola a la que enviaremos los mensajes (si la cola ya existe no se crearía)
this.queue = session.createQueue(queueName);

// Creamos el productor de mensajes
messageProducer = session.createProducer(this.queue);
}

private void closeConnection() throws JMSException {

// Cerramos la conexion
connection.close();
}

private void sendMensaje() throws JMSException {

// Creamos un mensaje JMS de tipo TextMessage
final String text = "Productor ["+clientId+"] genera el mensaje ["+ System.currentTimeMillis()+"]";
final TextMessage textMessage = session.createTextMessage(text);

// Enviamos el mensaje a la cola definida
messageProducer.send(textMessage);

LOGGER.info(text);
}
}

package activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumidor implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(Consumidor.class);

private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Queue queue;
private MessageConsumer messageConsumer;
private String clientId;

public void run() {
try {
getTexto(250);
closeConnection();
} catch (JMSException e) {
e.printStackTrace();
}
}

public Consumidor(String clientId, String queueName) throws JMSException {

// Creamos la factoria de conexiones
this.connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);

//  Creamos una conexión
this.clientId = clientId;
connection = connectionFactory.createConnection();
connection.setClientID(clientId);

// Creamos una sesion
this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Creamos la cola de la que recibiremos los mensajes (si la cola ya existe no se crearía)
this.queue = session.createQueue(queueName);

// Creamos el consumidor de mensajes
messageConsumer = session.createConsumer(queue);

// Iniciamos la conexión para poder recibir mensajes
connection.start();
}

private void closeConnection() throws JMSException {

// Cerramos la conexion
connection.close();
}

private void getTexto(int timeout) throws JMSException {

// Solicitamos un mensaje de la cola y esperamos timeout milisegundos la llegada de uno si no hubiese nada en la cola
Message message = messageConsumer.receive(timeout);

String texto = null;

// Verificamos si se ha recibido un mensaje
if (message != null) {

// Casteamos el mensaje al tipo previamente acordado (en este caso un TextMessage)
TextMessage textMessage = (TextMessage) message;

// Recuperamos el texto del TextMessage
texto = textMessage.getText();

// Confirmamos al broker (a nuestro ActiveMQ) que se ha procesado correctamente el mensaje
message.acknowledge();

}
LOGGER.info("Consumidor ["+clientId+"] procesa el mensaje ["+ texto+"]");
}

}

package activemq;

import javax.jms.JMSException;

public class TestActiveMQ {

public static void main(String[] args) throws JMSException, InterruptedException {
testColas();
}
public static void testColas() throws JMSException, InterruptedException {

//Levantamos 5 productores de mensajes en threads independientes cada uno envía su mensaje a la cola
Thread tp;
for (int i=0;i<5;i++) {
tp = new Thread(new Productor("p"+i,"miCola"), "tp"+i);
tp.start();
}

//Levantamos 8 consumidores de mensajes en threads independientes, cada uno procesa un mensaje de la cola
//observaremos en el log que los tres últimos consumidores no obtienen nada de la cola.
Thread tc;
for (int i=0;i<8;i++) {
tc = new Thread(new Consumidor("c"+i,"miCola"), "tc"+i);
tc.start();
}
}
}

Ejemplo Topic JMS

package activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Publicador implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(Publicador.class);

private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Topic topic;
private MessageProducer messageProducer;
private String clientId;

public void run() {
try {
sendMensaje();
closeConnection();
} catch (JMSException e) {
e.printStackTrace();
}
}

public Publicador(String clientId, String topicName) throws JMSException {

// Creamos la factoria de conexiones
this.connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);

// Creamos una conexión
this.clientId = clientId;
this.connection = connectionFactory.createConnection();
this.connection.setClientID(this.clientId);

// Creamos una sesion
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Creamos el topic a la que enviaremos los mensajes (si el topic ya existe no se crearía)
this.topic = session.createTopic(topicName);

// Creamos el productor de mensajes
messageProducer = session.createProducer(this.topic);
}

private void closeConnection() throws JMSException {

// Cerramos la conexion
connection.close();
}

private void sendMensaje() throws JMSException {

// Creamos un mensaje JMS de tipo TextMessage
final String text = "Productor ["+clientId+"] genera el mensaje ["+ System.currentTimeMillis()+"]";
final TextMessage textMessage = session.createTextMessage(text);

// Enviamos el mensaje al topic definido
messageProducer.send(textMessage);

LOGGER.info(text);
}
}

package activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Suscriptor implements Runnable, MessageListener {

private static final Logger LOGGER = LoggerFactory.getLogger(Suscriptor.class);

private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Topic topic;
private MessageConsumer messageConsumer;
private String suscriptorId;
private String topicName;

public void run() {
LOGGER.info("Suscriptor ["+this.suscriptorId+"] iniciando la escucha del topic ["+this.topicName+"]...");
}

public Suscriptor(String suscriptorId, String topicName) throws JMSException {

// Creamos la factoria de conexiones
this.connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);

//  Creamos una conexión
this.suscriptorId = suscriptorId;
connection = connectionFactory.createConnection();
connection.setClientID(this.suscriptorId);

// Creamos una sesion
this.session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// Creamos el topic del que recibiremos los mensajes (si el topic ya existe no se crearía)
this.topicName = topicName;
this.topic = session.createTopic(this.topicName);

// Creamos el consumidor de mensajes
this.messageConsumer = session.createConsumer(this.topic);
this.messageConsumer.setMessageListener(this);
this.connection.start();
}

// Método que será invocado cuando llegue un mensaje
public void onMessage(Message message) {

String texto = null;

// Casteamos el mensaje al tipo previamente acordado (en este caso un TextMessage)
TextMessage textMessage = (TextMessage) message;

// Recuperamos el texto del TextMessage
try {
texto = textMessage.getText();
LOGGER.info("Suscriptor ["+this.suscriptorId+"] procesa el mensaje ["+ texto+"]");
} catch (JMSException e) {
e.printStackTrace();
}
}
}

package activemq;

import javax.jms.JMSException;

public class TestActiveMQ {

public static void main(String[] args) throws JMSException, InterruptedException {
testTopic();
}

public static void testTopic() throws JMSException, InterruptedException {

//Levantamos 3 consumidores de mensajes en threads independientes, cada uno procesa todo
//lo que llegue al topic. Conectamos antes los suscriptores dado que en caso de levantar
//primero los publicadores no llegaría nada a los suscriptores y además la cola se vería
//vacía dado que no había nadie escuchando
Thread tc;
for (int i=0;i<3;i++) {
tc = new Thread(new Suscriptor("sus"+i,"miTopic"), "tsus"+i);
tc.start();
}

//Levantamos 2 productores de mensajes en threads independientes cada uno envía su mensaje al topic
Thread tp;
for (int i=0;i<2;i++) {
tp = new Thread(new Publicador("pub"+i,"miTopic"), "tpub"+i);
tp.start();
}
}
}

Referencias:
https://www.codenotfound.com/jms-point-to-point-messaging-example-activemq-maven.html
https://www.codenotfound.com/jms-publish-subscribe-messaging-example-activemq-maven.html