I recently had to implement a logging system for our database. Googling resulted in a few interesting ways to do this. The best I have found so far is from Lorenzo Alberton. I recommend you glance through that, and also an extension/improvement of it by James Gardner.
Now, Lorenzo’s implementation has a few problems. Most notably: not all of us want to install Tcl. So, I have “rewritten” it in plpgsql. This is possible since in the meantime the EXECUTE command was extended with the USING clause.
Some extra features are added also:
- Support for multi-field PK’s.
- Logging of PK’s.
- Changes are detected by default type operators instead of textbased operators.
Note: I am using a slightly different naming convention, so you should adjust the names before using it with extra functions from James.
Note2: This is still a (working) tryout. Comments on improvement are welcome.
Update: fnc_audit(…) is used by trg_audit()
CREATE OR REPLACE FUNCTION private.tfc_audit()
RETURNS TRIGGER AS
$BODY$
/*!
Dynamic query exectution through plpgsql EXECUTE command makes this function possible.
Some complications which can araise using this command are described here:
- http://archives.postgresql.org/pgsql-general/2008-05/msg00314.php
*/
DECLARE
_PK_NAMES CONSTANT VARCHAR[] := ARRAY(SELECT a.attname AS pk_name FROM pg_class c, pg_attribute a, pg_index i
WHERE c.oid = i.indrelid
AND a.attrelid = i.indexrelid
AND a.attisdropped = FALSE /* such column names are like "........pg.dropped.1........" and are invalid */
AND a.attnum > 0 /* i.e is not a system column like OID: they have (arbitrary) negative numbers */
AND i.indisprimary = TRUE /* is pk */
AND c.oid = TG_RELID /* regclass takes current schema into consideration! */);
_FIELD_NAMES CONSTANT VARCHAR[] := ARRAY(SELECT a.attname AS pk_name FROM pg_class c, pg_attribute a
WHERE c.oid = a.attrelid
AND a.attisdropped = FALSE /* such column names are like "........pg.dropped.1........" and are invalid */
AND a.attnum > 0 /* i.e is not a system column like OID: they have (arbitrary) negative numbers */
AND c.oid = TG_RELID ); /* regclass takes current schema into consideration! */
_PK_VALUES VARCHAR[];
BEGIN
-- this function works only when marked as AFTER trigger!
PERFORM assert(TG_WHEN = 'AFTER', 'Wrong execution of fnc_audit(): should only be executed AFTER an event!');
-- this function works only when marked as FOR EACH ROW trigger!
PERFORM assert(TG_LEVEL = 'ROW', 'Wrong execution of fnc_audit(): should only be executed FOR EACH ROW!');
-- table sanity check
PERFORM assert(TG_TABLE_NAME IS NOT NULL, 'TG_TABLE_NAME IS NULL');
PERFORM assert(_PK_NAMES IS NOT NULL AND array_length(_PK_NAMES, 1) IS NOT NULL, 'Table ' || TG_TABLE_NAME || ' can not be logged: it has no PK!'); -- FOR-loop will otherwise throw "upper bound of FOR loop cannot be null"
-- skip changes on audit_table: otherwise endless loop
IF TG_TABLE_NAME = 'tbl_audits' THEN
RETURN NULL; -- result is ignored since this is an AFTER trigger
END IF;
-- find PK value
FOR i IN 1 .. array_length(_PK_NAMES, 1)
LOOP
DECLARE
tmp text;
tmp_record record;
BEGIN
CASE TG_OP
WHEN 'UPDATE', 'INSERT' THEN tmp_record = NEW;
WHEN 'DELETE' THEN tmp_record = OLD;
END CASE;
EXECUTE 'SELECT $1.' || quote_ident(_PK_NAMES[i]) INTO STRICT tmp USING tmp_record;
_PK_VALUES[i] := tmp;
END;
END LOOP;
-- PK values sanity check
PERFORM assert(_PK_VALUES IS NOT NULL AND array_length(_PK_VALUES, 1) IS NOT NULL, '_PK_VALUES IS NULL'); -- FOR-loop will otherwise throw "upper bound of FOR loop cannot be null"
PERFORM assert(array_length(_PK_VALUES, 1) = array_length(_PK_NAMES, 1), '_PK_VALUES count not equal to _PK_NAMES count in table ' || TG_TABLE_NAME);
-- find fields to be logged
FOR i IN 1 .. array_length(_FIELD_NAMES, 1)
LOOP
-- field_name sanity check
PERFORM assert(_FIELD_NAMES[i] IS NOT NULL, '_FIELD_NAMES[' || i || '] IS NULL');
DECLARE
_field_name CONSTANT VARCHAR := quote_ident(_FIELD_NAMES[i]);
_distinct BOOLEAN;
_old text;
_new text;
BEGIN
CASE TG_OP --fall through is unfortunately not supported
WHEN 'UPDATE' THEN
-- compare the two fields according to the type-default functions.
-- note that <> returns false on NULL input and IS DISTINCT FROM does not, but only if both inputs are NULL!
EXECUTE 'SELECT $1.' || _field_name || ' IS DISTINCT FROM $2.' || _field_name || ';' INTO STRICT _distinct USING OLD, NEW;
CONTINUE WHEN NOT _distinct;
EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _new USING NEW;
EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _old USING OLD;
WHEN 'INSERT' THEN
EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _new USING NEW;
WHEN 'DELETE' THEN
EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _old USING OLD;
ELSE
RAISE EXCEPTION 'Unhandled TG_OP in fnc_audit(): %', TG_OP;
END CASE;
CONTINUE WHEN _new IS NULL AND _old IS NULL;
PERFORM private.fnc_audit(statement_timestamp()::TIMESTAMP WITHOUT TIME zone, SESSION_USER::VARCHAR, TG_TABLE_NAME::VARCHAR, _field_name, _PK_NAMES, _PK_VALUES, TG_OP::private.table_audit_mod_type, _old, _new);
END;
END LOOP;
RETURN NULL; -- result is ignored since this is an AFTER trigger
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE SECURITY DEFINER; |
CREATE OR REPLACE FUNCTION private.tfc_audit()
RETURNS trigger AS
$BODY$
/*!
Dynamic query exectution through plpgsql EXECUTE command makes this function possible.
Some complications which can araise using this command are described here:
- http://archives.postgresql.org/pgsql-general/2008-05/msg00314.php
*/
DECLARE
_PK_NAMES CONSTANT varchar[] := ARRAY(SELECT a.attname AS pk_name FROM pg_class c, pg_attribute a, pg_index i
WHERE c.oid = i.indrelid
AND a.attrelid = i.indexrelid
AND a.attisdropped = false /* such column names are like "........pg.dropped.1........" and are invalid */
AND a.attnum > 0 /* i.e is not a system column like OID: they have (arbitrary) negative numbers */
AND i.indisprimary = TRUE /* is pk */
AND c.oid = TG_RELID /* regclass takes current schema into consideration! */);
_FIELD_NAMES CONSTANT varchar[] := ARRAY(SELECT a.attname AS pk_name FROM pg_class c, pg_attribute a
WHERE c.oid = a.attrelid
AND a.attisdropped = false /* such column names are like "........pg.dropped.1........" and are invalid */
AND a.attnum > 0 /* i.e is not a system column like OID: they have (arbitrary) negative numbers */
AND c.oid = TG_RELID ); /* regclass takes current schema into consideration! */
_PK_VALUES varchar[];
BEGIN
-- this function works only when marked as AFTER trigger!
PERFORM assert(TG_WHEN = 'AFTER', 'Wrong execution of fnc_audit(): should only be executed AFTER an event!');
-- this function works only when marked as FOR EACH ROW trigger!
PERFORM assert(TG_LEVEL = 'ROW', 'Wrong execution of fnc_audit(): should only be executed FOR EACH ROW!');
-- table sanity check
PERFORM assert(TG_TABLE_NAME IS NOT NULL, 'TG_TABLE_NAME IS NULL');
PERFORM assert(_PK_NAMES IS NOT NULL AND array_length(_PK_NAMES, 1) IS NOT NULL, 'Table ' || TG_TABLE_NAME || ' can not be logged: it has no PK!'); -- FOR-loop will otherwise throw "upper bound of FOR loop cannot be null"
-- skip changes on audit_table: otherwise endless loop
IF TG_TABLE_NAME = 'tbl_audits' THEN
return NULL; -- result is ignored since this is an AFTER trigger
END IF;
-- find PK value
FOR i IN 1 .. array_length(_PK_NAMES, 1)
LOOP
DECLARE
tmp text;
tmp_record record;
BEGIN
CASE TG_OP
WHEN 'UPDATE', 'INSERT' THEN tmp_record = NEW;
WHEN 'DELETE' THEN tmp_record = OLD;
END CASE;
EXECUTE 'SELECT $1.' || quote_ident(_PK_NAMES[i]) INTO STRICT tmp USING tmp_record;
_PK_VALUES[i] := tmp;
END;
END LOOP;
-- PK values sanity check
PERFORM assert(_PK_VALUES IS NOT NULL AND array_length(_PK_VALUES, 1) IS NOT NULL, '_PK_VALUES IS NULL'); -- FOR-loop will otherwise throw "upper bound of FOR loop cannot be null"
PERFORM assert(array_length(_PK_VALUES, 1) = array_length(_PK_NAMES, 1), '_PK_VALUES count not equal to _PK_NAMES count in table ' || TG_TABLE_NAME);
-- find fields to be logged
FOR i IN 1 .. array_length(_FIELD_NAMES, 1)
LOOP
-- field_name sanity check
PERFORM assert(_FIELD_NAMES[i] IS NOT NULL, '_FIELD_NAMES[' || i || '] IS NULL');
DECLARE
_field_name CONSTANT varchar := quote_ident(_FIELD_NAMES[i]);
_distinct boolean;
_old text;
_new text;
BEGIN
CASE TG_OP --fall through is unfortunately not supported
WHEN 'UPDATE' THEN
-- compare the two fields according to the type-default functions.
-- note that <> returns false on NULL input and IS DISTINCT FROM does not, but only if both inputs are NULL!
EXECUTE 'SELECT $1.' || _field_name || ' IS DISTINCT FROM $2.' || _field_name || ';' INTO STRICT _distinct USING OLD, NEW;
CONTINUE WHEN NOT _distinct;
EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _new USING NEW;
EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _old USING OLD;
WHEN 'INSERT' THEN
EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _new USING NEW;
WHEN 'DELETE' THEN
EXECUTE 'SELECT $1.' || _field_name || ';' INTO STRICT _old USING OLD;
ELSE
RAISE EXCEPTION 'Unhandled TG_OP in fnc_audit(): %', TG_OP;
END CASE;
CONTINUE WHEN _new IS NULL AND _old IS NULL;
PERFORM private.fnc_audit(statement_timestamp()::timestamp without time zone, SESSION_USER::varchar, TG_TABLE_NAME::varchar, _field_name, _PK_NAMES, _PK_VALUES, TG_OP::private.table_audit_mod_type, _old, _new);
END;
END LOOP;
RETURN NULL; -- result is ignored since this is an AFTER trigger
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE SECURITY DEFINER;
A specific function for inserting into tbl_audits is used:
CREATE OR REPLACE FUNCTION private.fnc_audit(_audi_timestamp TIMESTAMP WITHOUT TIME zone, _audi_user name, _audi_table name, _audi_field name, _audi_pk_name CHARACTER VARYING[], _audi_pk_value CHARACTER VARYING[], _audi_mod_type private.table_audit_mod_type, _audi_value_old text, _audi_value_new text)
RETURNS void AS
$BODY$
DECLARE
BEGIN
INSERT INTO private.tbl_audits(audi_timestamp, audi_user, audi_table, audi_field, audi_pk_name, audi_pk_value, audi_mod_type, audi_value_old, audi_value_new)
VALUES (_audi_timestamp, _audi_user, _audi_table, _audi_field, _audi_pk_name, _audi_pk_value, _audi_mod_type, _audi_value_old, _audi_value_new);
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE; |
CREATE OR REPLACE FUNCTION private.fnc_audit(_audi_timestamp timestamp without time zone, _audi_user name, _audi_table name, _audi_field name, _audi_pk_name character varying[], _audi_pk_value character varying[], _audi_mod_type private.table_audit_mod_type, _audi_value_old text, _audi_value_new text)
RETURNS void AS
$BODY$
DECLARE
BEGIN
INSERT INTO private.tbl_audits(audi_timestamp, audi_user, audi_table, audi_field, audi_pk_name, audi_pk_value, audi_mod_type, audi_value_old, audi_value_new)
VALUES (_audi_timestamp, _audi_user, _audi_table, _audi_field, _audi_pk_name, _audi_pk_value, _audi_mod_type, _audi_value_old, _audi_value_new);
END;
$BODY$
LANGUAGE 'plpgsql' VOLATILE;
TG_OP is stored as an ENUM (= more efficient) so we need a new type:
CREATE TYPE private.table_audit_mod_type AS ENUM ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE'); |
CREATE TYPE private.table_audit_mod_type AS ENUM ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE');
Here is a very important helper function:
CREATE OR REPLACE FUNCTION assert(BOOLEAN, CHARACTER VARYING)
RETURNS void AS
$BODY$
BEGIN
IF NOT $1 OR $1 IS NULL THEN
IF $2 IS NOT NULL THEN
RAISE EXCEPTION 'Assert failure: %', $2;
END IF;
RAISE NOTICE 'Assert. Message is null';
END IF;
END;$BODY$
LANGUAGE 'plpgsql' VOLATILE; |
CREATE OR REPLACE FUNCTION assert(boolean, character varying)
RETURNS void AS
$BODY$
BEGIN
IF NOT $1 OR $1 IS NULL THEN
IF $2 IS NOT NULL THEN
RAISE EXCEPTION 'Assert failure: %', $2;
END IF;
RAISE NOTICE 'Assert. Message is null';
END IF;
END;$BODY$
LANGUAGE 'plpgsql' VOLATILE;
And finaly the audit table:
CREATE TABLE private.tbl_audits
(
audi_timestamp TIMESTAMP WITHOUT TIME zone NOT NULL,
audi_user CHARACTER VARYING(30) NOT NULL,
audi_table CHARACTER VARYING(45) NOT NULL,
audi_field CHARACTER VARYING(30) NOT NULL,
audi_pk_name CHARACTER VARYING(30)[] NOT NULL,
audi_pk_value CHARACTER VARYING(60)[] NOT NULL,
audi_mod_type private.table_audit_mod_type NOT NULL,
audi_value_old text,
audi_value_new text,
CONSTRAINT tbl_audits_check_pk CHECK (array_length(audi_pk_name, 1) = array_length(audi_pk_value, 1))
); |
CREATE TABLE private.tbl_audits
(
audi_timestamp timestamp without time zone NOT NULL,
audi_user character varying(30) NOT NULL,
audi_table character varying(45) NOT NULL,
audi_field character varying(30) NOT NULL,
audi_pk_name character varying(30)[] NOT NULL,
audi_pk_value character varying(60)[] NOT NULL,
audi_mod_type private.table_audit_mod_type NOT NULL,
audi_value_old text,
audi_value_new text,
CONSTRAINT tbl_audits_check_pk CHECK (array_length(audi_pk_name, 1) = array_length(audi_pk_value, 1))
);
A more storage-efficient way to store logged data would be to make an audi_changeset column which consists of an array of changed fieldnames, old values and new values for each PK during a statement execution. It would be equivalent to the following view:
CREATE OR REPLACE VIEW vew_audit_changesets AS
SELECT tbl_audits.audi_timestamp, tbl_audits.audi_user, tbl_audits.audi_table, tbl_audits.audi_pk_name, tbl_audits.audi_pk_value, tbl_audits.audi_mod_type, ARRAY[array_agg(tbl_audits.audi_field), array_agg(tbl_audits.audi_value_old)::CHARACTER VARYING[], array_agg(tbl_audits.audi_value_new)::CHARACTER VARYING[]] AS audi_changeset
FROM tbl_audits
GROUP BY tbl_audits.audi_timestamp, tbl_audits.audi_user, tbl_audits.audi_table, tbl_audits.audi_pk_name, tbl_audits.audi_pk_value, tbl_audits.audi_mod_type; |
CREATE OR REPLACE VIEW vew_audit_changesets AS
SELECT tbl_audits.audi_timestamp, tbl_audits.audi_user, tbl_audits.audi_table, tbl_audits.audi_pk_name, tbl_audits.audi_pk_value, tbl_audits.audi_mod_type, ARRAY[array_agg(tbl_audits.audi_field), array_agg(tbl_audits.audi_value_old)::character varying[], array_agg(tbl_audits.audi_value_new)::character varying[]] AS audi_changeset
FROM tbl_audits
GROUP BY tbl_audits.audi_timestamp, tbl_audits.audi_user, tbl_audits.audi_table, tbl_audits.audi_pk_name, tbl_audits.audi_pk_value, tbl_audits.audi_mod_type;
fnc_audit() is easily adjustable to store logs to such a table. But I wonder whether this is worth the fuss? Most of the queries run on vew_audit_changesets would first require the “unnesting of array audi_changeset to sets” (i.e. a structure which resembles tbl_audits). Why? Because SQL is set-oriented, not array-oriented. In other words, it seems easy to convert from tbl_audits structure to vew_audit_changesets structure, but how does one go from vew_audit_changesets structure to tbl_audits structure, and is this conversion efficient? Is it worth to save logs in a vew_audit_changesets-like table?