Davor Josipovic Just another WordPress blog – rather tryout

29/06/2010

PostgreSQL table audit in plpgsql

Filed under: Postgres,Programming — Tags: , — Davor @ 15:27

I recently had to implement a logging system for our database. Googling resulted in a few interesting ways to do this. The best I have found so far is from Lorenzo Alberton. I recommend you glance through that, and also an extension/improvement of it by James Gardner.

Now, Lorenzo’s implementation has a few problems. Most notably: not all of us want to install Tcl. So, I have “rewritten” it in plpgsql. This is possible since in the meantime the EXECUTE command was extended with the USING clause.

Some extra features are added also:

  • Support for multi-field PK’s.
  • Logging of PK’s.
  • Changes are detected by default type operators instead of textbased operators.

Note: I am using a slightly different naming convention, so you should adjust the names before using it with extra functions from James.

Note2: This is still a (working) tryout. Comments on improvement are welcome.

Update: fnc_audit(…) is used by trg_audit()

CREATE OR REPLACE FUNCTION private.tfc_audit()
  RETURNS TRIGGER AS
$BODY$ 
 
/*!
 Dynamic query exectution through plpgsql EXECUTE command makes this function possible.
 Some complications which can araise using this command are described here:
  - http://archives.postgresql.org/pgsql-general/2008-05/msg00314.php
*/ 
 
DECLARE 
 
_PK_NAMES CONSTANT VARCHAR[] := ARRAY(SELECT a.attname AS pk_name FROM pg_class c, pg_attribute a, pg_index i
     WHERE c.oid = i.indrelid
     AND a.attrelid = i.indexrelid
     AND a.attisdropped = FALSE /* such column names are like "........pg.dropped.1........" and are invalid */
     AND a.attnum > 0 /* i.e is not a system column like OID: they have (arbitrary) negative numbers */ 
     AND i.indisprimary = TRUE /* is pk */
     AND c.oid = TG_RELID /* regclass takes current schema into consideration! */); 
 
_FIELD_NAMES CONSTANT VARCHAR[] := ARRAY(SELECT a.attname AS pk_name FROM pg_class c, pg_attribute a
     WHERE c.oid = a.attrelid
     AND a.attisdropped = FALSE /* such column names are like "........pg.dropped.1........" and are invalid */
     AND a.attnum > 0 /* i.e is not a system column like OID: they have (arbitrary) negative numbers */ 
     AND c.oid = TG_RELID ); /* regclass takes current schema into consideration! */ 
 
_PK_VALUES VARCHAR[]; 
 
BEGIN 
 
-- this function works only when marked as AFTER trigger!
PERFORM assert(TG_WHEN = 'AFTER', 'Wrong execution of fnc_audit(): should only be executed AFTER an event!');
-- this function works only when marked as FOR EACH ROW trigger!
PERFORM assert(TG_LEVEL = 'ROW', 'Wrong execution of fnc_audit(): should only be executed FOR EACH ROW!'); 
 
-- table sanity check
PERFORM assert(TG_TABLE_NAME IS NOT NULL, 'TG_TABLE_NAME IS NULL');
PERFORM assert(_PK_NAMES IS NOT NULL AND array_length(_PK_NAMES, 1) IS NOT NULL, 'Table ' || TG_TABLE_NAME || ' can not be logged: it has no PK!');  -- FOR-loop will otherwise throw "upper bound of FOR loop cannot be null" 
 
-- skip changes on audit_table: otherwise endless loop
IF TG_TABLE_NAME = 'tbl_audits' THEN
 RETURN NULL; -- result is ignored since this is an AFTER trigger
END IF; 
 
-- find PK value
FOR i IN 1 .. array_length(_PK_NAMES, 1)
LOOP
 DECLARE
  tmp text;
  tmp_record record;
 BEGIN
  CASE TG_OP
  WHEN 'UPDATE', 'INSERT' THEN tmp_record = NEW;
  WHEN 'DELETE' THEN tmp_record = OLD;
  END CASE; 
 
  EXECUTE 'SELECT $1.' || quote_ident(_PK_NAMES[i]) INTO STRICT tmp USING tmp_record;
  _PK_VALUES[i] := tmp;
 END;
END LOOP; 
 
-- PK values sanity check
PERFORM assert(_PK_VALUES IS NOT NULL AND array_length(_PK_VALUES, 1) IS NOT NULL, '_PK_VALUES IS NULL'); -- FOR-loop will otherwise throw "upper bound of FOR loop cannot be null"
PERFORM assert(array_length(_PK_VALUES, 1) = array_length(_PK_NAMES, 1), '_PK_VALUES count not equal to _PK_NAMES count in table ' || TG_TABLE_NAME); 
 
-- find fields to be logged
FOR i IN 1 .. array_length(_FIELD_NAMES, 1)
LOOP
 -- field_name sanity check
 PERFORM assert(_FIELD_NAMES[i] IS NOT NULL, '_FIELD_NAMES[' || i || '] IS NULL'); 
 
 DECLARE
  _field_name CONSTANT VARCHAR := quote_ident(_FIELD_NAMES[i]);
  _distinct BOOLEAN;
  _old text;
  _new text;
 BEGIN 
 
 CASE TG_OP --fall through is unfortunately not supported
  WHEN 'UPDATE' THEN
   -- compare the two fields according to the type-default functions.
   -- note that <> returns false on NULL input and IS DISTINCT FROM does not, but only if both inputs are NULL!
   EXECUTE 'SELECT $1.' || _field_name || ' IS DISTINCT FROM $2.' || _field_name || ';' INTO STRICT _distinct USING OLD, NEW;
   CONTINUE WHEN NOT _distinct; 
 
   EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _new USING NEW;
   EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _old USING OLD;
  WHEN 'INSERT' THEN
   EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _new USING NEW;
  WHEN 'DELETE' THEN
   EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _old USING OLD;
  ELSE
   RAISE EXCEPTION 'Unhandled TG_OP in fnc_audit(): %', TG_OP;
 END CASE; 
 
 CONTINUE WHEN _new IS NULL AND _old IS NULL; 
 
 PERFORM private.fnc_audit(statement_timestamp()::TIMESTAMP WITHOUT TIME zone, SESSION_USER::VARCHAR, TG_TABLE_NAME::VARCHAR, _field_name, _PK_NAMES, _PK_VALUES, TG_OP::private.table_audit_mod_type, _old, _new);
 
 END;
END LOOP; 
 
RETURN NULL; -- result is ignored since this is an AFTER trigger
END; 
 
$BODY$
  LANGUAGE 'plpgsql' VOLATILE SECURITY DEFINER;

A specific function for inserting into tbl_audits is used:

CREATE OR REPLACE FUNCTION private.fnc_audit(_audi_timestamp TIMESTAMP WITHOUT TIME zone, _audi_user name, _audi_table name, _audi_field name, _audi_pk_name CHARACTER VARYING[], _audi_pk_value CHARACTER VARYING[], _audi_mod_type private.table_audit_mod_type, _audi_value_old text, _audi_value_new text)
  RETURNS void AS
$BODY$
DECLARE
BEGIN 
 
INSERT INTO private.tbl_audits(audi_timestamp, audi_user, audi_table, audi_field, audi_pk_name, audi_pk_value, audi_mod_type, audi_value_old, audi_value_new)
 VALUES (_audi_timestamp, _audi_user, _audi_table, _audi_field, _audi_pk_name, _audi_pk_value, _audi_mod_type, _audi_value_old, _audi_value_new); 
 
END;
$BODY$
  LANGUAGE 'plpgsql' VOLATILE;

TG_OP is stored as an ENUM (= more efficient) so we need a new type:

CREATE TYPE private.table_audit_mod_type AS ENUM ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE');

Here is a very important helper function:

CREATE OR REPLACE FUNCTION assert(BOOLEAN, CHARACTER VARYING)
  RETURNS void AS
$BODY$
BEGIN
 IF NOT $1 OR $1 IS NULL THEN
   IF $2 IS NOT NULL THEN 
     RAISE EXCEPTION 'Assert failure: %', $2;
   END IF;
   RAISE NOTICE 'Assert. Message is null';
 END IF; 
END;$BODY$
  LANGUAGE 'plpgsql' VOLATILE;

And finaly the audit table:

CREATE TABLE private.tbl_audits
(
  audi_timestamp TIMESTAMP WITHOUT TIME zone NOT NULL,
  audi_user CHARACTER VARYING(30) NOT NULL,
  audi_table CHARACTER VARYING(45) NOT NULL,
  audi_field CHARACTER VARYING(30) NOT NULL,
  audi_pk_name CHARACTER VARYING(30)[] NOT NULL,
  audi_pk_value CHARACTER VARYING(60)[] NOT NULL,
  audi_mod_type private.table_audit_mod_type NOT NULL,
  audi_value_old text,
  audi_value_new text,
  CONSTRAINT tbl_audits_check_pk CHECK (array_length(audi_pk_name, 1) = array_length(audi_pk_value, 1))
);

A more storage-efficient way to store logged data would be to make an audi_changeset column which consists of an array of changed fieldnames, old values and new values for each PK during a statement execution. It would be equivalent to the following view:

CREATE OR REPLACE VIEW vew_audit_changesets AS
 SELECT tbl_audits.audi_timestamp, tbl_audits.audi_user, tbl_audits.audi_table, tbl_audits.audi_pk_name, tbl_audits.audi_pk_value, tbl_audits.audi_mod_type, ARRAY[array_agg(tbl_audits.audi_field), array_agg(tbl_audits.audi_value_old)::CHARACTER VARYING[], array_agg(tbl_audits.audi_value_new)::CHARACTER VARYING[]] AS audi_changeset
   FROM tbl_audits
  GROUP BY tbl_audits.audi_timestamp, tbl_audits.audi_user, tbl_audits.audi_table, tbl_audits.audi_pk_name, tbl_audits.audi_pk_value, tbl_audits.audi_mod_type;

fnc_audit() is easily adjustable to store logs to such a table. But I wonder whether this is worth the fuss? Most of the queries run on vew_audit_changesets would first require the “unnesting of array audi_changeset to sets” (i.e. a structure which resembles tbl_audits). Why? Because SQL is set-oriented, not array-oriented. In other words, it seems easy to convert from tbl_audits structure to vew_audit_changesets structure, but how does one go from vew_audit_changesets structure to tbl_audits structure, and is this conversion efficient? Is it worth to save logs in a vew_audit_changesets-like table?

Powered by WordPress