Utilize PostgreSQL messaging feature

Messaging service is a feature that is available in PostgreSQL 11. This service has a great benefit to notify change happened in the database once someone modified database. PostgreSQL provides pg_notify to sending message out for subscriber who listening to the queue.

Create function

you can use any database IDE you create a function in the database as below.
CREATE OR REPLACE FUNCTION usr01.queue_event()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
DECLARE
data json;
notification json;
BEGIN
-- Convert the old or new row to JSON, based on the kind of action.
-- Action = DELETE? -> OLD row
-- Action = INSERT or UPDATE? -> NEW row
IF (TG_OP = 'DELETE') THEN
data = row_to_json(OLD);
ELSE
data = row_to_json(NEW);
END IF;

-- Contruct the notification as a JSON string.
notification = json_build_object(
'table',TG_TABLE_NAME,
'action', TG_OP,
'data', data);

-- Execute pg_notify(channel, notification)
PERFORM pg_notify('q_event',notification::text);

-- Result is ignored since this is an AFTER trigger
RETURN NULL;
END;
$function$

create trigger once add or delete data from the table

No to trigger the notification once data is added or deleted from the table, you need to create a trigger.
-- trigger once new record is added to the table
create trigger queue_notify_add after
insert
    on
    usr01.usr01tbl for each row execute function queue_event();
and create trigger that call once delete record from the table.
--- trigger once record is deleted from the table

create trigger queue_notify_delete after
delete
    on
    usr01.usr01tbl for each row execute function queue_event();

Create Client to listen to the message from Postgres

To receive message send from the server, you can create java client as below. For the dependencies packages, you might need to add 2 of the packages below to your pom.xml
<dependencies>
  <dependency>
    <groupId>postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>9.1-901.jdbc4</version>
</dependency>

<dependency>
    <groupId>com.impossibl.pgjdbc-ng</groupId>
    <artifactId>pgjdbc-ng</artifactId>
    <version>0.7.1</version>
</dependency>
</dependencies>

For the Java code, you could create connector as below.

package com.pg.data;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import com.impossibl.postgres.jdbc.PGDataSource;

public class ListenNotify {
// Create the queue that will be shared by the producer and consumer
private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);

// Database connection
PGConnection connection;

public ListenNotify() {
// Get database info from environment variables
String DBHost = "pgdbsvr.itstikk.pro";
String DBName = "usr01";
String DBUserName = "usr01";
String DBPassword = "password123";

// Create the listener callback
PGNotificationListener listener = new PGNotificationListener() {
@Override
public void notification(int processId, String channelName, String payload) {
// Add event and payload to the queue
queue.add("/channels/" + channelName + " " + payload);
}
};

try {
// Create a data source for logging into the db
PGDataSource dataSource = new PGDataSource();
dataSource.setHost(DBHost);
dataSource.setPort(5432);
dataSource.setDatabase(DBName);
dataSource.setUser(DBUserName);
dataSource.setPassword(DBPassword);

// Log into the db
connection = (PGConnection) dataSource.getConnection();

// add the callback listener created earlier to the connection
connection.addNotificationListener(listener);

// Tell Postgres to send NOTIFY q_event to our connection and listener
Statement statement = connection.createStatement();
statement.execute("LISTEN q_event");
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}

/**
* @return shared queue
*/
public BlockingQueue<String> getQueue() {
return queue;
}

}
Next is to create Main class
package com.pg.data;

import java.util.concurrent.BlockingQueue;

public class Main {

public static void main(String[] args) {
ListenNotify ln = new ListenNotify();

// Get the shared queue
BlockingQueue<String> queue = ln.getQueue();

// Loop forever pulling messages off the queue
while (true) {
try {
// queue blocks until something is placed on it
String msg = queue.take();

// Do something with the event
System.out.println(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

}




No comments:

Post a Comment

Feature Recently

Running Wildfly Application Server in Domain Mode

  Wildfly application server provides two modes of how to run application one wildfly application server. It is very simple if you run your ...

Most Views