Initial commit
[postgresql/geographic_data] / paf_postgresql_import.plpgsql
1 DROP FUNCTION IF EXISTS public.import_pc_paf(varchar, varchar);
2 CREATE OR REPLACE FUNCTION public.import_pc_paf(in_edition varchar, in_data varchar)
3 RETURNS boolean AS
4 $BODY$
5
6 DECLARE
7         v_data_root varchar;
8         v_data_main varchar;
9         v_data_postzon varchar;
10         v_data_alias varchar;
11         v_processed integer;
12         v_main_footer varchar;
13         v_std_footer varchar;
14         v_sql varchar;
15         v_table_created boolean;
16 BEGIN
17         v_data_root := in_data || '/' || in_edition || '/';
18         v_data_main := v_data_root || 'PAF MAIN FILE/';
19         v_data_postzon := v_data_root || 'POSTZON 100M/';
20         v_data_alias := v_data_root || 'ALIAS/';
21         
22         v_main_footer = '       %';
23         v_std_footer = '99999999%';
24         
25         RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root;
26         CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP;
27         
28         -- 1) Localaties
29         TRUNCATE TABLE data_stage;
30         
31         RAISE NOTICE '%: Begin staging localaties', clock_timestamp();
32
33         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'local.c01');
34         EXECUTE v_sql;
35         
36         DELETE FROM data_stage WHERE data like '%LOCALITY'  || in_edition ||  '%' OR data like v_std_footer;
37
38         RAISE NOTICE '%: Done staging localaties, importing', clock_timestamp();
39         
40         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_localities') THEN
41                 RAISE NOTICE 'Table "public"."pc_paf_localities" already exists';
42                 v_table_created := false;
43         ELSE
44                 RAISE NOTICE 'Creating table "public"."pc_paf_localities"';
45                 CREATE TABLE public.pc_paf_localities (
46                         locality_key integer NOT NULL,
47                         post_town varchar(30),
48                         dependent_locality varchar(35),
49                         double_dependent_locality  varchar(35)
50                 );
51                 v_table_created := true;
52         END IF;
53         
54         TRUNCATE TABLE public.pc_paf_localities;
55         INSERT INTO public.pc_paf_localities
56         SELECT substring(data,1,6)::integer,
57                 nullif(trim(substring(data,52,30)),''),
58                 nullif(trim(substring(data,82,35)),''),
59                 nullif(trim(substring(data,117,35)),'')
60         FROM data_stage;
61         
62         GET DIAGNOSTICS v_processed = ROW_COUNT;
63         
64         IF (v_table_created) THEN
65                 ALTER TABLE public.pc_paf_localities ADD PRIMARY KEY (locality_key);
66         END IF;
67         
68         RAISE NOTICE '%: Done importing localities (imported % records)', clock_timestamp(), v_processed;
69         
70         -- 2) Thoroughfares
71         TRUNCATE TABLE data_stage;
72                 
73         RAISE NOTICE '%: Begin staging thoroughfares', clock_timestamp();
74
75         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thfare.c01');
76         EXECUTE v_sql;
77         
78         DELETE FROM data_stage WHERE data like '%THOROUGH'  || in_edition ||  '%' OR data like v_std_footer;                          
79
80         RAISE NOTICE '%: Done staging thoroughfares, importing', clock_timestamp();
81         
82         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfares') THEN
83                 RAISE NOTICE 'Table "public"."pc_paf_thoroughfares" already exists';
84                 v_table_created := false;
85         ELSE
86                 RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfares"';
87                 CREATE TABLE public.pc_paf_thoroughfares (
88                         thoroughfare_key integer NOT NULL,
89                         thoroughfare_name varchar(60)
90                 );
91                 v_table_created := true;
92         END IF;
93         
94         TRUNCATE TABLE public. pc_paf_thoroughfares;
95         INSERT INTO public. pc_paf_thoroughfares
96         SELECT substring(data,1,8)::integer,
97                 nullif(trim(substring(data,9,60)),'')
98         FROM data_stage;
99
100         GET DIAGNOSTICS v_processed = ROW_COUNT;
101         
102         IF (v_table_created) THEN
103                 ALTER TABLE public.pc_paf_thoroughfares ADD PRIMARY KEY (thoroughfare_key);
104         END IF;
105         
106         RAISE NOTICE '%: Done importing thoroughfares (imported % records)', clock_timestamp(), v_processed;
107         
108         -- 3) Thoroughfares Descriptor
109         TRUNCATE TABLE data_stage;
110         
111         RAISE NOTICE '%: Begin staging thoroughfares descriptor', clock_timestamp();
112
113         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thdesc.c01');
114         EXECUTE v_sql;
115         
116         DELETE FROM data_stage WHERE data like '%THDESCRI'  || in_edition ||  '%' OR data like v_std_footer; 
117         
118         RAISE NOTICE '%: Done staging thoroughfares descriptor, importing', clock_timestamp();
119         
120         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfare_descriptor') THEN
121                 RAISE NOTICE 'Table "public"."pc_paf_thoroughfare_descriptor" already exists';
122                 v_table_created := false;
123         ELSE
124                 RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfare_descriptor"';
125                 CREATE TABLE public.pc_paf_thoroughfare_descriptor (
126                         thoroughfare_descriptor_key integer NOT NULL,
127                         thoroughfare_descriptor varchar(20),
128                         approved_abbreviation varchar(6)
129                 );
130                 v_table_created := true;
131         end if;
132         
133         TRUNCATE TABLE public.pc_paf_thoroughfare_descriptor;
134         INSERT INTO public.pc_paf_thoroughfare_descriptor
135         SELECT substring(data,1,4)::integer,
136                 nullif(trim(substring(data,5,20)),''),
137                 nullif(trim(substring(data,25,6)),'')
138         FROM data_stage;
139         
140         GET DIAGNOSTICS v_processed = ROW_COUNT;
141         
142         IF (v_table_created) THEN
143                 ALTER TABLE public.pc_paf_thoroughfare_descriptor ADD PRIMARY KEY (thoroughfare_descriptor_key);
144         END IF;
145         
146         RAISE NOTICE '%: Done importing thoroughfares descriptor (imported % records)', clock_timestamp(), v_processed; 
147         
148         -- 4) Building Names
149         TRUNCATE TABLE data_stage;
150         
151         RAISE NOTICE '%: Begin staging building names', clock_timestamp();
152         
153         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'bname.c01');
154         EXECUTE v_sql;
155
156         DELETE FROM data_stage WHERE data like '%BUILDING'  || in_edition ||  '%' OR data like v_std_footer; 
157
158         RAISE NOTICE '%: Done staging building names, importing', clock_timestamp();
159         
160         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_building_names') THEN
161                 RAISE NOTICE 'Table "public"."pc_paf_building_names" already exists';
162                 v_table_created := false;
163         ELSE
164                 RAISE NOTICE 'Creating table "public"."pc_paf_building_names"';
165                 CREATE TABLE public.pc_paf_building_names (
166                         building_name_key integer NOT NULL,
167                         building_name varchar(50)
168                 );
169                 v_table_created := true;
170         END IF;
171
172         TRUNCATE TABLE public.pc_paf_building_names;
173         INSERT INTO public.pc_paf_building_names
174         SELECT substring(data,1,8)::integer,
175                 nullif(trim(substring(data,9,50)),'')
176         FROM data_stage;
177         
178         GET DIAGNOSTICS v_processed = ROW_COUNT;
179         
180         IF (v_table_created) THEN
181                 ALTER TABLE public.pc_paf_building_names ADD PRIMARY KEY (building_name_key);
182         END IF;
183         
184         RAISE NOTICE '%: Done importing building names (imported % records)', clock_timestamp(), v_processed;   
185         
186         -- 5) Sub Building Names file (subbname.c01)
187         TRUNCATE TABLE data_stage;
188         
189         RAISE NOTICE '%: Begin staging sub building names', clock_timestamp();
190         
191         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'subbname.c01');
192         EXECUTE v_sql;
193
194         DELETE FROM data_stage WHERE data like '%SUBBUILD'  || in_edition ||  '%' OR data like v_std_footer; 
195
196         RAISE NOTICE '%: Done staging sub building names, importing', clock_timestamp();
197         
198         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_sub_building_names') THEN
199                 RAISE NOTICE 'Table "public"."pc_paf_sub_building_names" already exists';
200                 v_table_created := false;
201         ELSE
202                 RAISE NOTICE 'Creating table "public"."pc_paf_sub_building_names"';
203                 CREATE TABLE public.pc_paf_sub_building_names (
204                         sub_building_name_key integer NOT NULL,
205                         sub_building_name varchar(50)
206                 );
207                 v_table_created := true;
208         END IF;
209
210         TRUNCATE TABLE public.pc_paf_sub_building_names;
211         INSERT INTO public.pc_paf_sub_building_names
212         SELECT substring(data,1,8)::integer,
213                 nullif(trim(substring(data,9,30)),'')
214         FROM data_stage;
215         
216         GET DIAGNOSTICS v_processed = ROW_COUNT;
217         
218         IF (v_table_created) THEN
219                 ALTER TABLE public.pc_paf_sub_building_names ADD PRIMARY KEY (sub_building_name_key);
220         END IF;
221         
222         RAISE NOTICE '%: Done importing sub building names (imported % records)', clock_timestamp(), v_processed;       
223         
224         -- 6) Organisations
225         TRUNCATE TABLE data_stage;
226         
227         RAISE NOTICE '%: Begin staging organisations', clock_timestamp();
228         
229         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'org.c01');
230         EXECUTE v_sql;          
231
232         DELETE FROM data_stage WHERE data like '%ORGANISA'  || in_edition ||  '%' OR data like v_std_footer; 
233
234         RAISE NOTICE '%: Done staging organisations, importing', clock_timestamp();
235         
236         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_organisations') THEN
237                 RAISE NOTICE 'Table "public"."pc_paf_organisations" already exists';
238                 v_table_created := false;
239         ELSE
240                 RAISE NOTICE 'Creating table "public"."pc_paf_organisations"';
241                 CREATE TABLE public.pc_paf_organisations (
242                         organisation_key integer NOT NULL,
243                         postcode_type varchar(1) NOT NULL,
244                         organisation_name varchar(60),
245                         department_name varchar(60)
246                 );
247                 v_table_created := true;
248         END IF;
249
250         TRUNCATE TABLE public.pc_paf_organisations;
251         INSERT INTO public.pc_paf_organisations
252         SELECT substring(data,1,8)::integer,
253                 nullif(trim(substring(data,9,1)),''),
254                 nullif(trim(substring(data,10,60)),''),
255                 nullif(trim(substring(data,70,60)),'')
256         FROM data_stage;
257
258         GET DIAGNOSTICS v_processed = ROW_COUNT;
259         
260         IF (v_table_created) THEN
261                 ALTER TABLE public.pc_paf_organisations ADD PRIMARY KEY (organisation_key, postcode_type);
262                 COMMENT ON COLUMN public.pc_paf_organisations.organisation_key IS 'When postcode type is L organisation_key relates to address_key';
263         END IF;
264         
265         RAISE NOTICE '%: Done importing organisations (imported % records)', clock_timestamp(), v_processed;
266         
267         -- 7) Postzon with latlon
268         TRUNCATE TABLE data_stage;
269         
270         RAISE NOTICE '%: Begin staging postzone', clock_timestamp();
271         
272         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_postzon || 'pzone100.c02');
273         EXECUTE v_sql;  
274         
275         DELETE FROM data_stage WHERE data like '%PZONE100' || in_edition || '%' OR data like v_main_footer; 
276         
277         RAISE NOTICE '%: Done staging postzon, importing', clock_timestamp();
278         
279         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_postzon_100m') THEN
280                 RAISE NOTICE 'Table "public"."pc_paf_postzon_100m" already exists';
281                 v_table_created := false;
282         ELSE
283                 RAISE NOTICE 'Creating table "public"."pc_paf_postzon_100m"';
284                 CREATE TABLE public.pc_paf_postzon_100m (
285                         postzon_100m_key serial NOT NULL,
286                         outward_code varchar(4) NOT NULL,
287                         inward_code varchar(3) NOT NULL,
288                         introduction_date date,
289                         grid_reference_east integer,
290                         grid_reference_north integer,
291                         country_code varchar(9),
292                         area_code_county varchar(9),
293                         area_code_district varchar(9),
294                         ward_code varchar(9),
295                         nhs_region varchar(9),
296                         nhs_code varchar(9),
297                         user_type smallint,
298                         grid_status smallint,
299                         latitude double precision,
300                         longitude double precision
301                 );
302                 v_table_created := true;
303         END IF;
304         
305         TRUNCATE TABLE public.pc_paf_postzon_100m;
306         INSERT INTO public.pc_paf_postzon_100m(outward_code, inward_code, introduction_date, grid_reference_east, grid_reference_north,
307         country_code, area_code_county, area_code_district, ward_code, nhs_region, nhs_code, user_type, grid_status, latitude, longitude)
308         SELECT nullif(trim(substring(data,1,4)),''),
309                 nullif(trim(substring(data,5,3)),''),
310                 to_date(substring(data,8,6), 'YYYYMM'),
311                 nullif(trim(substring(data,14,6)),'')::integer,
312                 nullif(trim(substring(data,20,7)),'')::integer,
313                 nullif(trim(substring(data,27,9)),''),
314                 nullif(trim(substring(data,36,9)),''),
315                 nullif(trim(substring(data,45,9)),''),
316                 nullif(trim(substring(data,54,9)),''),
317                 nullif(trim(substring(data,63,9)),''),
318                 nullif(trim(substring(data,72,9)),''),
319                 nullif(trim(substring(data,81,1)),'')::smallint,
320                 nullif(trim(substring(data,82,1)),'')::smallint,
321                 nullif(trim(substring(data,83,10)),'')::double precision,
322                 nullif(trim(substring(data,93,10)),'')::double precision
323         FROM data_stage;
324
325         GET DIAGNOSTICS v_processed = ROW_COUNT;
326         
327         IF (v_table_created) THEN
328                 ALTER TABLE public.pc_paf_postzon_100m ADD PRIMARY KEY (postzon_100m_key),
329                 ADD CONSTRAINT pc_paf_postzon_100m_unique UNIQUE (outward_code, inward_code);
330                 CREATE INDEX pc_paf_postzon_100m_longitude ON public.pc_paf_postzon_100m (longitude);
331                 CREATE INDEX pc_paf_postzon_100m_latitude ON public.pc_paf_postzon_100m (latitude);
332                 
333                 COMMENT ON TABLE public.pc_paf_postzon_100m IS 'Geographical data from the Royal Mail with accuracy of 100m';
334                 COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_east IS 'BT postcodes reflect the Irish Grid public (different origin and scale); subtract 17000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion';
335                 COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_north IS 'BT postcodes reflect the Irish Grid public (different origin and scale); add 13000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion';
336                 COMMENT ON COLUMN public.pc_paf_postzon_100m.user_type IS '0 Small User 1 Large User';
337                 COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_status IS '0  Status not supplied by OS \n
338                         1  Within the building of the matched address closest to the Postcode mean. \n
339                         2  Co-ordinates allocated by GROS during Postcode boundary creation to the building nearest the centre of the populated part of the Postcode (Scotland only) \n
340                         3  Approximate to within 50m of true position \n
341                         4  Postcode unit mean (direct copy from ADDRESS-POINT (GB) and COMPAS (NI) - mean of matched addresses with the same Postcode) \n
342                         5  Postcode imputed by ONS to 1 metre resolution \n
343                         6  Postcode sector mean - mainly PO Boxes \n
344                         9  No co-ordinates available';
345         END IF;
346         
347         RAISE NOTICE '%: Done importing postzon (imported % records)', clock_timestamp(), v_processed;  
348         
349         -- 8) Mainfile
350         TRUNCATE TABLE data_stage;
351         RAISE NOTICE '%: Begin staging mainfile', clock_timestamp();
352         
353         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c02');
354         EXECUTE v_sql;
355         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c03');
356         EXECUTE v_sql;
357         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c04');
358         EXECUTE v_sql;
359         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c05');
360         EXECUTE v_sql;
361         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c06');
362         EXECUTE v_sql;
363         
364         DELETE FROM data_stage WHERE data like '%ADDRESS '  || in_edition ||  '%' OR data like v_main_footer;
365
366         RAISE NOTICE '%: Done staging mainfile, importing', clock_timestamp();
367         
368         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile') THEN
369            RAISE NOTICE 'Table "public"."pc_paf_mainfile" already exists';
370            v_table_created := false;
371         ELSE
372                 RAISE NOTICE 'Creating table "public"."pc_paf_mainfile"';
373                 CREATE TABLE public.pc_paf_mainfile (
374                         paf_record_key serial NOT NULL,
375                         outward_code varchar(4) NOT NULL,
376                         inward_code varchar(3) NOT NULL,
377                         address_key integer NOT NULL,
378                         locality_key integer,
379                         thoroughfare_key integer,
380                         thoroughfare_descriptor_key integer,
381                         dependent_thoroughfare_key integer,
382                         dependent_thoroughfare_descriptor_key integer,
383                         building_number integer,
384                         building_name_key integer,
385                         sub_building_name_key integer,
386                         number_of_households integer,
387                         organisation_key integer,
388                         postcode_type varchar(1),
389                         concatenation_indicator varchar(1),
390                         delivery_point_suffix varchar(2),
391                         small_user_organisation_indicator varchar(1),
392                         po_box_number varchar(6)
393                 );
394                 v_table_created := true;
395         END IF;
396         
397         TRUNCATE TABLE public.pc_paf_mainfile RESTART IDENTITY;
398         INSERT INTO public.pc_paf_mainfile(outward_code,inward_code,address_key,locality_key,thoroughfare_key,
399                 thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number,
400                 building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator,
401                 delivery_point_suffix,small_user_organisation_indicator,po_box_number)
402         SELECT nullif(trim(substring(data,1,4)),''),
403                 nullif(trim(substring(data,5,3)),''),
404                 nullif(substring(data,8,8)::integer,0),
405                 nullif(substring(data,16,6)::integer,0),
406                 nullif(substring(data,22,8)::integer,0),
407                 nullif(substring(data,30,4)::integer,0),
408                 nullif(substring(data,34,8)::integer,0),
409                 nullif(substring(data,42,4)::integer,0),
410                 nullif(substring(data,46,4)::integer,0),
411                 nullif(substring(data,50,8)::integer,0),
412                 nullif(substring(data,58,8)::integer,0),
413                 nullif(substring(data,66,4)::integer,0),
414                 nullif(substring(data,70,8)::integer,0),
415                 nullif(trim(substring(data,78,1)),''),
416                 nullif(trim(substring(data,79,1)),''),
417                 nullif(trim(substring(data,80,2)),''),
418                 nullif(trim(substring(data,82,1)),''),
419                 nullif(trim(substring(data,83,6)),'')
420         FROM data_stage;
421         
422         GET DIAGNOSTICS v_processed = ROW_COUNT;
423         
424         IF (v_table_created) THEN
425                 CREATE INDEX pc_paf_mainfile_postcode ON public.pc_paf_mainfile USING btree (outward_code, inward_code);
426                 ALTER TABLE public.pc_paf_mainfile 
427                 ADD PRIMARY KEY (paf_record_key),
428                 ADD CONSTRAINT pc_paf_mainfile_unique UNIQUE (address_key, organisation_key, postcode_type),
429                 ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE,
430                 ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, 
431                 ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
432                 ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
433                 ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
434                 ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE,
435                 ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE;
436                 COMMENT ON COLUMN public.pc_paf_mainfile.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!';
437         END IF;
438         
439         RAISE NOTICE '%: Done importing mainfile (imported % records)', clock_timestamp(), v_processed;
440         
441         -- 9) Welsh Mainfile
442         TRUNCATE TABLE data_stage;
443         RAISE NOTICE '%: Begin staging welsh alternative mainfile', clock_timestamp();
444         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'wfmainfl.c06');
445         EXECUTE v_sql;
446         
447         DELETE FROM data_stage WHERE data like '%ADDRESS '  || in_edition ||  '%' OR data like v_main_footer;
448         
449         RAISE NOTICE '%: Done staging welsh alternative mainfile, importing', clock_timestamp();
450
451         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile_welsh') THEN
452            RAISE NOTICE 'Table "public"."pc_paf_mainfile_welsh" already exists';
453            v_table_created := false;
454         ELSE
455                 RAISE NOTICE 'Creating table "public"."pc_paf_mainfile_welsh"';
456                 CREATE TABLE public.pc_paf_mainfile_welsh (
457                         paf_record_key serial NOT NULL,
458                         outward_code varchar(4) NOT NULL,
459                         inward_code varchar(3) NOT NULL,
460                         address_key integer NOT NULL,
461                         locality_key integer,
462                         thoroughfare_key integer,
463                         thoroughfare_descriptor_key integer,
464                         dependent_thoroughfare_key integer,
465                         dependent_thoroughfare_descriptor_key integer,
466                         building_number integer,
467                         building_name_key integer,
468                         sub_building_name_key integer,
469                         number_of_households integer,
470                         organisation_key integer,
471                         postcode_type varchar(1),
472                         concatenation_indicator varchar(1),
473                         delivery_point_suffix varchar(2),
474                         small_user_organisation_indicator varchar(1),
475                         po_box_number varchar(6)
476                 );
477                 v_table_created := true;
478         END IF;
479         
480         TRUNCATE TABLE public.pc_paf_mainfile_welsh RESTART IDENTITY;
481                 INSERT INTO public.pc_paf_mainfile_welsh(outward_code,inward_code,address_key,locality_key,thoroughfare_key,
482                         thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number,
483                         building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator,
484                         delivery_point_suffix,small_user_organisation_indicator,po_box_number)
485                 SELECT nullif(trim(substring(data,1,4)),''),
486                 nullif(trim(substring(data,5,3)),''),
487                 nullif(substring(data,8,8)::integer,0),
488                 nullif(substring(data,16,6)::integer,0),
489                 nullif(substring(data,22,8)::integer,0),
490                 nullif(substring(data,30,4)::integer,0),
491                 nullif(substring(data,34,8)::integer,0),
492                 nullif(substring(data,42,4)::integer,0),
493                 nullif(substring(data,46,4)::integer,0),
494                 nullif(substring(data,50,8)::integer,0),
495                 nullif(substring(data,58,8)::integer,0),
496                 nullif(substring(data,66,4)::integer,0),
497                 nullif(substring(data,70,8)::integer,0),
498                 nullif(trim(substring(data,78,1)),''),
499                 nullif(trim(substring(data,79,1)),''),
500                 nullif(trim(substring(data,80,2)),''),
501                 nullif(trim(substring(data,82,1)),''),
502                 nullif(trim(substring(data,83,6)),'')
503         FROM data_stage;
504         
505         GET DIAGNOSTICS v_processed = ROW_COUNT;
506         
507         IF (v_table_created) THEN
508                 CREATE INDEX pc_paf_mainfile_welsh_postcode ON public.pc_paf_mainfile_welsh USING btree (outward_code, inward_code);
509                 ALTER TABLE public.pc_paf_mainfile_welsh 
510                 ADD PRIMARY KEY (paf_record_key),
511                 ADD CONSTRAINT pc_paf_mainfile_welsh_unique UNIQUE (address_key, organisation_key, postcode_type),
512                 ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE,
513                 ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, 
514                 ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
515                 ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
516                 ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
517                 ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE,
518                 ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE;
519                 COMMENT ON COLUMN public.pc_paf_mainfile_welsh.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!';
520         END IF;
521         
522         RAISE NOTICE '%: Done importing welsh alternative mainfile (imported % records)', clock_timestamp(), v_processed;       
523
524         -- 10) Alias file
525         TRUNCATE TABLE data_stage;
526         RAISE NOTICE '%: Begin staging alias file', clock_timestamp();
527         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_alias || 'aliasfle.c01');
528         EXECUTE v_sql;
529         
530         DELETE FROM data_stage WHERE data like '%ALIASFLEY'  || in_edition ||  '%' OR data like v_std_footer;
531         
532         RAISE NOTICE '%: Done staging alias file, importing', clock_timestamp();        
533
534         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_counties') THEN
535                 RAISE NOTICE 'Table "public"."pc_paf_counties" already exists';
536                 v_table_created := false;
537         ELSE
538                 RAISE NOTICE 'Creating table "public"."pc_paf_counties"';
539                 CREATE TABLE public.pc_paf_counties (
540                         county_key integer NOT NULL,
541                         county_name varchar(30),
542                         county_type varchar(1)
543                 );
544                 v_table_created := true;
545         END IF;
546
547         TRUNCATE TABLE public.pc_paf_counties;
548         INSERT INTO public.pc_paf_counties
549         SELECT substring(data,2,4)::integer AS county_key,
550                 trim(substring(data,6,30)) AS county_name,
551                 trim(substring(data,36,1)) AS county_type
552         FROM data_stage
553         WHERE substring(data,1,1)::integer = 4;
554         
555         GET DIAGNOSTICS v_processed = ROW_COUNT;
556                 
557         IF (v_table_created) THEN
558                 ALTER TABLE public.pc_paf_counties ADD PRIMARY KEY (county_key),
559                 ADD CONSTRAINT pc_paf_counties_unique UNIQUE (county_name, county_type);
560                 COMMENT ON COLUMN public.pc_paf_counties.county_type IS 'T (Traditional County), P (Former Postal County) or A (Administrative County)';
561         END IF;
562         
563         RAISE NOTICE '%: Done importing counties (imported % records)', clock_timestamp(), v_processed; 
564         
565         IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_county_alias') THEN
566                 RAISE NOTICE 'Table "public"."pc_paf_county_alias" already exists';
567                 v_table_created := false;
568         ELSE
569                 RAISE NOTICE 'Creating table "public"."pc_paf_county_alias"';
570                 CREATE TABLE public.pc_paf_county_alias (
571                         county_alias_key serial NOT NULL,
572                         postcode varchar(7) NOT NULL,
573                         former_postal_county integer,
574                         traditional_county integer,
575                         administrative_county integer
576                 );
577                 v_table_created := true;
578         END IF; 
579         
580         TRUNCATE TABLE public.pc_paf_county_alias;
581         INSERT INTO public.pc_paf_county_alias (postcode, former_postal_county, traditional_county, administrative_county)
582         SELECT trim(substring(data,2,7)) AS postcode,
583                 nullif(substring(data,9,4)::integer,0) AS former_postal_county,
584                 nullif(substring(data,13,4)::integer,0) AS traditional_county,
585                 nullif(substring(data,17,4)::integer,0) AS administrative_county
586         FROM data_stage
587         WHERE substring(data,1,1)::integer = 5;
588         
589         GET DIAGNOSTICS v_processed = ROW_COUNT;
590
591         IF (v_table_created) THEN
592                 ALTER TABLE public.pc_paf_county_alias ADD PRIMARY KEY (county_alias_key),
593                 ADD CONSTRAINT pc_paf_county_alias_unique UNIQUE (postcode),
594                 ADD FOREIGN KEY (former_postal_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE,
595                 ADD FOREIGN KEY (traditional_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE,
596                 ADD FOREIGN KEY (administrative_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE;
597         END IF;
598         
599         RAISE NOTICE '%: Done importing county_alias (imported % records)', clock_timestamp(), v_processed;     
600
601         RAISE NOTICE '%: Completed', clock_timestamp();
602         RETURN true;    
603 END;
604
605 $BODY$
606 LANGUAGE 'plpgsql' VOLATILE;
607
608 --
609
610 DROP FUNCTION IF EXISTS public.update_pc_paf(varchar, varchar);
611 CREATE OR REPLACE FUNCTION public.update_pc_paf(in_edition varchar, in_data varchar)
612 RETURNS boolean AS
613 $BODY$
614
615 DECLARE
616         v_data_root varchar;
617         v_data_update varchar;
618         v_processed integer;
619         v_main_footer varchar;
620         v_std_footer varchar;
621         v_sql varchar;
622         v_table_created boolean;
623 BEGIN
624         v_data_root := in_data || '/' || in_edition || '_CHANGES/';
625         v_data_update := v_data_root || 'CONSOLIDATED CHANGES/';
626         
627         v_main_footer = '       %';
628         
629         RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root;
630         CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP;
631         
632         RAISE NOTICE '%: Begin staging Changes1 file', clock_timestamp();
633
634         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'changes1.c01');
635         EXECUTE v_sql;
636         
637         DELETE FROM data_stage WHERE data like '%CHANGES1' || in_edition || '%' OR data like v_main_footer;
638         
639         GET DIAGNOSTICS v_processed = ROW_COUNT;
640         RAISE NOTICE '%: Done staging Changes1 file of % records',clock_timestamp(), v_processed;
641         
642         SET CONSTRAINTS ALL DEFERRED;
643         
644         --changes1.c01 - Changes1 (Changes to satelite tables except Organisations)
645         --Record Type 1  - Localities
646         RAISE NOTICE '%: Preparing to update localities', clock_timestamp();
647         
648         CREATE TEMPORARY TABLE tmp_localities ON COMMIT DROP AS
649         SELECT substring(data,2,8)::integer AS locality_key,
650                 to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
651                 nullif(trim(substring(data,24,1)),'') AS amendment_type,
652                 nullif(trim(substring(data,70,30)),'') AS post_town,
653                 nullif(trim(substring(data,100,35)),'') AS dependent_locality,
654                 nullif(trim(substring(data,135,35)),'') AS double_dependent_locality
655         FROM data_stage 
656         WHERE substring(data,1,1)::integer = 1 
657         AND nullif(trim(substring(data,24,1)),'') IS NOT NULL
658         ORDER BY to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS');
659         
660         GET DIAGNOSTICS v_processed = ROW_COUNT;
661         RAISE NOTICE '%: Prepared % records for update on localities',clock_timestamp(), v_processed;
662         
663         DELETE FROM public.pc_paf_localities l
664         USING tmp_localities lt WHERE lt.locality_key = l.locality_key
665         AND lt.amendment_type IN ('D', 'B')
666         AND l.* = (lt.locality_key, lt.post_town::character varying(30), lt.dependent_locality::character varying(30), lt.double_dependent_locality::character varying(30));
667         
668         GET DIAGNOSTICS v_processed = ROW_COUNT;
669         RAISE NOTICE '%: Removed % records from localities',clock_timestamp(), v_processed;
670         
671         INSERT INTO public.pc_paf_localities
672         SELECT lt.locality_key, lt.post_town, lt.dependent_locality, lt.double_dependent_locality
673         FROM tmp_localities lt
674         WHERE lt.amendment_type IN ('I', 'C');
675         
676         GET DIAGNOSTICS v_processed = ROW_COUNT;
677         RAISE NOTICE '%: Created % records in localities',clock_timestamp(), v_processed;
678         
679         --Record Type 2  - Thoroughfares
680         RAISE NOTICE '%: Preparing to update thoroughfares', clock_timestamp();
681         
682         CREATE TEMPORARY TABLE tmp_thoroughfares ON COMMIT DROP AS
683         SELECT substring(data,2,8)::integer AS thoroughfare_key,
684                         to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
685                         nullif(trim(substring(data,24,1)),'') AS amendment_type,
686                         nullif(trim(substring(data,25,60)),'') AS thoroughfare_name
687         FROM data_stage 
688         WHERE substring(data,1,1)::integer = 2 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
689         
690         GET DIAGNOSTICS v_processed = ROW_COUNT;
691         RAISE NOTICE '%: Prepared % records for update on thoroughfares',clock_timestamp(), v_processed;
692         
693         DELETE FROM public.pc_paf_thoroughfares t
694         USING tmp_thoroughfares tt WHERE tt.thoroughfare_key = t.thoroughfare_key
695         AND tt.amendment_type IN ('D', 'B')
696         AND t.* = (tt.thoroughfare_key, t.thoroughfare_name::character varying(60));
697         
698         GET DIAGNOSTICS v_processed = ROW_COUNT;
699         RAISE NOTICE '%: Removed % records from thoroughfares',clock_timestamp(), v_processed;
700         
701         INSERT INTO public.pc_paf_thoroughfares
702         SELECT tt.thoroughfare_key, tt.thoroughfare_name
703         FROM tmp_thoroughfares tt
704         WHERE tt.amendment_type IN ('I', 'C');
705         
706         GET DIAGNOSTICS v_processed = ROW_COUNT;
707         RAISE NOTICE '%: Created % records in thoroughfares',clock_timestamp(), v_processed;
708         
709         --Record Type 3  - Thoroughfare Descriptors 
710         RAISE NOTICE '%: Preparing to update thoroughfare_descriptor', clock_timestamp();
711         
712         CREATE TEMPORARY TABLE tmp_thoroughfare_descriptor ON COMMIT DROP AS
713         SELECT substring(data,2,8)::integer AS thoroughfare_descriptor_key ,
714                 to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
715                 nullif(trim(substring(data,24,1)),'') AS amendment_type,
716                 nullif(trim(substring(data,25,20)),'') AS thoroughfare_descriptor,
717                 nullif(trim(substring(data,45,6)),'') AS approved_abbreviation
718         FROM data_stage 
719         WHERE substring(data,1,1)::integer = 3 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
720         
721         GET DIAGNOSTICS v_processed = ROW_COUNT;
722         RAISE NOTICE '%: Prepared % records for update on thoroughfare_descriptor',clock_timestamp(), v_processed;
723         
724         DELETE FROM public.pc_paf_thoroughfare_descriptor td
725         USING tmp_thoroughfare_descriptor tdt WHERE tdt.thoroughfare_descriptor_key  = td.thoroughfare_descriptor_key 
726         AND tdt.amendment_type IN ('D', 'B')
727         AND td.* = (tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor::character varying(20), tdt.approved_abbreviation::character varying(6));
728         
729         GET DIAGNOSTICS v_processed = ROW_COUNT;
730         RAISE NOTICE '%: Removed % records from thoroughfare_descriptor',clock_timestamp(), v_processed;
731         
732         INSERT INTO public.pc_paf_thoroughfare_descriptor
733         SELECT tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor, tdt.approved_abbreviation
734         FROM tmp_thoroughfare_descriptor tdt
735         WHERE tdt.amendment_type IN ('I', 'C');
736         
737         GET DIAGNOSTICS v_processed = ROW_COUNT;
738         RAISE NOTICE '%: Created % records in thoroughfare_descriptor',clock_timestamp(), v_processed;
739         
740         --Record Type 4  - Building Names 
741         RAISE NOTICE '%: Preparing to update building_names', clock_timestamp();
742         
743         CREATE TEMPORARY TABLE tmp_building_names ON COMMIT DROP AS
744         SELECT substring(data,2,8)::integer AS building_name_key,
745                 to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
746                 nullif(trim(substring(data,24,1)),'') AS amendment_type,
747                 nullif(trim(substring(data,25,50)),'') AS building_name
748         FROM data_stage 
749         WHERE substring(data,1,1)::integer = 4 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
750
751         GET DIAGNOSTICS v_processed = ROW_COUNT;
752         RAISE NOTICE '%: Prepared % records for update on building_names',clock_timestamp(), v_processed;
753         
754         DELETE FROM public.pc_paf_building_names bn
755         USING tmp_building_names tbn WHERE bn.building_name_key = tbn.building_name_key
756         AND tbn.amendment_type IN ('D', 'B')
757         AND bn.* = (tbn.building_name_key, tbn.building_name::character varying(50));
758         
759         GET DIAGNOSTICS v_processed = ROW_COUNT;
760         RAISE NOTICE '%: Removed % records from building_names',clock_timestamp(), v_processed;
761         
762         INSERT INTO public.pc_paf_building_names
763         SELECT tbn.building_name_key, tbn.building_name
764         FROM tmp_building_names tbn
765         WHERE tbn.amendment_type IN ('I', 'C');
766         
767         GET DIAGNOSTICS v_processed = ROW_COUNT;
768         RAISE NOTICE '%: Created % records in building_names',clock_timestamp(), v_processed;
769         
770         --Record Type 5  - Sub Building Names 
771         RAISE NOTICE '%: Preparing to update sub_building_names', clock_timestamp();
772         
773         CREATE TEMPORARY TABLE tmp_sub_building_names ON COMMIT DROP AS
774         SELECT substring(data,2,8)::integer AS sub_building_name_key,
775                 to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
776                 nullif(trim(substring(data,24,1)),'') AS amendment_type,
777                 nullif(trim(substring(data,25,30)),'') AS sub_building_name
778         FROM data_stage 
779         WHERE substring(data,1,1)::integer = 5 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
780         
781         GET DIAGNOSTICS v_processed = ROW_COUNT;
782         RAISE NOTICE '%: Prepared % records for update on sub_building_names',clock_timestamp(), v_processed;
783         
784         DELETE FROM public.pc_paf_sub_building_names sbn
785         USING tmp_sub_building_names tsbn WHERE sbn.sub_building_name_key = tsbn.sub_building_name_key
786         AND tsbn.amendment_type IN ('D', 'B')
787         AND sbn.* = (tsbn.sub_building_name_key, tsbn.sub_building_name::character varying(50));
788         
789         GET DIAGNOSTICS v_processed = ROW_COUNT;
790         RAISE NOTICE '%: Removed % records from sub_building_names',clock_timestamp(), v_processed;
791         
792         INSERT INTO public.pc_paf_sub_building_names
793         SELECT tsbn.sub_building_name_key, tsbn.sub_building_name
794         FROM tmp_sub_building_names tsbn
795         WHERE tsbn.amendment_type IN ('I', 'C');
796         
797         GET DIAGNOSTICS v_processed = ROW_COUNT;
798         RAISE NOTICE '%: Created % records in sub_building_names',clock_timestamp(), v_processed;
799         
800         -- fpchngs2.c01  - Changes2 (Changes to Mainfile and Organisations)
801         TRUNCATE TABLE data_stage;
802         
803         RAISE NOTICE '%: Begin staging Changes2 file', clock_timestamp();
804         
805         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'fpchngs2.c01');
806         EXECUTE v_sql;
807         
808         DELETE FROM data_stage WHERE data like '%CHANGES2' || in_edition || '%' OR data like v_main_footer;
809         
810         GET DIAGNOSTICS v_processed = ROW_COUNT;
811         RAISE NOTICE '%: Done staging Changes2 file of % records',clock_timestamp(), v_processed;
812         
813         -- Mainfile
814         RAISE NOTICE '%: Preparing to update mainfile', clock_timestamp();
815         
816         CREATE TEMPORARY TABLE tmp_mainfile ON COMMIT DROP AS
817         SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
818                 nullif(trim(substring(data,15,4)),'') AS outward_code,
819                 nullif(trim(substring(data,19,3)),'') AS inward_code,
820                 nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
821                 nullif(trim(substring(data,24,1)),'') AS postcode_type,
822                 nullif(substring(data,25,8)::integer,0) AS address_key,
823                 nullif(substring(data,33,8)::integer,0) AS organisation_key,
824                 nullif(trim(substring(data,41,1)),'') AS amendment_type,
825                 nullif(substring(data,42,1)::integer,0) AS record_type,
826                 nullif(substring(data,43,8)::integer,0) AS locality_key,
827                 nullif(substring(data,51,8)::integer,0) AS thoroughfare_key,
828                 nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key,
829                 nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key,
830                 nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key,
831                 nullif(substring(data,83,4)::integer,0) AS building_number,
832                 nullif(substring(data,87,8)::integer,0) AS building_name_key,
833                 nullif(substring(data,95,8)::integer,0) AS sub_building_name_key,
834                 nullif(substring(data,103,4)::integer,0) AS number_of_households,
835                 nullif(trim(substring(data,107,1)),'') AS concatenation_indicator,
836                 nullif(trim(substring(data,108,6)),'') AS po_box_number,
837                 nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator,
838                 CASE nullif(substring(data,115,2)::integer,0) 
839                         WHEN 1 THEN 'New' 
840                         WHEN 2 THEN 'Correction'
841                         WHEN 3 THEN '' 
842                         WHEN 4 THEN 'Coding Revision'
843                         WHEN 5 THEN 'Organisation Change'
844                         WHEN 6 THEN 'Status Change'
845                         WHEN 7 THEN 'Large User Deleted'
846                         WHEN 8 THEN 'Building/Sub Building Change'
847                         WHEN 9 THEN 'Large User Change'
848                 END AS reason_for_amendment,            
849                 nullif(trim(substring(data,117,4)),'') AS new_outward_code,
850                 nullif(trim(substring(data,121,3)),'') AS new_inward_code
851         FROM data_stage 
852         WHERE nullif(substring(data,42,1)::integer,0) = 1;
853         
854         GET DIAGNOSTICS v_processed = ROW_COUNT;
855         RAISE NOTICE '%: Prepared % records for update on mainfile',clock_timestamp(), v_processed;
856         
857         DELETE FROM public.pc_paf_mainfile m
858         USING tmp_mainfile tm WHERE tm.address_key = m.address_key AND tm.organisation_key = m.organisation_key AND tm.postcode_type = m.postcode_type
859         AND tm.amendment_type IN ('D', 'B');
860         
861         GET DIAGNOSTICS v_processed = ROW_COUNT;
862         RAISE NOTICE '%: Removed % records from mainfile',clock_timestamp(), v_processed;
863         
864         INSERT INTO public.pc_paf_mainfile (outward_code, inward_code, address_key, locality_key, thoroughfare_key, 
865                 thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key, 
866                 building_number, building_name_key, sub_building_name_key, number_of_households, 
867                 organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix, 
868                 small_user_organisation_indicator, po_box_number)
869         SELECT tm.outward_code, tm.inward_code, tm.address_key, tm.locality_key, tm.thoroughfare_key, 
870                 tm.thoroughfare_descriptor_key, tm.dependent_thoroughfare_key, tm.dependent_thoroughfare_descriptor_key, 
871                 tm.building_number, tm.building_name_key, tm.sub_building_name_key, tm.number_of_households, 
872                 tm.organisation_key, tm.postcode_type, tm.concatenation_indicator, tm.delivery_point_suffix, 
873                 tm.small_user_organisation_indicator, tm.po_box_number
874         FROM (
875                 SELECT cume_dist() OVER w, *
876                 FROM tmp_mainfile
877                 WHERE amendment_type IN ('I', 'C')
878                 WINDOW w AS (PARTITION BY address_key, organisation_key, postcode_type ORDER BY timestamp)
879         ) tm WHERE tm.cume_dist = 1 ;
880         
881         
882         GET DIAGNOSTICS v_processed = ROW_COUNT;
883         RAISE NOTICE '%: Created % records in mainfile',clock_timestamp(), v_processed;
884         
885         -- Organisations
886         RAISE NOTICE '%: Preparing to update organisations', clock_timestamp();
887         
888         CREATE TEMPORARY TABLE tmp_organisations ON COMMIT DROP AS
889         SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
890                         nullif(trim(substring(data,15,4)),'') AS outward_code,
891                         nullif(trim(substring(data,19,3)),'') AS inward_code,
892                         nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
893                         nullif(trim(substring(data,24,1)),'') AS postcode_type,
894                         nullif(substring(data,25,8)::integer,0) AS address_key,
895                         nullif(substring(data,33,8)::integer,0) AS organisation_key,
896                         nullif(trim(substring(data,41,1)),'') AS amendment_type,
897                         nullif(substring(data,42,1)::integer,0) AS record_type,
898                         nullif(trim(substring(data,43,60)),'') AS organisation_name,
899                         nullif(trim(substring(data,103,60)),'') AS department_name,
900                         nullif(trim(substring(data,163,6)),'') AS po_box_number
901         FROM data_stage 
902         WHERE nullif(substring(data,42,1)::integer,0) IN (2,3); 
903
904         GET DIAGNOSTICS v_processed = ROW_COUNT;
905         RAISE NOTICE '%: Prepared % records for update on organisations',clock_timestamp(), v_processed;
906         
907         DELETE FROM public.pc_paf_organisations o
908         USING tmp_organisations tx WHERE COALESCE(tx.organisation_key, tx.address_key) = o.organisation_key AND tx.postcode_type = o.postcode_type
909         AND tx.amendment_type IN ('D', 'B');
910
911         GET DIAGNOSTICS v_processed = ROW_COUNT;
912         RAISE NOTICE '%: Removed % records from organisations',clock_timestamp(), v_processed;
913         
914         INSERT INTO public.pc_paf_organisations
915         SELECT COALESCE(tx.organisation_key, tx.address_key), tx.postcode_type, tx.organisation_name, tx.department_name
916         FROM (
917                 SELECT cume_dist() OVER w, *
918                 FROM tmp_organisations
919                 WHERE amendment_type IN ('I', 'C')
920                 WINDOW w AS (PARTITION BY COALESCE(organisation_key, address_key), postcode_type ORDER BY timestamp)
921         ) tx WHERE tx.cume_dist = 1 ;
922         
923
924         GET DIAGNOSTICS v_processed = ROW_COUNT;
925         RAISE NOTICE '%: Created % records in organisations',clock_timestamp(), v_processed;
926         
927         -- wchanges.c01  -- Welsh changes
928         TRUNCATE TABLE data_stage;
929         
930         RAISE NOTICE '%: Begin staging WChanges2 file', clock_timestamp();
931         
932         v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'wchanges.c01');
933         EXECUTE v_sql;
934         DELETE FROM data_stage WHERE data like '%WCHANGES' || in_edition || '%' OR data like v_main_footer;
935
936         GET DIAGNOSTICS v_processed = ROW_COUNT;
937         RAISE NOTICE '%: Done staging WChanges2 file of % records',clock_timestamp(), v_processed;
938         
939         RAISE NOTICE '%: Preparing to update welsh mainfile', clock_timestamp();
940
941         CREATE TEMPORARY TABLE tmp_mainfile_welsh ON COMMIT DROP AS
942         SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
943                 nullif(trim(substring(data,15,4)),'') AS outward_code,
944                 nullif(trim(substring(data,19,3)),'') AS inward_code,
945                 nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
946                 nullif(trim(substring(data,24,1)),'') AS postcode_type,
947                 nullif(substring(data,25,8)::integer,0) AS address_key,
948                 nullif(substring(data,33,8)::integer,0) AS organisation_key,
949                 nullif(trim(substring(data,41,1)),'') AS amendment_type,
950                 nullif(substring(data,42,1)::integer,0) AS record_type,
951                 nullif(substring(data,43,8)::integer,0) AS locality_key,
952                 nullif(substring(data,51,8)::integer,0) AS thoroughfare_key,
953                 nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key,
954                 nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key,
955                 nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key,
956                 nullif(substring(data,83,4)::integer,0) AS building_number,
957                 nullif(substring(data,87,8)::integer,0) AS building_name_key,
958                 nullif(substring(data,95,8)::integer,0) AS sub_building_name_key,
959                 nullif(substring(data,103,4)::integer,0) AS number_of_households,
960                 nullif(trim(substring(data,107,1)),'') AS concatenation_indicator,
961                 nullif(trim(substring(data,108,6)),'') AS po_box_number,
962                 nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator
963         FROM data_stage 
964         WHERE nullif(substring(data,42,1)::integer,0) = 1;
965         
966         GET DIAGNOSTICS v_processed = ROW_COUNT;
967         RAISE NOTICE '%: Prepared % records for update on welsh mainfile',clock_timestamp(), v_processed;
968         
969         DELETE FROM public.pc_paf_mainfile_welsh mw
970         USING tmp_mainfile_welsh twm WHERE twm.address_key = mw.address_key AND twm.organisation_key = mw.organisation_key AND twm.postcode_type = mw.postcode_type
971         AND twm.amendment_type IN ('D', 'B');
972
973         GET DIAGNOSTICS v_processed = ROW_COUNT;
974         RAISE NOTICE '%: Removed % records from welsh mainfile',clock_timestamp(), v_processed;
975         
976         INSERT INTO public.pc_paf_mainfile_welsh (outward_code, inward_code, address_key, locality_key, thoroughfare_key, 
977                 thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key, 
978                 building_number, building_name_key, sub_building_name_key, number_of_households, 
979                 organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix, 
980                 small_user_organisation_indicator, po_box_number)
981         SELECT twm.outward_code, twm.inward_code, twm.address_key, twm.locality_key, twm.thoroughfare_key, 
982                 twm.thoroughfare_descriptor_key, twm.dependent_thoroughfare_key, twm.dependent_thoroughfare_descriptor_key, 
983                 twm.building_number, twm.building_name_key, twm.sub_building_name_key, twm.number_of_households, 
984                 twm.organisation_key, twm.postcode_type, twm.concatenation_indicator, twm.delivery_point_suffix, 
985                 twm.small_user_organisation_indicator, twm.po_box_number
986         FROM tmp_mainfile_welsh twm
987         WHERE twm.amendment_type IN ('I', 'C');
988         
989         GET DIAGNOSTICS v_processed = ROW_COUNT;
990         RAISE NOTICE '%: Created % records in welsh mainfile',clock_timestamp(), v_processed;
991                 
992         -- Organisations
993         RAISE NOTICE '%: Preparing to update welsh organisations', clock_timestamp();
994         
995         CREATE TEMPORARY TABLE tmp_organisations_welsh ON COMMIT DROP AS
996         SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
997                         nullif(trim(substring(data,15,4)),'') AS outward_code,
998                         nullif(trim(substring(data,19,3)),'') AS inward_code,
999                         nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
1000                         nullif(trim(substring(data,24,1)),'') AS postcode_type,
1001                         nullif(substring(data,25,8)::integer,0) AS address_key,
1002                         nullif(substring(data,33,8)::integer,0) AS organisation_key,
1003                         nullif(trim(substring(data,41,1)),'') AS amendment_type,
1004                         nullif(substring(data,42,1)::integer,0) AS record_type,
1005                         nullif(trim(substring(data,43,60)),'') AS organisation_name,
1006                         nullif(trim(substring(data,103,60)),'') AS department_name,
1007                         nullif(trim(substring(data,163,6)),'') AS po_box_number
1008         FROM data_stage 
1009         WHERE nullif(substring(data,42,1)::integer,0) IN (2,3); 
1010
1011         GET DIAGNOSTICS v_processed = ROW_COUNT;
1012         RAISE NOTICE '%: Prepared % records for update on welsh organisations',clock_timestamp(), v_processed;
1013         
1014         DELETE FROM public.pc_paf_organisations o
1015         USING tmp_organisations_welsh txw WHERE COALESCE(txw.organisation_key, txw.address_key) = o.organisation_key AND txw.postcode_type = o.postcode_type
1016         AND txw.amendment_type IN ('D', 'B');
1017
1018         GET DIAGNOSTICS v_processed = ROW_COUNT;
1019         RAISE NOTICE '%: Removed % records from welsh organisations',clock_timestamp(), v_processed;
1020         
1021         INSERT INTO public.pc_paf_organisations
1022         SELECT COALESCE(txw.organisation_key, txw.address_key), txw.postcode_type, txw.organisation_name, txw.department_name
1023         FROM tmp_organisations_welsh txw
1024         WHERE txw.amendment_type IN ('I', 'C');
1025
1026         GET DIAGNOSTICS v_processed = ROW_COUNT;
1027         RAISE NOTICE '%: Created % records in welsh organisations',clock_timestamp(), v_processed;
1028         
1029         -- So far I haven't seen a changes file for postzon data, without which updates will most likely fail on
1030         -- the fk for pc_paf_postzon_100m.  I've also not seen an alias changes file.
1031
1032         -- If I did have a postzon changes file, it'd most likely be best to import the data directly and use postgis to do our conversion
1033         -- as the data is selected out of the data_stage table (rather than using the convert_paf.pl script). We could get those fields with
1034         -- something like:
1035         -- CASE WHEN substring(northing,1,1) ~ '[\x4f-\x5a]' THEN '1' || translate(northing, 'POUTZY', '221100') ELSE northing END || '0' AS f_notrhing,
1036         -- ST_x(ST_transform(ST_GeomFromText('POINT('||easting||' '||f_northing||')',CASE WHEN postcode LIKE 'BT%' THEN 29903 ELSE 27700 END),4326)) AS longitude,
1037         -- ST_y(ST_transform(ST_GeomFromText('POINT('||easting||' '||f_northing||')',CASE WHEN postcode LIKE 'BT%' THEN 29903 ELSE 27700 END),4326)) AS latitude;
1038         
1039         RAISE NOTICE '%: Fudging postzon', clock_timestamp();
1040         
1041         INSERT INTO public.pc_paf_postzon_100m (outward_code, inward_code)
1042         SELECT DISTINCT m.outward_code, m.inward_code 
1043         FROM tmp_mainfile m
1044         WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_postzon_100m p WHERE p.outward_code = m.outward_code AND p.inward_code = m.inward_code);
1045                 
1046         GET DIAGNOSTICS v_processed = ROW_COUNT;
1047         RAISE NOTICE '%: Fudged % mainfile records in postzon',clock_timestamp(), v_processed;
1048         
1049         INSERT INTO public.pc_paf_postzon_100m (outward_code, inward_code)
1050         SELECT DISTINCT mw.outward_code, mw.inward_code 
1051         FROM tmp_mainfile_welsh mw
1052         WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_postzon_100m p WHERE p.outward_code = mw.outward_code AND p.inward_code = mw.inward_code);
1053                 
1054         GET DIAGNOSTICS v_processed = ROW_COUNT;
1055         RAISE NOTICE '%: Fudged % welsh mainfile records in postzon',clock_timestamp(), v_processed;
1056         
1057         RAISE NOTICE '%: Completed', clock_timestamp();
1058         
1059         RETURN true;    
1060 END;
1061
1062 $BODY$
1063 LANGUAGE 'plpgsql' VOLATILE;
1064
1065 -- Standard master view for postcode tables
1066 CREATE OR REPLACE VIEW public.pc_paf_master_view AS
1067 SELECT m.building_number,
1068         b.building_name,
1069         sb.sub_building_name,
1070         COALESCE(o.organisation_name, lo.organisation_name) AS organisation_name,
1071         COALESCE(o.department_name, lo.department_name) AS department_name,
1072         m.po_box_number,
1073         t.thoroughfare_name,
1074         td.thoroughfare_descriptor,
1075         td.approved_abbreviation,
1076         l.post_town,
1077         l.dependent_locality,
1078         l.double_dependent_locality,
1079         apc.county_name,
1080         m.outward_code,
1081         m.inward_code,
1082         m.number_of_households,
1083         m.postcode_type,
1084         m.concatenation_indicator,
1085         m.delivery_point_suffix,
1086         m.small_user_organisation_indicator,
1087         EXISTS(SELECT 1 FROM public.pc_paf_mainfile_welsh mw WHERE mw.address_key = m.address_key AND mw.organisation_key IS NOT DISTINCT FROM m.organisation_key AND mw.postcode_type IS NOT DISTINCT FROM m.postcode_type) AS welsh_alternative_available,
1088         p.longitude,
1089         p.latitude
1090 FROM public.pc_paf_mainfile m
1091 LEFT OUTER JOIN public.pc_paf_localities l USING (locality_key)
1092 LEFT OUTER JOIN public.pc_paf_thoroughfares t USING (thoroughfare_key)
1093 LEFT OUTER JOIN public.pc_paf_thoroughfare_descriptor td USING (thoroughfare_descriptor_key)
1094 LEFT OUTER JOIN public.pc_paf_building_names b USING (building_name_key)
1095 LEFT OUTER JOIN public.pc_paf_sub_building_names sb USING (sub_building_name_key)
1096 LEFT OUTER JOIN public.pc_paf_organisations o USING (organisation_key, postcode_type)
1097 LEFT OUTER JOIN public.pc_paf_organisations lo ON m.address_key = lo.organisation_key AND m.postcode_type = lo.postcode_type
1098 LEFT OUTER JOIN public.pc_paf_postzon_100m p USING (outward_code, inward_code)
1099 LEFT OUTER JOIN public.pc_paf_county_alias a ON a.postcode = (rpad(m.outward_code,4) || m.inward_code)
1100 LEFT OUTER JOIN public.pc_paf_counties apc ON apc.county_key = a.former_postal_county;
1101
1102 -- Welsh master view for postcode tables
1103 CREATE OR REPLACE VIEW public.pc_paf_master_welsh_view AS
1104 SELECT mw.building_number,
1105         b.building_name,
1106         sb.sub_building_name,
1107         COALESCE(o.organisation_name, lo.organisation_name) AS organisation_name,
1108         COALESCE(o.department_name, lo.department_name) AS department_name,
1109         mw.po_box_number,
1110         t.thoroughfare_name,
1111         td.thoroughfare_descriptor,
1112         td.approved_abbreviation,
1113         l.post_town,
1114         l.dependent_locality,
1115         l.double_dependent_locality,
1116         apc.county_name,
1117         mw.outward_code,
1118         mw.inward_code,
1119         mw.number_of_households,
1120         mw.postcode_type,
1121         mw.concatenation_indicator,
1122         mw.delivery_point_suffix,
1123         mw.small_user_organisation_indicator,
1124         p.longitude,
1125         p.latitude
1126 FROM public.pc_paf_mainfile_welsh mw
1127 LEFT OUTER JOIN public.pc_paf_localities l USING (locality_key)
1128 LEFT OUTER JOIN public.pc_paf_thoroughfares t USING (thoroughfare_key)
1129 LEFT OUTER JOIN public.pc_paf_thoroughfare_descriptor td USING (thoroughfare_descriptor_key)
1130 LEFT OUTER JOIN public.pc_paf_building_names b USING (building_name_key)
1131 LEFT OUTER JOIN public.pc_paf_sub_building_names sb USING (sub_building_name_key)
1132 LEFT OUTER JOIN public.pc_paf_organisations o USING (organisation_key, postcode_type)
1133 LEFT OUTER JOIN public.pc_paf_organisations lo ON mw.address_key = lo.organisation_key AND mw.postcode_type = lo.postcode_type
1134 LEFT OUTER JOIN public.pc_paf_postzon_100m p USING (outward_code, inward_code)
1135 LEFT OUTER JOIN public.pc_paf_county_alias a ON a.postcode = (rpad(mw.outward_code,4) || mw.inward_code)
1136 LEFT OUTER JOIN public.pc_paf_counties apc ON apc.county_key = a.former_postal_county;