]> git.8kb.co.uk Git - postgresql/table_log_pl/blob - sql/table_log_pl--0.1.sql
Ensure documentation file name is not ambiguous
[postgresql/table_log_pl] / sql / table_log_pl--0.1.sql
1 -- 
2 -- Glyn Astill 28/08/2015
3 --
4 -- Attempt at pl/pgsql drop-in replacement for table_log C extenstion AKA
5 -- pg Table Audit / PostgreSQL Table Log / tablelog by Andreas Scherbaum
6 --     http://www.postgresql.org/ftp/projects/pgFoundry/tablelog/tablelog/
7 --     http://github.com/andreasscherbaum/table_log
8 --
9 -- A slightly more up to date version of the original C extension can 
10 -- also be found here:
11 --     https://github.com/glynastill/pg_table_audit
12 --
13 -- There are now many better ways to audit DML, using json types or 
14 -- advanced extensions like pgaudit (below), however if for some reason 
15 -- you're stuck with table_log this may help.
16 --
17 --     http://8kb.co.uk/blog/2015/01/19/copying-pavel-stehules-simple-history-table-but-with-the-jsonb-type/
18 --     https://github.com/2ndQuadrant/pgaudit
19
20 -- complain if script is sourced in psql, rather than via CREATE EXTENSION
21 \echo Use "CREATE EXTENSION table_log_pl" to load this file. \quit
22
23 --
24
25 CREATE OR REPLACE FUNCTION table_log_pl() RETURNS TRIGGER AS
26 $BODY$
27 DECLARE
28     v_tabname text;
29     v_loguser boolean := false;
30     v_nspname text;    
31     v_num_col int;
32     v_num_col_log int;
33     v_col_trig text := '';
34     v_val_trig text := '';    
35     v_cols text := '';
36     v_sql text;
37     v_col_cache text;
38     v_max_cache int;
39     v_enable_cache boolean := true;
40 BEGIN
41     -- Notes:
42     --     - The trigger_id comes off sequence, this function is oblivious
43     --     - 3 columns means don't log trigger_user or trigger_id
44     --     - 4 columns means don't log trigger_user
45     --     - 5 columns means log both 
46     --     - To use the column data caching on server versions prior to 
47     --       9.6 add custom var "table_log.column_cache = ''" to postgresql.conf
48
49     IF (TG_NARGS > 2) THEN
50         v_nspname := TG_ARGV[2];
51     ELSE
52         v_nspname := TG_TABLE_SCHEMA;
53     END IF;
54         
55     IF (TG_NARGS > 1 AND TG_ARGV[1]::int = 1) THEN
56         v_loguser := true;
57     END IF;
58     
59     IF (TG_NARGS > 0) THEN
60         v_tabname := TG_ARGV[0];
61     ELSE
62         v_tabname := TG_TABLE_NAME || '_log';
63     END IF;
64
65     -- Retrieve custom variable used as a poor mans cache for multirow statements
66     IF (v_enable_cache) THEN
67         IF (current_setting('server_version_num')::int >= 90600) THEN
68             v_col_cache := current_setting('table_log.column_cache', true);
69         ELSE
70             v_col_cache := current_setting('table_log.column_cache');
71         END IF;
72     END IF;
73     
74     IF (v_enable_cache AND left(v_col_cache, length(TG_RELID::text)+1) = (TG_RELID::text || ':')) THEN      
75         v_cols := right(v_col_cache, (length(TG_RELID::text)+1)*-1);
76     ELSE
77         IF (TG_WHEN != 'AFTER') THEN
78             RAISE EXCEPTION 'table_log: must be fired after event';
79         END IF;
80         IF (TG_LEVEL = 'STATEMENT') THEN
81             RAISE EXCEPTION 'table_log: can''t process STATEMENT events';
82         END IF;    
83     
84         SELECT count(*), string_agg(quote_ident(attname),',') INTO STRICT v_num_col, v_cols
85         FROM pg_catalog.pg_attribute
86         WHERE attrelid = TG_RELID
87         AND attnum > 0
88         AND NOT attisdropped;       
89         
90         IF (v_num_col < 1) THEN
91             RAISE EXCEPTION 'table_log: number of columns in table is < 1, can this happen?';
92         END IF;    
93             
94         SELECT count(*) INTO STRICT v_num_col_log
95         FROM pg_catalog.pg_attribute
96         WHERE attrelid = (v_nspname || '.' || v_tabname)::regclass
97         AND attnum > 0
98         AND NOT attisdropped;    
99         
100         IF (v_num_col_log < 1) THEN
101             RAISE EXCEPTION 'could not get number columns in relation %.%', v_nspname, v_tabname;
102         END IF;
103
104         -- This is the way the original checks column count regardless of trigger_id is presence
105         IF (v_num_col_log != (v_num_col + 3 + v_loguser::int)) AND (v_num_col_log != (v_num_col + 4 + v_loguser::int)) THEN
106             RAISE EXCEPTION 'number colums in relation %.%(%) does not match columns in %.%(%)', TG_TABLE_SCHEMA, TG_TABLE_NAME, v_num_col, v_nspname, v_tabname, v_num_col_log;
107         END IF;
108         
109         -- Set custom variable for use as a poor mans cache for multirow statements
110         IF (v_enable_cache) THEN
111             v_col_cache := (TG_RELID::text || ':' || v_cols);
112             PERFORM set_config('table_log.column_cache', v_col_cache, true);
113         END IF;
114     END IF;
115
116     IF (v_loguser) THEN
117         v_col_trig := v_col_trig || ', "trigger_user"';
118         v_val_trig := format('%L, ', session_user);
119     END IF;
120     v_col_trig := v_col_trig || ', "trigger_mode", "trigger_changed", "trigger_tuple"';
121     v_val_trig := format('%s%L, %L', v_val_trig, TG_OP, current_timestamp);    
122
123     IF (TG_OP != 'INSERT') THEN
124         v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''old'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
125         EXECUTE v_sql  USING OLD;
126     END IF;
127     IF (TG_OP != 'DELETE') THEN
128         v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''new'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
129         EXECUTE v_sql  USING NEW;
130         RETURN NEW;
131     ELSE 
132         RETURN OLD;
133     END IF;
134
135 END;
136 $BODY$
137 LANGUAGE plpgsql VOLATILE;
138
139 --
140
141 CREATE OR REPLACE FUNCTION table_log_pl_restore_table (origtab varchar, origtab_pk varchar, logtab char, logtab_pk char, restoretab char, to_timestamp timestamptz, search_pk char DEFAULT NULL, method int DEFAULT 0, not_temporarly int DEFAULT 0, origtab_schema varchar DEFAULT NULL, logtab_schema varchar DEFAULT NULL) RETURNS varchar AS
142 $BODY$
143 DECLARE
144     v_origtab_cols int;
145     v_logtab_cols int;
146     v_restoretab_cols int;
147     v_origtab_fqn text;
148     v_logtab_fqn text;
149     v_sql text;
150     v_cols text;
151     v_pk_count int;
152     v_rec record;
153     v_old_pk_str text;
154 BEGIN
155
156     -- Notes:
157     --
158     -- The original implimentation doesn't allow fully qualified table 
159     -- references in table_log_restore_table;  You can get some milage 
160     -- out of search_path if required there. For this reason the plpgsql
161     -- version adds the following two optional parameters to those below:
162     --
163     --   - original table schema
164     --   - logging table schema
165     --
166     -- Comments from C implimentation:
167     --
168     -- restore a complete table based on the logging table
169     --
170     -- parameter:   
171     --   - original table name
172     --   - name of primary key in original table
173     --   - logging table
174     --   - name of primary key in logging table
175     --   - restore table name
176     --   - timestamp for restoring data
177     --   - primary key to restore (only this key will be restored) (optional)
178     --   - restore mode
179     --     0: restore from blank table (default)
180     --        needs a complete logging table
181     --     1: restore from actual table backwards
182     --   - dont create table temporarly
183     --     0: create restore table temporarly (default)
184     --     1: create restore table not temporarly
185     --   return:
186     --     not yet defined
187
188     IF origtab IS NULL THEN
189         RAISE NOTICE 'table_log_restore_table: missing original table name';
190     END IF;
191     IF origtab_pk IS NULL THEN
192         RAISE NOTICE 'table_log_restore_table: missing primary key name for original table';
193     END IF;
194     IF logtab IS NULL THEN
195         RAISE NOTICE 'table_log_restore_table: missing log table name';
196     END IF;
197     IF logtab_pk IS NULL THEN
198         RAISE NOTICE 'table_log_restore_table: missing primary key name for log table';
199     END IF;
200     IF restoretab IS NULL THEN
201         RAISE NOTICE 'table_log_restore_table: missing copy table name';
202     END IF;
203     IF to_timestamp IS NULL THEN
204         RAISE NOTICE 'table_log_restore_table: missing timestamp';
205     END IF;
206     IF (search_pk IS NOT NULL) THEN
207         RAISE NOTICE 'table_log_restore_table: will restore a single key';
208     END IF;
209     
210     IF origtab_pk = logtab_pk THEN 
211         RAISE EXCEPTION 'pkey of logging table cannot be the pkey of the original table: % <-> %', origtab_pk, logtab_pk;
212     END IF;
213     
214     v_origtab_fqn := coalesce(quote_ident(origtab_schema) || '.','') || quote_ident(origtab);
215     v_logtab_fqn := coalesce(quote_ident(logtab_schema) || '.','') || quote_ident(logtab);
216     
217     -- Check original table and get column list
218     SELECT string_agg(quote_ident(attname), ','), count(*), count(*) filter (where attname=origtab_pk)
219     INTO v_cols, v_origtab_cols, v_pk_count
220     FROM pg_catalog.pg_class c 
221     JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
222     JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
223     WHERE c.relname = origtab AND c.relkind='r' AND a.attnum > 0 
224     AND (origtab_schema IS NULL OR n.nspname = origtab_schema)
225     AND NOT attisdropped;
226                 
227     IF v_origtab_cols = 0 OR v_cols IS NULL THEN
228         RAISE EXCEPTION 'could not check relation: % (columns = %)', v_origtab_fqn, v_origtab_cols;
229     ELSIF v_pk_count != 1 THEN
230         RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', origtab_pk, v_origtab_fqn;
231     ELSE
232         RAISE NOTICE 'original table: OK (% columns)', v_origtab_cols;
233     END IF;
234         
235     -- Check log table    
236     SELECT count(*), count(*) filter (where attname=logtab_pk) 
237     INTO v_logtab_cols, v_pk_count
238     FROM pg_catalog.pg_class c 
239     JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
240     JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
241     WHERE c.relname = logtab AND c.relkind='r' AND a.attnum > 0
242     AND (logtab_schema IS NULL OR n.nspname = logtab_schema)
243     AND NOT attisdropped;   
244     
245     IF v_logtab_cols = 0 THEN
246         RAISE EXCEPTION 'could not check relation: % (columns = %)', v_logtab_fqn, v_logtab_cols;
247     ELSIF v_pk_count != 1 THEN
248         RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', logtab_pk, v_logtab_fqn;
249     ELSE
250         RAISE NOTICE 'log table: OK (% columns)', v_logtab_cols;
251     END IF;
252        
253     -- Check restore table
254     IF EXISTS(SELECT 1 FROM pg_catalog.pg_class
255               WHERE relname=restoretab AND relkind='r') THEN
256         RAISE EXCEPTION 'restore table already exists: %', restoretab;
257     ELSE
258         RAISE NOTICE 'restore table: OK (doesnt exists)';
259     END IF;
260     
261     -- create restore table 
262     v_sql := 'CREATE';
263     IF not_temporarly = 0 THEN
264         v_sql := v_sql || ' TEMPORARY';
265     END IF;    
266     v_sql := v_sql || format(' TABLE %I AS SELECT * FROM %s', restoretab, v_origtab_fqn);    
267     IF search_pk IS NOT NULL THEN
268         v_sql := v_sql || format(' WHERE %I = %L', origtab_pk, search_pk);
269     END IF;     
270     IF method = 0 THEN
271         RAISE NOTICE 'need logs from start to timestamp: %', to_timestamp;
272         v_sql := v_sql || ' LIMIT 0'; -- Create blank table to roll forward into (need all logs)
273     ELSE
274         RAISE NOTICE 'need logs from end to timestamp: %', to_timestamp;
275     END IF;
276     
277     -- RAISE NOTICE 'DDL: %', v_sql;
278     EXECUTE v_sql;
279   
280     -- now build query for getting logs
281     v_sql := format('SELECT * FROM %s WHERE ', v_logtab_fqn);
282     IF method = 0 THEN
283         v_sql := v_sql || format('trigger_changed <= %L', to_timestamp); -- ROLL FORWARD
284     ELSE
285         v_sql := v_sql || format('trigger_changed >= %L', to_timestamp); -- ROLL BACK
286     END IF;
287     
288     IF search_pk IS NOT NULL THEN
289         v_sql := v_sql || format(' AND %I = %L', origtab_pk, search_pk);
290     END IF;
291     
292     IF method = 0 THEN 
293         v_sql := v_sql || format(' ORDER BY %I ASC', logtab_pk);
294     ELSE
295         v_sql := v_sql || format(' ORDER BY %I DESC', logtab_pk);
296     END IF;
297     
298     -- RAISE NOTICE 'SQL: %', v_sql;
299     
300     FOR v_rec IN EXECUTE v_sql 
301     LOOP        
302         IF v_rec.trigger_mode = 'UPDATE' AND ((method = 0 AND v_rec.trigger_tuple = 'old') OR (method = 1 AND v_rec.trigger_tuple = 'new')) THEN
303             -- For previous update row versions we needn't apply anything; 
304             -- we just note the pk value for the quals when applying the 
305             -- next row change, i.e when rolling forward the old pk value, 
306             -- when rolling back the new pk value
307             EXECUTE format('SELECT ($1::text::%s).%I', v_logtab_fqn, origtab_pk) INTO v_old_pk_str USING v_rec;
308         ELSE
309             -- Apply the row changes from the log table, the following is
310             -- a mass of substitutions, but essentially we're selecting 
311             -- data out of the log table record and casting it into the 
312             -- restore table.
313
314             IF v_rec.trigger_mode = 'UPDATE' THEN 
315                 v_sql := format('UPDATE %I SET (%s) = (SELECT %s FROM (SELECT ($1::text::%s).*) t) WHERE %I = %L',
316                                 restoretab, v_cols, v_cols, v_logtab_fqn, origtab_pk, v_old_pk_str);                
317             ELSIF (v_rec.trigger_mode = 'INSERT' AND method = 0) OR (v_rec.trigger_mode = 'DELETE' AND method != 0) THEN            
318                 v_sql := format('INSERT INTO %I (%s) SELECT %s FROM (SELECT ($1::text::%s).*) t', 
319                                 restoretab, v_cols, v_cols, v_logtab_fqn);
320             ELSIF (v_rec.trigger_mode = 'INSERT' AND method != 0) OR (v_rec.trigger_mode = 'DELETE' AND method = 0) THEN
321                 v_sql := format('DELETE FROM %I WHERE %I = ($1::text::%s).%I', 
322                                 restoretab, origtab_pk, v_logtab_fqn, origtab_pk);
323             ELSE 
324                 RAISE EXCEPTION 'unknown trigger_mode: %', trigger_mode;
325             END IF;            
326             
327             -- RAISE NOTICE 'DML: %', v_sql;
328             EXECUTE v_sql USING v_rec;            
329         END IF;
330
331     END LOOP;
332
333     RETURN quote_ident(restoretab);
334 END;
335 $BODY$
336 LANGUAGE plpgsql VOLATILE;
337
338 --
339
340 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_schema text, orig_name text, log_schema text, log_name text) 
341 RETURNS void AS 
342 $BODY$
343 DECLARE
344     do_log_user  int = 0;
345     level_create text = E'';
346     orig_qq      text;
347     log_qq       text;
348 BEGIN
349     -- Quoted qualified names
350     orig_qq := quote_ident(orig_schema)||'.'||quote_ident(orig_name);
351     log_qq := quote_ident(log_schema)||'.'||quote_ident(log_name);
352
353     IF level <> 3 THEN
354         level_create := level_create
355             ||', trigger_id BIGSERIAL NOT NULL PRIMARY KEY';
356         IF level <> 4 THEN
357             level_create := level_create
358                 ||', trigger_user VARCHAR(32) NOT NULL';
359             do_log_user := 1;
360             IF level <> 5 THEN
361                 RAISE EXCEPTION 
362                     'table_log_pl_init: First arg has to be 3, 4 or 5.';
363             END IF;
364         END IF;
365     END IF;
366     
367     EXECUTE 'CREATE TABLE '||log_qq
368           ||'(LIKE '||orig_qq
369           ||', trigger_mode VARCHAR(10) NOT NULL'
370           ||', trigger_tuple VARCHAR(5) NOT NULL'
371           ||', trigger_changed TIMESTAMPTZ NOT NULL'
372           ||level_create
373           ||')';
374             
375     EXECUTE 'CREATE TRIGGER "table_log_trigger_pl" AFTER UPDATE OR INSERT OR DELETE ON '
376           ||orig_qq||' FOR EACH ROW EXECUTE PROCEDURE table_log_pl('
377           ||quote_literal(log_name)||','
378           ||do_log_user||','
379           ||quote_literal(log_schema)||')';
380
381     RETURN;
382 END;
383 $BODY$
384 LANGUAGE plpgsql;
385
386 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_name text) 
387 RETURNS void AS 
388
389 $BODY$
390 BEGIN
391     PERFORM table_log_pl_init(level, orig_name, current_schema());
392     RETURN;
393 END;
394 $BODY$
395 LANGUAGE plpgsql;
396
397
398 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_name text, log_schema text) 
399 RETURNS void AS 
400 $BODY$
401 BEGIN
402     PERFORM table_log_pl_init(level, current_schema(), orig_name, log_schema);
403     RETURN;
404 END;
405 $BODY$
406 LANGUAGE plpgsql;
407
408
409 CREATE OR REPLACE FUNCTION table_log_pl_init(level int, orig_schema text, orig_name text, log_schema text) 
410 RETURNS void AS 
411 $BODY$
412 BEGIN
413     PERFORM table_log_pl_init(level, orig_schema, orig_name, log_schema,
414         CASE WHEN orig_schema=log_schema 
415             THEN orig_name||'_log' ELSE orig_name END);
416     RETURN;
417 END;
418 $BODY$
419 LANGUAGE plpgsql;