wiki:jazz/ActiveMQ/08-10-27

2008-10-27

關於 Portfolio 範例

  • 下載 ActiveMQ 5.1.0 Release, 解壓縮,並執行 ActiveMQ Message Broker
    ~$ wget http://ftp.twaren.net/Unix/Web/apache/activemq/apache-activemq/5.1.0/apache-activemq-5.1.0-bin.tar.gz
    ~$ tar zxvf apache-activemq-5.1.0-bin.tar.gz
    ~$ cd apache-activemq-5.1.0/
    ~/apache-activemq-5.1.0$ ./bin/activemq
    
  • 連線到 http://127.0.0.1:8161/demo/

  • 用不同的視窗執行 Portfolio 範例,下圖右上方是"Market data publisher"網頁(Message Provider),左上方是"Portfolio"網頁(Message Receiver),下方是 ActiveMQ Message Broker。我們可以看到 Message Sender/Provider? 網頁每兩秒送出一筆新的訊息。而 Message Receiver/Subscriber? 則會用顏色標示出目前更新的情形。

  • 這個範例證實了 ActiveMQ 可以用 JavaScript 實做出加入 Message Broker 後即時更新的可能性,因此值得追蹤其背後的原理。

追蹤原始碼(1) - Message Sender / Provider

  • "Market data publisher"網頁(Message Provider)的網址比較特殊,是 "portfolioPublish?count=1&refresh=2&stocks=IBMW&stocks=BEAS&stocks=MSFT&stocks=SUNW",直接觀看 HTML 原始碼並沒有 JavaScript 的蹤影,因此它應該是 JSP 網頁。而在 apache-activemq-5.1.0\webapps\demo\WEB-INF\web.xml 中,我們就可以找到答案了~
        <!-- servlets for the portfolio demo -->
        <servlet>
            <servlet-name>PortfolioPublishServlet</servlet-name>
            <servlet-class>org.apache.activemq.web.PortfolioPublishServlet</servlet-class>
            <load-on-startup>1</load-on-startup>
        </servlet>
    
    ... 略 ...
    
        <servlet-mapping>
            <servlet-name>PortfolioPublishServlet</servlet-name>
            <url-pattern>/portfolioPublish</url-pattern>
        </servlet-mapping>
    
  • 因此,"Market data publisher"網頁(Message Provider)的原始碼必須看 activemq.jar 裡包的 PortfolioPublishServlet.class 對應的原始碼。
  • 請下載 ActiveMQ 5.1.0 Source Release
    ~$ wget http://ftp.twaren.net/Unix/Web/apache/activemq/apache-activemq/5.1.0/apache-activemq-5.1.0-src.tar.gz
    ~$ tar zxvf apache-activemq-5.1.0-src.tar.gz
    ~$ cd apache-activemq-5.1.0/src/activemq-web/src/main/java/org/apache/activemq/web/
    ~$ vi PortfolioPublishServlet.java
    
    • 路徑: apache-activemq-5.1.0/src/activemq-web/src/main/java/org/apache/activemq/web/PortfolioPublishServlet.java

追蹤原始碼(2) - Message Subsciber / Receiver

  • 觀察"Portfolio"網頁(Message Receiver)的原始碼
    <script type="text/javascript" src="../amq/amq.js"></script>
    <script type="text/javascript">amq.uri='../amq';</script>
    <script type="text/javascript" src="portfolio.js"></script>
    
  • 下載所有相關的 JavaScript 原始碼
    ~$ wget http://127.0.0.1:8161/demo/portfolio/portfolio.js
    ~$ wget http://127.0.0.1:8161/demo/amq/amq.js
    
  • 從 amq.js 又可以找出其他 JavaScript 原始碼
    var _AMQ_INCLUDE = {
      Version: 'AMQ JS',
      script: function(libraryName) {
        document.write('<script type="text/javascript" src="'+libraryName+'"></script>');
      },
      load: function() {
        var scriptTags = document.getElementsByTagName("script");
        for(var i=0;i<scriptTags.length;i++) {
          if(scriptTags[i].src && scriptTags[i].src.match(/amq\.js$/)) {
            var path = scriptTags[i].src.replace(/amq\.js$/,'');
            this.script(path + 'prototype.js');
            this.script(path + 'behaviour.js');
            this.script(path + '_amq.js');
            // this.script(path + 'scriptaculous.js');
            break;
          }
        }
      }
    }
    
    _AMQ_INCLUDE.load();
    
  • 下載其他 JavaScript 原始碼
    ~$ wget http://127.0.0.1:8161/demo/amq/prototype.js
    ~$ wget http://127.0.0.1:8161/demo/amq/behaviour.js
    ~$ wget http://127.0.0.1:8161/demo/amq/_amq.js
    
  • 其中 prototype.js 跟 behaviour.js 都是跟 AJAX 與 CSS 相關的 JavaScript Framework
  • 剩下 portfolio.js 跟 _amq.js 是比較重要的兩個 script. 首先來看 portfolio.js
    // 註冊 PollHandler
    // 參考: _amq.js - addPollHandler : function(func)
    //              * func: 函數指標(Function Pointer)
    amq.addPollHandler(portfolioPoll);
    
    // 這裡的 first 定義在 _amq.js 裡,預設值 _first: true
    function portfolioPoll(first)
    {
       if (first)
       {
         // 參考: _amq.js - addListener : function(id,destination,handler)
         //              * id: Handler id
         //              * destination: Listen to a Message Channel or Topic
         //              * handler: 函數指標, 發生事件時的 callback 函數, 必須能處理 message 函數
         amq.addListener('stocks','topic://STOCKS.*',priceHandler._price);
       }
    }
    
    // priceHandler 類別的 _price 函數宣告
    var priceHandler = 
    {
      _price: function(message) 
      // 剩下的部份就是在處理 message 中的資料,忽略不看
    }
    
  • _amq.js
    // 一開始載入時會透過 Behaviour 啟動 amq._startPolling
    Behaviour.addLoadEvent(amq._startPolling);
    
    var amq =
    {
      uri: '/amq',   // 這在 portfolio.html 中已被取代為 amq.uri='../amq'
      poll: true,    // 預設 poll 為 true
    
      _startPolling : function()
      {
       if (amq.poll) // 預設 amq.poll 為 true
          // 產生 AJAX Request
          // 參考: prototype.js - Ajax.Request(url, options) = 建構子(Constructor)
          // 
          // 626 Ajax.Request.prototype = Object.extend(new Ajax.Base(), {
          // 627    initialize: function(url, options) {
          //
          // 這裡的意思是當 AJAX XmlHttpRequest 完成 get amq.uri 時,執行 amq._pollHandler
          new Ajax.Request(amq.uri, { method: 'get', parameters: 'timeout=0', onSuccess: amq._pollHandler });
      }
    
      _pollHandler: function(request)
      {
        amq.startBatch();
        try
        {
          amq._messageHandler(request);
          amq._pollEvent(amq._first);     // amq._pollEvent 等同於呼叫 portfolio.js 的 portfolioPoll(first)
          amq._first=false;
        }
        catch(e)
        {
            alert(e);
        }
        amq.endBatch();
    
        if (amq._pollDelay>0)
          setTimeout('amq._sendPoll()',amq._pollDelay);
        else
          amq._sendPoll();
      },
    
      startBatch: function()
      {
        amq._queueMessages++;
      },
    
      _messageHandler: function(request)
      {
        try
        {
          if (request.status == 200)
          {
            var response = request.responseXML.getElementsByTagName("ajax-response");
            if (response != null && response.length == 1)
            {
              for ( var i = 0 ; i < response[0].childNodes.length ; i++ )
              {
                var responseElement = response[0].childNodes[i];
    
                // only process nodes of type element.....
                if ( responseElement.nodeType != 1 )
                  continue;
    
                var id   = responseElement.getAttribute('id');
    
    
                var handler = amq._handlers[id];
                if (handler!=null)
                {
                  for (var j = 0; j < responseElement.childNodes.length; j++)
                  {
                    handler(responseElement.childNodes[j]);
                }
                }
              }
            }
          }
        }
        catch(e)
        {
          alert(e);
        }
      },
    
      endBatch: function()
      {
        amq._queueMessages--;
        if (amq._queueMessages==0 && amq._messages>0)
        {
          var body = amq._messageQueue;
          amq._messageQueue='';
          amq._messages=0;
          amq._queueMessages++;
          new Ajax.Request(amq.uri, { method: 'post', postBody: body, onSuccess: amq.endBatch});
        }
      },
    
Last modified 15 years ago Last modified on Feb 16, 2009, 1:44:23 PM

Attachments (9)

Download all attachments as: .zip