DROP FUNCTION IF EXISTS public.import_pc_paf(varchar, varchar); CREATE OR REPLACE FUNCTION public.import_pc_paf(in_edition varchar, in_data varchar) RETURNS boolean AS $BODY$ DECLARE v_data_root varchar; v_data_main varchar; v_data_postzon varchar; v_data_alias varchar; v_processed integer; v_main_footer varchar; v_std_footer varchar; v_sql varchar; v_table_created boolean; BEGIN v_data_root := in_data || '/' || in_edition || '/'; v_data_main := v_data_root || 'PAF MAIN FILE/'; v_data_postzon := v_data_root || 'POSTZON 100M/'; v_data_alias := v_data_root || 'ALIAS/'; v_main_footer = ' %'; v_std_footer = '99999999%'; RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root; CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP; -- 1) Localaties TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging localaties', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'local.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%LOCALITY' || in_edition || '%' OR data like v_std_footer; RAISE NOTICE '%: Done staging localaties, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_localities') THEN RAISE NOTICE 'Table "public"."pc_paf_localities" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_localities"'; CREATE TABLE public.pc_paf_localities ( locality_key integer NOT NULL, post_town varchar(30), dependent_locality varchar(35), double_dependent_locality varchar(35) ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_localities CASCADE; INSERT INTO public.pc_paf_localities SELECT substring(data,1,6)::integer, nullif(trim(substring(data,52,30)),''), nullif(trim(substring(data,82,35)),''), nullif(trim(substring(data,117,35)),'') FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_localities ADD PRIMARY KEY (locality_key); END IF; RAISE NOTICE '%: Done importing localities (imported % records)', clock_timestamp(), v_processed; -- 2) Thoroughfares TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging thoroughfares', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thfare.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%THOROUGH' || in_edition || '%' OR data like v_std_footer; RAISE NOTICE '%: Done staging thoroughfares, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfares') THEN RAISE NOTICE 'Table "public"."pc_paf_thoroughfares" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfares"'; CREATE TABLE public.pc_paf_thoroughfares ( thoroughfare_key integer NOT NULL, thoroughfare_name varchar(60) ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_thoroughfares CASCADE; INSERT INTO public. pc_paf_thoroughfares SELECT substring(data,1,8)::integer, nullif(trim(substring(data,9,60)),'') FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_thoroughfares ADD PRIMARY KEY (thoroughfare_key); END IF; RAISE NOTICE '%: Done importing thoroughfares (imported % records)', clock_timestamp(), v_processed; -- 3) Thoroughfares Descriptor TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging thoroughfares descriptor', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thdesc.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%THDESCRI' || in_edition || '%' OR data like v_std_footer; RAISE NOTICE '%: Done staging thoroughfares descriptor, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfare_descriptor') THEN RAISE NOTICE 'Table "public"."pc_paf_thoroughfare_descriptor" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfare_descriptor"'; CREATE TABLE public.pc_paf_thoroughfare_descriptor ( thoroughfare_descriptor_key integer NOT NULL, thoroughfare_descriptor varchar(20), approved_abbreviation varchar(6) ); v_table_created := true; end if; TRUNCATE TABLE public.pc_paf_thoroughfare_descriptor CASCADE; INSERT INTO public.pc_paf_thoroughfare_descriptor SELECT substring(data,1,4)::integer, nullif(trim(substring(data,5,20)),''), nullif(trim(substring(data,25,6)),'') FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_thoroughfare_descriptor ADD PRIMARY KEY (thoroughfare_descriptor_key); END IF; RAISE NOTICE '%: Done importing thoroughfares descriptor (imported % records)', clock_timestamp(), v_processed; -- 4) Building Names TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging building names', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'bname.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%BUILDING' || in_edition || '%' OR data like v_std_footer; RAISE NOTICE '%: Done staging building names, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_building_names') THEN RAISE NOTICE 'Table "public"."pc_paf_building_names" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_building_names"'; CREATE TABLE public.pc_paf_building_names ( building_name_key integer NOT NULL, building_name varchar(50) ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_building_names CASCADE; INSERT INTO public.pc_paf_building_names SELECT substring(data,1,8)::integer, nullif(trim(substring(data,9,50)),'') FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_building_names ADD PRIMARY KEY (building_name_key); END IF; RAISE NOTICE '%: Done importing building names (imported % records)', clock_timestamp(), v_processed; -- 5) Sub Building Names file (subbname.c01) TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging sub building names', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'subbname.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%SUBBUILD' || in_edition || '%' OR data like v_std_footer; RAISE NOTICE '%: Done staging sub building names, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_sub_building_names') THEN RAISE NOTICE 'Table "public"."pc_paf_sub_building_names" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_sub_building_names"'; CREATE TABLE public.pc_paf_sub_building_names ( sub_building_name_key integer NOT NULL, sub_building_name varchar(50) ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_sub_building_names CASCADE; INSERT INTO public.pc_paf_sub_building_names SELECT substring(data,1,8)::integer, nullif(trim(substring(data,9,30)),'') FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_sub_building_names ADD PRIMARY KEY (sub_building_name_key); END IF; RAISE NOTICE '%: Done importing sub building names (imported % records)', clock_timestamp(), v_processed; -- 6) Organisations TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging organisations', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'org.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%ORGANISA' || in_edition || '%' OR data like v_std_footer; RAISE NOTICE '%: Done staging organisations, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_organisations') THEN RAISE NOTICE 'Table "public"."pc_paf_organisations" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_organisations"'; CREATE TABLE public.pc_paf_organisations ( organisation_key integer NOT NULL, postcode_type varchar(1) NOT NULL, organisation_name varchar(60), department_name varchar(60) ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_organisations CASCADE; INSERT INTO public.pc_paf_organisations SELECT substring(data,1,8)::integer, nullif(trim(substring(data,9,1)),''), nullif(trim(substring(data,10,60)),''), nullif(trim(substring(data,70,60)),'') FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_organisations ADD PRIMARY KEY (organisation_key, postcode_type); COMMENT ON COLUMN public.pc_paf_organisations.organisation_key IS 'When postcode type is L organisation_key relates to address_key'; END IF; RAISE NOTICE '%: Done importing organisations (imported % records)', clock_timestamp(), v_processed; -- 7) Postzon with latlon TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging postzone', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_postzon || 'pzone100.c02'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%PZONE100' || in_edition || '%' OR data like v_main_footer; RAISE NOTICE '%: Done staging postzon, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_postzon_100m') THEN RAISE NOTICE 'Table "public"."pc_paf_postzon_100m" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_postzon_100m"'; CREATE TABLE public.pc_paf_postzon_100m ( postzon_100m_key serial NOT NULL, outward_code varchar(4) NOT NULL, inward_code varchar(3) NOT NULL, introduction_date date, grid_reference_east integer, grid_reference_north integer, country_code varchar(9), area_code_county varchar(9), area_code_district varchar(9), ward_code varchar(9), nhs_region varchar(9), nhs_code varchar(9), user_type smallint, grid_status smallint, latitude double precision, longitude double precision ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_postzon_100m CASCADE; INSERT INTO public.pc_paf_postzon_100m(outward_code, inward_code, introduction_date, grid_reference_east, grid_reference_north, country_code, area_code_county, area_code_district, ward_code, nhs_region, nhs_code, user_type, grid_status, latitude, longitude) SELECT nullif(trim(substring(data,1,4)),''), nullif(trim(substring(data,5,3)),''), to_date(substring(data,8,6), 'YYYYMM'), nullif(trim(substring(data,14,6)),'')::integer, nullif(trim(substring(data,20,7)),'')::integer, nullif(trim(substring(data,27,9)),''), nullif(trim(substring(data,36,9)),''), nullif(trim(substring(data,45,9)),''), nullif(trim(substring(data,54,9)),''), nullif(trim(substring(data,63,9)),''), nullif(trim(substring(data,72,9)),''), nullif(trim(substring(data,81,1)),'')::smallint, nullif(trim(substring(data,82,1)),'')::smallint, nullif(trim(substring(data,83,10)),'')::double precision, nullif(trim(substring(data,93,10)),'')::double precision FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_postzon_100m ADD PRIMARY KEY (postzon_100m_key), ADD CONSTRAINT pc_paf_postzon_100m_unique UNIQUE (outward_code, inward_code); CREATE INDEX pc_paf_postzon_100m_longitude ON public.pc_paf_postzon_100m (longitude); CREATE INDEX pc_paf_postzon_100m_latitude ON public.pc_paf_postzon_100m (latitude); COMMENT ON TABLE public.pc_paf_postzon_100m IS 'Geographical data from the Royal Mail with accuracy of 100m'; COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_east IS 'BT postcodes reflect the Irish Grid public (different origin and scale); subtract 17000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion'; COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_north IS 'BT postcodes reflect the Irish Grid public (different origin and scale); add 13000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion'; COMMENT ON COLUMN public.pc_paf_postzon_100m.user_type IS '0 Small User 1 Large User'; COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_status IS E'0 Status not supplied by OS \n 1 Within the building of the matched address closest to the Postcode mean. \n 2 Co-ordinates allocated by GROS during Postcode boundary creation to the building nearest the centre of the populated part of the Postcode (Scotland only) \n 3 Approximate to within 50m of true position \n 4 Postcode unit mean (direct copy from ADDRESS-POINT (GB) and COMPAS (NI) - mean of matched addresses with the same Postcode) \n 5 Postcode imputed by ONS to 1 metre resolution \n 6 Postcode sector mean - mainly PO Boxes \n 9 No co-ordinates available'; END IF; RAISE NOTICE '%: Done importing postzon (imported % records)', clock_timestamp(), v_processed; -- 8) Mainfile TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging mainfile', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c02'); EXECUTE v_sql; v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c03'); EXECUTE v_sql; v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c04'); EXECUTE v_sql; v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c05'); EXECUTE v_sql; v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c06'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%ADDRESS ' || in_edition || '%' OR data like v_main_footer; RAISE NOTICE '%: Done staging mainfile, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile') THEN RAISE NOTICE 'Table "public"."pc_paf_mainfile" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_mainfile"'; CREATE TABLE public.pc_paf_mainfile ( paf_record_key serial NOT NULL, outward_code varchar(4) NOT NULL, inward_code varchar(3) NOT NULL, address_key integer NOT NULL, locality_key integer, thoroughfare_key integer, thoroughfare_descriptor_key integer, dependent_thoroughfare_key integer, dependent_thoroughfare_descriptor_key integer, building_number integer, building_name_key integer, sub_building_name_key integer, number_of_households integer, organisation_key integer, postcode_type varchar(1), concatenation_indicator varchar(1), delivery_point_suffix varchar(2), small_user_organisation_indicator varchar(1), po_box_number varchar(6) ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_mainfile RESTART IDENTITY; INSERT INTO public.pc_paf_mainfile(outward_code,inward_code,address_key,locality_key,thoroughfare_key, thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number, building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator, delivery_point_suffix,small_user_organisation_indicator,po_box_number) SELECT nullif(trim(substring(data,1,4)),''), nullif(trim(substring(data,5,3)),''), nullif(substring(data,8,8)::integer,0), nullif(substring(data,16,6)::integer,0), nullif(substring(data,22,8)::integer,0), nullif(substring(data,30,4)::integer,0), nullif(substring(data,34,8)::integer,0), nullif(substring(data,42,4)::integer,0), nullif(substring(data,46,4)::integer,0), nullif(substring(data,50,8)::integer,0), nullif(substring(data,58,8)::integer,0), nullif(substring(data,66,4)::integer,0), nullif(substring(data,70,8)::integer,0), nullif(trim(substring(data,78,1)),''), nullif(trim(substring(data,79,1)),''), nullif(trim(substring(data,80,2)),''), nullif(trim(substring(data,82,1)),''), nullif(trim(substring(data,83,6)),'') FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN CREATE INDEX pc_paf_mainfile_postcode ON public.pc_paf_mainfile USING btree (outward_code, inward_code); ALTER TABLE public.pc_paf_mainfile ADD PRIMARY KEY (paf_record_key), ADD CONSTRAINT pc_paf_mainfile_unique UNIQUE (address_key, organisation_key, postcode_type), ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE; COMMENT ON COLUMN public.pc_paf_mainfile.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!'; END IF; RAISE NOTICE '%: Done importing mainfile (imported % records)', clock_timestamp(), v_processed; -- 9) Welsh Mainfile TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging welsh alternative mainfile', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'wfmainfl.c06'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%ADDRESS ' || in_edition || '%' OR data like v_main_footer; RAISE NOTICE '%: Done staging welsh alternative mainfile, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile_welsh') THEN RAISE NOTICE 'Table "public"."pc_paf_mainfile_welsh" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_mainfile_welsh"'; CREATE TABLE public.pc_paf_mainfile_welsh ( paf_record_key serial NOT NULL, outward_code varchar(4) NOT NULL, inward_code varchar(3) NOT NULL, address_key integer NOT NULL, locality_key integer, thoroughfare_key integer, thoroughfare_descriptor_key integer, dependent_thoroughfare_key integer, dependent_thoroughfare_descriptor_key integer, building_number integer, building_name_key integer, sub_building_name_key integer, number_of_households integer, organisation_key integer, postcode_type varchar(1), concatenation_indicator varchar(1), delivery_point_suffix varchar(2), small_user_organisation_indicator varchar(1), po_box_number varchar(6) ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_mainfile_welsh RESTART IDENTITY; INSERT INTO public.pc_paf_mainfile_welsh(outward_code,inward_code,address_key,locality_key,thoroughfare_key, thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number, building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator, delivery_point_suffix,small_user_organisation_indicator,po_box_number) SELECT nullif(trim(substring(data,1,4)),''), nullif(trim(substring(data,5,3)),''), nullif(substring(data,8,8)::integer,0), nullif(substring(data,16,6)::integer,0), nullif(substring(data,22,8)::integer,0), nullif(substring(data,30,4)::integer,0), nullif(substring(data,34,8)::integer,0), nullif(substring(data,42,4)::integer,0), nullif(substring(data,46,4)::integer,0), nullif(substring(data,50,8)::integer,0), nullif(substring(data,58,8)::integer,0), nullif(substring(data,66,4)::integer,0), nullif(substring(data,70,8)::integer,0), nullif(trim(substring(data,78,1)),''), nullif(trim(substring(data,79,1)),''), nullif(trim(substring(data,80,2)),''), nullif(trim(substring(data,82,1)),''), nullif(trim(substring(data,83,6)),'') FROM data_stage; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN CREATE INDEX pc_paf_mainfile_welsh_postcode ON public.pc_paf_mainfile_welsh USING btree (outward_code, inward_code); ALTER TABLE public.pc_paf_mainfile_welsh ADD PRIMARY KEY (paf_record_key), ADD CONSTRAINT pc_paf_mainfile_welsh_unique UNIQUE (address_key, organisation_key, postcode_type), ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE; COMMENT ON COLUMN public.pc_paf_mainfile_welsh.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!'; END IF; RAISE NOTICE '%: Done importing welsh alternative mainfile (imported % records)', clock_timestamp(), v_processed; -- 10) Alias file TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging alias file', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_alias || 'aliasfle.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%ALIASFLEY' || in_edition || '%' OR data like v_std_footer; RAISE NOTICE '%: Done staging alias file, importing', clock_timestamp(); IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_counties') THEN RAISE NOTICE 'Table "public"."pc_paf_counties" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_counties"'; CREATE TABLE public.pc_paf_counties ( county_key integer NOT NULL, county_name varchar(30), county_type varchar(1) ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_counties CASCADE; INSERT INTO public.pc_paf_counties SELECT substring(data,2,4)::integer AS county_key, trim(substring(data,6,30)) AS county_name, trim(substring(data,36,1)) AS county_type FROM data_stage WHERE substring(data,1,1)::integer = 4; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_counties ADD PRIMARY KEY (county_key), ADD CONSTRAINT pc_paf_counties_unique UNIQUE (county_name, county_type); COMMENT ON COLUMN public.pc_paf_counties.county_type IS 'T (Traditional County), P (Former Postal County) or A (Administrative County)'; END IF; RAISE NOTICE '%: Done importing counties (imported % records)', clock_timestamp(), v_processed; IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_county_alias') THEN RAISE NOTICE 'Table "public"."pc_paf_county_alias" already exists'; v_table_created := false; ELSE RAISE NOTICE 'Creating table "public"."pc_paf_county_alias"'; CREATE TABLE public.pc_paf_county_alias ( county_alias_key serial NOT NULL, postcode varchar(7) NOT NULL, former_postal_county integer, traditional_county integer, administrative_county integer ); v_table_created := true; END IF; TRUNCATE TABLE public.pc_paf_county_alias CASCADE; INSERT INTO public.pc_paf_county_alias (postcode, former_postal_county, traditional_county, administrative_county) SELECT trim(substring(data,2,7)) AS postcode, nullif(substring(data,9,4)::integer,0) AS former_postal_county, nullif(substring(data,13,4)::integer,0) AS traditional_county, nullif(substring(data,17,4)::integer,0) AS administrative_county FROM data_stage WHERE substring(data,1,1)::integer = 5; GET DIAGNOSTICS v_processed = ROW_COUNT; IF (v_table_created) THEN ALTER TABLE public.pc_paf_county_alias ADD PRIMARY KEY (county_alias_key), ADD CONSTRAINT pc_paf_county_alias_unique UNIQUE (postcode), ADD FOREIGN KEY (former_postal_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (traditional_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE, ADD FOREIGN KEY (administrative_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE; END IF; RAISE NOTICE '%: Done importing county_alias (imported % records)', clock_timestamp(), v_processed; RAISE NOTICE '%: Completed', clock_timestamp(); RETURN true; END; $BODY$ LANGUAGE 'plpgsql' VOLATILE; -- DROP FUNCTION IF EXISTS public.update_pc_paf(varchar, varchar); CREATE OR REPLACE FUNCTION public.update_pc_paf(in_edition varchar, in_data varchar) RETURNS boolean AS $BODY$ DECLARE v_data_root varchar; v_data_update varchar; v_processed integer; v_main_footer varchar; v_std_footer varchar; v_sql varchar; v_table_created boolean; BEGIN v_data_root := in_data || '/' || in_edition || '_CHANGES/'; v_data_update := v_data_root || 'CONSOLIDATED CHANGES/'; v_main_footer = ' %'; RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root; CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP; RAISE NOTICE '%: Begin staging Changes1 file', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'changes1.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%CHANGES1' || in_edition || '%' OR data like v_main_footer; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Done staging Changes1 file of % records',clock_timestamp(), v_processed; SET CONSTRAINTS ALL DEFERRED; --changes1.c01 - Changes1 (Changes to satelite tables except Organisations) --Record Type 1 - Localities RAISE NOTICE '%: Preparing to update localities', clock_timestamp(); CREATE TEMPORARY TABLE tmp_localities ON COMMIT DROP AS SELECT substring(data,2,8)::integer AS locality_key, to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,24,1)),'') AS amendment_type, nullif(trim(substring(data,70,30)),'') AS post_town, nullif(trim(substring(data,100,35)),'') AS dependent_locality, nullif(trim(substring(data,135,35)),'') AS double_dependent_locality FROM data_stage WHERE substring(data,1,1)::integer = 1 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL ORDER BY to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on localities',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_localities l USING tmp_localities lt WHERE lt.locality_key = l.locality_key AND lt.amendment_type IN ('D', 'B') AND l.* = (lt.locality_key, lt.post_town::character varying(30), lt.dependent_locality::character varying(30), lt.double_dependent_locality::character varying(30)); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from localities',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_localities SELECT lt.locality_key, lt.post_town, lt.dependent_locality, lt.double_dependent_locality FROM tmp_localities lt WHERE lt.amendment_type IN ('I', 'C'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in localities',clock_timestamp(), v_processed; --Record Type 2 - Thoroughfares RAISE NOTICE '%: Preparing to update thoroughfares', clock_timestamp(); CREATE TEMPORARY TABLE tmp_thoroughfares ON COMMIT DROP AS SELECT substring(data,2,8)::integer AS thoroughfare_key, to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,24,1)),'') AS amendment_type, nullif(trim(substring(data,25,60)),'') AS thoroughfare_name FROM data_stage WHERE substring(data,1,1)::integer = 2 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on thoroughfares',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_thoroughfares t USING tmp_thoroughfares tt WHERE tt.thoroughfare_key = t.thoroughfare_key AND tt.amendment_type IN ('D', 'B') AND t.* = (tt.thoroughfare_key, t.thoroughfare_name::character varying(60)); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from thoroughfares',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_thoroughfares SELECT tt.thoroughfare_key, tt.thoroughfare_name FROM tmp_thoroughfares tt WHERE tt.amendment_type IN ('I', 'C'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in thoroughfares',clock_timestamp(), v_processed; --Record Type 3 - Thoroughfare Descriptors RAISE NOTICE '%: Preparing to update thoroughfare_descriptor', clock_timestamp(); CREATE TEMPORARY TABLE tmp_thoroughfare_descriptor ON COMMIT DROP AS SELECT substring(data,2,8)::integer AS thoroughfare_descriptor_key , to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,24,1)),'') AS amendment_type, nullif(trim(substring(data,25,20)),'') AS thoroughfare_descriptor, nullif(trim(substring(data,45,6)),'') AS approved_abbreviation FROM data_stage WHERE substring(data,1,1)::integer = 3 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on thoroughfare_descriptor',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_thoroughfare_descriptor td USING tmp_thoroughfare_descriptor tdt WHERE tdt.thoroughfare_descriptor_key = td.thoroughfare_descriptor_key AND tdt.amendment_type IN ('D', 'B') AND td.* = (tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor::character varying(20), tdt.approved_abbreviation::character varying(6)); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from thoroughfare_descriptor',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_thoroughfare_descriptor SELECT tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor, tdt.approved_abbreviation FROM tmp_thoroughfare_descriptor tdt WHERE tdt.amendment_type IN ('I', 'C'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in thoroughfare_descriptor',clock_timestamp(), v_processed; --Record Type 4 - Building Names RAISE NOTICE '%: Preparing to update building_names', clock_timestamp(); CREATE TEMPORARY TABLE tmp_building_names ON COMMIT DROP AS SELECT substring(data,2,8)::integer AS building_name_key, to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,24,1)),'') AS amendment_type, nullif(trim(substring(data,25,50)),'') AS building_name FROM data_stage WHERE substring(data,1,1)::integer = 4 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on building_names',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_building_names bn USING tmp_building_names tbn WHERE bn.building_name_key = tbn.building_name_key AND tbn.amendment_type IN ('D', 'B') AND bn.* = (tbn.building_name_key, tbn.building_name::character varying(50)); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from building_names',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_building_names SELECT tbn.building_name_key, tbn.building_name FROM tmp_building_names tbn WHERE tbn.amendment_type IN ('I', 'C'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in building_names',clock_timestamp(), v_processed; --Record Type 5 - Sub Building Names RAISE NOTICE '%: Preparing to update sub_building_names', clock_timestamp(); CREATE TEMPORARY TABLE tmp_sub_building_names ON COMMIT DROP AS SELECT substring(data,2,8)::integer AS sub_building_name_key, to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,24,1)),'') AS amendment_type, nullif(trim(substring(data,25,30)),'') AS sub_building_name FROM data_stage WHERE substring(data,1,1)::integer = 5 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on sub_building_names',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_sub_building_names sbn USING tmp_sub_building_names tsbn WHERE sbn.sub_building_name_key = tsbn.sub_building_name_key AND tsbn.amendment_type IN ('D', 'B') AND sbn.* = (tsbn.sub_building_name_key, tsbn.sub_building_name::character varying(50)); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from sub_building_names',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_sub_building_names SELECT tsbn.sub_building_name_key, tsbn.sub_building_name FROM tmp_sub_building_names tsbn WHERE tsbn.amendment_type IN ('I', 'C'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in sub_building_names',clock_timestamp(), v_processed; -- fpchngs2.c01 - Changes2 (Changes to Mainfile and Organisations) TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging Changes2 file', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'fpchngs2.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%CHANGES2' || in_edition || '%' OR data like v_main_footer; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Done staging Changes2 file of % records',clock_timestamp(), v_processed; -- Mainfile RAISE NOTICE '%: Preparing to update mainfile', clock_timestamp(); CREATE TEMPORARY TABLE tmp_mainfile ON COMMIT DROP AS SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,15,4)),'') AS outward_code, nullif(trim(substring(data,19,3)),'') AS inward_code, nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix, nullif(trim(substring(data,24,1)),'') AS postcode_type, nullif(substring(data,25,8)::integer,0) AS address_key, nullif(substring(data,33,8)::integer,0) AS organisation_key, nullif(trim(substring(data,41,1)),'') AS amendment_type, nullif(substring(data,42,1)::integer,0) AS record_type, nullif(substring(data,43,8)::integer,0) AS locality_key, nullif(substring(data,51,8)::integer,0) AS thoroughfare_key, nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key, nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key, nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key, nullif(substring(data,83,4)::integer,0) AS building_number, nullif(substring(data,87,8)::integer,0) AS building_name_key, nullif(substring(data,95,8)::integer,0) AS sub_building_name_key, nullif(substring(data,103,4)::integer,0) AS number_of_households, nullif(trim(substring(data,107,1)),'') AS concatenation_indicator, nullif(trim(substring(data,108,6)),'') AS po_box_number, nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator, CASE nullif(substring(data,115,2)::integer,0) WHEN 1 THEN 'New' WHEN 2 THEN 'Correction' WHEN 3 THEN '' WHEN 4 THEN 'Coding Revision' WHEN 5 THEN 'Organisation Change' WHEN 6 THEN 'Status Change' WHEN 7 THEN 'Large User Deleted' WHEN 8 THEN 'Building/Sub Building Change' WHEN 9 THEN 'Large User Change' END AS reason_for_amendment, nullif(trim(substring(data,117,4)),'') AS new_outward_code, nullif(trim(substring(data,121,3)),'') AS new_inward_code FROM data_stage WHERE nullif(substring(data,42,1)::integer,0) = 1; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on mainfile',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_mainfile m USING tmp_mainfile tm WHERE tm.address_key = m.address_key AND tm.organisation_key = m.organisation_key AND tm.postcode_type = m.postcode_type AND tm.amendment_type IN ('D', 'B'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from mainfile',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_mainfile (outward_code, inward_code, address_key, locality_key, thoroughfare_key, thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key, building_number, building_name_key, sub_building_name_key, number_of_households, organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix, small_user_organisation_indicator, po_box_number) SELECT tm.outward_code, tm.inward_code, tm.address_key, tm.locality_key, tm.thoroughfare_key, tm.thoroughfare_descriptor_key, tm.dependent_thoroughfare_key, tm.dependent_thoroughfare_descriptor_key, tm.building_number, tm.building_name_key, tm.sub_building_name_key, tm.number_of_households, tm.organisation_key, tm.postcode_type, tm.concatenation_indicator, tm.delivery_point_suffix, tm.small_user_organisation_indicator, tm.po_box_number FROM ( SELECT cume_dist() OVER w, * FROM tmp_mainfile WHERE amendment_type IN ('I', 'C') WINDOW w AS (PARTITION BY address_key, organisation_key, postcode_type ORDER BY timestamp) ) tm WHERE tm.cume_dist = 1 ; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in mainfile',clock_timestamp(), v_processed; -- Organisations RAISE NOTICE '%: Preparing to update organisations', clock_timestamp(); CREATE TEMPORARY TABLE tmp_organisations ON COMMIT DROP AS SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,15,4)),'') AS outward_code, nullif(trim(substring(data,19,3)),'') AS inward_code, nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix, nullif(trim(substring(data,24,1)),'') AS postcode_type, nullif(substring(data,25,8)::integer,0) AS address_key, nullif(substring(data,33,8)::integer,0) AS organisation_key, nullif(trim(substring(data,41,1)),'') AS amendment_type, nullif(substring(data,42,1)::integer,0) AS record_type, nullif(trim(substring(data,43,60)),'') AS organisation_name, nullif(trim(substring(data,103,60)),'') AS department_name, nullif(trim(substring(data,163,6)),'') AS po_box_number FROM data_stage WHERE nullif(substring(data,42,1)::integer,0) IN (2,3); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on organisations',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_organisations o USING tmp_organisations tx WHERE COALESCE(tx.organisation_key, tx.address_key) = o.organisation_key AND tx.postcode_type = o.postcode_type AND tx.amendment_type IN ('D', 'B'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from organisations',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_organisations SELECT COALESCE(tx.organisation_key, tx.address_key), tx.postcode_type, tx.organisation_name, tx.department_name FROM ( SELECT cume_dist() OVER w, * FROM tmp_organisations WHERE amendment_type IN ('I', 'C') WINDOW w AS (PARTITION BY COALESCE(organisation_key, address_key), postcode_type ORDER BY timestamp) ) tx WHERE tx.cume_dist = 1 ; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in organisations',clock_timestamp(), v_processed; -- wchanges.c01 -- Welsh changes TRUNCATE TABLE data_stage; RAISE NOTICE '%: Begin staging WChanges2 file', clock_timestamp(); v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'wchanges.c01'); EXECUTE v_sql; DELETE FROM data_stage WHERE data like '%WCHANGES' || in_edition || '%' OR data like v_main_footer; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Done staging WChanges2 file of % records',clock_timestamp(), v_processed; RAISE NOTICE '%: Preparing to update welsh mainfile', clock_timestamp(); CREATE TEMPORARY TABLE tmp_mainfile_welsh ON COMMIT DROP AS SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,15,4)),'') AS outward_code, nullif(trim(substring(data,19,3)),'') AS inward_code, nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix, nullif(trim(substring(data,24,1)),'') AS postcode_type, nullif(substring(data,25,8)::integer,0) AS address_key, nullif(substring(data,33,8)::integer,0) AS organisation_key, nullif(trim(substring(data,41,1)),'') AS amendment_type, nullif(substring(data,42,1)::integer,0) AS record_type, nullif(substring(data,43,8)::integer,0) AS locality_key, nullif(substring(data,51,8)::integer,0) AS thoroughfare_key, nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key, nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key, nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key, nullif(substring(data,83,4)::integer,0) AS building_number, nullif(substring(data,87,8)::integer,0) AS building_name_key, nullif(substring(data,95,8)::integer,0) AS sub_building_name_key, nullif(substring(data,103,4)::integer,0) AS number_of_households, nullif(trim(substring(data,107,1)),'') AS concatenation_indicator, nullif(trim(substring(data,108,6)),'') AS po_box_number, nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator FROM data_stage WHERE nullif(substring(data,42,1)::integer,0) = 1; GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on welsh mainfile',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_mainfile_welsh mw USING tmp_mainfile_welsh twm WHERE twm.address_key = mw.address_key AND twm.organisation_key = mw.organisation_key AND twm.postcode_type = mw.postcode_type AND twm.amendment_type IN ('D', 'B'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from welsh mainfile',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_mainfile_welsh (outward_code, inward_code, address_key, locality_key, thoroughfare_key, thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key, building_number, building_name_key, sub_building_name_key, number_of_households, organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix, small_user_organisation_indicator, po_box_number) SELECT twm.outward_code, twm.inward_code, twm.address_key, twm.locality_key, twm.thoroughfare_key, twm.thoroughfare_descriptor_key, twm.dependent_thoroughfare_key, twm.dependent_thoroughfare_descriptor_key, twm.building_number, twm.building_name_key, twm.sub_building_name_key, twm.number_of_households, twm.organisation_key, twm.postcode_type, twm.concatenation_indicator, twm.delivery_point_suffix, twm.small_user_organisation_indicator, twm.po_box_number FROM tmp_mainfile_welsh twm WHERE twm.amendment_type IN ('I', 'C'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in welsh mainfile',clock_timestamp(), v_processed; -- Organisations RAISE NOTICE '%: Preparing to update welsh organisations', clock_timestamp(); CREATE TEMPORARY TABLE tmp_organisations_welsh ON COMMIT DROP AS SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, nullif(trim(substring(data,15,4)),'') AS outward_code, nullif(trim(substring(data,19,3)),'') AS inward_code, nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix, nullif(trim(substring(data,24,1)),'') AS postcode_type, nullif(substring(data,25,8)::integer,0) AS address_key, nullif(substring(data,33,8)::integer,0) AS organisation_key, nullif(trim(substring(data,41,1)),'') AS amendment_type, nullif(substring(data,42,1)::integer,0) AS record_type, nullif(trim(substring(data,43,60)),'') AS organisation_name, nullif(trim(substring(data,103,60)),'') AS department_name, nullif(trim(substring(data,163,6)),'') AS po_box_number FROM data_stage WHERE nullif(substring(data,42,1)::integer,0) IN (2,3); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Prepared % records for update on welsh organisations',clock_timestamp(), v_processed; DELETE FROM public.pc_paf_organisations o USING tmp_organisations_welsh txw WHERE COALESCE(txw.organisation_key, txw.address_key) = o.organisation_key AND txw.postcode_type = o.postcode_type AND txw.amendment_type IN ('D', 'B'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Removed % records from welsh organisations',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_organisations SELECT COALESCE(txw.organisation_key, txw.address_key), txw.postcode_type, txw.organisation_name, txw.department_name FROM tmp_organisations_welsh txw WHERE txw.amendment_type IN ('I', 'C'); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Created % records in welsh organisations',clock_timestamp(), v_processed; -- So far I haven't seen a changes file for postzon data, without which updates will most likely fail on -- the fk for pc_paf_postzon_100m. I've also not seen an alias changes file. -- If I did have a postzon changes file, it'd most likely be best to import the data directly and use postgis to do our conversion -- as the data is selected out of the data_stage table (rather than using the convert_paf.pl script). We could get those fields with -- something like: -- CASE WHEN substring(northing,1,1) ~ E'[\x4f-\x5a]' THEN '1' || translate(northing, 'POUTZY', '221100') ELSE northing END || '0' AS f_notrhing, -- ST_x(ST_transform(ST_GeomFromText('POINT('||easting||' '||f_northing||')',CASE WHEN postcode LIKE 'BT%' THEN 29903 ELSE 27700 END),4326)) AS longitude, -- ST_y(ST_transform(ST_GeomFromText('POINT('||easting||' '||f_northing||')',CASE WHEN postcode LIKE 'BT%' THEN 29903 ELSE 27700 END),4326)) AS latitude; RAISE NOTICE '%: Fudging postzon', clock_timestamp(); INSERT INTO public.pc_paf_postzon_100m (outward_code, inward_code) SELECT DISTINCT m.outward_code, m.inward_code FROM tmp_mainfile m WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_postzon_100m p WHERE p.outward_code = m.outward_code AND p.inward_code = m.inward_code); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Fudged % mainfile records in postzon',clock_timestamp(), v_processed; INSERT INTO public.pc_paf_postzon_100m (outward_code, inward_code) SELECT DISTINCT mw.outward_code, mw.inward_code FROM tmp_mainfile_welsh mw WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_postzon_100m p WHERE p.outward_code = mw.outward_code AND p.inward_code = mw.inward_code); GET DIAGNOSTICS v_processed = ROW_COUNT; RAISE NOTICE '%: Fudged % welsh mainfile records in postzon',clock_timestamp(), v_processed; RAISE NOTICE '%: Completed', clock_timestamp(); RETURN true; END; $BODY$ LANGUAGE 'plpgsql' VOLATILE; -- Standard master view for postcode tables CREATE OR REPLACE VIEW public.pc_paf_master_view AS SELECT m.building_number, b.building_name, sb.sub_building_name, COALESCE(o.organisation_name, lo.organisation_name) AS organisation_name, COALESCE(o.department_name, lo.department_name) AS department_name, m.po_box_number, t.thoroughfare_name, td.thoroughfare_descriptor, td.approved_abbreviation, l.post_town, l.dependent_locality, l.double_dependent_locality, apc.county_name, m.outward_code, m.inward_code, m.number_of_households, m.postcode_type, m.concatenation_indicator, m.delivery_point_suffix, m.small_user_organisation_indicator, EXISTS(SELECT 1 FROM public.pc_paf_mainfile_welsh mw WHERE mw.address_key = m.address_key AND mw.organisation_key IS NOT DISTINCT FROM m.organisation_key AND mw.postcode_type IS NOT DISTINCT FROM m.postcode_type) AS welsh_alternative_available, p.longitude, p.latitude FROM public.pc_paf_mainfile m LEFT OUTER JOIN public.pc_paf_localities l USING (locality_key) LEFT OUTER JOIN public.pc_paf_thoroughfares t USING (thoroughfare_key) LEFT OUTER JOIN public.pc_paf_thoroughfare_descriptor td USING (thoroughfare_descriptor_key) LEFT OUTER JOIN public.pc_paf_building_names b USING (building_name_key) LEFT OUTER JOIN public.pc_paf_sub_building_names sb USING (sub_building_name_key) LEFT OUTER JOIN public.pc_paf_organisations o USING (organisation_key, postcode_type) LEFT OUTER JOIN public.pc_paf_organisations lo ON m.address_key = lo.organisation_key AND m.postcode_type = lo.postcode_type AND m.postcode_type = 'L' LEFT OUTER JOIN public.pc_paf_postzon_100m p USING (outward_code, inward_code) LEFT OUTER JOIN public.pc_paf_county_alias a ON a.postcode = (rpad(m.outward_code,4) || m.inward_code) LEFT OUTER JOIN public.pc_paf_counties apc ON apc.county_key = a.former_postal_county; -- Welsh master view for postcode tables CREATE OR REPLACE VIEW public.pc_paf_master_welsh_view AS SELECT mw.building_number, b.building_name, sb.sub_building_name, COALESCE(o.organisation_name, lo.organisation_name) AS organisation_name, COALESCE(o.department_name, lo.department_name) AS department_name, mw.po_box_number, t.thoroughfare_name, td.thoroughfare_descriptor, td.approved_abbreviation, l.post_town, l.dependent_locality, l.double_dependent_locality, apc.county_name, mw.outward_code, mw.inward_code, mw.number_of_households, mw.postcode_type, mw.concatenation_indicator, mw.delivery_point_suffix, mw.small_user_organisation_indicator, p.longitude, p.latitude FROM public.pc_paf_mainfile_welsh mw LEFT OUTER JOIN public.pc_paf_localities l USING (locality_key) LEFT OUTER JOIN public.pc_paf_thoroughfares t USING (thoroughfare_key) LEFT OUTER JOIN public.pc_paf_thoroughfare_descriptor td USING (thoroughfare_descriptor_key) LEFT OUTER JOIN public.pc_paf_building_names b USING (building_name_key) LEFT OUTER JOIN public.pc_paf_sub_building_names sb USING (sub_building_name_key) LEFT OUTER JOIN public.pc_paf_organisations o USING (organisation_key, postcode_type) LEFT OUTER JOIN public.pc_paf_organisations lo ON mw.address_key = lo.organisation_key AND mw.postcode_type = lo.postcode_type AND mw.postcode_type = 'L' LEFT OUTER JOIN public.pc_paf_postzon_100m p USING (outward_code, inward_code) LEFT OUTER JOIN public.pc_paf_county_alias a ON a.postcode = (rpad(mw.outward_code,4) || mw.inward_code) LEFT OUTER JOIN public.pc_paf_counties apc ON apc.county_key = a.former_postal_county;