Egy látszólag egyszerű problémába szaladtam bele a mindennapi munkám során, a megoldásához bele kellett magam ásnom a JMS és az OpenMQ világába. Na de lássuk, mi is volt a feladat és a probléma:
Feladat: Tipikus SMS küldési feladat. A rendszerben, amit fejlesztünk, keletkeznek SMS-ek, amiket ki kellene küldeni. Az SMS-ek között vannak olyanok melyeknek „ráér” a küldése és vannak olyanok, melyeket minél hamarabb kézbesíteni kell. Az SMS-ek vegyesen keletkezhetnek. Van úgy, hogy kevésbé fontosból 500-600 keletkezik egy rövid időszak alatt. Úgy kell kiküldeni, hogy a fontos SMS-ek hamarabb kerüljenek kézbesítésre, mint a kevésbé fontosak, magyarán prioritásuk szerint.
Fontos peremfeltétel, hogy az SMS küldő szolgáltatónak (szerződésünk miatt) egy szálon adhatjuk oda az elküldendő SMS-eket. Maga az SMS küldése kb 0,5 másodpercet vesz igénybe, ami 600 SMS esetén kb. 5 perc.
Megvalósítás: JMS Queue-t hívtuk segítségül, melyet úgy paramétereztük hogy egy Receiver legyen, így biztosítottuk, hogy a szolgáltató felé egy szálon adjuk át a küldendő SMS-eket. A JMS biztosította azt, hogy bármilyen folyamat „dobhat be” a queue-ba SMS-t, ami küldésre kerül. A JMS-nek van beépített prioritás kezelése, így a prioritás kezelése is megoldottnak tűnt. Na ez az amiben tévedtem.
Probléma: Azt tapasztaltam, ha 500-600 alacsony prioritású SMS-t kapott a JMS és ezután fontos SMS -ek kerültek a queue-ba, akkor nem ugrottak a feldolgozási sor elejére a fontos SMS-ek, ezért csak az alacsony prioritású üzenetek után kerültek kiküldésre, ami nem megengedhető. Szeretném az alábbi néhány sorban bemutatni egy lecsupaszított példán keresztül, hogy mi is a hiba jelenség és milyen módon lehet megoldani.A JMS bemutatása nem cél, arra nagyon sok jó leírás található. Emlékeztetőül egy sematikus ábra:
http://docs.oracle.com/javaee/6/tutorial/doc/bnceh.html és http://docs.oracle.com/javaee/6/tutorial/doc/bncfa.html
A mellékelt PrioritasDemo egy tipikus JEE alkalmazás: PrioritasDemo.zip
A PrioritasDemo-war-ban semmi különös nincs, egy nagyon egyszerű JSF oldalt készítettem, ahol is különböző prioritású csomagokból megadott darabot fog elhelyezni a queue-ban Session Bean hívás segítségével. Ennél sokkal érdekesebb a PrioritasDemo-ejb, melyből a fontosabb, érdekesebb részeket kiemelem:
Az SMSQueue feldolgozásáért felelős MDB az SMSQueueMessageBean.java, mely most az egyszerűség kedvéért csak imitálja a szolgáltatónak való küldést, és helyette kiírja hogy milyen üzenetet kapott, és annak mi volt a prioritása. A kód lényegi része:
@Override public void onMessage(Message message) { try { TextMessage tm = (TextMessage) message; log.info(tm.getText() + ", \tJMS Prioritás: " + message.getJMSPriority() ); // Tényleges küldés a szolgáltatóhoz, ami kb 0,5 másodperc / SMS // ... Thread.sleep(500); } catch (Exception e) { log.fatal(e); } } |
Azt, hogy az SMS küldés egy szálon történjen, úgy oldottam meg, hogy a feldolgozó szálak számát maximalizáltam egyre, azaz nem kell mást tennem, csak glassfish-ejb-jar.xml-be a következő néhány sort beszúrni:
<ejb> <ejb-name>SMSQueueMessageBean</ejb-name> <bean-pool> <steady-pool-size>1</steady-pool-size> <max-pool-size>1</max-pool-size> </bean-pool> </ejb> |
Érdemes megfigyelni a @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) annotációt, ez azért került bele, hogy minden egyes üzenet külön tranzakcióba kerüljön a queue-ba. Ugyanis ha nem így tennénk, akkor a hívó Session Bean (jelen esetben: SMSGeneralasSessionBean.smsGeneralas) tranzakciójában futna, tehát a küldés nem kezdődne el, csak ha a hívó befejezte volna az időigényes munkáját:
@Override @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) public void sendJMSMessageToSMSQueue(String uzenet, int prioritas) throws JMSException { Connection connection = null; Session session = null; try { connection = sMSQueueFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer messageProducer = session.createProducer(sMSQueue); messageProducer.setPriority(prioritas); TextMessage tm = session.createTextMessage(); tm.setText(uzenet); messageProducer.send(tm); messageProducer.close(); } finally { if (session != null) { try { session.close(); } catch (JMSException e) { log.fatal(e); } } if (connection != null) { connection.close(); } } } |
A kód többi része megegyezik bármely JMS példakóddal.
Próbáljuk ki!
Az egyszerű JSF oldal segítségével generálunk üzeneteket és figyeljük a log-ban megjelenő tényleges feldolgozási sorrendet. Valami hasonlót fogunk látni: (Log4J-t használtam, a log megtalálható a: ${com.sun.aas.instanceRoot}/logs/PrioritasDemo.log)
SMSQueueMessageBean:29 - 1. Alacsony prioritású, JMS Prioritás: 1 GeneralasManagedBean:40 - Nem fontos üzenetek generálása véget ért: 2013.09.24 19:14:15 SMSQueueMessageBean:29 - 2. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 3. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 4. Alacsony prioritású, JMS Prioritás: 1 GeneralasManagedBean:48 - Fontos üzenetek generálása véget ért: 2013.09.24 19:14:17 SMSQueueMessageBean:29 - 5. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 6. Alacsony prioritású, JMS Prioritás: 1 ... ... ... SMSQueueMessageBean:29 - 48. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 49. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 50. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 1. Magas prioritású, JMS Prioritás: 9 |
A tesztet sokszor futtatva azt tapasztaltam, hogy nem konzekvensen működik a kód, ha szabad ilyet mondani. Az esetek többségében a magas prioritású üzenet nem kerül előre a feldolgozási sorban, hanem legvégül kerül feldolgozásra.
Miért van ez? Rosszul használom a JMS prioritás kezelését? JMS hiba, nem kezeli a prioritást? OpenMQ hiba, implementációs hiba? Sok fejvakarás és utána olvasás után rájöttem a megfejtésre, ami a következőkben rejlik. Minden JMS implementáció kicsit másképp kezeli a fenti problémát, van ahol erős megkötés van és minden queue-ba érkező üzenet után a JMS broker újra rendezi prioritás szerint az üzeneteket és ennek megfelelően adja át feldolgozásra az MDB-nek. Ez, lássuk be, nagy számú üzenet esetén károsan hat a teljesítményre. Van, ahol az „erős” prioritás kezelés bekapcsolható (pl.: ActiveMQ http://activemq.apache.org/how-can-i-support-priority-queues.html)
Az OpenMQ esetén a varázsszavak: Consumer flow control és imqConsumerFlowLimit paraméter állítása, ami nagy vonalakban annyit tesz, hogy mekkora a puffer. Ha puffer kiürül, akkor a feldolgozásra várakozó üzenetekből tölti fel a puffert, mégpedig prioritásuk szerint. Na ez az, ami nekünk kell!
Visszatérve próba projektünkhöz nézzük meg, hogyan van most beállítva a SMSQueue - Consumer Flow Limit értéke. Ezt a következő utasítással tudjuk elérni:
./imqcmd query dst -t q -n SMSPhysicalQueue |
Az imqcmd parancsot a glassfish alkalmazás szerver mq/bin alkönyvtárában találjuk (felhasználó név és jelszó alapértelmezésben: admin/admin). Valami hasonlót fogunk látni, a lényegi rész:
… ------------------------------------ Destination Name Destination Type ------------------------------------ SMSPhysicalQueue Queue On the broker specified by: ------------------------- Host Primary Port ------------------------- localhost 7676 Destination Name SMSPhysicalQueue Destination Type Queue … Limit Behavior REJECT_NEWEST Consumer Flow Limit 1000 Is Local Destination false … |
Hát igen, mivel soha nem volt egyszerre 1000 tétel a queue-ban, a puffer feltöltésekor nem rendezte át a sorrendet, ezért tapasztaltuk azt, hogy nem működik a JMS prioritás kezelése. Állítsuk át az értéket mondjuk 10-re. Glassfish esetén a beállítására vonatkozóan itt találunk iránymutatást: http://openmessaging.blogspot.hu/2009/08/how-to-set-arbitrary-broker-properties.html
Röviden: az admin consolon a Configurations/server-config/Java Message Service form-on az OpenMQ indítási paraméterei (Start Arguments) közzé felvesszük az alábbi paramétert:
-Dimq.autocreate.queue.consumerFlowLimit=10 |
Újraindítás és ellenőrzés után újból futtatjuk a teszt alkalmazásunkat, amitől most azt várjuk hogy legkésőbb a magas prioritású üzenet elhelyezése után 10 üzenet feldolgozása után megjelenik a magas prioritású üzenet a log-ban.
SMSQueueMessageBean:29 - 1. Alacsony prioritású, JMS Prioritás: 1 GeneralasManagedBean:40 - Nem fontos üzenetek generálása véget ért: 2013.09.24 20:06:28 SMSQueueMessageBean:29 - 2. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 3. Alacsony prioritású, JMS Prioritás: 1 GeneralasManagedBean:48 - Fontos üzenetek generálása véget ért: 2013.09.24 20:06:29 SMSQueueMessageBean:29 - 4. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 5. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 6. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 7. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 8. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 9. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 10. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 11. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 12. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 1. Magas prioritású, JMS Prioritás: 9 SMSQueueMessageBean:29 - 13. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 14. Alacsony prioritású, JMS Prioritás: 1 ... ... SMSQueueMessageBean:29 - 49. Alacsony prioritású, JMS Prioritás: 1 SMSQueueMessageBean:29 - 50. Alacsony prioritású, JMS Prioritás: 1 |
Mint láthatjuk a log-ból, ez így is történt. Boldogok vagyunk ;)