]> git.8kb.co.uk Git - postgresql/table_log_pl/blob - sql/table_log_pl.sql
Ensure documentation file name is not ambiguous
[postgresql/table_log_pl] / sql / table_log_pl.sql
1 -- 
2 -- Glyn Astill 28/08/2015
3 --
4 -- Attempt at pl/pgsql drop-in replacement for table_log C extenstion AKA
5 -- pg Table Audit / PostgreSQL Table Log / tablelog by Andreas Scherbaum
6 --     http://www.postgresql.org/ftp/projects/pgFoundry/tablelog/tablelog/
7 --     http://github.com/andreasscherbaum/table_log
8 --
9 -- A slightly more up to date version of the original C extension can 
10 -- also be found here:
11 --     https://github.com/glynastill/pg_table_audit
12 --
13 -- There are now many better ways to audit DML, using json types or 
14 -- advanced extensions like pgaudit (below), however if for some reason 
15 -- you're stuck with table_log this may help.
16 --
17 --     http://8kb.co.uk/blog/2015/01/19/copying-pavel-stehules-simple-history-table-but-with-the-jsonb-type/
18 --     https://github.com/2ndQuadrant/pgaudit
19
20 SET search_path TO public;
21
22 --
23 DROP FUNCTION IF EXISTS table_log_pl (); -- ignore any error (but do not CASCADE)
24 DROP FUNCTION IF EXISTS table_log_pl_restore_table(varchar, varchar, char, char, char, timestamptz, char, int, int, varchar, varchar);
25
26 --
27
28 CREATE OR REPLACE FUNCTION table_log_pl() RETURNS TRIGGER AS
29 $BODY$
30 DECLARE
31     v_tabname text;
32     v_loguser boolean := false;
33     v_nspname text;    
34     v_num_col int;
35     v_num_col_log int;
36     v_col_trig text := '';
37     v_val_trig text := '';    
38     v_cols text := '';
39     v_sql text;
40     v_col_cache text;
41     v_max_cache int;
42     v_enable_cache boolean := true;
43 BEGIN
44     -- Notes:
45     --     - The trigger_id comes off sequence, this function is oblivious
46     --     - 3 columns means don't log trigger_user or trigger_id
47     --     - 4 columns means don't log trigger_user
48     --     - 5 columns means log both 
49     --     - To use the column data caching on server versions prior to 
50     --       9.6 add custom var "table_log.column_cache = ''" to postgresql.conf
51
52     IF (TG_NARGS > 2) THEN
53         v_nspname := TG_ARGV[2];
54     ELSE
55         v_nspname := TG_TABLE_SCHEMA;
56     END IF;
57         
58     IF (TG_NARGS > 1 AND TG_ARGV[1]::int = 1) THEN
59         v_loguser := true;
60     END IF;
61     
62     IF (TG_NARGS > 0) THEN
63         v_tabname := TG_ARGV[0];
64     ELSE
65         v_tabname := TG_TABLE_NAME || '_log';
66     END IF;
67
68     -- Retrieve custom variable used as a poor mans cache for multirow statements
69     IF (v_enable_cache) THEN
70         IF (current_setting('server_version_num')::int >= 90600) THEN
71             v_col_cache := current_setting('table_log.column_cache', true);
72         ELSE
73             v_col_cache := current_setting('table_log.column_cache');
74         END IF;
75     END IF;
76     
77     IF (v_enable_cache AND left(v_col_cache, length(TG_RELID::text)+1) = (TG_RELID::text || ':')) THEN      
78         v_cols := right(v_col_cache, (length(TG_RELID::text)+1)*-1);
79     ELSE
80         IF (TG_WHEN != 'AFTER') THEN
81             RAISE EXCEPTION 'table_log: must be fired after event';
82         END IF;
83         IF (TG_LEVEL = 'STATEMENT') THEN
84             RAISE EXCEPTION 'table_log: can''t process STATEMENT events';
85         END IF;    
86     
87         SELECT count(*), string_agg(quote_ident(attname),',') INTO STRICT v_num_col, v_cols
88         FROM pg_catalog.pg_attribute
89         WHERE attrelid = TG_RELID
90         AND attnum > 0
91         AND NOT attisdropped;       
92         
93         IF (v_num_col < 1) THEN
94             RAISE EXCEPTION 'table_log: number of columns in table is < 1, can this happen?';
95         END IF;    
96             
97         SELECT count(*) INTO STRICT v_num_col_log
98         FROM pg_catalog.pg_attribute
99         WHERE attrelid = (v_nspname || '.' || v_tabname)::regclass
100         AND attnum > 0
101         AND NOT attisdropped;    
102         
103         IF (v_num_col_log < 1) THEN
104             RAISE EXCEPTION 'could not get number columns in relation %.%', v_nspname, v_tabname;
105         END IF;
106
107         -- This is the way the original checks column count regardless of trigger_id is presence
108         IF (v_num_col_log != (v_num_col + 3 + v_loguser::int)) AND (v_num_col_log != (v_num_col + 4 + v_loguser::int)) THEN
109             RAISE EXCEPTION 'number colums in relation %.%(%) does not match columns in %.%(%)', TG_TABLE_SCHEMA, TG_TABLE_NAME, v_num_col, v_nspname, v_tabname, v_num_col_log;
110         END IF;
111         
112         -- Set custom variable for use as a poor mans cache for multirow statements
113         IF (v_enable_cache) THEN
114             v_col_cache := (TG_RELID::text || ':' || v_cols);
115             PERFORM set_config('table_log.column_cache', v_col_cache, true);
116         END IF;
117     END IF;
118
119     IF (v_loguser) THEN
120         v_col_trig := v_col_trig || ', "trigger_user"';
121         v_val_trig := format('%L, ', session_user);
122     END IF;
123     v_col_trig := v_col_trig || ', "trigger_mode", "trigger_changed", "trigger_tuple"';
124     v_val_trig := format('%s%L, %L', v_val_trig, TG_OP, current_timestamp);    
125
126     IF (TG_OP != 'INSERT') THEN
127         v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''old'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
128         EXECUTE v_sql  USING OLD;
129     END IF;
130     IF (TG_OP != 'DELETE') THEN
131         v_sql := format('INSERT INTO %I.%I (%s%s) SELECT %s, %s, ''new'' FROM (SELECT ($1::text::%I).*) t', v_nspname, v_tabname, v_cols, v_col_trig, v_cols, v_val_trig, TG_RELID::regclass);
132         EXECUTE v_sql  USING NEW;
133         RETURN NEW;
134     ELSE 
135         RETURN OLD;
136     END IF;
137
138 END;
139 $BODY$
140 LANGUAGE plpgsql VOLATILE;
141
142 --
143
144 CREATE OR REPLACE FUNCTION table_log_pl_restore_table (origtab varchar, origtab_pk varchar, logtab char, logtab_pk char, restoretab char, to_timestamp timestamptz, search_pk char DEFAULT NULL, method int DEFAULT 0, not_temporarly int DEFAULT 0, origtab_schema varchar DEFAULT NULL, logtab_schema varchar DEFAULT NULL) RETURNS varchar AS
145 $BODY$
146 DECLARE
147     v_origtab_cols int;
148     v_logtab_cols int;
149     v_restoretab_cols int;
150     v_origtab_fqn text;
151     v_logtab_fqn text;
152     v_sql text;
153     v_cols text;
154     v_pk_count int;
155     v_rec record;
156     v_old_pk_str text;
157 BEGIN
158
159     -- Notes:
160     --
161     -- The original implimentation doesn't allow fully qualified table 
162     -- references in table_log_restore_table;  You can get some milage 
163     -- out of search_path if required there. For this reason the plpgsql
164     -- version adds the following two optional parameters to those below:
165     --
166     --   - original table schema
167     --   - logging table schema
168     --
169     -- Comments from C implimentation:
170     --
171     -- restore a complete table based on the logging table
172     --
173     -- parameter:   
174     --   - original table name
175     --   - name of primary key in original table
176     --   - logging table
177     --   - name of primary key in logging table
178     --   - restore table name
179     --   - timestamp for restoring data
180     --   - primary key to restore (only this key will be restored) (optional)
181     --   - restore mode
182     --     0: restore from blank table (default)
183     --        needs a complete logging table
184     --     1: restore from actual table backwards
185     --   - dont create table temporarly
186     --     0: create restore table temporarly (default)
187     --     1: create restore table not temporarly
188     --   return:
189     --     not yet defined
190
191     IF origtab IS NULL THEN
192         RAISE NOTICE 'table_log_restore_table: missing original table name';
193     END IF;
194     IF origtab_pk IS NULL THEN
195         RAISE NOTICE 'table_log_restore_table: missing primary key name for original table';
196     END IF;
197     IF logtab IS NULL THEN
198         RAISE NOTICE 'table_log_restore_table: missing log table name';
199     END IF;
200     IF logtab_pk IS NULL THEN
201         RAISE NOTICE 'table_log_restore_table: missing primary key name for log table';
202     END IF;
203     IF restoretab IS NULL THEN
204         RAISE NOTICE 'table_log_restore_table: missing copy table name';
205     END IF;
206     IF to_timestamp IS NULL THEN
207         RAISE NOTICE 'table_log_restore_table: missing timestamp';
208     END IF;
209     IF (search_pk IS NOT NULL) THEN
210         RAISE NOTICE 'table_log_restore_table: will restore a single key';
211     END IF;
212     
213     IF origtab_pk = logtab_pk THEN 
214         RAISE EXCEPTION 'pkey of logging table cannot be the pkey of the original table: % <-> %', origtab_pk, logtab_pk;
215     END IF;
216     
217     v_origtab_fqn := coalesce(quote_ident(origtab_schema) || '.','') || quote_ident(origtab);
218     v_logtab_fqn := coalesce(quote_ident(logtab_schema) || '.','') || quote_ident(logtab);
219     
220     -- Check original table and get column list
221     SELECT string_agg(quote_ident(attname), ','), count(*), count(*) filter (where attname=origtab_pk)
222     INTO v_cols, v_origtab_cols, v_pk_count
223     FROM pg_catalog.pg_class c 
224     JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
225     JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
226     WHERE c.relname = origtab AND c.relkind='r' AND a.attnum > 0 
227     AND (origtab_schema IS NULL OR n.nspname = origtab_schema)
228     AND NOT attisdropped;
229                 
230     IF v_origtab_cols = 0 OR v_cols IS NULL THEN
231         RAISE EXCEPTION 'could not check relation: % (columns = %)', v_origtab_fqn, v_origtab_cols;
232     ELSIF v_pk_count != 1 THEN
233         RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', origtab_pk, v_origtab_fqn;
234     ELSE
235         RAISE NOTICE 'original table: OK (% columns)', v_origtab_cols;
236     END IF;
237         
238     -- Check log table    
239     SELECT count(*), count(*) filter (where attname=logtab_pk) 
240     INTO v_logtab_cols, v_pk_count
241     FROM pg_catalog.pg_class c 
242     JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
243     JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
244     WHERE c.relname = logtab AND c.relkind='r' AND a.attnum > 0
245     AND (logtab_schema IS NULL OR n.nspname = logtab_schema)
246     AND NOT attisdropped;   
247     
248     IF v_logtab_cols = 0 THEN
249         RAISE EXCEPTION 'could not check relation: % (columns = %)', v_logtab_fqn, v_logtab_cols;
250     ELSIF v_pk_count != 1 THEN
251         RAISE EXCEPTION 'could not check relation: (missing pkey) % in table %', logtab_pk, v_logtab_fqn;
252     ELSE
253         RAISE NOTICE 'log table: OK (% columns)', v_logtab_cols;
254     END IF;
255        
256     -- Check restore table
257     IF EXISTS(SELECT 1 FROM pg_catalog.pg_class
258               WHERE relname=restoretab AND relkind='r') THEN
259         RAISE EXCEPTION 'restore table already exists: %', restoretab;
260     ELSE
261         RAISE NOTICE 'restore table: OK (doesnt exists)';
262     END IF;
263     
264     -- create restore table 
265     v_sql := 'CREATE';
266     IF not_temporarly = 0 THEN
267         v_sql := v_sql || ' TEMPORARY';
268     END IF;    
269     v_sql := v_sql || format(' TABLE %I AS SELECT * FROM %s', restoretab, v_origtab_fqn);    
270     IF search_pk IS NOT NULL THEN
271         v_sql := v_sql || format(' WHERE %I = %L', origtab_pk, search_pk);
272     END IF;     
273     IF method = 0 THEN
274         RAISE NOTICE 'need logs from start to timestamp: %', to_timestamp;
275         v_sql := v_sql || ' LIMIT 0'; -- Create blank table to roll forward into (need all logs)
276     ELSE
277         RAISE NOTICE 'need logs from end to timestamp: %', to_timestamp;
278     END IF;
279     
280     -- RAISE NOTICE 'DDL: %', v_sql;
281     EXECUTE v_sql;
282   
283     -- now build query for getting logs
284     v_sql := format('SELECT * FROM %s WHERE ', v_logtab_fqn);
285     IF method = 0 THEN
286         v_sql := v_sql || format('trigger_changed <= %L', to_timestamp); -- ROLL FORWARD
287     ELSE
288         v_sql := v_sql || format('trigger_changed >= %L', to_timestamp); -- ROLL BACK
289     END IF;
290     
291     IF search_pk IS NOT NULL THEN
292         v_sql := v_sql || format(' AND %I = %L', origtab_pk, search_pk);
293     END IF;
294     
295     IF method = 0 THEN 
296         v_sql := v_sql || format(' ORDER BY %I ASC', logtab_pk);
297     ELSE
298         v_sql := v_sql || format(' ORDER BY %I DESC', logtab_pk);
299     END IF;
300     
301     -- RAISE NOTICE 'SQL: %', v_sql;
302     
303     FOR v_rec IN EXECUTE v_sql 
304     LOOP        
305         IF v_rec.trigger_mode = 'UPDATE' AND ((method = 0 AND v_rec.trigger_tuple = 'old') OR (method = 1 AND v_rec.trigger_tuple = 'new')) THEN
306             -- For previous update row versions we needn't apply anything; 
307             -- we just note the pk value for the quals when applying the 
308             -- next row change, i.e when rolling forward the old pk value, 
309             -- when rolling back the new pk value
310             EXECUTE format('SELECT ($1::text::%s).%I', v_logtab_fqn, origtab_pk) INTO v_old_pk_str USING v_rec;
311         ELSE
312             -- Apply the row changes from the log table, the following is
313             -- a mass of substitutions, but essentially we're selecting 
314             -- data out of the log table record and casting it into the 
315             -- restore table.
316
317             IF v_rec.trigger_mode = 'UPDATE' THEN 
318                 v_sql := format('UPDATE %I SET (%s) = (SELECT %s FROM (SELECT ($1::text::%s).*) t) WHERE %I = %L',
319                                 restoretab, v_cols, v_cols, v_logtab_fqn, origtab_pk, v_old_pk_str);                
320             ELSIF (v_rec.trigger_mode = 'INSERT' AND method = 0) OR (v_rec.trigger_mode = 'DELETE' AND method != 0) THEN            
321                 v_sql := format('INSERT INTO %I (%s) SELECT %s FROM (SELECT ($1::text::%s).*) t', 
322                                 restoretab, v_cols, v_cols, v_logtab_fqn);
323             ELSIF (v_rec.trigger_mode = 'INSERT' AND method != 0) OR (v_rec.trigger_mode = 'DELETE' AND method = 0) THEN
324                 v_sql := format('DELETE FROM %I WHERE %I = ($1::text::%s).%I', 
325                                 restoretab, origtab_pk, v_logtab_fqn, origtab_pk);
326             ELSE 
327                 RAISE EXCEPTION 'unknown trigger_mode: %', trigger_mode;
328             END IF;            
329             
330             -- RAISE NOTICE 'DML: %', v_sql;
331             EXECUTE v_sql USING v_rec;            
332         END IF;
333
334     END LOOP;
335
336     RETURN quote_ident(restoretab);
337 END;
338 $BODY$
339 LANGUAGE plpgsql VOLATILE;
340
341 -- tests
342
343 -- drop old trigger
344 DROP TRIGGER test_log_chg ON test; -- ignore any error
345
346 -- create demo table
347 DROP TABLE test; -- ignore any error
348 CREATE TABLE test (
349   id                    INT                 NOT NULL
350                                             PRIMARY KEY,
351   name                  VARCHAR(20)         NOT NULL
352 );
353
354 -- create the table without data from demo table
355 DROP TABLE test_log; -- ignore any error
356 SELECT * INTO test_log FROM test LIMIT 0;
357 ALTER TABLE test_log ADD COLUMN trigger_mode VARCHAR(10);
358 ALTER TABLE test_log ADD COLUMN trigger_tuple VARCHAR(5);
359 ALTER TABLE test_log ADD COLUMN trigger_changed TIMESTAMPTZ;
360 ALTER TABLE test_log ADD COLUMN trigger_id BIGINT;
361 CREATE SEQUENCE test_log_id;
362 SELECT SETVAL('test_log_id', 1, FALSE);
363 ALTER TABLE test_log ALTER COLUMN trigger_id SET DEFAULT NEXTVAL('test_log_id');
364
365 -- create trigger
366 CREATE TRIGGER test_log_chg AFTER UPDATE OR INSERT OR DELETE ON test FOR EACH ROW
367                EXECUTE PROCEDURE table_log_pl();
368
369 -- test trigger
370 INSERT INTO test VALUES (1, 'name');
371 SELECT * FROM test;
372 SELECT * FROM test_log;
373 UPDATE test SET name='other name' WHERE id=1;
374 SELECT * FROM test;
375 SELECT * FROM test_log;
376
377 -- create restore table
378 SELECT table_log_pl_restore_table('test', 'id', 'test_log', 'trigger_id', 'test_recover', NOW());
379 SELECT * FROM test_recover;