RabbitMqConfig.java
@Configuration
@PropertySource("classpath:/conf/property/rabbitmq.properties")
public class RabbitMqConfig {
@Autowired private Environment env;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setPort(Integer.parseInt(env.getProperty("rabbitmq.port")));
connectionFactory.setUsername(env.getProperty("rabbitmq.user"));
connectionFactory.setPassword(env.getProperty("rabbitmq.password"));
connectionFactory.setHost(env.getProperty("rabbitmq.ip"));
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setExchange(env.getProperty("rabbitmq.exchange"));
rabbitTemplate.setRoutingKey(env.getProperty("rabbitmq.routingKey"));
rabbitTemplate.setQueue(env.getProperty("rabbitmq.queueName"));
rabbitTemplate.setConnectionFactory(new CachingConnectionFactory(connectionFactory()));
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(new CachingConnectionFactory(connectionFactory()));
container.setQueueNames(env.getProperty("rabbitmq.queueName"));
container.setMessageListener(exampleListener());
return container;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + new String(message.getBody()));
}
};
}
}
Producer 테스트
@RunWith( SpringJUnit4ClassRunner.class )
@ContextConfiguration(classes = {RabbitMqConfig.class})
public class ProducerTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void productTest(){
rabbitTemplate.convertAndSend("Hello World");
}
}
consummer테스트
- ExecutorService 이용한 Multi Thread처리
@RunWith( SpringJUnit4ClassRunner.class )
@ContextConfiguration(classes = {RabbitMqConfig.class})
public class ConsumerTest{
protected final Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
ConnectionFactory connectionFactory;
@Test
public void consumTest(){
int maxthread =20;
ExecutorService es = Executors.newFixedThreadPool(maxthread);
try {
Connection conn = connectionFactory.newConnection(es);
// Thread 당 다른 Channel 을 사용하기 위해서 Thread수 만큼 별도의 채널을 생성하낟.
for(int i=0;i<maxthread;i++){
Channel channel = conn.createChannel();
channel.basicQos(1);
channel.basicConsume("test",false,new MyQueueConsumer(channel));
}
System.out.println("Invoke "+maxthread+" thread and wait for listening");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//System.out.println(rabbitTemplate.receiveAndConvert());
//System.exit(1);
}
public class MyQueueConsumer extends DefaultConsumer {
Channel channel;
public MyQueueConsumer(Channel channel) {
super(channel);
// TODO Auto-generated constructor stub
this.channel = channel;
}
@Override
public void handleDelivery(String consumeTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// message handling logic here
String msg = new String(body);
UUID uuid = UUID.randomUUID();
System.out.println(uuid+" S Channel :"+channel+" Thread:"+Thread.currentThread()+" msg:"+msg);
// multiple - false if we are acknowledging multiple messages with the same delivery tag
this.channel.basicAck(deliveryTag, false);
}
}
}
댓글 없음:
댓글 쓰기