Notifications enable a user to know that some event has occurred in the system. Data change notifications are events that are delivered by the Junos Space platform whenever a resource is added, modified, or deleted.
This topic has the following sections:
The following steps are needed to enable the notification generation mechanism on the respective code:
Configure hornetq-jms.xml file.
This is the main configuration file for enabling the
notifications support. It contains the configuration for the
JMS Topic on which notifications are sent.
The following configuration entry will be added in
hornetq-jms.xml to indicate the JMS topic.
<topic name="HelloWorldDatabaseChange">
<entry name="topic/HelloWorldDatabaseChange">
</topic>
Write a Message Driven Bean(MDB) extending GuiNotificationMDBean.
This message driven bean is the main consumer of the notifications. It is responsible for listening to the JMS messages received on the JMS topic that is configured in Step 1.
@MessageDriven(
name = "HelloWorldGuiNotificationMDB",
activationConfig = {
@ActivationConfigProperty(propertyName="destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName="destination", propertyValue="topic/HelloWorldDatabaseChange"),
@ActivationConfigProperty(propertyName="acknowledgeMode", propertyValue="Auto-acknowledge"),
@ActivationConfigProperty(propertyName="subscriptionDurability", propertyValue="NonDurable")
}
)
public class HelloWorldGuiNotificationMDB extends GuiNotificationMDBean{
public void internalOnMessage(Message message){
if( message instanceof ObjectMessage ) {
/* Custom Code */
}
}
}
Provide "JmpNotification" annotation on the respective managed object.
This annotation contains the JMS topic name configured in Step 1.
@JmpNotification(destination = "topic/HelloWorldDatabaseChange")
@XmlAccessorType(XmlAccessType.NONE)
public class Country extends AbstractManagedObject implements Serializable {
@XmlElement private int id = 0;
@XmlElement private String name;
@XmlElement private int population;
@XmlElement private Collection <State> states;
/* Custom Code */
}
The method for which data notifications are enabled should include "ApiContextInterface" as one of its parameters. The following steps are needed to generate notifications:
The EJB method implementation should use EntityManagerWrapper for database operations. Under the hood, this wrapper generates notifications for each ADD, UPDATE, and DELETE operation.
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public Country addCountry(ApiContextInterface ctx, Country country) throws Exception {
CountryEntity countryEntity = fromCountry2CountryEntity(country);
EntityManagerWrapper qm = new EntityManagerWrapper(manager);
qm.persist(countryEntity, ctx);
Collection<State> states = country.getStates();
For(State s:states) addStateToCountry(ctx,s, countryEntity.getId());
/* Custom Code */
return getCountry(countryEntity.getCountry());
}
The following code snippet describes how to invoke the EJB method from the REST resource to generate the notifications. The RESTified method should pass InternalPagingContext as a parameter. If null is passed, the EJB layer interceptors automatically create an instance of it.
public Country addCountry(Country param0) {
/**
* Put addCountry logic using
* net.juniper.jmp.helloworld.ejb.HelloWorld bean instance
*/
country = getBean().addCountry(new InternalPagingContext(),country);
/* Custom code */
}
The following are the code snippets in Java and Javascript for consuming messages that resulted from the notifications:
This code is primarily used by a client to subscribe to notifications on a queue. After the subscription, the client can poll the queue to read the messages from it. It is not mandatory to create the 'queue' every time. The notifications can be received on an already created queue as well.
Following is the sample Java code demonstrating the usage of subscription to receive notifications on a queue.
import java.net.URL;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.Credentials;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.auth.AuthScope;
import org.jboss.resteasy.client.ClientExecutor;
import org.jboss.resteasy.client.ClientRequest;
import org.jboss.resteasy.client.ClientResponse;
import org.jboss.resteasy.client.core.executors.ApacheHttpClientExecutor;
public class TopicPushSubscription {
public static void main(String[] args) throws Exception {
String curl = pushSubscription(
"http://space:8080/api/hornet-q/topics/jms.topic.database-changes",
"http://space:8080/api/hornet-q/queues/jms.queue.test",
"application/xml");
System.out.println(curl);
}
// @param topicUrlString - JMS Topic URL for subscription.
// @param queueUrlString - Queue URL used for subscription. Notifications will
// be pushed on this queue.
@SuppressWarnings( { "unused", "rawtypes", "unchecked" })
public static String pushSubscription(String topicUrlString,
String queueUrlString, String linkType) throws Exception {
URL topicUrl = new URL(topicUrlString); // Checking malformed URL
URL queueUrl = new URL(queueUrlString); // Checking malformed URL
if (linkType == null || linkType.length() == 0) {
throw new Exception("Invalid input");
}
else {
Credentials credentials = new UsernamePasswordCredentials("testUser","test@123"); // set up credentials
// Set up HTTP Client
HttpClient httpClient = new HttpClient();
httpClient.getState().setCredentials(AuthScope.ANY, credentials);
httpClient.getParams().setAuthenticationPreemptive(true);
ClientExecutor clientExecutor = new ApacheHttpClientExecutor(httpClient);
// Create Client Request on Topic URL
ClientRequest request = new ClientRequest(topicUrlString, clientExecutor);
ClientResponse response = request.head();
int status = response.getStatus();
if (status == Response.Status.OK.getStatusCode()) // OK!
{
MultivaluedMap headerMap = response.getHeaders();
// Get the Push Subscription header resulted from the HEAD request.
String pushSubscriptionUrl = headerMap.getFirst("msg-push-subscriptions");
httpClient = new HttpClient();
httpClient.getState().setCredentials(AuthScope.ANY, credentials);
httpClient.getParams().setAuthenticationPreemptive(true);
clientExecutor = new ApacheHttpClientExecutor(httpClient);
// Initiate push subscription.
request = new ClientRequest(pushSubscriptionUrl, clientExecutor);
request.header("Content-Type", "application/xml");
// Get Push subscription body having the queue url
request.body("application/xml", getPushSubscriptionBody(queueUrlString,linkType));
response = request.post();
status = response.getStatus();
if (status == Response.Status.CREATED.getStatusCode()) // OK!
{
// Return created Subscription URL. Need to be used for deletion of subscription.
return response.getLocation().getHref();
} else {
throw new Exception("Failed to push subscription");
}
} else {
throw new Exception("Topic is not present");
}
}
}
// Method to get the subscription body.
private static String getPushSubscriptionBody(String queueUrl, String linkType) {
return "<push-topic-registration>" + "<link type=\"" + linkType
+ "\" rel=\"destination\" href=\"" + queueUrl
+ "\"/></push-topic-registration>";
}
}
Click here to navigate to the sample Java code for receiving notifications.
For using database notification, we need to poll continuously at specified intervals. The client sends AJAX requests for fetching notifications (if any) from the corresponding queue.
Follow the steps below for implementing database notifications on the client side:
Using am AJAX request with the extJS API create a hornet queue on which notifications (using AJAX request with extJS the API) should get posted. The name string for the queue is generated by concatenating the qPrefix passed as a parameter, and time, in milliseconds at that moment. For creating the queue, we need to send an AJAX request (POST) to the REST service(/api/hornet-q/queues).
Example from HelloWorld application:
/*
* Function used to create queue for Data notifications.
* qPrefix: Name prefix for Queue to be created
* topicName: Topic to which the queue will be subscribed to.
*/
function createSortQueueAndPoll( qPrefix, topicName )
{
var sortQueueURL;
var d = new Date();
if( PollUtilModel.getInstance().getSortQueue() == null || PollUtilModel.getInstance().getSortQueue() == "" )
{
PollUtilModel.getInstance().setSortQueue( qPrefix + d.getTime() );
Ext.Ajax.request(
{
url: "/api/hornet-q/queues",
method: "POST",
headers: { "Content-Type": "application/hornetq.jms.queue+xml" },
success: function( response, opts ) { sortQueueURL = response.getResponseHeader("Location");registerForSubscriptions(sortQueueURL,topicName); },
xmlData: "false "
}
);
}
}
On the successful creation of the queue, send a request for registering the newly created queue with the database change topic. The topic name is passed as a path parameter to the REST service and the queue URL is passed as an XML payload. As shown below:
/*
* Function used to register the topic with the Queue passed
* sortQueueLocation: Queue URL
* topicName: Topic to which the queue will be subscribed to.
*
*/
function registerForSubscriptions(sortQueueLocation, topicName){
Ext.Ajax.request({
url: "/api/hornet-q/topics/"+topicName+"/push-subscriptions",
method: "POST",
headers: {"Content-Type": "application/xml"},
success: function(response, opts) { pollStatus(sortQueueLocation, "DBN"); },
xmlData: "<push-topic-registration><durable>false</durable><link type="application/xml" rel="destination" href="%22%20+%20sortQueueLocation%20+%20%22"></push-topic-registration>"
});
}
Now the created queue is ready for listening to database notifications.
After the successful registration for notification, poll the queue by calling the pollStatus() method, which has the URL of queue as its first parameter and its second parameter as "DBN".
/* Function used to poll the queue * for Data notifications and Async Task.
* lRRURL: Queue URL
* rtype : Notification type
*
*/
function pollStatus(lRRURL, rtype){
var ackNext;
var ackLRR;
var response;
Ext.Ajax.request({
url: lRRURL,
method: "GET",
success: function (responseQ) {
response = responseQ;
var pullSubscriptions = responseQ.getResponseHeader("msg-pull-consumers");
Ext.Ajax.request({
url: pullSubscriptions,
method: "POST",
callback: function (param, isSucceeded, responsePullConsumers) {
response = responsePullConsumers;
if(rtype=="DBN"){
ackNext = responsePullConsumers.getResponseHeader("msg-acknowledge-next");
getAsyncStatus(ackNext, lRRURL);
}
else if(rtype=="LRR"){
ackLRR = responsePullConsumers.getResponseHeader("msg-acknowledge-next");
getAsyncStatusLRR(ackLRR);
}
else{
alert("Notifications not supported");
}
},
params: { autoAck: "false" }
});
}
});
}
/* Function used to poll the queue for Data notifications
* ackNext: A variable to hold msg-acknowledge-next header
* lRRURL: Queue URL
*/
function getAsyncStatus(ackNext, lRRURL)
{
var keepPoll = true;
var pollInterval = 10000;
Ext.Ajax.request({
url: ackNext,
method: "POST",
callback: function (param, isSucceeded, responseNext) {
response = responseNext;
if(responseNext.status == 200){
var subscription = responseNext.responseXML;
var ack = responseNext.getResponseHeader("msg-acknowledgement");
Ext.Ajax.request({
url: ack,
method: "POST",
callback: function (param, isSucceeded, responseNextInner) {
ackNext = responseNextInner.getResponseHeader("msg-acknowledge-next");
},
params: { acknowledge: "true" }
});
var stateValue = Ext.DomQuery.selectValue("/progress-update/state", subscription, "ERROR");
var statusValue = Ext.DomQuery.selectValue("/progress-update/status", subscription, "ERROR");
var jpaOpValue = Ext.DomQuery.selectValue("/ManagedObjectInfo/jpaOperation", subscription, "ERROR");
var jpaOpStatusValue = Ext.DomQuery.selectValue("/ManagedObjectInfo/status", subscription, "ERROR");
if((stateValue=="DONE" && statusValue=="SUCCESS") || (jpaOpStatusValue=="1" && (jpaOpValue == "CREATE" || jpaOpValue == "DELETE" || jpaOpValue == "MODIFY")))
PollUtilModel.getInstance().getSortGrid().reloadData();
}
else if(responseNext.status == 503){
ackNext = responseNext.getResponseHeader("msg-acknowledge-next");
}
else{
}
},
headers : { 'Accept-Wait' : '10', 'Accept' : 'application/xml' }
});
if(keepPoll)
setTimeout("getAsyncStatus('" + ackNext + "')" ,parseInt(pollInterval), ackNext);
}
The two functions mentioned above execute the following tasks in sequence:
This section describes a workflow for receiving "data change notifications" using a browser-based REST client. Notifications are based on the Hornet-Q mechanism and it uses HornetQ messaging libraries and its REST/HTTP interface. For information about the HornetQ REST APIs, refer to http://docs.jboss.org/resteasy/hornetq-rest/1.0-beta-3/userguide/html/ch07.html.
The data change and asynchronous notification requires a HornetQ url for delivering notifications. You can use a single HornetQ url for multiple data change and asynchronous notifications. This ensures better resource utilization.
Note: In case you do not have a previously created queue, you can create a new one using the process given below.
Create a HornetQ queue over REST using the /api/hornet-q
service. In the following example, a REST client creates a
HornetQ named testq. If you are coding along with this
tutorial, make sure this queue does not exist already.
The Request and Response are shown for this and subsequent examples in this tutorial.
HTTP/1.1 POST : https://space.company.com/api/hornet-q/queues
Authorization : Basic c3VwZXI6cmFrZXNocmFqa2U=
Content-Type : application/hornetq.jms.queue+xml
<queue name="testq">
<durable>false</durable>
</queue>
Note: To create a durable queue, provide the durable value as "true".
Status Code : 201 Created
Server : Apache-Coyote/1.1
X-Powered-By : Servlet 2.4;JBoss-4.2.3.GA (build:SVNTag=JBoss_4_2_3_GA date=200807181439)/JBossWeb-2.0
Location: : http://space-0050569e0043:8080/api/hornet-q/queues/jms.queue.testq
Date : Fri,24 Sep 2010 10:08:53 GMT
Cache-Control : proxy-revalidate
Content-Length : 0
Proxy-Connection : Keep-Alive
Connection : Keep-Alive
Note: The location header contains the URL of the
created HornetQ. Although the URL returned in the location
header can be directly passed to another REST service, it can't
be used externally, because, http access is blocked.
To use the location header, externally, replace the FQDN portion
of the location with the one which was initially used to call
the /api/hornet-q/queues API.
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<xs:schema version="1.0" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="ManagedObjectInfo" type="managedObjectInfo" />
<xs:complexType name="managedObjectInfo">
<xs:sequence>
<xs:element name="objName" type="xs:string" />
<xs:element name="objType" type="xs:string" />
<xs:element name="rowId" type="xs:int" />
<xs:element name="entityCategory" type="xs:string" />
<xs:element name="jpaOperation" type="jpaOperationTypeEnum" />
<xs:element name="status" type="xs:int" />
<xs:element name="uri" type="xs:string" minOccurs="0" />
</xs:sequence>
</xs:complexType>
<xs:simpleType name="jpaOperationTypeEnum">
<xs:restriction base="xs:string">
<xs:enumeration value="CREATE" />
<xs:enumeration value="MODIFY" />
<xs:enumeration value="DELETE" />
</xs:restriction>
</xs:simpleType>
</xs:schema>
The Junos Space platform hosts a JMS topic named "database-changes". To receive notifications over a queue, the client must subscribe the queue created in Step 1 to the "database-changes" topic. The following steps explain the subscription workflow:
HEAD : https://space.company.com/api/hornet-q/topics/jms.topic.database-changes
Authorization : xxxxxxxxxxxxxxxxx
The response for this request contains multiple "Custom" headers. It is important for the client to pick up the "msg-push-subscriptions" header.
From the above, POST to the "msg-push-subscriptions" URL:
POST : https://space.company.com/api/hornet-q/topics/jms.topic.database-changes/push-subscriptions
Authorization : xxxxxxxxxxxxxxxxx
Content-type : application/xml
<push-topic-registration>
<durable>true</durable>
<selector><![CDATA[ target like '/api/space/user-management/users%' or target like '/api/space/device-management%']]>
</selector>
<link type="application/xml" rel="destination" href="http://<ip>:<port>/api/hornet-q/queues/jms.queue.testq"/>
</push-topic-registration>
Status Code: 201 Created
The response contains the "Location" header, which can be used to delete the subscription. The different elements of the Request XML are:
Create a pull consumer for the specified queue. This is two-step process: First, do an HTTP HEAD on the queue URL to get the pull consumer's URL, then do a Post on it to create a new consumer for the queue.
Note: If you have already created a pull consumer for the HornetQ that is used for subscription, you need not to follow this step.HTTP HEAD https://space.company.com/api/hornet-q/queues/jms.queue.testq
Status Code : 200 OK
Server : Apache-Coyote/1.1
X-Powered-By : Servlet 2.4;JBoss-4.2.3.GA (build:SVNTag=JBoss_4_2_3_GA date=200807181439)/JBossWeb-2.0
msg-pull-consumers : https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers
msg-create-with-id : https://space.company.com/api/hornet-q/queues/jms.queue.testq/create/{id}
msg-create: : https://space.company.com/api/hornet-q/queues/jms.queue.testq/create
msg-push-consumers : http://space.company.com/api/hornet-q/queues/jms.queue.testq/push-consumers
Date : Fri,24 Sep 2010 10:08:53 GMT
Cache-Control : proxy-revalidate
Content-Length : 0
Proxy-Connection : Keep-Alive
Connection : Keep-Alive
HTTP POST https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers
Status Code : 201 OK
Server : Apache-Coyote/1.1
X-Powered-By : Servlet 2.4;JBoss-4.2.3.GA (build:SVNTag=JBoss_4_2_3_GA date=200807181439)/JBossWeb-2.0
msg-consume-next : https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testq-1285333083076/consume-next-1
msg-consume-next-type : application/x-www-form-urlencoded
Date : Fri,24 Sep 2010 10:08:53 GMT
Cache-Control : proxy-revalidate
Content-Length : 0
Proxy-Connection : Keep-Alive
Connection : Keep-Alive
Here, the msg-consume-next header provides the URL to fetch data from HornetQ.
Fetching posted data from a queue using msg-consume-next headers
Each HTTP POST on a msg-consume-next URL provides data posted on the queue regarding a created user, and the response msg-consume-next header provides the URL for getting the next data posted on the queue. A client can continuously do HTTP POSTs on subsequent msg-consume-next URLs to obtain any other data notifications posted. For more information, see sample java code for polling the queue.
For example, assume only one user is currently present in Space:
Now add a new user named testUser to Space.
Now that testUser has been created, polling HornetQ should return that a new user has been added to Space.
The following provides an example.
HTTP POST https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testq-1285333083076/consume-next-1
Status Code : 200 OK
Server : Apache-Coyote/1.1
X-Powered-By : Servlet 2.4;JBoss-4.2.3.GA (build:SVNTag=JBoss_4_2_3_GA date=200807181439)/JBossWeb-2.0
msg-consume-next : https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testq-1285333083076/consume-next602
msg-consumer-type : application/xml
msg-consumer: : https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testqueue-1285333083076
msg-consumer-next-type : application/x-www-form-urlencoded
Date : Fri,24 Sep 2010 10:08:53 GMT
Cache-Control : proxy-revalidate
Content-Length : 0
Proxy-Connection : Keep-Alive
Connection : Keep-Alive
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ManagedObjectInfo>
<objName>testUser</objName>
<ObjType>net.juniper.jmp.cmp.systemService.security.UserTO</ObjType>
<rowId>524314</rowId>
<entityCategory>net.juniper.jmp.cmp.systemService.jpa.security.UserEntity</entityCategory>
<jpaOperation>CREATE</jpaOperation>
<status>1</status>
<uri>/api/space/user-management/users/524314</uri>
</ManagedObjectInfo>
This notification on HornetQ will come when a new user is created.
HTTP POST http://127.0.0.1:8080/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testq-1285333083076/consume-next602
Status Code : 200 OK
Server : Apache-Coyote/1.1
X-Powered-By : Servlet 2.4;JBoss-4.2.3.GA (build:SVNTag=JBoss_4_2_3_GA date=200807181439)/JBossWeb-2.0
msg-consume-next : https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testq-1285333083076/consume-next852
msg-consumer-type : application/xml
msg-consumer : https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testqueue-1285333083076
msg-consumer-next-type : application/x-www-form-urlencoded
Date : Fri,24 Sep 2010 10:08:53 GMT
Cache-Control : proxy-revalidate
Content-Length : 0
Proxy-Connection : Keep-Alive
Connection : Keep-Alive
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ManagedObjectInfo>
<objName>testUser</objName>
<ObjType>net.juniper.jmp.cmp.systemService.security.UserTO</ObjType>
<rowId>524314</rowId>
<entityCategory>net.juniper.jmp.cmp.systemService.jpa.security.UserEntity</entityCategory>
<jpaOperation>MODIFY</jpaOperation>
<status>1</status>
<uri>/api/space/user-management/users/524314</uri>
</ManagedObjectInfo>
Note: The above notification on HornetQ will come when a user is modified.
Now, delete testUser from Space.
The next HornetQ poll should return that testUser was deleted from Space:
HTTP POST https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testq-1285333083076/consume-next852
Status Code : 200 OK
Server : Apache-Coyote/1.1
X-Powered-By : Servlet 2.4;JBoss-4.2.3.GA (build:SVNTag=JBoss_4_2_3_GA date=200807181439)/JBossWeb-2.0
msg-consume-next : https://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testq-1285333083076/consume-next1017
msg-consumer-type : application/xml
msg-consumer : http://space.company.com/api/hornet-q/queues/jms.queue.testq/pull-consumers/auto-ack/1-queue-jms.queue.testqueue-1285333083076
msg-consumer-next-type : application/x-www-form-urlencoded
Date : Fri,24 Sep 2010 10:08:53 GMT
Cache-Control : proxy-revalidate
Content-Length : 0
Proxy-Connection : Keep-Alive
Connection : Keep-Alive
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ManagedObjectInfo>
<objName>testUser</objName>
<ObjType>net.juniper.jmp.cmp.systemService.security.UserTO</ObjType>
<rowId>524314</rowId>
<entityCategory>net.juniper.jmp.cmp.systemService.jpa.security.UserEntity</entityCategory>
<jpaOperation>DELETE</jpaOperation>
<status>1</status>
<uri>/api/space/user-management/users/524314</uri>
</ManagedObjectInfo>
The following steps enable a client to receive notifications.
The subscription process explained in Step 2 generates a "Location" in the Response header. This location URI is used to delete the subscription, as shown below.
DELETE : /api/hornet-q/topics/jms.topic.database-changes/push-subscriptions/<dynamic-part-of-location-URI>
HOST : https://space.company.com
Authorization : xxxxxxxxxxxxxxxxx
Status 200 OK