]> git.8kb.co.uk Git - postgresql/table_log_pl/blob - sql/table_log_pl--0.2.sql
Test using prepare / execute for dynamic statements to see if there is a performance...
[postgresql/table_log_pl] / sql / table_log_pl--0.2.sql
1 -- 
2 -- Glyn Astill 28/08/2015
3 --
4 -- Attempt at pl/pgsql drop-in replacement for table_log C extenstion AKA
5 -- pg Table Audit / PostgreSQL Table Log / tablelog by Andreas Scherbaum
6 --     http://www.postgresql.org/ftp/projects/pgFoundry/tablelog/tablelog/
7 --     http://github.com/andreasscherbaum/table_log
8 --
9 -- A slightly more up to date version of the original C extension can 
10 -- also be found here:
11 --     https://github.com/glynastill/pg_table_audit
12 --
13 -- There are now many better ways to audit DML, using json types or 
14 -- advanced extensions like pgaudit (below), however if for some reason 
15 -- you're stuck with table_log this may help.
16 --
17 --     http://8kb.co.uk/blog/2015/01/19/copying-pavel-stehules-simple-history-table-but-with-the-jsonb-type/
18 --     https://github.com/2ndQuadrant/pgaudit
19
20 -- complain if script is sourced in psql, rather than via CREATE EXTENSION
21 \echo Use "CREATE EXTENSION table_log_pl" to load this file. \quit
22
23 --
24
25 CREATE OR REPLACE FUNCTION table_log_pl() RETURNS TRIGGER AS
26 $BODY$
27 DECLARE
28     v_tabname text;
29     v_loguser boolean := false;
30     v_nspname text;
31     v_num_col int;
32     v_num_col_log int;
33     v_col_trig text := '';
34     v_val_trig text := '';
35     v_cols text := '';
36     v_sql text;
37     v_col_cache text;
38     v_enable_cache boolean := true;
39     v_enable_prepare boolean := false;
40     v_tmp text;
41     v_i int;
42
43     v_cols_typ text;
44     v_cols_nam text[];
45     v_vals_old text = '';
46     v_vals_new text = '';
47 BEGIN
48     -- Notes:
49     --     - The trigger_id comes off sequence, this function is oblivious
50     --     - 3 columns means don't log trigger_user or trigger_id
51     --     - 4 columns means don't log trigger_user
52     --     - 5 columns means log both 
53     --     - To use the column data caching on server versions prior to 
54     --       9.6 add custom var "table_log.column_cache = ''" to postgresql.conf
55
56     IF (TG_NARGS > 2) THEN
57         v_nspname := TG_ARGV[2];
58     ELSE
59         v_nspname := TG_TABLE_SCHEMA;
60     END IF;
61         
62     IF (TG_NARGS > 1 AND TG_ARGV[1]::int = 1) THEN
63         v_loguser := true;
64     END IF;
65     
66     IF (TG_NARGS > 0) THEN
67         v_tabname := TG_ARGV[0];
68     ELSE
69         v_tabname := TG_TABLE_NAME || '_log';
70     END IF;
71
72     -- Retrieve custom variable used as a poor mans cache for multirow statements
73     IF (v_enable_cache) THEN
74         IF (current_setting('server_version_num')::int >= 90600) THEN
75             v_col_cache := current_setting('table_log.column_cache', true);
76         ELSE
77             v_col_cache := current_setting('table_log.column_cache');
78         END IF;
79     END IF;
80     
81     -- If column caching is enabled and previous call in this transaction 
82     -- was for the same relation we can retrieve column detail.
83     IF (v_enable_cache AND left(v_col_cache, length(TG_RELID::text)+1) = (TG_RELID::text || ':')) THEN
84         v_cols := right(v_col_cache, (length(TG_RELID::text)+1)*-1);
85         v_cols_nam := ('{' || right(v_col_cache, (length(TG_RELID::text)+1)*-1) || '}')::text[];
86     ELSE -- Otherwise fetch the column detail
87         IF (TG_WHEN != 'AFTER') THEN
88             RAISE EXCEPTION 'table_log: must be fired after event';
89         END IF;
90         IF (TG_LEVEL = 'STATEMENT') THEN
91             RAISE EXCEPTION 'table_log: can''t process STATEMENT events';
92         END IF;
93     
94         SELECT count(*), string_agg(quote_ident(attname),','), string_agg(format_type(atttypid, atttypmod),','), array_agg(quote_ident(attname))
95         INTO STRICT v_num_col, v_cols, v_cols_typ, v_cols_nam
96         FROM pg_catalog.pg_attribute
97         WHERE attrelid = TG_RELID
98         AND attnum > 0
99         AND NOT attisdropped;
100         
101         IF (v_num_col < 1) THEN
102             RAISE EXCEPTION 'table_log: number of columns in table is < 1, can this happen?';
103         END IF;
104             
105         SELECT count(*) INTO STRICT v_num_col_log
106         FROM pg_catalog.pg_attribute
107         WHERE attrelid = (v_nspname || '.' || v_tabname)::regclass
108         AND attnum > 0
109         AND NOT attisdropped;
110         
111         IF (v_num_col_log < 1) THEN
112             RAISE EXCEPTION 'could not get number columns in relation %.%', v_nspname, v_tabname;
113         END IF;
114
115         -- This is the way the original checks column count regardless of trigger_id is presence
116         IF (v_num_col_log != (v_num_col + 3 + v_loguser::int)) AND (v_num_col_log != (v_num_col + 4 + v_loguser::int)) THEN
117             RAISE EXCEPTION 'number colums in relation %.%(%) does not match columns in %.%(%)', TG_TABLE_SCHEMA, TG_TABLE_NAME, v_num_col, v_nspname, v_tabname, v_num_col_log;
118         END IF;
119         
120         -- Set custom variable for use as a poor mans cache for multirow statements
121         IF (v_enable_cache) THEN
122             v_col_cache := (TG_RELID::text || ':' || v_cols);
123             PERFORM set_config('table_log.column_cache', v_col_cache, true);
124         END IF;
125         
126         -- Create a prepared statement for the current table, deallocating
127         -- any old statements we may have prepared.
128         IF (v_enable_prepare) THEN
129             FOR v_tmp IN (SELECT name FROM pg_catalog.pg_prepared_statements WHERE name ~ '^table_log_pl_') LOOP
130                 EXECUTE format('DEALLOCATE %I', v_tmp);
131             END LOOP;
132             
133             SELECT '$' || string_agg(a::text, ', $') INTO v_col_trig FROM generate_series(1,v_num_col+3+v_loguser::int) a;
134             
135             IF (v_loguser) THEN
136                 v_sql := format('PREPARE table_log_pl_%s(%s, text, text, timestamptz, text) AS INSERT INTO %I.%I (%s, "trigger_user", "trigger_mode", "trigger_changed", "trigger_tuple") VALUES (%s)', TG_RELID, v_cols_typ, v_nspname, v_tabname, v_cols, v_col_trig);
137             ELSE
138                 v_sql := format('PREPARE table_log_pl_%s(%s, text, timestamptz, text) AS INSERT INTO %I.%I (%s, "trigger_mode", "trigger_changed", "trigger_tuple") VALUES (%s)', TG_RELID, v_cols_typ, v_nspname, v_tabname, v_cols, v_col_trig);
139             END IF;
140             EXECUTE v_sql;
141         END IF;        
142     END IF;
143     
144     -- If prepared statement method is enabled, construct strings for
145     -- variable parameters and execute.
146     IF (v_enable_prepare) THEN 
147         FOR v_i IN 1..array_upper(v_cols_nam, 1) LOOP
148             IF (TG_OP != 'INSERT') THEN
149                 EXECUTE 'SELECT ($1).' || v_cols_nam[v_i] || '::text' INTO v_tmp USING OLD;
150                 v_vals_old :=  v_vals_old || quote_nullable(v_tmp) || ',';
151             END IF;
152             IF (TG_OP != 'DELETE') THEN
153                 EXECUTE 'SELECT ($1).' || v_cols_nam[v_i] || '::text' INTO v_tmp USING NEW;
154                 v_vals_new :=  v_vals_new || quote_nullable(v_tmp) || ',';
155             END IF;
156         END LOOP;
157         
158         IF (v_loguser) THEN
159             v_vals_new :=  v_vals_new || quote_literal(session_user) || ',';
160             v_vals_old :=  v_vals_old || quote_literal(session_user) || ',';
161         END IF;
162
163         IF (TG_OP != 'INSERT') THEN
164             v_sql := format('EXECUTE table_log_pl_%s(%s%L, %L, %L)', TG_RELID, v_vals_old, TG_OP, current_timestamp, 'old');
165             EXECUTE v_sql;
166         END IF;
167         IF (TG_OP != 'DELETE') THEN
168             v_sql := format('EXECUTE table_log_pl_%s(%s%L, %L, %L)', TG_RELID, v_vals_new, TG_OP, current_timestamp, 'new');
169             EXECUTE v_sql;
170             RETURN NEW;
171         ELSE
172             RETURN OLD;
173         END IF;
174     ELSE -- Otherwise we can do the inserts dynamically.
175         IF (v_loguser) THEN
176             v_col_trig := v_col_trig || ', "trigger_user"';
177             v_val_trig := format('%L, ', session_user);
178         END IF;
179         v_col_trig := v_col_trig || ', "trigger_mode", "trigger_changed", "trigger_tuple"';
180         v_val_trig := format('%s%L, %L', v_val_trig, TG_OP, current_timestamp);
181     
182         IF (TG_OP != 'INSERT') THEN
183             v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''old'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
184             EXECUTE v_sql USING OLD;
185         END IF;
186         IF (TG_OP != 'DELETE') THEN
187             v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''new'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
188             EXECUTE v_sql USING NEW;
189             RETURN NEW;
190         ELSE 
191             RETURN OLD;
192         END IF;
193     END IF;
194
195 END;
196 $BODY$
197 LANGUAGE plpgsql VOLATILE;
198
199 --
200
201 CREATE OR REPLACE FUNCTION table_log_pl_restore_table (origtab varchar, origtab_pk varchar, logtab char, logtab_pk char, restoretab char, to_timestamp timestamptz, search_pk char DEFAULT NULL, method int DEFAULT 0, not_temporarly int DEFAULT 0, origtab_schema varchar DEFAULT NULL, logtab_schema varchar DEFAULT NULL) RETURNS varchar AS
202 $BODY$
203 DECLARE
204     v_origtab_cols int;
205     v_logtab_cols int;
206     v_restoretab_cols int;
207     v_origtab_fqn text;
208     v_logtab_fqn text;
209     v_sql text;
210     v_cols text;
211     v_pk_count int;
212     v_rec record;
213     v_old_pk_str text;
214 BEGIN
215
216     -- Notes:
217     --
218     -- The original implimentation doesn't allow fully qualified table 
219     -- references in table_log_restore_table;  You can get some milage 
220     -- out of search_path if required there. For this reason the plpgsql
221     -- version adds the following two optional parameters to those below:
222     --
223     --   - original table schema
224     --   - logging table schema
225     --
226     -- Comments from C implimentation:
227     --
228     -- restore a complete table based on the logging table
229     --
230     -- parameter:   
231     --   - original table name
232     --   - name of primary key in original table
233     --   - logging table
234     --   - name of primary key in logging table
235     --   - restore table name
236     --   - timestamp for restoring data
237     --   - primary key to restore (only this key will be restored) (optional)
238     --   - restore mode
239     --     0: restore from blank table (default)
240     --        needs a complete logging table
241     --     1: restore from actual table backwards
242     --   - dont create table temporarly
243     --     0: create restore table temporarly (default)
244     --     1: create restore table not temporarly
245     --   return:
246     --     not yet defined
247
248     IF origtab IS NULL THEN
249         RAISE NOTICE 'table_log_restore_table: missing original table name';
250     END IF;
251     IF origtab_pk IS NULL THEN
252         RAISE NOTICE 'table_log_restore_table: missing primary key name for original table';
253     END IF;
254     IF logtab IS NULL THEN
255         RAISE NOTICE 'table_log_restore_table: missing log table name';
256     END IF;
257     IF logtab_pk IS NULL THEN
258         RAISE NOTICE 'table_log_restore_table: missing primary key name for log table';
259     END IF;
260     IF restoretab IS NULL THEN
261         RAISE NOTICE 'table_log_restore_table: missing copy table name';
262     END IF;
263     IF to_timestamp IS NULL THEN
264         RAISE NOTICE 'table_log_restore_table: missing timestamp';
265     END IF;
266     IF (search_pk IS NOT NULL) THEN
267         RAISE NOTICE 'table_log_restore_table: will restore a single key';
268     END IF;
269     
270     IF origtab_pk = logtab_pk THEN 
271         RAISE EXCEPTION 'pkey of logging table cannot be the pkey of the original table: % <-> %', origtab_pk, logtab_pk;
272     END IF;
273     
274     v_origtab_fqn := coalesce(quote_ident(origtab_schema) || '.','') || quote_ident(origtab);
275     v_logtab_fqn := coalesce(quote_ident(logtab_schema) || '.','') || quote_ident(logtab);
276     
277     -- Check original table and get column list
278     SELECT string_agg(quote_ident(attname), ','), count(*), count(*) filter (where attname=origtab_pk)
279     INTO v_cols, v_origtab_cols, v_pk_count
280     FROM pg_catalog.pg_class c 
281     JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
282     JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
283     WHERE c.relname = origtab AND c.relkind='r' AND a.attnum > 0 
284     AND (origtab_schema IS NULL OR n.nspname = origtab_schema)
285     AND NOT attisdropped;
286                 
287     IF v_origtab_cols = 0 OR v_cols IS NULL THEN
288         RAISE EXCEPTION 'could not check relation: % (columns = %)', v_origtab_fqn, v_origtab_cols;
289     ELSIF v_pk_count != 1 THEN
290         RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', origtab_pk, v_origtab_fqn;
291     ELSE
292         RAISE NOTICE 'original table: OK (% columns)', v_origtab_cols;
293     END IF;
294         
295     -- Check log table    
296     SELECT count(*), count(*) filter (where attname=logtab_pk) 
297     INTO v_logtab_cols, v_pk_count
298     FROM pg_catalog.pg_class c 
299     JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
300     JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
301     WHERE c.relname = logtab AND c.relkind='r' AND a.attnum > 0
302     AND (logtab_schema IS NULL OR n.nspname = logtab_schema)
303     AND NOT attisdropped;   
304     
305     IF v_logtab_cols = 0 THEN
306         RAISE EXCEPTION 'could not check relation: % (columns = %)', v_logtab_fqn, v_logtab_cols;
307     ELSIF v_pk_count != 1 THEN
308         RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', logtab_pk, v_logtab_fqn;
309     ELSE
310         RAISE NOTICE 'log table: OK (% columns)', v_logtab_cols;
311     END IF;
312        
313     -- Check restore table
314     IF EXISTS(SELECT 1 FROM pg_catalog.pg_class
315               WHERE relname=restoretab AND relkind='r') THEN
316         RAISE EXCEPTION 'restore table already exists: %', restoretab;
317     ELSE
318         RAISE NOTICE 'restore table: OK (doesnt exists)';
319     END IF;
320     
321     -- create restore table 
322     v_sql := 'CREATE';
323     IF not_temporarly = 0 THEN
324         v_sql := v_sql || ' TEMPORARY';
325     END IF;    
326     v_sql := v_sql || format(' TABLE %I AS SELECT * FROM %s', restoretab, v_origtab_fqn);    
327     IF search_pk IS NOT NULL THEN
328         v_sql := v_sql || format(' WHERE %I = %L', origtab_pk, search_pk);
329     END IF;     
330     IF method = 0 THEN
331         RAISE NOTICE 'need logs from start to timestamp: %', to_timestamp;
332         v_sql := v_sql || ' LIMIT 0'; -- Create blank table to roll forward into (need all logs)
333     ELSE
334         RAISE NOTICE 'need logs from end to timestamp: %', to_timestamp;
335     END IF;
336     
337     -- RAISE NOTICE 'DDL: %', v_sql;
338     EXECUTE v_sql;
339   
340     -- now build query for getting logs
341     v_sql := format('SELECT * FROM %s WHERE ', v_logtab_fqn);
342     IF method = 0 THEN
343         v_sql := v_sql || format('trigger_changed <= %L', to_timestamp); -- ROLL FORWARD
344     ELSE
345         v_sql := v_sql || format('trigger_changed >= %L', to_timestamp); -- ROLL BACK
346     END IF;
347     
348     IF search_pk IS NOT NULL THEN
349         v_sql := v_sql || format(' AND %I = %L', origtab_pk, search_pk);
350     END IF;
351     
352     IF method = 0 THEN 
353         v_sql := v_sql || format(' ORDER BY %I ASC', logtab_pk);
354     ELSE
355         v_sql := v_sql || format(' ORDER BY %I DESC', logtab_pk);
356     END IF;
357     
358     -- RAISE NOTICE 'SQL: %', v_sql;
359     
360     FOR v_rec IN EXECUTE v_sql 
361     LOOP        
362         IF v_rec.trigger_mode = 'UPDATE' AND ((method = 0 AND v_rec.trigger_tuple = 'old') OR (method = 1 AND v_rec.trigger_tuple = 'new')) THEN
363             -- For previous update row versions we needn't apply anything; 
364             -- we just note the pk value for the quals when applying the 
365             -- next row change, i.e when rolling forward the old pk value, 
366             -- when rolling back the new pk value
367             EXECUTE format('SELECT ($1::text::%s).%I', v_logtab_fqn, origtab_pk) INTO v_old_pk_str USING v_rec;
368         ELSE
369             -- Apply the row changes from the log table, the following is
370             -- a mass of substitutions, but essentially we're selecting 
371             -- data out of the log table record and casting it into the 
372             -- restore table.
373
374             IF v_rec.trigger_mode = 'UPDATE' THEN 
375                 v_sql := format('UPDATE %I SET (%s) = (SELECT %s FROM (SELECT ($1::text::%s).*) t) WHERE %I = %L',
376                                 restoretab, v_cols, v_cols, v_logtab_fqn, origtab_pk, v_old_pk_str);                
377             ELSIF (v_rec.trigger_mode = 'INSERT' AND method = 0) OR (v_rec.trigger_mode = 'DELETE' AND method != 0) THEN            
378                 v_sql := format('INSERT INTO %I (%s) SELECT %s FROM (SELECT ($1::text::%s).*) t', 
379                                 restoretab, v_cols, v_cols, v_logtab_fqn);
380             ELSIF (v_rec.trigger_mode = 'INSERT' AND method != 0) OR (v_rec.trigger_mode = 'DELETE' AND method = 0) THEN
381                 v_sql := format('DELETE FROM %I WHERE %I = ($1::text::%s).%I', 
382                                 restoretab, origtab_pk, v_logtab_fqn, origtab_pk);
383             ELSE 
384                 RAISE EXCEPTION 'unknown trigger_mode: %', trigger_mode;
385             END IF;            
386             
387             -- RAISE NOTICE 'DML: %', v_sql;
388             EXECUTE v_sql USING v_rec;            
389         END IF;
390
391     END LOOP;
392
393     RETURN quote_ident(restoretab);
394 END;
395 $BODY$
396 LANGUAGE plpgsql VOLATILE;
397
398 --
399
400 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_schema text, orig_name text, log_schema text, log_name text) 
401 RETURNS void AS 
402 $BODY$
403 DECLARE
404     do_log_user  int = 0;
405     level_create text = E'''';
406     orig_qq      text;
407     log_qq       text;
408 BEGIN
409     -- Quoted qualified names
410     orig_qq := quote_ident(orig_schema)||'.'||quote_ident(orig_name);
411     log_qq := quote_ident(log_schema)||'.'||quote_ident(log_name);
412
413     IF level <> 3 THEN
414         level_create := level_create
415             ||', trigger_id BIGSERIAL NOT NULL PRIMARY KEY';
416         IF level <> 4 THEN
417             level_create := level_create
418                 ||', trigger_user VARCHAR(32) NOT NULL';
419             do_log_user := 1;
420             IF level <> 5 THEN
421                 RAISE EXCEPTION 
422                     'table_log_pl_init: First arg has to be 3, 4 or 5.';
423             END IF;
424         END IF;
425     END IF;
426     
427     EXECUTE 'CREATE TABLE '||log_qq
428           ||'(LIKE '||orig_qq
429           ||', trigger_mode VARCHAR(10) NOT NULL'
430           ||', trigger_tuple VARCHAR(5) NOT NULL'
431           ||', trigger_changed TIMESTAMPTZ NOT NULL'
432           ||level_create
433           ||')';
434             
435     EXECUTE 'CREATE TRIGGER "table_log_trigger_pl" AFTER UPDATE OR INSERT OR DELETE ON '
436           ||orig_qq||' FOR EACH ROW EXECUTE PROCEDURE table_log_pl('
437           ||quote_literal(log_name)||','
438           ||do_log_user||','
439           ||quote_literal(log_schema)||')';
440
441     RETURN;
442 END;
443 $BODY$
444 LANGUAGE plpgsql;
445
446 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_name text) 
447 RETURNS void AS 
448
449 $BODY$
450 BEGIN
451     PERFORM table_log_pl_init(level, orig_name, current_schema());
452     RETURN;
453 END;
454 $BODY$
455 LANGUAGE plpgsql;
456
457
458 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_name text, log_schema text) 
459 RETURNS void AS 
460 $BODY$
461 BEGIN
462     PERFORM table_log_pl_init(level, current_schema(), orig_name, log_schema);
463     RETURN;
464 END;
465 $BODY$
466 LANGUAGE plpgsql;
467
468
469 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_schema text, orig_name text, log_schema text) 
470 RETURNS void AS 
471 $BODY$
472 BEGIN
473     PERFORM table_log_pl_init(level, orig_schema, orig_name, log_schema,
474         CASE WHEN orig_schema=log_schema 
475             THEN orig_name||'_log' ELSE orig_name END);
476     RETURN;
477 END;
478 $BODY$
479 LANGUAGE plpgsql;