Real-Time Application with Websocket and Postgres

On this post, I will discuss the problem about the problem I have often faced in my entire career. That is system integration. If you could imagine once you have to monitor data on database's table. Base on that change you need so show data in the page, etc. The solution for this situation, commonly most of what people will do is apply pooling. But pooling is not practical way to deal with this kind of situation.

Luckily, recently most of the database product provide asynchronize notification functionality to handle this situation. So, in this post, we will demonstrate how to leverage notification functionality of PostgreSQL 12, JMS, and Websocket to create real-time application that show inserting, updating and deleting in the database.

Table Content

Below is how our post is organized.

  1. Setup Notification in PostgreSQL 12
  2. Create Dynamic Web Project
  3. Reference

1.Setup Notification Service in PostgreSQL 12

First of all let work with PostgreSQL, in order to board cast any message or notification, we need to create a trigger to invoke notification when there is any chance occured in the database or table. The step to enable notification is below.

  1. Create function that return the trigger for boardcasting the notification
  2. Create trigger for inserting or deleting happened in the table

Supposed we have created table booktbl as below.

booktbl.sql
CREATE TABLE usr01.booktbl (
	sid varchar NOT NULL,
	title varchar NOT NULL,
	author varchar NOT NULL,
	CONSTRAINT booktbl_pkey PRIMARY KEY (sid)
);

And for function used to boardcast notification we create as below.

queueEvent.sql
CREATE OR REPLACE FUNCTION usr01.queue_event()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
 DECLARE
 data json;
 notification json;
 BEGIN
-- Convert the old or new row to JSON, based on the kind of action.
-- Action = DELETE? -> OLD row
-- Action = INSERT or UPDATE? -> NEW row
IF (TG_OP = 'DELETE') THEN
data = row_to_json(OLD);
ELSE
data = row_to_json(NEW);
END IF;

-- Contruct the notification as a JSON string.
notification = json_build_object(
'table',TG_TABLE_NAME,
'action', TG_OP,
'data', data);

-- Execute pg_notify(channel, notification)
PERFORM pg_notify('q_event',notification::text);

-- Result is ignored since this is an AFTER trigger
RETURN NULL;
 END;
$function$
    

Create trigger invoked once insert or delete data from the table (booktbl)

trigger-queue.sql
-- trigger once new record is added to the table
create trigger queue_notify_add after
insert
    on
    usr01.booktbl for each row execute function queue_event();
    
 --- trigger once record is deleted from the table

create trigger queue_notify_delete after
delete
    on
    usr01.booktbl for each row execute function queue_event();

Then our work to enable PostgreSQL notification service is done.

2.Create Dynamic Project

After setting up postgresql, we need to create program to listen to notification from PostgreSQL. So, in this post we create web application that will receive the notification message and show in the page. Once creating dynamic web project, import dependecy as below pom.xml

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>pro.itstikk</groupId>
  <artifactId>java-ee-10-web</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>war</packaging>
  <name>java-ee-10-web</name>
  <url>https://blog.tikkwiki.pro</url>
  <description>Jarkata EE project for PostgreSQL and WebSocket</description>
  
  <dependencies>
  	<dependency>
		<groupId>postgresql</groupId>
		<artifactId>postgresql</artifactId>
		<version>9.1-901.jdbc4</version>
	</dependency>

	<dependency>
		<groupId>com.impossibl.pgjdbc-ng</groupId>
		<artifactId>pgjdbc-ng</artifactId>
		<version>0.7.1</version>
	</dependency>
	
  </dependencies>
  
  <build>
  	<finalName>java-ee-10-web</finalName>
    <sourceDirectory>src</sourceDirectory>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.7.0</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
      <plugin>
        <artifactId>maven-war-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <warSourceDirectory>webapps</warSourceDirectory>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

Next is to create connection notification listener from Postgres by creating database connection, listener and attaching blocking queue that used to store notification from database to the listener.

Title
package pro.itstikk.listener;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import com.impossibl.postgres.jdbc.PGDataSource;

public class NotificationListener {
	private static Logger logger = Logger.getLogger(NotificationListener.class.getName());
	
	// Create the queue that will be shared by the producer and consumer
	private static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
	
	// Get database info from environment variables
	private static final String DBHost 		= "192.168.230.8";
	private static final String DBName 		= "sammy";
	private static final String DBUserName 	= "sammy";
	private static final String DBPassword 	= "qf48d8uv";
	private static final int 	DBPort 		= 5432;
	
	// Database connection
	// private static PGDataSource dataSource;
	// private static PGConnection connection;
	
	static {

		try {
			
			// Create a data source for logging into the db
			PGDataSource dataSource = new PGDataSource();
			dataSource.setHost(DBHost);
			dataSource.setPort(DBPort);
			dataSource.setDatabase(DBName);
			dataSource.setUser(DBUserName);
			dataSource.setPassword(DBPassword);

			// Log into the db
			PGConnection connection = (PGConnection) dataSource.getConnection();
			
			logger.log(Level.ALL, "Connection to PostgreSQL Successful !");
			System.out.println("Connection to PostgreSQL Successful !");
			// add the callback listener created earlier to the connection
			connection.addNotificationListener(new PGNotificationListener() {
				@Override
				public void notification(int processId, String channelName, String payload) {
					// Add event and payload to the queue
					queue.add("/channels/" + channelName + " " + payload);
				}
			});
			logger.log(Level.ALL, "Attached Listener to the Connection Successful !");
			System.out.println("Attached Listener to the Connection Successful !");
			// Tell PostgreSQL to send NOTIFY q_event to our connection and listener
			Statement statement = connection.createStatement();
			statement.execute("LISTEN queue_event");
			statement.close();
			
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}

	public NotificationListener() {
		super();
	}
	
	/**
	 * @return shared queue
	 */
	public static BlockingQueue<String> getQueue() {
		return queue;
	}
}

And we create thread that run above program as following.

Title
package pro.itstikk.thread.runner;

import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.websocket.Session;


import pro.itstikk.listener.NotificationListener;

public class NotificationWorker implements Runnable {
	private static Logger logger = Logger.getLogger(NotificationWorker.class.getName());
	// Get the shared queue
	private static BlockingQueue<String> queue = NotificationListener.getQueue();
	private Session session;
	
	public NotificationWorker(final Session session) {
		this.session = session;
	}
	
	@Override
	public void run() {
		// start notification listener
		logger.log(Level.ALL,"Notification listener is running ......!");
		
		// Loop forever pulling messages off the queue
		while (true) {
			try {
				// queue blocks until something is placed on it
				String msg = queue.take();

				// Do something with the event
				logger.log(Level.ALL,"PostgreSQL Notification :"+msg);
				// Send message through Websocket to client
				session.getAsyncRemote().sendText(msg);

			} catch (InterruptedException e) {
				logger.log(Level.ALL, "Error occured while processing message"+e.getMessage());
				e.printStackTrace();
			}
		}
	}

}

Then we create websocket end point for client to connect.

WebsocketEndPoint.java
package pro.itstikk.websocket.endpoint;

import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import pro.itstikk.thread.runner.NotificationWorker;

@ServerEndpoint(value="/endpoint")
public class WebsocketEndPoint {
	private static Logger logger = Logger.getLogger(WebsocketEndPoint.class.getName());
	
	@OnOpen
    public void onOpen(Session session) {
		logger.log(Level.INFO,"onOpen::" + session.getId());
		String info = "Hello Client " + session.getId() + "!";
		logger.log(Level.INFO,info);
		
		// start notification listener
		
		NotificationWorker runnable = new NotificationWorker(session);
		Thread thread = new Thread(runnable, "Notification Thread");
		thread.start();
		logger.log(Level.INFO, "Listener for the notification started.");
		
		
    }
	
    @OnClose
    public void onClose(Session session) {
    	String str = "onClose::" +  session.getId();
        logger.log(Level.INFO, str);
    }
    
    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
		String str = " Server Site Message= " + message;
		logger.log(Level.INFO,str);
		session.getBasicRemote().sendText("Hello Client ... !");
    }
    
    @OnError
    public void onError(Throwable t) {
        logger.log(Level.ALL,"onError:: occured"+t.getMessage());
    }
    
}

Finally, for the client we just create simple javascript program as below.

default.jsp
<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="ISO-8859-1"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Clients</title>
</head>
<body>
<h2>Message from PostgreSQL is below.</h2>
<div id="msg">
</div>
<script language="javascript" type="text/javascript">

	var wsUri = "ws://localhost:8080/java-ee-10-web/endpoint";
	var ws = new WebSocket(wsUri);
	ws.onopen = function() {
		console.log("Web Socket is connected!!");
		setInterval(function(){ws.send("keep alive .... !");},10000);
		
	};
	ws.onmessage = function(evt) {
		var msg = evt.data;
		console.log("Message received:" + msg);
		var tag = document.createElement("p");
		var text = document.createTextNode(msg);

		tag.appendChild(text);
		
		var element = document.getElementById("msg");
		element.appendChild(tag);
	};
	ws.onclose = function() {
		console.log("Connection is closed...");
	};

	
</script>
</body>
</html>

Below is the result, once we run the program.

Once we insert data to the booktbl table, we got below result shown in the page.

do $$
<<FIRSTBLOCK>>
declare
	PUSRNAME VARCHAR ;
	C INTEGER;
begin

	/* */
	for C in 1..50 LOOP	
		pusrname := '000'|| cast (C as VARCHAR);
		insert into sammy.sammy.booktbl (sid,title,author ) values (pusrname,'bookname'||pusrname,'author'||pusrname);
	end loop;

	commit;

end FIRSTBLOCK $$;

And once we delete data from booktbl table

DELETE FROM booktbl;

4.Reference

  1. Utilize PostgreSQL messaging feature
  2. Integrate ActiveMQ with Wildfly 10 and Secure SSL Connection
  3. JMS (Java Messagin Service) with Wildfly 10

No comments:

Post a Comment

Feature Recently

Running Wildfly Application Server in Domain Mode

  Wildfly application server provides two modes of how to run application one wildfly application server. It is very simple if you run your ...

Most Views