[원문 출처] http://activemq.apache.org/slow-consumer-handling.html


해당 문서의 경우 Apache 재단의 ActiveMQ Document 문서 일부를 번역 한 것입니다. 
번역본인 해당 문서의 경우 역자에게 있음을 알리며 상업적 이용을 불허합니다. 

www.sogomsoft.co.kr (주) 소곰소프트



속도가 느린 컨슈머는 일단 채워진 RAM에 오래된 메시지를 유지하기 위해 프로듀서에 속도를 늦추도록 브로커에 강제할 수 있고 빠른 컨슈머가 속도가 느려지게 되기 때문에 비영속성 토픽에서 문제가 발생 할 수 있다. 이후에 우리가 구현 할수 있는 하나의 옵션은 디스크에 스플링(디스크에 쓰기와 동시에 처리) 하는 것이다.  그러나 디스크에 스플링 하는 것 역시 속도가 빠른 컨슈머를 속도를 감소 시킨다.

현재 브로커의 프리패치 버퍼에 추가된 컨슈머를 위해 브로커가 유지하게 되는 일치하는 메시지의 최대 수를 환경 구성하는 전략을 가지고 있다. 최대 값에 도달 할 때, 새 메시지가 들어 오기 때문에 오래된 메시지들은 폐기 된다. 이것은 현재 메시지를 위해 RAM을  유지하고 속도가 느린 컨슈머에 메시지를 보내는 것을 유지하는 것을 허용한다. 그러나 오래된 메시지는 폐기한다.

지연 메시지 제한 전략(Pending Message Limit Strategy)

목적지 Map에 PendingMessageLimitStrategy 구현체 클래스를 환경 구성할 수 있다. 그래서 토픽 네임스페이스의 다른 영역이 속도가 늦은 컨슈머를 다루기 위한 다른 지연 전략을 가질 수 있다. 예를 들면,  매우 부피가 큰가격을 위해 지연 전략을 사용하기를 원할지 모른다. 그러나 매우 작은 부피를 가지는 주문과 거래를 위해 오래된 메시지는 폐기하는 것을 원하지 않을지 모른다.

이 전략은 컨슈머를 위해 RAM에 유지 되어야 하는 지연되는 메시지의 최대 량을 계산한다. (위 프리패치 사이즈). 0 값은 프리패치 량보다 더 이상의 메시지를 유지 하지 않는 다는 것을 의미한다. 0보다 큰 값은 메시지의 량에 들어 온 새 메시지로 오래된 메시지를 폐기하면서 프리패치 량보더 더 메시지를 유지 하게 될 것이다.  -1 값은 메시의 폐기를 비활성화 한다. 

현재 이 전략은 다른 두 구현체가 있다. :

  • 정해진 상수 지연 메시지 제한 전략 (ConstantPendingMessageLimitStrategy)
  • 프리패치 비율 지연 메시지 제한 전략 (PrefetchRatePendingMessageLimitStrategy)

정해진 상수 지연 메시지 제한 전략 (ConstantPendingMessageLimitStrategy)

이 전략은 모든 컨슈머를 위해 정해진 상수 제한값을 사용한다.  (위 프리패치 사이즈).

예제:

xml<constantPendingMessageLimitStrategy limit="50"/>

프리패치 비율 지연 메시지 제한 전략 (PrefetchRatePendingMessageLimitStrategy)

이 전략은 컨슈머들 프리패치 사이즈의 곱셈을 사용하여 지연 메시지의 최대 량을 계산한다. 그래서 예를 들면 각각의 컨슈머를 위한 프리패치 수를 2.5배 유지 할 수 있다. 

예제:

xml<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

제한을 환경 구성하기 위해 프리패치 정책 사용하기 

JMS 클라이언트는 지속성 또는 비지속성 큐와 토픽을 위한 다양한 프리패치 정책을 환경 구성하기 위해 사용될 수 있는 프리패치 정책 을 가진다. 이 프리패치 정책은 또한 커넥션 단위 또는 컨슈머 단위로 maximumPendingMessageLimit 을 지정하는 것을 허용한다. 이 값을 환경 구성하는 하나의 작은 차이가 있다; OpenWire 같은 non-JMS 클라이언트로 동작을 단순화 하기 위해,  0값이 무시된다.; 그래서 환경구성 될수 있는 가장 낮은 값은 1이다.

퇴출 정책 환경 구성하기

속도가 느린 컨슈머에 메시지가 퇴출되는 것을 결정할때 사용하는 MessageEvictionStrategy  전략이 있다. 

하나는 가장 오래된 메시지를 퇴출 하는 것이 기본 값이고 설정은 다음과 같다:

xml<oldestMessageEvictionStrategy/>

그러나, 몇몇 어플리케이션이 퇴출을 위한 메시지를 선택하는 특별한 방법을 사용하기 위해 사용할 수 있다. 예를들면, 만약 가격을 업데이트 할 시장 데이타를 보낸다면, 가장 오래된 메시지가 아닌 오래된 가격 값을 찾는 것을 원할지 모른다.

예제:

xml<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>

propertyName 은 가격을 지정한 JMS 메시지 프로퍼티이다. 

역자주) propertyName 에 지정된 이름에 가장 오래된 메시지를 퇴출 한다.


다른 하나의 옵션은 우선 순위가 가장 낮은 메시지로 가장 오래된 메시지를 사용할 수있다. 그러므로 만약 몇몇 높은 우선순위 메시지가 있다면 심지어 새 메시지라고 하더라도 우선순위가 낮은 메시지가 퇴출된다. 


xml<oldestMessageWithLowestPriorityEvictionStrategy/>

예제

다음 예지는 ActiveMQ 브로커의 환경 구성 파일을 보여준다. Notice that for topics in the PRICES.> wildcard range the pendingMessageLimitStrategyproperty is set to only keep around 10 messages for each consumer above their prefetch buffer size.

xml

<beans xmlns="http://www.springframework.org/schema/beans"
            xmlns:amq="http://activemq.apache.org/schema/core"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://activemq.apache.org/schema/core  http://activemq.apache.org/schema/core/activemq-core.xsd">

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
    <broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="${brokername}">
      <!-- lets define the dispatch policy -->
      <destinationPolicy>
        <policyMap>
          <policyEntries>

            <policyEntry topic="FOO.>">
              <dispatchPolicy>
                <roundRobinDispatchPolicy/>
              </dispatchPolicy>
              <subscriptionRecoveryPolicy>
                <lastImageSubscriptionRecoveryPolicy/>
              </subscriptionRecoveryPolicy>
            </policyEntry>

            <policyEntry topic="ORDERS.>">
              <dispatchPolicy>
                <strictOrderDispatchPolicy/>
              </dispatchPolicy>
              <!-- 1 minutes worth -->
              <subscriptionRecoveryPolicy>
                <timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
             </subscriptionRecoveryPolicy>
          </policyEntry>
  
          <policyEntry topic="PRICES.>">
            <!-- lets force old messages to be discarded for slow consumers -->
            <pendingMessageLimitStrategy>
              <constantPendingMessageLimitStrategy limit="10"/>
            </pendingMessageLimitStrategy>
            <!-- 10 seconds worth -->
            <subscriptionRecoveryPolicy>
              <timedSubscriptionRecoveryPolicy recoverDuration="10000"/>
            </subscriptionRecoveryPolicy>
          </policyEntry>
  
          <policyEntry tempTopic="true" advisoryForConsumed="true"/>
          <policyEntry tempQueue="true" advisoryForConsumed="true"/>

        </policyEntries>
      </policyMap>
    </destinationPolicy>
  </broker>
</beans>

사용 팁

만약 특정한 컨슈머가 속도가 느리다면 빠른 컨슈머보다 프리패치 사이즈를 더 작게 설정하는 것이 좋다!

예를 들면, 만약 특정 서버가 매우 느리고 높은 메시지 비율을 가지고 있고, 몇몇 속도가 빠른 컨슈머가 있다면, 빠른 서버 보다 더 작게 속도가 느린 서버에 프리패치를 설정하고 이 기능을 활성화 하는 것을 원할지 모른다.

속도가 느린 컨슈머 상태 모니터링 하기

활성화된 구독의 상태를 보기 위해 JMX 콘솔을 사용 할 수 있다. 이 방법은 TopicSubscriptionViewMBean에 다음 통계를 보는것을 허용한다.  

Statistic

Definition

discarded

속도가 느린 컨슈머에 구독 구독라이프 사이클 동안 폐기된 많은 메시지가 수.

matched

프리패치 버퍼에 가용량이 사용가능하자 마자 구독에 전파되게 되고 일치된 메시지의 현재 수. 그래서 0이 아닌 값은 구독을 위해 프리패치 버커가 가득 찬 것을 암시한다. 


[원문 출처] http://activemq.apache.org/vm-transport-reference.html


해당 문서의 경우 Apache 재단의 ActiveMQ Document 문서 일부를 번역 한 것입니다. 
번역본인 해당 문서의 경우 역자에게 있음을 알리며 상업적 이용을 불허합니다. 

www.sogomsoft.co.kr (주) 소곰소프트





VM 전송

VM 전송은 클라이언크가 네트워크 커넥터의 오버헤드 없이 VM 내부에 다른 곳에 접속 하는 것을 허용한다. 이 커넥션은 소켓 커넥션이 사용되지 않는다. 그러나 고성능의 내장된 메시징 시스템을 사용 가능하게 직접 메서드 호출을 사용한다. .

VM 커넥션을 사용하기 위한 첫번째 클라이언트는 내장된 브로커를 구동하게 된다.  그 뒤의 클라이언트 커넥션은 같은 브로커에 접속한다. 일단 브로커에 접속하기 위한 모든 VM 커넥션이 모두 닫히게 되면, 내장된 브로커는 자동적으로 셧다운 된다.


심플 브로커 환경 구성 구문

VM 커넥션을 위한 일반적인 구문인 simple 이고, 단지 제한된 내장된 브로커의 환경 구성을 제공한다. 


vm://brokerName?transportOptions


만약 이미 인스턴스화된 브로커에 접속을 원하면, 내장된 브로커(예를 들면. Apache ServiceMix 같은 경우에 ) brokerName에  이미 실행중인 브로커의 brokerName 과 일치하는 것이 사용 된 vm://brokerName URI를 사용하는 것을  보장한다. 

전송 옵션

옵션 명

기본값

상세 설명

marshal

false

만약 true 이면, 각각의 명령어가 WireFormat을 사용하여 강제로 marshalled 와 unmarshalled 된 VM 전송 통해 보낸다. 

wireFormat

default

사용할  WireFormat 의 이름

wireFormat.*

 

wireFormat을 환경 구성하기 위해 사용되는 이 접두사로 된 모든 프로퍼티.

create

true

만약 브로커가 존재 하지 않는다면 요청에 의해 브로커를 생성하게 될 것 인지 여부

waitForStart

-1

만약 0보다 크면, 브로커를 시작하기 위해 기다리는 밀리세컨드 타임아웃을 나타낸다. -1 과 0 은 기다리지 않는다는 것을 의미한다. 단지 ActiveMQ 5.2+ 에서 지원된다.

broker.*

 

브로커를 환경 구성하기 위해 사용되는 이 접두어를 사용하는 모든프로퍼티 . 더 많은 정보는 Wire Formats 환경구성하기 를 보라.

예제 URI
vm://broker1?marshal=false&broker.persistent=false

내장된 브로커 주의

만약 VM 전송을 사용하고 명시적으로 내장된 브로커 를 환경구성하는 것을 원하면, 브로커를 시작하기 전에 JMS 커넥션을 생성할 기회가 있다. 만약 VM 전송을 사용하고 이미 환경 구성된 브로커가 없다면, 현재 ActiveMQ는 브로커를 자동생성하게 될 것이다. (ActiveMQ 5.2 이상에서,  커넥션 URI를 사용을 위해 waitForStart 와 create=false 옵션을 사용하는 것이 가능하다.)

만약 스프링을 사용한다면 이 문제를 해결하기 위해,  내장된 브로커에서 이 문제가 발생하는 것을 피하기 위해 JMS ConnectionFactory가 depends-on 속성을 사용해야 할지 모른다. 

예를 들면.

<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
    <property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />
    <property name="start" value="true" />
  </bean>
 
  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
    <property name="brokerURL" value="vm://localhost"/>
  </bean>

어드벤스드 브로커 환경 구성 구문

VM 커넥션을 위한 어드벤스드 구문이다. Broker 브로커 환경  구성 URI를 사용하여 좀더 광범위하게 브로커를 환경 구성하는 것을 허용한다. 

vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions

또는 
vm:broker:(tcp://localhost)?brokerOptions

전송 옵션 

옵션명

기본값

상세 설명

marshal

 false

만약 true 이면, 각각의 명령어가 WireFormat을 사용하여 강제로 marshalled 와 unmarshalled 된 VM 전송 통해 보낸다. 

wireFormat

default

사용할  WireFormat 의 이름

wireFormat.*

 

WireFormat을 환경 구성하기 위해 사용되는 이 접두사를 사용하는 모든 프로퍼티

VM 전송을 사용을 최적화 하는 더 많은 옵션이 있다.

예제 URI
vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false

외부 환경 구성 파일을 사용하여 내장된 브로커 환경 구성하기 

 VM 전송과 외부 환경 구성 파일을 사용하여 (i.e. activemq.xml) 내장된 브로커를 시작하기 위해, 다음 URI를 사용하라:


vm://localhost?brokerConfig=xbean:activemq.xml


+ Recent posts