
On this post, I will discuss the problem about the problem I have often faced in my entire career. That is system integration. If you could imagine once you have to monitor data on database's table. Base on that change you need so show data in the page, etc. The solution for this situation, commonly most of what people will do is apply pooling. But pooling is not practical way to deal with this kind of situation.
Luckily, recently most of the database product provide asynchronize notification functionality to handle this situation. So, in this post, we will demonstrate how to leverage notification functionality of PostgreSQL 12, JMS, and Websocket to create real-time application that show inserting, updating and deleting in the database.
Table Content
Below is how our post is organized.
- Setup Notification in PostgreSQL 12
- Create Dynamic Web Project
- Reference
1.Setup Notification Service in PostgreSQL 12
First of all let work with PostgreSQL, in order to board cast any message or notification, we need to create a trigger to invoke notification when there is any chance occured in the database or table. The step to enable notification is below.
- Create function that return the trigger for boardcasting the notification
- Create trigger for inserting or deleting happened in the table
Supposed we have created table booktbl as below.
CREATE TABLE usr01.booktbl (
sid varchar NOT NULL,
title varchar NOT NULL,
author varchar NOT NULL,
CONSTRAINT booktbl_pkey PRIMARY KEY (sid)
);
And for function used to boardcast notification we create as below.
CREATE OR REPLACE FUNCTION usr01.queue_event()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
DECLARE
data json;
notification json;
BEGIN
-- Convert the old or new row to JSON, based on the kind of action.
-- Action = DELETE? -> OLD row
-- Action = INSERT or UPDATE? -> NEW row
IF (TG_OP = 'DELETE') THEN
data = row_to_json(OLD);
ELSE
data = row_to_json(NEW);
END IF;
-- Contruct the notification as a JSON string.
notification = json_build_object(
'table',TG_TABLE_NAME,
'action', TG_OP,
'data', data);
-- Execute pg_notify(channel, notification)
PERFORM pg_notify('q_event',notification::text);
-- Result is ignored since this is an AFTER trigger
RETURN NULL;
END;
$function$
Create trigger invoked once insert or delete data from the table (booktbl)
-- trigger once new record is added to the table
create trigger queue_notify_add after
insert
on
usr01.booktbl for each row execute function queue_event();
--- trigger once record is deleted from the table
create trigger queue_notify_delete after
delete
on
usr01.booktbl for each row execute function queue_event();
Then our work to enable PostgreSQL notification service is done.
2.Create Dynamic Project
After setting up postgresql, we need to create program to listen to notification from PostgreSQL. So, in this post we create web application that will receive the notification message and show in the page. Once creating dynamic web project, import dependecy as below pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>pro.itstikk</groupId>
<artifactId>java-ee-10-web</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>java-ee-10-web</name>
<url>https://blog.tikkwiki.pro</url>
<description>Jarkata EE project for PostgreSQL and WebSocket</description>
<dependencies>
<dependency>
<groupId>postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.1-901.jdbc4</version>
</dependency>
<dependency>
<groupId>com.impossibl.pgjdbc-ng</groupId>
<artifactId>pgjdbc-ng</artifactId>
<version>0.7.1</version>
</dependency>
</dependencies>
<build>
<finalName>java-ee-10-web</finalName>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<warSourceDirectory>webapps</warSourceDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
Next is to create connection notification listener from Postgres by creating database connection, listener and attaching blocking queue that used to store notification from database to the listener.
package pro.itstikk.listener;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import com.impossibl.postgres.jdbc.PGDataSource;
public class NotificationListener {
private static Logger logger = Logger.getLogger(NotificationListener.class.getName());
// Create the queue that will be shared by the producer and consumer
private static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
// Get database info from environment variables
private static final String DBHost = "192.168.230.8";
private static final String DBName = "sammy";
private static final String DBUserName = "sammy";
private static final String DBPassword = "qf48d8uv";
private static final int DBPort = 5432;
// Database connection
// private static PGDataSource dataSource;
// private static PGConnection connection;
static {
try {
// Create a data source for logging into the db
PGDataSource dataSource = new PGDataSource();
dataSource.setHost(DBHost);
dataSource.setPort(DBPort);
dataSource.setDatabase(DBName);
dataSource.setUser(DBUserName);
dataSource.setPassword(DBPassword);
// Log into the db
PGConnection connection = (PGConnection) dataSource.getConnection();
logger.log(Level.ALL, "Connection to PostgreSQL Successful !");
System.out.println("Connection to PostgreSQL Successful !");
// add the callback listener created earlier to the connection
connection.addNotificationListener(new PGNotificationListener() {
@Override
public void notification(int processId, String channelName, String payload) {
// Add event and payload to the queue
queue.add("/channels/" + channelName + " " + payload);
}
});
logger.log(Level.ALL, "Attached Listener to the Connection Successful !");
System.out.println("Attached Listener to the Connection Successful !");
// Tell PostgreSQL to send NOTIFY q_event to our connection and listener
Statement statement = connection.createStatement();
statement.execute("LISTEN queue_event");
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
public NotificationListener() {
super();
}
/**
* @return shared queue
*/
public static BlockingQueue<String> getQueue() {
return queue;
}
}
And we create thread that run above program as following.
package pro.itstikk.thread.runner;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.websocket.Session;
import pro.itstikk.listener.NotificationListener;
public class NotificationWorker implements Runnable {
private static Logger logger = Logger.getLogger(NotificationWorker.class.getName());
// Get the shared queue
private static BlockingQueue<String> queue = NotificationListener.getQueue();
private Session session;
public NotificationWorker(final Session session) {
this.session = session;
}
@Override
public void run() {
// start notification listener
logger.log(Level.ALL,"Notification listener is running ......!");
// Loop forever pulling messages off the queue
while (true) {
try {
// queue blocks until something is placed on it
String msg = queue.take();
// Do something with the event
logger.log(Level.ALL,"PostgreSQL Notification :"+msg);
// Send message through Websocket to client
session.getAsyncRemote().sendText(msg);
} catch (InterruptedException e) {
logger.log(Level.ALL, "Error occured while processing message"+e.getMessage());
e.printStackTrace();
}
}
}
}
Then we create websocket end point for client to connect.
package pro.itstikk.websocket.endpoint;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import pro.itstikk.thread.runner.NotificationWorker;
@ServerEndpoint(value="/endpoint")
public class WebsocketEndPoint {
private static Logger logger = Logger.getLogger(WebsocketEndPoint.class.getName());
@OnOpen
public void onOpen(Session session) {
logger.log(Level.INFO,"onOpen::" + session.getId());
String info = "Hello Client " + session.getId() + "!";
logger.log(Level.INFO,info);
// start notification listener
NotificationWorker runnable = new NotificationWorker(session);
Thread thread = new Thread(runnable, "Notification Thread");
thread.start();
logger.log(Level.INFO, "Listener for the notification started.");
}
@OnClose
public void onClose(Session session) {
String str = "onClose::" + session.getId();
logger.log(Level.INFO, str);
}
@OnMessage
public void onMessage(String message, Session session) throws IOException {
String str = " Server Site Message= " + message;
logger.log(Level.INFO,str);
session.getBasicRemote().sendText("Hello Client ... !");
}
@OnError
public void onError(Throwable t) {
logger.log(Level.ALL,"onError:: occured"+t.getMessage());
}
}
Finally, for the client we just create simple javascript program as below.
<%@ page language="java" contentType="text/html; charset=UTF-8"
pageEncoding="ISO-8859-1"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Clients</title>
</head>
<body>
<h2>Message from PostgreSQL is below.</h2>
<div id="msg">
</div>
<script language="javascript" type="text/javascript">
var wsUri = "ws://localhost:8080/java-ee-10-web/endpoint";
var ws = new WebSocket(wsUri);
ws.onopen = function() {
console.log("Web Socket is connected!!");
setInterval(function(){ws.send("keep alive .... !");},10000);
};
ws.onmessage = function(evt) {
var msg = evt.data;
console.log("Message received:" + msg);
var tag = document.createElement("p");
var text = document.createTextNode(msg);
tag.appendChild(text);
var element = document.getElementById("msg");
element.appendChild(tag);
};
ws.onclose = function() {
console.log("Connection is closed...");
};
</script>
</body>
</html>
Below is the result, once we run the program.
Once we insert data to the booktbl table, we got below result shown in the page.
do $$
<<FIRSTBLOCK>>
declare
PUSRNAME VARCHAR ;
C INTEGER;
begin
/* */
for C in 1..50 LOOP
pusrname := '000'|| cast (C as VARCHAR);
insert into sammy.sammy.booktbl (sid,title,author ) values (pusrname,'bookname'||pusrname,'author'||pusrname);
end loop;
commit;
end FIRSTBLOCK $$;
And once we delete data from booktbl table
DELETE FROM booktbl;
No comments:
Post a Comment