2015년 3월 31일 화요일

rabbitMQ Configuration설정 및 consumer&producer

java config설정 listener추가

RabbitMqConfig.java

@Configuration
@PropertySource("classpath:/conf/property/rabbitmq.properties")
public class RabbitMqConfig {
    @Autowired private Environment env;
 
    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setPort(Integer.parseInt(env.getProperty("rabbitmq.port")));
        connectionFactory.setUsername(env.getProperty("rabbitmq.user"));
        connectionFactory.setPassword(env.getProperty("rabbitmq.password"));
        connectionFactory.setHost(env.getProperty("rabbitmq.ip"));

        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
         RabbitTemplate rabbitTemplate = new RabbitTemplate();
         rabbitTemplate.setExchange(env.getProperty("rabbitmq.exchange"));
         rabbitTemplate.setRoutingKey(env.getProperty("rabbitmq.routingKey"));
         rabbitTemplate.setQueue(env.getProperty("rabbitmq.queueName"));
         rabbitTemplate.setConnectionFactory(new CachingConnectionFactory(connectionFactory()));
         return rabbitTemplate;
    }
 
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(new CachingConnectionFactory(connectionFactory()));
        container.setQueueNames(env.getProperty("rabbitmq.queueName"));
        container.setMessageListener(exampleListener());
        return container;
    }


    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + new String(message.getBody()));
            }
        };
    }
 
}

Producer 테스트
@RunWith( SpringJUnit4ClassRunner.class )
@ContextConfiguration(classes = {RabbitMqConfig.class})
public class ProducerTest {
    @Autowired
    RabbitTemplate rabbitTemplate;
 
    @Test
    public void productTest(){
        rabbitTemplate.convertAndSend("Hello World");
    }
}


consummer테스트 
 - ExecutorService 이용한 Multi Thread처리 

@RunWith( SpringJUnit4ClassRunner.class )
@ContextConfiguration(classes = {RabbitMqConfig.class})
public class ConsumerTest{
    
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    
    @Autowired 
    RabbitTemplate rabbitTemplate;
    @Autowired
    ConnectionFactory connectionFactory;
    
    @Test
    public void consumTest(){
        int maxthread =20;
        ExecutorService es = Executors.newFixedThreadPool(maxthread);
        try {
            Connection conn = connectionFactory.newConnection(es);

            // Thread 당 다른 Channel 을 사용하기 위해서 Thread수 만큼 별도의 채널을 생성하낟.
           for(int i=0;i<maxthread;i++){
                   Channel channel = conn.createChannel();     
                   channel.basicQos(1);
                   channel.basicConsume("test",false,new MyQueueConsumer(channel));
           }
           System.out.println("Invoke "+maxthread+" thread and wait for listening");   
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        //System.out.println(rabbitTemplate.receiveAndConvert());
        //System.exit(1);
    }
    
    public class MyQueueConsumer extends DefaultConsumer {
        
        Channel channel;
        public MyQueueConsumer(Channel channel) {
               super(channel);
               // TODO Auto-generated constructor stub
               this.channel = channel;
        }
        @Override
        public void handleDelivery(String consumeTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                                        byte[] body)
               throws IOException
        {
               String routingKey = envelope.getRoutingKey();
               String contentType = properties.getContentType();
               long deliveryTag = envelope.getDeliveryTag();
              
               // message handling logic here
               String msg = new String(body);
               UUID uuid = UUID.randomUUID();
               System.out.println(uuid+" S Channel :"+channel+" Thread:"+Thread.currentThread()+" msg:"+msg);
              
               // multiple - false if we are acknowledging multiple messages with the same delivery tag
               this.channel.basicAck(deliveryTag, false);
        }
    }
}

댓글 없음:

댓글 쓰기