Child pages
  • Glassfish, OpenMQ prioritás kezelés
Skip to end of metadata
Go to start of metadata

Glassfish, OpenMQ prioritás kezelés

Egy látszólag egyszerű problémába szaladtam bele a mindennapi munkám során, a megoldásához bele kellett magam ásnom a JMS és az OpenMQ világába. Na de lássuk, mi is volt a feladat és a probléma:

Feladat: Tipikus SMS küldési feladat. A rendszerben, amit fejlesztünk, keletkeznek SMS-ek, amiket ki kellene küldeni. Az SMS-ek között vannak olyanok melyeknek „ráér” a küldése és vannak olyanok, melyeket minél hamarabb kézbesíteni kell. Az SMS-ek vegyesen keletkezhetnek. Van úgy, hogy kevésbé fontosból 500-600 keletkezik egy rövid időszak alatt. Úgy kell kiküldeni, hogy a fontos SMS-ek hamarabb kerüljenek kézbesítésre, mint a kevésbé fontosak, magyarán prioritásuk szerint.

Fontos peremfeltétel, hogy az SMS küldő szolgáltatónak (szerződésünk miatt) egy szálon adhatjuk oda az elküldendő SMS-eket. Maga az SMS küldése kb 0,5 másodpercet vesz igénybe, ami 600 SMS esetén kb. 5 perc.

Megvalósítás: JMS Queue-t hívtuk segítségül, melyet úgy paramétereztük hogy egy Receiver legyen, így biztosítottuk, hogy a szolgáltató felé egy szálon adjuk át a küldendő SMS-eket. A JMS biztosította azt, hogy bármilyen folyamat „dobhat be” a queue-ba SMS-t, ami küldésre kerül. A JMS-nek van beépített prioritás kezelése, így a prioritás kezelése is megoldottnak tűnt. Na ez az amiben tévedtem.

Probléma: Azt tapasztaltam, ha 500-600 alacsony prioritású SMS-t kapott a JMS és ezután fontos SMS -ek kerültek a queue-ba, akkor nem ugrottak a feldolgozási sor elejére a fontos SMS-ek, ezért csak az alacsony prioritású üzenetek után kerültek kiküldésre, ami nem megengedhető. Szeretném az alábbi néhány sorban bemutatni egy lecsupaszított példán keresztül, hogy mi is a hiba jelenség és milyen módon lehet megoldani.A JMS bemutatása nem cél, arra nagyon sok jó leírás található. Emlékeztetőül egy sematikus ábra:

http://docs.oracle.com/javaee/6/tutorial/doc/bnceh.html és http://docs.oracle.com/javaee/6/tutorial/doc/bncfa.html

Demo alkalmazás

A mellékelt PrioritasDemo egy tipikus JEE alkalmazás: PrioritasDemo.zip

A PrioritasDemo-war-ban semmi különös nincs, egy nagyon egyszerű JSF oldalt készítettem, ahol is különböző prioritású csomagokból megadott darabot fog elhelyezni a queue-ban Session Bean hívás segítségével. Ennél sokkal érdekesebb a PrioritasDemo-ejb, melyből a fontosabb, érdekesebb részeket kiemelem:

  • a glassfish-resource.xml-ben találhatók egy jms/SMSQueue JMS erőforrás létrehozásához szükséges leírók, ez az a queue, amibe a küldendő SMS-eket várjuk.
  • Az SMSQueue feldolgozásáért felelős MDB az SMSQueueMessageBean.java, mely most az egyszerűség kedvéért csak imitálja a szolgáltatónak való küldést, és helyette kiírja hogy milyen üzenetet kapott, és annak mi volt a prioritása. A kód lényegi része:

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage tm = (TextMessage) message;
            log.info(tm.getText() + ", \tJMS Prioritás: " + message.getJMSPriority() );
            
            // Tényleges küldés a szolgáltatóhoz, ami kb 0,5 másodperc / SMS
            // ... 
            Thread.sleep(500);
            
        } catch (Exception e) {
            log.fatal(e);
        }
    }
  • Azt, hogy az SMS küldés egy szálon történjen, úgy oldottam meg, hogy a feldolgozó szálak számát maximalizáltam egyre, azaz nem kell mást tennem, csak glassfish-ejb-jar.xml-be a következő néhány sort beszúrni:

    <ejb>
      <ejb-name>SMSQueueMessageBean</ejb-name> 
      <bean-pool>
        <steady-pool-size>1</steady-pool-size>
        <max-pool-size>1</max-pool-size> 
      </bean-pool>
    </ejb>
  • A JMS Queue-ba helyezést a SMSToQueueSessionBean.java-ban lévő sendJMSMessageToSMSQueue metódus végzi, mely kap egy szöveges üzenetet és az adott üzenet prioritását.
  • Érdemes megfigyelni a @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) annotációt, ez azért került bele, hogy minden egyes üzenet külön tranzakcióba kerüljön a queue-ba. Ugyanis ha nem így tennénk, akkor a hívó Session Bean (jelen esetben: SMSGeneralasSessionBean.smsGeneralas) tranzakciójában futna, tehát a küldés nem kezdődne el, csak ha a hívó befejezte volna az időigényes munkáját:

    @Override
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)    
    public void sendJMSMessageToSMSQueue(String uzenet, int prioritas) throws JMSException {
        Connection connection = null;
        Session session = null;
        try {
            connection = sMSQueueFactory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer messageProducer = session.createProducer(sMSQueue);
            messageProducer.setPriority(prioritas);
            
            TextMessage tm = session.createTextMessage();          
            tm.setText(uzenet);  
            
            messageProducer.send(tm);
            
            messageProducer.close();            
        } finally {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    log.fatal(e);
                }
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
  • Természetesen messageProducer.setPriority(prioritas) hívással állítjuk be a megfelelő prioritást, amivel az üzenet bekerül a JMS Queue-ba.

A kód többi része megegyezik bármely JMS példakóddal.

Próbáljuk ki!

Az egyszerű JSF oldal segítségével generálunk üzeneteket és figyeljük a log-ban megjelenő tényleges feldolgozási sorrendet. Valami hasonlót fogunk látni: (Log4J-t használtam, a log megtalálható a: ${com.sun.aas.instanceRoot}/logs/PrioritasDemo.log)

SMSQueueMessageBean:29 - 1. Alacsony prioritású, JMS Prioritás: 1
GeneralasManagedBean:40 - Nem fontos üzenetek generálása véget ért: 2013.09.24 19:14:15
SMSQueueMessageBean:29 - 2. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 3. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 4. Alacsony prioritású, JMS Prioritás: 1
GeneralasManagedBean:48 - Fontos üzenetek generálása véget ért: 2013.09.24 19:14:17
SMSQueueMessageBean:29 - 5. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 6. Alacsony prioritású, JMS Prioritás: 1
...
...
...
SMSQueueMessageBean:29 - 48. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 49. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 50. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 1. Magas prioritású, JMS Prioritás: 9

A tesztet sokszor futtatva azt tapasztaltam, hogy nem konzekvensen működik a kód, ha szabad ilyet mondani. Az esetek többségében a magas prioritású üzenet nem kerül előre a feldolgozási sorban, hanem legvégül kerül feldolgozásra.

Miért van ez? Rosszul használom a JMS prioritás kezelését? JMS hiba, nem kezeli a prioritást?  OpenMQ hiba, implementációs hiba? Sok fejvakarás és utána olvasás után rájöttem a megfejtésre, ami a következőkben rejlik. Minden JMS implementáció kicsit másképp kezeli a fenti problémát, van ahol erős megkötés van és minden queue-ba érkező üzenet után a JMS broker újra rendezi prioritás szerint az üzeneteket és ennek megfelelően adja át feldolgozásra az MDB-nek. Ez, lássuk be, nagy számú üzenet esetén károsan hat a teljesítményre. Van, ahol az „erős” prioritás kezelés bekapcsolható (pl.: ActiveMQ http://activemq.apache.org/how-can-i-support-priority-queues.html)

Az OpenMQ esetén a varázsszavak: Consumer flow control és imqConsumerFlowLimit paraméter állítása, ami nagy vonalakban annyit tesz, hogy mekkora a puffer. Ha puffer kiürül, akkor a feldolgozásra várakozó üzenetekből tölti fel a puffert, mégpedig prioritásuk szerint. Na ez az, ami nekünk kell!

Visszatérve próba projektünkhöz nézzük meg, hogyan van most beállítva a SMSQueue - Consumer Flow Limit értéke. Ezt a következő utasítással tudjuk elérni:

./imqcmd query dst -t q -n SMSPhysicalQueue

Az imqcmd parancsot a glassfish alkalmazás szerver mq/bin alkönyvtárában találjuk (felhasználó név és jelszó alapértelmezésben: admin/admin). Valami hasonlót fogunk látni, a lényegi rész:

…
------------------------------------
Destination Name Destination Type
------------------------------------
SMSPhysicalQueue Queue
On the broker specified by:
-------------------------
Host Primary Port
-------------------------
localhost 7676

Destination Name SMSPhysicalQueue
Destination Type Queue
…
Limit Behavior REJECT_NEWEST
Consumer Flow Limit 1000
Is Local Destination false
…

Hát igen, mivel soha nem volt egyszerre 1000 tétel a queue-ban, a puffer feltöltésekor nem rendezte át a sorrendet, ezért tapasztaltuk azt, hogy nem működik a JMS prioritás kezelése. Állítsuk át az értéket mondjuk 10-re. Glassfish esetén a beállítására vonatkozóan itt találunk iránymutatást:  http://openmessaging.blogspot.hu/2009/08/how-to-set-arbitrary-broker-properties.html

Röviden: az admin consolon a Configurations/server-config/Java Message Service form-on az OpenMQ indítási paraméterei (Start Arguments) közzé felvesszük az alábbi paramétert:

-Dimq.autocreate.queue.consumerFlowLimit=10

Újraindítás és ellenőrzés után újból futtatjuk a teszt alkalmazásunkat, amitől most azt várjuk hogy legkésőbb a magas prioritású üzenet elhelyezése után 10 üzenet feldolgozása után megjelenik a magas prioritású üzenet a log-ban.

 SMSQueueMessageBean:29 - 1. Alacsony prioritású, JMS Prioritás: 1
GeneralasManagedBean:40 - Nem fontos üzenetek generálása véget ért: 2013.09.24 20:06:28
SMSQueueMessageBean:29 - 2. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 3. Alacsony prioritású, JMS Prioritás: 1
GeneralasManagedBean:48 - Fontos üzenetek generálása véget ért: 2013.09.24 20:06:29
SMSQueueMessageBean:29 - 4. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 5. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 6. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 7. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 8. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 9. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 10. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 11. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 12. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 1. Magas prioritású, JMS Prioritás: 9
SMSQueueMessageBean:29 - 13. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 14. Alacsony prioritású, JMS Prioritás: 1
...
...
SMSQueueMessageBean:29 - 49. Alacsony prioritású, JMS Prioritás: 1
SMSQueueMessageBean:29 - 50. Alacsony prioritású, JMS Prioritás: 1

Mint láthatjuk a log-ból, ez így is történt. Boldogok vagyunk ;)

Futtató/Fejlesztő környezet:

  • Glassfish 3.1.2 (benne található OpenMQ),
  • JDK 1.6.x,
  • NetBeans 7.3
      
      
Page viewed times
#trackbackRdf ($trackbackUtils.getContentIdentifier($page) $page.title $trackbackUtils.getPingUrl($page))