]> git.8kb.co.uk Git - postgresql/table_log_pl/blob - sql/table_log_pl.sql
Test using prepare / execute for dynamic statements to see if there is a performance...
[postgresql/table_log_pl] / sql / table_log_pl.sql
1 -- 
2 -- Glyn Astill 28/08/2015
3 --
4 -- Attempt at pl/pgsql drop-in replacement for table_log C extenstion AKA
5 -- pg Table Audit / PostgreSQL Table Log / tablelog by Andreas Scherbaum
6 --     http://www.postgresql.org/ftp/projects/pgFoundry/tablelog/tablelog/
7 --     http://github.com/andreasscherbaum/table_log
8 --
9 -- A slightly more up to date version of the original C extension can 
10 -- also be found here:
11 --     https://github.com/glynastill/pg_table_audit
12 --
13 -- There are now many better ways to audit DML, using json types or 
14 -- advanced extensions like pgaudit (below), however if for some reason 
15 -- you're stuck with table_log this may help.
16 --
17 --     http://8kb.co.uk/blog/2015/01/19/copying-pavel-stehules-simple-history-table-but-with-the-jsonb-type/
18 --     https://github.com/2ndQuadrant/pgaudit
19
20 SET search_path TO public;
21
22 --
23 DROP FUNCTION IF EXISTS table_log_pl (); -- ignore any error (but do not CASCADE)
24 DROP FUNCTION IF EXISTS table_log_pl_restore_table(varchar, varchar, char, char, char, timestamptz, char, int, int, varchar, varchar);
25
26 --
27
28 CREATE OR REPLACE FUNCTION table_log_pl() RETURNS TRIGGER AS
29 $BODY$
30 DECLARE
31     v_tabname text;
32     v_loguser boolean := false;
33     v_nspname text;
34     v_num_col int;
35     v_num_col_log int;
36     v_col_trig text := '';
37     v_val_trig text := '';
38     v_cols text := '';
39     v_sql text;
40     v_col_cache text;
41     v_enable_cache boolean := true;
42     v_enable_prepare boolean := false;
43     v_tmp text;
44     v_i int;
45
46     v_cols_typ text;
47     v_cols_nam text[];
48     v_vals_old text = '';
49     v_vals_new text = '';
50 BEGIN
51     -- Notes:
52     --     - The trigger_id comes off sequence, this function is oblivious
53     --     - 3 columns means don't log trigger_user or trigger_id
54     --     - 4 columns means don't log trigger_user
55     --     - 5 columns means log both 
56     --     - To use the column data caching on server versions prior to 
57     --       9.6 add custom var "table_log.column_cache = ''" to postgresql.conf
58
59     IF (TG_NARGS > 2) THEN
60         v_nspname := TG_ARGV[2];
61     ELSE
62         v_nspname := TG_TABLE_SCHEMA;
63     END IF;
64         
65     IF (TG_NARGS > 1 AND TG_ARGV[1]::int = 1) THEN
66         v_loguser := true;
67     END IF;
68     
69     IF (TG_NARGS > 0) THEN
70         v_tabname := TG_ARGV[0];
71     ELSE
72         v_tabname := TG_TABLE_NAME || '_log';
73     END IF;
74
75     -- Retrieve custom variable used as a poor mans cache for multirow statements
76     IF (v_enable_cache) THEN
77         IF (current_setting('server_version_num')::int >= 90600) THEN
78             v_col_cache := current_setting('table_log.column_cache', true);
79         ELSE
80             v_col_cache := current_setting('table_log.column_cache');
81         END IF;
82     END IF;
83     
84     -- If column caching is enabled and previous call in this transaction 
85     -- was for the same relation we can retrieve column detail.
86     IF (v_enable_cache AND left(v_col_cache, length(TG_RELID::text)+1) = (TG_RELID::text || ':')) THEN
87         v_cols := right(v_col_cache, (length(TG_RELID::text)+1)*-1);
88         v_cols_nam := ('{' || right(v_col_cache, (length(TG_RELID::text)+1)*-1) || '}')::text[];
89     ELSE -- Otherwise fetch the column detail
90         IF (TG_WHEN != 'AFTER') THEN
91             RAISE EXCEPTION 'table_log: must be fired after event';
92         END IF;
93         IF (TG_LEVEL = 'STATEMENT') THEN
94             RAISE EXCEPTION 'table_log: can''t process STATEMENT events';
95         END IF;
96     
97         SELECT count(*), string_agg(quote_ident(attname),','), string_agg(format_type(atttypid, atttypmod),','), array_agg(quote_ident(attname))
98         INTO STRICT v_num_col, v_cols, v_cols_typ, v_cols_nam
99         FROM pg_catalog.pg_attribute
100         WHERE attrelid = TG_RELID
101         AND attnum > 0
102         AND NOT attisdropped;
103         
104         IF (v_num_col < 1) THEN
105             RAISE EXCEPTION 'table_log: number of columns in table is < 1, can this happen?';
106         END IF;
107             
108         SELECT count(*) INTO STRICT v_num_col_log
109         FROM pg_catalog.pg_attribute
110         WHERE attrelid = (v_nspname || '.' || v_tabname)::regclass
111         AND attnum > 0
112         AND NOT attisdropped;
113         
114         IF (v_num_col_log < 1) THEN
115             RAISE EXCEPTION 'could not get number columns in relation %.%', v_nspname, v_tabname;
116         END IF;
117
118         -- This is the way the original checks column count regardless of trigger_id is presence
119         IF (v_num_col_log != (v_num_col + 3 + v_loguser::int)) AND (v_num_col_log != (v_num_col + 4 + v_loguser::int)) THEN
120             RAISE EXCEPTION 'number colums in relation %.%(%) does not match columns in %.%(%)', TG_TABLE_SCHEMA, TG_TABLE_NAME, v_num_col, v_nspname, v_tabname, v_num_col_log;
121         END IF;
122         
123         -- Set custom variable for use as a poor mans cache for multirow statements
124         IF (v_enable_cache) THEN
125             v_col_cache := (TG_RELID::text || ':' || v_cols);
126             PERFORM set_config('table_log.column_cache', v_col_cache, true);
127         END IF;
128         
129         -- Create a prepared statement for the current table, deallocating
130         -- any old statements we may have prepared.
131         IF (v_enable_prepare) THEN
132             FOR v_tmp IN (SELECT name FROM pg_catalog.pg_prepared_statements WHERE name ~ '^table_log_pl_') LOOP
133                 EXECUTE format('DEALLOCATE %I', v_tmp);
134             END LOOP;
135             
136             SELECT '$' || string_agg(a::text, ', $') INTO v_col_trig FROM generate_series(1,v_num_col+3+v_loguser::int) a;
137             
138             IF (v_loguser) THEN
139                 v_sql := format('PREPARE table_log_pl_%s(%s, text, text, timestamptz, text) AS INSERT INTO %I.%I (%s, "trigger_user", "trigger_mode", "trigger_changed", "trigger_tuple") VALUES (%s)', TG_RELID, v_cols_typ, v_nspname, v_tabname, v_cols, v_col_trig);
140             ELSE
141                 v_sql := format('PREPARE table_log_pl_%s(%s, text, timestamptz, text) AS INSERT INTO %I.%I (%s, "trigger_mode", "trigger_changed", "trigger_tuple") VALUES (%s)', TG_RELID, v_cols_typ, v_nspname, v_tabname, v_cols, v_col_trig);
142             END IF;
143             EXECUTE v_sql;
144         END IF;        
145     END IF;
146     
147     -- If prepared statement method is enabled, construct strings for
148     -- variable parameters and execute.
149     IF (v_enable_prepare) THEN 
150         FOR v_i IN 1..array_upper(v_cols_nam, 1) LOOP
151             IF (TG_OP != 'INSERT') THEN
152                 EXECUTE 'SELECT ($1).' || v_cols_nam[v_i] || '::text' INTO v_tmp USING OLD;
153                 v_vals_old :=  v_vals_old || quote_nullable(v_tmp) || ',';
154             END IF;
155             IF (TG_OP != 'DELETE') THEN
156                 EXECUTE 'SELECT ($1).' || v_cols_nam[v_i] || '::text' INTO v_tmp USING NEW;
157                 v_vals_new :=  v_vals_new || quote_nullable(v_tmp) || ',';
158             END IF;
159         END LOOP;
160         
161         IF (v_loguser) THEN
162             v_vals_new :=  v_vals_new || quote_literal(session_user) || ',';
163             v_vals_old :=  v_vals_old || quote_literal(session_user) || ',';
164         END IF;
165
166         IF (TG_OP != 'INSERT') THEN
167             v_sql := format('EXECUTE table_log_pl_%s(%s%L, %L, %L)', TG_RELID, v_vals_old, TG_OP, current_timestamp, 'old');
168             EXECUTE v_sql;
169         END IF;
170         IF (TG_OP != 'DELETE') THEN
171             v_sql := format('EXECUTE table_log_pl_%s(%s%L, %L, %L)', TG_RELID, v_vals_new, TG_OP, current_timestamp, 'new');
172             EXECUTE v_sql;
173             RETURN NEW;
174         ELSE
175             RETURN OLD;
176         END IF;
177     ELSE -- Otherwise we can do the inserts dynamically.
178         IF (v_loguser) THEN
179             v_col_trig := v_col_trig || ', "trigger_user"';
180             v_val_trig := format('%L, ', session_user);
181         END IF;
182         v_col_trig := v_col_trig || ', "trigger_mode", "trigger_changed", "trigger_tuple"';
183         v_val_trig := format('%s%L, %L', v_val_trig, TG_OP, current_timestamp);
184     
185         IF (TG_OP != 'INSERT') THEN
186             v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''old'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
187             EXECUTE v_sql USING OLD;
188         END IF;
189         IF (TG_OP != 'DELETE') THEN
190             v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''new'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
191             EXECUTE v_sql USING NEW;
192             RETURN NEW;
193         ELSE 
194             RETURN OLD;
195         END IF;
196     END IF;
197
198 END;
199 $BODY$
200 LANGUAGE plpgsql VOLATILE;
201
202 --
203
204 CREATE OR REPLACE FUNCTION table_log_pl_restore_table (origtab varchar, origtab_pk varchar, logtab char, logtab_pk char, restoretab char, to_timestamp timestamptz, search_pk char DEFAULT NULL, method int DEFAULT 0, not_temporarly int DEFAULT 0, origtab_schema varchar DEFAULT NULL, logtab_schema varchar DEFAULT NULL) RETURNS varchar AS
205 $BODY$
206 DECLARE
207     v_origtab_cols int;
208     v_logtab_cols int;
209     v_restoretab_cols int;
210     v_origtab_fqn text;
211     v_logtab_fqn text;
212     v_sql text;
213     v_cols text;
214     v_pk_count int;
215     v_rec record;
216     v_old_pk_str text;
217 BEGIN
218
219     -- Notes:
220     --
221     -- The original implimentation doesn't allow fully qualified table 
222     -- references in table_log_restore_table;  You can get some milage 
223     -- out of search_path if required there. For this reason the plpgsql
224     -- version adds the following two optional parameters to those below:
225     --
226     --   - original table schema
227     --   - logging table schema
228     --
229     -- Comments from C implimentation:
230     --
231     -- restore a complete table based on the logging table
232     --
233     -- parameter:   
234     --   - original table name
235     --   - name of primary key in original table
236     --   - logging table
237     --   - name of primary key in logging table
238     --   - restore table name
239     --   - timestamp for restoring data
240     --   - primary key to restore (only this key will be restored) (optional)
241     --   - restore mode
242     --     0: restore from blank table (default)
243     --        needs a complete logging table
244     --     1: restore from actual table backwards
245     --   - dont create table temporarly
246     --     0: create restore table temporarly (default)
247     --     1: create restore table not temporarly
248     --   return:
249     --     not yet defined
250
251     IF origtab IS NULL THEN
252         RAISE NOTICE 'table_log_restore_table: missing original table name';
253     END IF;
254     IF origtab_pk IS NULL THEN
255         RAISE NOTICE 'table_log_restore_table: missing primary key name for original table';
256     END IF;
257     IF logtab IS NULL THEN
258         RAISE NOTICE 'table_log_restore_table: missing log table name';
259     END IF;
260     IF logtab_pk IS NULL THEN
261         RAISE NOTICE 'table_log_restore_table: missing primary key name for log table';
262     END IF;
263     IF restoretab IS NULL THEN
264         RAISE NOTICE 'table_log_restore_table: missing copy table name';
265     END IF;
266     IF to_timestamp IS NULL THEN
267         RAISE NOTICE 'table_log_restore_table: missing timestamp';
268     END IF;
269     IF (search_pk IS NOT NULL) THEN
270         RAISE NOTICE 'table_log_restore_table: will restore a single key';
271     END IF;
272     
273     IF origtab_pk = logtab_pk THEN 
274         RAISE EXCEPTION 'pkey of logging table cannot be the pkey of the original table: % <-> %', origtab_pk, logtab_pk;
275     END IF;
276     
277     v_origtab_fqn := coalesce(quote_ident(origtab_schema) || '.','') || quote_ident(origtab);
278     v_logtab_fqn := coalesce(quote_ident(logtab_schema) || '.','') || quote_ident(logtab);
279     
280     -- Check original table and get column list
281     SELECT string_agg(quote_ident(attname), ','), count(*), count(*) filter (where attname=origtab_pk)
282     INTO v_cols, v_origtab_cols, v_pk_count
283     FROM pg_catalog.pg_class c 
284     JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
285     JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
286     WHERE c.relname = origtab AND c.relkind='r' AND a.attnum > 0 
287     AND (origtab_schema IS NULL OR n.nspname = origtab_schema)
288     AND NOT attisdropped;
289                 
290     IF v_origtab_cols = 0 OR v_cols IS NULL THEN
291         RAISE EXCEPTION 'could not check relation: % (columns = %)', v_origtab_fqn, v_origtab_cols;
292     ELSIF v_pk_count != 1 THEN
293         RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', origtab_pk, v_origtab_fqn;
294     ELSE
295         RAISE NOTICE 'original table: OK (% columns)', v_origtab_cols;
296     END IF;
297         
298     -- Check log table    
299     SELECT count(*), count(*) filter (where attname=logtab_pk) 
300     INTO v_logtab_cols, v_pk_count
301     FROM pg_catalog.pg_class c 
302     JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
303     JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
304     WHERE c.relname = logtab AND c.relkind='r' AND a.attnum > 0
305     AND (logtab_schema IS NULL OR n.nspname = logtab_schema)
306     AND NOT attisdropped;   
307     
308     IF v_logtab_cols = 0 THEN
309         RAISE EXCEPTION 'could not check relation: % (columns = %)', v_logtab_fqn, v_logtab_cols;
310     ELSIF v_pk_count != 1 THEN
311         RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', logtab_pk, v_logtab_fqn;
312     ELSE
313         RAISE NOTICE 'log table: OK (% columns)', v_logtab_cols;
314     END IF;
315        
316     -- Check restore table
317     IF EXISTS(SELECT 1 FROM pg_catalog.pg_class
318               WHERE relname=restoretab AND relkind='r') THEN
319         RAISE EXCEPTION 'restore table already exists: %', restoretab;
320     ELSE
321         RAISE NOTICE 'restore table: OK (doesnt exists)';
322     END IF;
323     
324     -- create restore table 
325     v_sql := 'CREATE';
326     IF not_temporarly = 0 THEN
327         v_sql := v_sql || ' TEMPORARY';
328     END IF;    
329     v_sql := v_sql || format(' TABLE %I AS SELECT * FROM %s', restoretab, v_origtab_fqn);    
330     IF search_pk IS NOT NULL THEN
331         v_sql := v_sql || format(' WHERE %I = %L', origtab_pk, search_pk);
332     END IF;     
333     IF method = 0 THEN
334         RAISE NOTICE 'need logs from start to timestamp: %', to_timestamp;
335         v_sql := v_sql || ' LIMIT 0'; -- Create blank table to roll forward into (need all logs)
336     ELSE
337         RAISE NOTICE 'need logs from end to timestamp: %', to_timestamp;
338     END IF;
339     
340     -- RAISE NOTICE 'DDL: %', v_sql;
341     EXECUTE v_sql;
342   
343     -- now build query for getting logs
344     v_sql := format('SELECT * FROM %s WHERE ', v_logtab_fqn);
345     IF method = 0 THEN
346         v_sql := v_sql || format('trigger_changed <= %L', to_timestamp); -- ROLL FORWARD
347     ELSE
348         v_sql := v_sql || format('trigger_changed >= %L', to_timestamp); -- ROLL BACK
349     END IF;
350     
351     IF search_pk IS NOT NULL THEN
352         v_sql := v_sql || format(' AND %I = %L', origtab_pk, search_pk);
353     END IF;
354     
355     IF method = 0 THEN 
356         v_sql := v_sql || format(' ORDER BY %I ASC', logtab_pk);
357     ELSE
358         v_sql := v_sql || format(' ORDER BY %I DESC', logtab_pk);
359     END IF;
360     
361     -- RAISE NOTICE 'SQL: %', v_sql;
362     
363     FOR v_rec IN EXECUTE v_sql 
364     LOOP        
365         IF v_rec.trigger_mode = 'UPDATE' AND ((method = 0 AND v_rec.trigger_tuple = 'old') OR (method = 1 AND v_rec.trigger_tuple = 'new')) THEN
366             -- For previous update row versions we needn't apply anything; 
367             -- we just note the pk value for the quals when applying the 
368             -- next row change, i.e when rolling forward the old pk value, 
369             -- when rolling back the new pk value
370             EXECUTE format('SELECT ($1::text::%s).%I', v_logtab_fqn, origtab_pk) INTO v_old_pk_str USING v_rec;
371         ELSE
372             -- Apply the row changes from the log table, the following is
373             -- a mass of substitutions, but essentially we're selecting 
374             -- data out of the log table record and casting it into the 
375             -- restore table.
376
377             IF v_rec.trigger_mode = 'UPDATE' THEN 
378                 v_sql := format('UPDATE %I SET (%s) = (SELECT %s FROM (SELECT ($1::text::%s).*) t) WHERE %I = %L',
379                                 restoretab, v_cols, v_cols, v_logtab_fqn, origtab_pk, v_old_pk_str);                
380             ELSIF (v_rec.trigger_mode = 'INSERT' AND method = 0) OR (v_rec.trigger_mode = 'DELETE' AND method != 0) THEN            
381                 v_sql := format('INSERT INTO %I (%s) SELECT %s FROM (SELECT ($1::text::%s).*) t', 
382                                 restoretab, v_cols, v_cols, v_logtab_fqn);
383             ELSIF (v_rec.trigger_mode = 'INSERT' AND method != 0) OR (v_rec.trigger_mode = 'DELETE' AND method = 0) THEN
384                 v_sql := format('DELETE FROM %I WHERE %I = ($1::text::%s).%I', 
385                                 restoretab, origtab_pk, v_logtab_fqn, origtab_pk);
386             ELSE 
387                 RAISE EXCEPTION 'unknown trigger_mode: %', trigger_mode;
388             END IF;            
389             
390             -- RAISE NOTICE 'DML: %', v_sql;
391             EXECUTE v_sql USING v_rec;            
392         END IF;
393
394     END LOOP;
395
396     RETURN quote_ident(restoretab);
397 END;
398 $BODY$
399 LANGUAGE plpgsql VOLATILE;
400
401 -- tests
402
403 -- drop old trigger
404 DROP TRIGGER test_log_chg ON test; -- ignore any error
405
406 -- create demo table
407 DROP TABLE test; -- ignore any error
408 CREATE TABLE test (
409   id                    INT                 NOT NULL
410                                             PRIMARY KEY,
411   name                  VARCHAR(20)         NOT NULL
412 );
413
414 -- create the table without data from demo table
415 DROP TABLE test_log; -- ignore any error
416 SELECT * INTO test_log FROM test LIMIT 0;
417 ALTER TABLE test_log ADD COLUMN trigger_mode VARCHAR(10);
418 ALTER TABLE test_log ADD COLUMN trigger_tuple VARCHAR(5);
419 ALTER TABLE test_log ADD COLUMN trigger_changed TIMESTAMPTZ;
420 ALTER TABLE test_log ADD COLUMN trigger_id BIGINT;
421 CREATE SEQUENCE test_log_id;
422 SELECT SETVAL('test_log_id', 1, FALSE);
423 ALTER TABLE test_log ALTER COLUMN trigger_id SET DEFAULT NEXTVAL('test_log_id');
424
425 -- create trigger
426 CREATE TRIGGER test_log_chg AFTER UPDATE OR INSERT OR DELETE ON test FOR EACH ROW
427                EXECUTE PROCEDURE table_log_pl();
428
429 -- test trigger
430 INSERT INTO test VALUES (1, 'name');
431 SELECT * FROM test;
432 SELECT * FROM test_log;
433 UPDATE test SET name='other name' WHERE id=1;
434 SELECT * FROM test;
435 SELECT * FROM test_log;
436
437 -- create restore table
438 SELECT table_log_pl_restore_table('test', 'id', 'test_log', 'trigger_id', 'test_recover', NOW());
439 SELECT * FROM test_recover;