2 -- Glyn Astill 28/08/2015
4 -- Attempt at pl/pgsql drop-in replacement for table_log C extenstion AKA
5 -- pg Table Audit / PostgreSQL Table Log / tablelog by Andreas Scherbaum
6 -- http://www.postgresql.org/ftp/projects/pgFoundry/tablelog/tablelog/
7 -- http://github.com/andreasscherbaum/table_log
9 -- A slightly more up to date version of the original C extension can
10 -- also be found here:
11 -- https://github.com/glynastill/pg_table_audit
13 -- There are now many better ways to audit DML, using json types or
14 -- advanced extensions like pgaudit (below), however if for some reason
15 -- you're stuck with table_log this may help.
17 -- http://8kb.co.uk/blog/2015/01/19/copying-pavel-stehules-simple-history-table-but-with-the-jsonb-type/
18 -- https://github.com/2ndQuadrant/pgaudit
20 -- complain if script is sourced in psql, rather than via CREATE EXTENSION
21 \echo Use "CREATE EXTENSION table_log_pl" to load this file. \quit
25 CREATE OR REPLACE FUNCTION table_log_pl() RETURNS TRIGGER AS
29 v_loguser boolean := false;
33 v_col_trig text := '';
34 v_val_trig text := '';
39 v_enable_cache boolean := true;
42 -- - The trigger_id comes off sequence, this function is oblivious
43 -- - 3 columns means don't log trigger_user or trigger_id
44 -- - 4 columns means don't log trigger_user
45 -- - 5 columns means log both
46 -- - To use the column data caching on server versions prior to
47 -- 9.6 add custom var "table_log.column_cache = ''" to postgresql.conf
49 IF (TG_NARGS > 2) THEN
50 v_nspname := TG_ARGV[2];
52 v_nspname := TG_TABLE_SCHEMA;
55 IF (TG_NARGS > 1 AND TG_ARGV[1]::int = 1) THEN
59 IF (TG_NARGS > 0) THEN
60 v_tabname := TG_ARGV[0];
62 v_tabname := TG_TABLE_NAME || '_log';
65 -- Retrieve custom variable used as a poor mans cache for multirow statements
66 IF (v_enable_cache) THEN
67 IF (current_setting('server_version_num')::int >= 90600) THEN
68 v_col_cache := current_setting('table_log.column_cache', true);
70 v_col_cache := current_setting('table_log.column_cache');
74 IF (v_enable_cache AND left(v_col_cache, length(TG_RELID::text)+1) = (TG_RELID::text || ':')) THEN
75 v_cols := right(v_col_cache, (length(TG_RELID::text)+1)*-1);
77 IF (TG_WHEN != 'AFTER') THEN
78 RAISE EXCEPTION 'table_log: must be fired after event';
80 IF (TG_LEVEL = 'STATEMENT') THEN
81 RAISE EXCEPTION 'table_log: can''t process STATEMENT events';
84 SELECT count(*), string_agg(quote_ident(attname),',') INTO STRICT v_num_col, v_cols
85 FROM pg_catalog.pg_attribute
86 WHERE attrelid = TG_RELID
90 IF (v_num_col < 1) THEN
91 RAISE EXCEPTION 'table_log: number of columns in table is < 1, can this happen?';
94 SELECT count(*) INTO STRICT v_num_col_log
95 FROM pg_catalog.pg_attribute
96 WHERE attrelid = (v_nspname || '.' || v_tabname)::regclass
100 IF (v_num_col_log < 1) THEN
101 RAISE EXCEPTION 'could not get number columns in relation %.%', v_nspname, v_tabname;
104 -- This is the way the original checks column count regardless of trigger_id is presence
105 IF (v_num_col_log != (v_num_col + 3 + v_loguser::int)) AND (v_num_col_log != (v_num_col + 4 + v_loguser::int)) THEN
106 RAISE EXCEPTION 'number colums in relation %.%(%) does not match columns in %.%(%)', TG_TABLE_SCHEMA, TG_TABLE_NAME, v_num_col, v_nspname, v_tabname, v_num_col_log;
109 -- Set custom variable for use as a poor mans cache for multirow statements
110 IF (v_enable_cache) THEN
111 v_col_cache := (TG_RELID::text || ':' || v_cols);
112 PERFORM set_config('table_log.column_cache', v_col_cache, true);
117 v_col_trig := v_col_trig || ', "trigger_user"';
118 v_val_trig := format('%L, ', session_user);
120 v_col_trig := v_col_trig || ', "trigger_mode", "trigger_changed", "trigger_tuple"';
121 v_val_trig := format('%s%L, %L', v_val_trig, TG_OP, current_timestamp);
123 IF (TG_OP != 'INSERT') THEN
124 v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''old'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
125 EXECUTE v_sql USING OLD;
127 IF (TG_OP != 'DELETE') THEN
128 v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''new'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
129 EXECUTE v_sql USING NEW;
137 LANGUAGE plpgsql VOLATILE;
141 CREATE OR REPLACE FUNCTION table_log_pl_restore_table (origtab varchar, origtab_pk varchar, logtab char, logtab_pk char, restoretab char, to_timestamp timestamptz, search_pk char DEFAULT NULL, method int DEFAULT 0, not_temporarly int DEFAULT 0, origtab_schema varchar DEFAULT NULL, logtab_schema varchar DEFAULT NULL) RETURNS varchar AS
146 v_restoretab_cols int;
158 -- The original implimentation doesn't allow fully qualified table
159 -- references in table_log_restore_table; You can get some milage
160 -- out of search_path if required there. For this reason the plpgsql
161 -- version adds the following two optional parameters to those below:
163 -- - original table schema
164 -- - logging table schema
166 -- Comments from C implimentation:
168 -- restore a complete table based on the logging table
171 -- - original table name
172 -- - name of primary key in original table
174 -- - name of primary key in logging table
175 -- - restore table name
176 -- - timestamp for restoring data
177 -- - primary key to restore (only this key will be restored) (optional)
179 -- 0: restore from blank table (default)
180 -- needs a complete logging table
181 -- 1: restore from actual table backwards
182 -- - dont create table temporarly
183 -- 0: create restore table temporarly (default)
184 -- 1: create restore table not temporarly
188 IF origtab IS NULL THEN
189 RAISE NOTICE 'table_log_restore_table: missing original table name';
191 IF origtab_pk IS NULL THEN
192 RAISE NOTICE 'table_log_restore_table: missing primary key name for original table';
194 IF logtab IS NULL THEN
195 RAISE NOTICE 'table_log_restore_table: missing log table name';
197 IF logtab_pk IS NULL THEN
198 RAISE NOTICE 'table_log_restore_table: missing primary key name for log table';
200 IF restoretab IS NULL THEN
201 RAISE NOTICE 'table_log_restore_table: missing copy table name';
203 IF to_timestamp IS NULL THEN
204 RAISE NOTICE 'table_log_restore_table: missing timestamp';
206 IF (search_pk IS NOT NULL) THEN
207 RAISE NOTICE 'table_log_restore_table: will restore a single key';
210 IF origtab_pk = logtab_pk THEN
211 RAISE EXCEPTION 'pkey of logging table cannot be the pkey of the original table: % <-> %', origtab_pk, logtab_pk;
214 v_origtab_fqn := coalesce(quote_ident(origtab_schema) || '.','') || quote_ident(origtab);
215 v_logtab_fqn := coalesce(quote_ident(logtab_schema) || '.','') || quote_ident(logtab);
217 -- Check original table and get column list
218 SELECT string_agg(quote_ident(attname), ','), count(*), count(*) filter (where attname=origtab_pk)
219 INTO v_cols, v_origtab_cols, v_pk_count
220 FROM pg_catalog.pg_class c
221 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
222 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
223 WHERE c.relname = origtab AND c.relkind='r' AND a.attnum > 0
224 AND (origtab_schema IS NULL OR n.nspname = origtab_schema)
225 AND NOT attisdropped;
227 IF v_origtab_cols = 0 OR v_cols IS NULL THEN
228 RAISE EXCEPTION 'could not check relation: % (columns = %)', v_origtab_fqn, v_origtab_cols;
229 ELSIF v_pk_count != 1 THEN
230 RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', origtab_pk, v_origtab_fqn;
232 RAISE NOTICE 'original table: OK (% columns)', v_origtab_cols;
236 SELECT count(*), count(*) filter (where attname=logtab_pk)
237 INTO v_logtab_cols, v_pk_count
238 FROM pg_catalog.pg_class c
239 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
240 JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
241 WHERE c.relname = logtab AND c.relkind='r' AND a.attnum > 0
242 AND (logtab_schema IS NULL OR n.nspname = logtab_schema)
243 AND NOT attisdropped;
245 IF v_logtab_cols = 0 THEN
246 RAISE EXCEPTION 'could not check relation: % (columns = %)', v_logtab_fqn, v_logtab_cols;
247 ELSIF v_pk_count != 1 THEN
248 RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', logtab_pk, v_logtab_fqn;
250 RAISE NOTICE 'log table: OK (% columns)', v_logtab_cols;
253 -- Check restore table
254 IF EXISTS(SELECT 1 FROM pg_catalog.pg_class
255 WHERE relname=restoretab AND relkind='r') THEN
256 RAISE EXCEPTION 'restore table already exists: %', restoretab;
258 RAISE NOTICE 'restore table: OK (doesnt exists)';
261 -- create restore table
263 IF not_temporarly = 0 THEN
264 v_sql := v_sql || ' TEMPORARY';
266 v_sql := v_sql || format(' TABLE %I AS SELECT * FROM %s', restoretab, v_origtab_fqn);
267 IF search_pk IS NOT NULL THEN
268 v_sql := v_sql || format(' WHERE %I = %L', origtab_pk, search_pk);
271 RAISE NOTICE 'need logs from start to timestamp: %', to_timestamp;
272 v_sql := v_sql || ' LIMIT 0'; -- Create blank table to roll forward into (need all logs)
274 RAISE NOTICE 'need logs from end to timestamp: %', to_timestamp;
277 -- RAISE NOTICE 'DDL: %', v_sql;
280 -- now build query for getting logs
281 v_sql := format('SELECT * FROM %s WHERE ', v_logtab_fqn);
283 v_sql := v_sql || format('trigger_changed <= %L', to_timestamp); -- ROLL FORWARD
285 v_sql := v_sql || format('trigger_changed >= %L', to_timestamp); -- ROLL BACK
288 IF search_pk IS NOT NULL THEN
289 v_sql := v_sql || format(' AND %I = %L', origtab_pk, search_pk);
293 v_sql := v_sql || format(' ORDER BY %I ASC', logtab_pk);
295 v_sql := v_sql || format(' ORDER BY %I DESC', logtab_pk);
298 -- RAISE NOTICE 'SQL: %', v_sql;
300 FOR v_rec IN EXECUTE v_sql
302 IF v_rec.trigger_mode = 'UPDATE' AND ((method = 0 AND v_rec.trigger_tuple = 'old') OR (method = 1 AND v_rec.trigger_tuple = 'new')) THEN
303 -- For previous update row versions we needn't apply anything;
304 -- we just note the pk value for the quals when applying the
305 -- next row change, i.e when rolling forward the old pk value,
306 -- when rolling back the new pk value
307 EXECUTE format('SELECT ($1::text::%s).%I', v_logtab_fqn, origtab_pk) INTO v_old_pk_str USING v_rec;
309 -- Apply the row changes from the log table, the following is
310 -- a mass of substitutions, but essentially we're selecting
311 -- data out of the log table record and casting it into the
314 IF v_rec.trigger_mode = 'UPDATE' THEN
315 v_sql := format('UPDATE %I SET (%s) = (SELECT %s FROM (SELECT ($1::text::%s).*) t) WHERE %I = %L',
316 restoretab, v_cols, v_cols, v_logtab_fqn, origtab_pk, v_old_pk_str);
317 ELSIF (v_rec.trigger_mode = 'INSERT' AND method = 0) OR (v_rec.trigger_mode = 'DELETE' AND method != 0) THEN
318 v_sql := format('INSERT INTO %I (%s) SELECT %s FROM (SELECT ($1::text::%s).*) t',
319 restoretab, v_cols, v_cols, v_logtab_fqn);
320 ELSIF (v_rec.trigger_mode = 'INSERT' AND method != 0) OR (v_rec.trigger_mode = 'DELETE' AND method = 0) THEN
321 v_sql := format('DELETE FROM %I WHERE %I = ($1::text::%s).%I',
322 restoretab, origtab_pk, v_logtab_fqn, origtab_pk);
324 RAISE EXCEPTION 'unknown trigger_mode: %', trigger_mode;
327 -- RAISE NOTICE 'DML: %', v_sql;
328 EXECUTE v_sql USING v_rec;
333 RETURN quote_ident(restoretab);
336 LANGUAGE plpgsql VOLATILE;
340 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_schema text, orig_name text, log_schema text, log_name text)
345 level_create text = E'';
349 -- Quoted qualified names
350 orig_qq := quote_ident(orig_schema)||'.'||quote_ident(orig_name);
351 log_qq := quote_ident(log_schema)||'.'||quote_ident(log_name);
354 level_create := level_create
355 ||', trigger_id BIGSERIAL NOT NULL PRIMARY KEY';
357 level_create := level_create
358 ||', trigger_user VARCHAR(32) NOT NULL';
362 'table_log_pl_init: First arg has to be 3, 4 or 5.';
367 EXECUTE 'CREATE TABLE '||log_qq
369 ||', trigger_mode VARCHAR(10) NOT NULL'
370 ||', trigger_tuple VARCHAR(5) NOT NULL'
371 ||', trigger_changed TIMESTAMPTZ NOT NULL'
375 EXECUTE 'CREATE TRIGGER "table_log_trigger_pl" AFTER UPDATE OR INSERT OR DELETE ON '
376 ||orig_qq||' FOR EACH ROW EXECUTE PROCEDURE table_log_pl('
377 ||quote_literal(log_name)||','
379 ||quote_literal(log_schema)||')';
386 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_name text)
391 PERFORM table_log_pl_init(level, orig_name, current_schema());
398 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_name text, log_schema text)
402 PERFORM table_log_pl_init(level, current_schema(), orig_name, log_schema);
409 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_schema text, orig_name text, log_schema text)
413 PERFORM table_log_pl_init(level, orig_schema, orig_name, log_schema,
414 CASE WHEN orig_schema=log_schema
415 THEN orig_name||'_log' ELSE orig_name END);