Create function
you can use any database IDE you create a function in the database as below.
CREATE OR REPLACE FUNCTION usr01.queue_event()
RETURNS triggerLANGUAGE plpgsql
AS $function$
DECLARE
data json;
notification json;
BEGIN
-- Convert the old or new row to JSON, based on the kind of action.
-- Action = DELETE? -> OLD row
-- Action = INSERT or UPDATE? -> NEW row
IF (TG_OP = 'DELETE') THEN
data = row_to_json(OLD);
ELSE
data = row_to_json(NEW);
END IF;
-- Contruct the notification as a JSON string.
notification = json_build_object(
'table',TG_TABLE_NAME,
'action', TG_OP,
'data', data);
-- Execute pg_notify(channel, notification)
PERFORM pg_notify('q_event',notification::text);
-- Result is ignored since this is an AFTER trigger
RETURN NULL;
END;
$function$
create trigger once add or delete data from the table
No to trigger the notification once data is added or deleted from the table, you need to create a trigger.
-- trigger once new record is added to the table
create trigger queue_notify_add afterinsert
on
usr01.usr01tbl for each row execute function queue_event();
--- trigger once record is deleted from the table
delete
on
usr01.usr01tbl for each row execute function queue_event();
Create Client to listen to the message from Postgres
To receive message send from the server, you can create java client as below. For the dependencies packages, you might need to add 2 of the packages below to your pom.xml
<dependencies>
<dependency><groupId>postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.1-901.jdbc4</version>
</dependency>
<dependency>
<groupId>com.impossibl.pgjdbc-ng</groupId>
<artifactId>pgjdbc-ng</artifactId>
<version>0.7.1</version>
</dependency>
</dependencies>
For the Java code, you could create connector as below.
package com.pg.data;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import com.impossibl.postgres.api.jdbc.PGConnection;
import com.impossibl.postgres.api.jdbc.PGNotificationListener;
import com.impossibl.postgres.jdbc.PGDataSource;
public class ListenNotify {
// Create the queue that will be shared by the producer and consumer
private BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
// Database connection
PGConnection connection;
public ListenNotify() {
// Get database info from environment variables
String DBHost = "pgdbsvr.itstikk.pro";
String DBName = "usr01";
String DBUserName = "usr01";
String DBPassword = "password123";
// Create the listener callback
PGNotificationListener listener = new PGNotificationListener() {
@Override
public void notification(int processId, String channelName, String payload) {
// Add event and payload to the queue
queue.add("/channels/" + channelName + " " + payload);
}
};
try {
// Create a data source for logging into the db
PGDataSource dataSource = new PGDataSource();
dataSource.setHost(DBHost);
dataSource.setPort(5432);
dataSource.setDatabase(DBName);
dataSource.setUser(DBUserName);
dataSource.setPassword(DBPassword);
// Log into the db
connection = (PGConnection) dataSource.getConnection();
// add the callback listener created earlier to the connection
connection.addNotificationListener(listener);
// Tell Postgres to send NOTIFY q_event to our connection and listener
Statement statement = connection.createStatement();
statement.execute("LISTEN q_event");
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
/**
* @return shared queue
*/
public BlockingQueue<String> getQueue() {
return queue;
}
}
package com.pg.data;
import java.util.concurrent.BlockingQueue;
public class Main {
public static void main(String[] args) {
ListenNotify ln = new ListenNotify();
// Get the shared queue
BlockingQueue<String> queue = ln.getQueue();
// Loop forever pulling messages off the queue
while (true) {
try {
// queue blocks until something is placed on it
String msg = queue.take();
// Do something with the event
System.out.println(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
No comments:
Post a Comment