--- /dev/null
+DROP FUNCTION IF EXISTS public.import_pc_paf(varchar, varchar);
+CREATE OR REPLACE FUNCTION public.import_pc_paf(in_edition varchar, in_data varchar)
+RETURNS boolean AS
+$BODY$
+
+DECLARE
+ v_data_root varchar;
+ v_data_main varchar;
+ v_data_postzon varchar;
+ v_data_alias varchar;
+ v_processed integer;
+ v_main_footer varchar;
+ v_std_footer varchar;
+ v_sql varchar;
+ v_table_created boolean;
+BEGIN
+ v_data_root := in_data || '/' || in_edition || '/';
+ v_data_main := v_data_root || 'PAF MAIN FILE/';
+ v_data_postzon := v_data_root || 'POSTZON 100M/';
+ v_data_alias := v_data_root || 'ALIAS/';
+
+ v_main_footer = ' %';
+ v_std_footer = '99999999%';
+
+ RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root;
+ CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP;
+
+ -- 1) Localaties
+ TRUNCATE TABLE data_stage;
+
+ RAISE NOTICE '%: Begin staging localaties', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'local.c01');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%LOCALITY' || in_edition || '%' OR data like v_std_footer;
+
+ RAISE NOTICE '%: Done staging localaties, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_localities') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_localities" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_localities"';
+ CREATE TABLE public.pc_paf_localities (
+ locality_key integer NOT NULL,
+ post_town varchar(30),
+ dependent_locality varchar(35),
+ double_dependent_locality varchar(35)
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_localities;
+ INSERT INTO public.pc_paf_localities
+ SELECT substring(data,1,6)::integer,
+ nullif(trim(substring(data,52,30)),''),
+ nullif(trim(substring(data,82,35)),''),
+ nullif(trim(substring(data,117,35)),'')
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_localities ADD PRIMARY KEY (locality_key);
+ END IF;
+
+ RAISE NOTICE '%: Done importing localities (imported % records)', clock_timestamp(), v_processed;
+
+ -- 2) Thoroughfares
+ TRUNCATE TABLE data_stage;
+
+ RAISE NOTICE '%: Begin staging thoroughfares', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thfare.c01');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%THOROUGH' || in_edition || '%' OR data like v_std_footer;
+
+ RAISE NOTICE '%: Done staging thoroughfares, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfares') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_thoroughfares" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfares"';
+ CREATE TABLE public.pc_paf_thoroughfares (
+ thoroughfare_key integer NOT NULL,
+ thoroughfare_name varchar(60)
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public. pc_paf_thoroughfares;
+ INSERT INTO public. pc_paf_thoroughfares
+ SELECT substring(data,1,8)::integer,
+ nullif(trim(substring(data,9,60)),'')
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_thoroughfares ADD PRIMARY KEY (thoroughfare_key);
+ END IF;
+
+ RAISE NOTICE '%: Done importing thoroughfares (imported % records)', clock_timestamp(), v_processed;
+
+ -- 3) Thoroughfares Descriptor
+ TRUNCATE TABLE data_stage;
+
+ RAISE NOTICE '%: Begin staging thoroughfares descriptor', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thdesc.c01');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%THDESCRI' || in_edition || '%' OR data like v_std_footer;
+
+ RAISE NOTICE '%: Done staging thoroughfares descriptor, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfare_descriptor') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_thoroughfare_descriptor" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfare_descriptor"';
+ CREATE TABLE public.pc_paf_thoroughfare_descriptor (
+ thoroughfare_descriptor_key integer NOT NULL,
+ thoroughfare_descriptor varchar(20),
+ approved_abbreviation varchar(6)
+ );
+ v_table_created := true;
+ end if;
+
+ TRUNCATE TABLE public.pc_paf_thoroughfare_descriptor;
+ INSERT INTO public.pc_paf_thoroughfare_descriptor
+ SELECT substring(data,1,4)::integer,
+ nullif(trim(substring(data,5,20)),''),
+ nullif(trim(substring(data,25,6)),'')
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_thoroughfare_descriptor ADD PRIMARY KEY (thoroughfare_descriptor_key);
+ END IF;
+
+ RAISE NOTICE '%: Done importing thoroughfares descriptor (imported % records)', clock_timestamp(), v_processed;
+
+ -- 4) Building Names
+ TRUNCATE TABLE data_stage;
+
+ RAISE NOTICE '%: Begin staging building names', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'bname.c01');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%BUILDING' || in_edition || '%' OR data like v_std_footer;
+
+ RAISE NOTICE '%: Done staging building names, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_building_names') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_building_names" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_building_names"';
+ CREATE TABLE public.pc_paf_building_names (
+ building_name_key integer NOT NULL,
+ building_name varchar(50)
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_building_names;
+ INSERT INTO public.pc_paf_building_names
+ SELECT substring(data,1,8)::integer,
+ nullif(trim(substring(data,9,50)),'')
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_building_names ADD PRIMARY KEY (building_name_key);
+ END IF;
+
+ RAISE NOTICE '%: Done importing building names (imported % records)', clock_timestamp(), v_processed;
+
+ -- 5) Sub Building Names file (subbname.c01)
+ TRUNCATE TABLE data_stage;
+
+ RAISE NOTICE '%: Begin staging sub building names', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'subbname.c01');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%SUBBUILD' || in_edition || '%' OR data like v_std_footer;
+
+ RAISE NOTICE '%: Done staging sub building names, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_sub_building_names') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_sub_building_names" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_sub_building_names"';
+ CREATE TABLE public.pc_paf_sub_building_names (
+ sub_building_name_key integer NOT NULL,
+ sub_building_name varchar(50)
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_sub_building_names;
+ INSERT INTO public.pc_paf_sub_building_names
+ SELECT substring(data,1,8)::integer,
+ nullif(trim(substring(data,9,30)),'')
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_sub_building_names ADD PRIMARY KEY (sub_building_name_key);
+ END IF;
+
+ RAISE NOTICE '%: Done importing sub building names (imported % records)', clock_timestamp(), v_processed;
+
+ -- 6) Organisations
+ TRUNCATE TABLE data_stage;
+
+ RAISE NOTICE '%: Begin staging organisations', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'org.c01');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%ORGANISA' || in_edition || '%' OR data like v_std_footer;
+
+ RAISE NOTICE '%: Done staging organisations, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_organisations') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_organisations" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_organisations"';
+ CREATE TABLE public.pc_paf_organisations (
+ organisation_key integer NOT NULL,
+ postcode_type varchar(1) NOT NULL,
+ organisation_name varchar(60),
+ department_name varchar(60)
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_organisations;
+ INSERT INTO public.pc_paf_organisations
+ SELECT substring(data,1,8)::integer,
+ nullif(trim(substring(data,9,1)),''),
+ nullif(trim(substring(data,10,60)),''),
+ nullif(trim(substring(data,70,60)),'')
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_organisations ADD PRIMARY KEY (organisation_key, postcode_type);
+ COMMENT ON COLUMN public.pc_paf_organisations.organisation_key IS 'When postcode type is L organisation_key relates to address_key';
+ END IF;
+
+ RAISE NOTICE '%: Done importing organisations (imported % records)', clock_timestamp(), v_processed;
+
+ -- 7) Postzon with latlon generated using postgis
+ TRUNCATE TABLE data_stage;
+
+ RAISE NOTICE '%: Begin staging postzone', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_postzon || 'pzone100.c01');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%PZONE100' || in_edition || '%' OR data like v_main_footer;
+
+ RAISE NOTICE '%: Done staging postzon, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_postzon_100m') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_postzon_100m" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_postzon_100m"';
+ CREATE TABLE public.pc_paf_postzon_100m (
+ postzon_100m_key serial NOT NULL,
+ outward_code varchar(4) NOT NULL,
+ inward_code varchar(3) NOT NULL,
+ introduction_date date,
+ grid_reference_east integer,
+ grid_reference_north integer,
+ country_code varchar(9),
+ area_code_county varchar(9),
+ area_code_district varchar(9),
+ ward_code varchar(9),
+ nhs_region varchar(9),
+ nhs_code varchar(9),
+ user_type smallint,
+ grid_status smallint,
+ latitude double precision,
+ longitude double precision
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_postzon_100m;
+ INSERT INTO public.pc_paf_postzon_100m(outward_code, inward_code, introduction_date, grid_reference_east, grid_reference_north,
+ country_code, area_code_county, area_code_district, ward_code, nhs_region, nhs_code, user_type, grid_status, latitude, longitude)
+ SELECT nullif(trim(substring(data,1,4)),'') AS outward_code,
+ nullif(trim(substring(data,5,3)),'') AS inward_code,
+ to_date(substring(data,8,6), 'YYYYMM') AS introduction_date,
+ (nullif(trim(substring(data,14,5)),'') || '0')::integer AS grid_reference_east,
+ (CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ '[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')::integer AS grid_reference_north,
+ nullif(trim(substring(data,24,9)),'') AS country_code,
+ nullif(trim(substring(data,33,9)),'') AS area_code_county,
+ nullif(trim(substring(data,42,9)),'') AS area_code_district,
+ nullif(trim(substring(data,51,9)),'') AS ward_code,
+ nullif(trim(substring(data,60,9)),'') AS nhs_region,
+ nullif(trim(substring(data,69,9)),'') AS nhs_code,
+ nullif(trim(substring(data,78,1)),'')::smallint AS user_type,
+ nullif(trim(substring(data,79,1)),'')::smallint AS grid_status,
+ ST_y(ST_transform(ST_GeomFromText('POINT('||(nullif(trim(substring(data,14,5)),'') || '0')||' '||(CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ '[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')||')',CASE WHEN nullif(trim(substring(data,1,4)),'') LIKE 'BT%' THEN 29903 ELSE 27700 END),4326))::numeric(8,6) AS latitude,
+ ST_x(ST_transform(ST_GeomFromText('POINT('||(nullif(trim(substring(data,14,5)),'') || '0')||' '||(CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ '[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')||')',CASE WHEN nullif(trim(substring(data,1,4)),'') LIKE 'BT%' THEN 29903 ELSE 27700 END),4326))::numeric(8,6) AS longitude
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_postzon_100m ADD PRIMARY KEY (postzon_100m_key),
+ ADD CONSTRAINT pc_paf_postzon_100m_unique UNIQUE (outward_code, inward_code);
+ CREATE INDEX pc_paf_postzon_100m_longitude ON public.pc_paf_postzon_100m (longitude);
+ CREATE INDEX pc_paf_postzon_100m_latitude ON public.pc_paf_postzon_100m (latitude);
+
+ COMMENT ON TABLE public.pc_paf_postzon_100m IS 'Geographical data from the Royal Mail with accuracy of 100m';
+ COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_east IS 'BT postcodes reflect the Irish Grid public (different origin and scale); subtract 17000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion';
+ COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_north IS 'BT postcodes reflect the Irish Grid public (different origin and scale); add 13000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion';
+ COMMENT ON COLUMN public.pc_paf_postzon_100m.user_type IS '0 Small User 1 Large User';
+ COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_status IS '0 Status not supplied by OS \n
+ 1 Within the building of the matched address closest to the Postcode mean. \n
+ 2 Co-ordinates allocated by GROS during Postcode boundary creation to the building nearest the centre of the populated part of the Postcode (Scotland only) \n
+ 3 Approximate to within 50m of true position \n
+ 4 Postcode unit mean (direct copy from ADDRESS-POINT (GB) and COMPAS (NI) - mean of matched addresses with the same Postcode) \n
+ 5 Postcode imputed by ONS to 1 metre resolution \n
+ 6 Postcode sector mean - mainly PO Boxes \n
+ 9 No co-ordinates available';
+ END IF;
+
+ RAISE NOTICE '%: Done importing postzon (imported % records)', clock_timestamp(), v_processed;
+
+ -- 8) Mainfile
+ TRUNCATE TABLE data_stage;
+ RAISE NOTICE '%: Begin staging mainfile', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c02');
+ EXECUTE v_sql;
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c03');
+ EXECUTE v_sql;
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c04');
+ EXECUTE v_sql;
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c05');
+ EXECUTE v_sql;
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c06');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%ADDRESS ' || in_edition || '%' OR data like v_main_footer;
+
+ RAISE NOTICE '%: Done staging mainfile, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_mainfile" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_mainfile"';
+ CREATE TABLE public.pc_paf_mainfile (
+ paf_record_key serial NOT NULL,
+ outward_code varchar(4) NOT NULL,
+ inward_code varchar(3) NOT NULL,
+ address_key integer NOT NULL,
+ locality_key integer,
+ thoroughfare_key integer,
+ thoroughfare_descriptor_key integer,
+ dependent_thoroughfare_key integer,
+ dependent_thoroughfare_descriptor_key integer,
+ building_number integer,
+ building_name_key integer,
+ sub_building_name_key integer,
+ number_of_households integer,
+ organisation_key integer,
+ postcode_type varchar(1),
+ concatenation_indicator varchar(1),
+ delivery_point_suffix varchar(2),
+ small_user_organisation_indicator varchar(1),
+ po_box_number varchar(6)
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_mainfile RESTART IDENTITY;
+ INSERT INTO public.pc_paf_mainfile(outward_code,inward_code,address_key,locality_key,thoroughfare_key,
+ thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number,
+ building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator,
+ delivery_point_suffix,small_user_organisation_indicator,po_box_number)
+ SELECT nullif(trim(substring(data,1,4)),''),
+ nullif(trim(substring(data,5,3)),''),
+ nullif(substring(data,8,8)::integer,0),
+ nullif(substring(data,16,6)::integer,0),
+ nullif(substring(data,22,8)::integer,0),
+ nullif(substring(data,30,4)::integer,0),
+ nullif(substring(data,34,8)::integer,0),
+ nullif(substring(data,42,4)::integer,0),
+ nullif(substring(data,46,4)::integer,0),
+ nullif(substring(data,50,8)::integer,0),
+ nullif(substring(data,58,8)::integer,0),
+ nullif(substring(data,66,4)::integer,0),
+ nullif(substring(data,70,8)::integer,0),
+ nullif(trim(substring(data,78,1)),''),
+ nullif(trim(substring(data,79,1)),''),
+ nullif(trim(substring(data,80,2)),''),
+ nullif(trim(substring(data,82,1)),''),
+ nullif(trim(substring(data,83,6)),'')
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ -- Not all of the indexes below are required, however without these the update_pc_paf function will
+ -- take a while to commit as the deferred foreign key constraints are checked
+ CREATE INDEX pc_paf_mainfile_postcode ON public.pc_paf_mainfile USING btree (outward_code, inward_code);
+ CREATE INDEX pc_paf_mainfile_building_name_key ON public.pc_paf_mainfile USING BTREE (building_name_key);
+ CREATE INDEX pc_paf_mainfile_locality_key ON public.pc_paf_mainfile USING BTREE (locality_key);
+ CREATE INDEX pc_paf_mainfile_organisation_key ON public.pc_paf_mainfile USING BTREE (organisation_key, postcode_type);
+ CREATE INDEX pc_paf_mainfile_sub_building_name_key ON public.pc_paf_mainfile USING BTREE (sub_building_name_key);
+ CREATE INDEX pc_paf_mainfile_thoroughfare_key ON public.pc_paf_mainfile USING BTREE (thoroughfare_key);
+
+ ALTER TABLE public.pc_paf_mainfile
+ ADD PRIMARY KEY (paf_record_key),
+ ADD CONSTRAINT pc_paf_mainfile_unique UNIQUE (address_key, organisation_key, postcode_type),
+ ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE;
+ COMMENT ON COLUMN public.pc_paf_mainfile.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!';
+ END IF;
+
+ RAISE NOTICE '%: Done importing mainfile (imported % records)', clock_timestamp(), v_processed;
+
+ -- 9) Welsh Mainfile
+ TRUNCATE TABLE data_stage;
+ RAISE NOTICE '%: Begin staging welsh alternative mainfile', clock_timestamp();
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'wfmainfl.c06');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%ADDRESS ' || in_edition || '%' OR data like v_main_footer;
+
+ RAISE NOTICE '%: Done staging welsh alternative mainfile, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile_welsh') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_mainfile_welsh" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_mainfile_welsh"';
+ CREATE TABLE public.pc_paf_mainfile_welsh (
+ paf_record_key serial NOT NULL,
+ outward_code varchar(4) NOT NULL,
+ inward_code varchar(3) NOT NULL,
+ address_key integer NOT NULL,
+ locality_key integer,
+ thoroughfare_key integer,
+ thoroughfare_descriptor_key integer,
+ dependent_thoroughfare_key integer,
+ dependent_thoroughfare_descriptor_key integer,
+ building_number integer,
+ building_name_key integer,
+ sub_building_name_key integer,
+ number_of_households integer,
+ organisation_key integer,
+ postcode_type varchar(1),
+ concatenation_indicator varchar(1),
+ delivery_point_suffix varchar(2),
+ small_user_organisation_indicator varchar(1),
+ po_box_number varchar(6)
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_mainfile_welsh RESTART IDENTITY;
+ INSERT INTO public.pc_paf_mainfile_welsh(outward_code,inward_code,address_key,locality_key,thoroughfare_key,
+ thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number,
+ building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator,
+ delivery_point_suffix,small_user_organisation_indicator,po_box_number)
+ SELECT nullif(trim(substring(data,1,4)),''),
+ nullif(trim(substring(data,5,3)),''),
+ nullif(substring(data,8,8)::integer,0),
+ nullif(substring(data,16,6)::integer,0),
+ nullif(substring(data,22,8)::integer,0),
+ nullif(substring(data,30,4)::integer,0),
+ nullif(substring(data,34,8)::integer,0),
+ nullif(substring(data,42,4)::integer,0),
+ nullif(substring(data,46,4)::integer,0),
+ nullif(substring(data,50,8)::integer,0),
+ nullif(substring(data,58,8)::integer,0),
+ nullif(substring(data,66,4)::integer,0),
+ nullif(substring(data,70,8)::integer,0),
+ nullif(trim(substring(data,78,1)),''),
+ nullif(trim(substring(data,79,1)),''),
+ nullif(trim(substring(data,80,2)),''),
+ nullif(trim(substring(data,82,1)),''),
+ nullif(trim(substring(data,83,6)),'')
+ FROM data_stage;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ -- As above, these are required for faster commit on update_pc_paf
+ CREATE INDEX pc_paf_mainfile_welsh_postcode ON public.pc_paf_mainfile_welsh USING btree (outward_code, inward_code);
+ CREATE INDEX pc_paf_mainfile_welsh_building_name_key ON public.pc_paf_mainfile_welsh USING BTREE (building_name_key);
+ CREATE INDEX pc_paf_mainfile_welsh_locality_key ON public.pc_paf_mainfile_welsh USING BTREE (locality_key);
+ CREATE INDEX pc_paf_mainfile_welsh_organisation_key ON public.pc_paf_mainfile_welsh USING BTREE (organisation_key, postcode_type);
+ CREATE INDEX pc_paf_mainfile_welsh_sub_building_name_key ON public.pc_paf_mainfile_welsh USING BTREE (sub_building_name_key);
+ CREATE INDEX pc_paf_mainfile_welsh_thoroughfare_key ON public.pc_paf_mainfile_welsh USING BTREE (thoroughfare_key);
+
+ ALTER TABLE public.pc_paf_mainfile_welsh
+ ADD PRIMARY KEY (paf_record_key),
+ ADD CONSTRAINT pc_paf_mainfile_welsh_unique UNIQUE (address_key, organisation_key, postcode_type),
+ ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE;
+ COMMENT ON COLUMN public.pc_paf_mainfile_welsh.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!';
+ END IF;
+
+ RAISE NOTICE '%: Done importing welsh alternative mainfile (imported % records)', clock_timestamp(), v_processed;
+
+ -- 10) Alias file
+ TRUNCATE TABLE data_stage;
+ RAISE NOTICE '%: Begin staging alias file', clock_timestamp();
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_alias || 'aliasfle.c01');
+ EXECUTE v_sql;
+
+ DELETE FROM data_stage WHERE data like '%ALIASFLE' || in_edition || '%' OR data like v_std_footer;
+
+ RAISE NOTICE '%: Done staging alias file, importing', clock_timestamp();
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_counties') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_counties" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_counties"';
+ CREATE TABLE public.pc_paf_counties (
+ county_key integer NOT NULL,
+ county_name varchar(30),
+ county_type varchar(1)
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_counties;
+ INSERT INTO public.pc_paf_counties
+ SELECT substring(data,2,4)::integer AS county_key,
+ trim(substring(data,6,30)) AS county_name,
+ trim(substring(data,36,1)) AS county_type
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 4;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_counties ADD PRIMARY KEY (county_key),
+ ADD CONSTRAINT pc_paf_counties_unique UNIQUE (county_name, county_type);
+ COMMENT ON COLUMN public.pc_paf_counties.county_type IS 'T (Traditional County), P (Former Postal County) or A (Administrative County)';
+ END IF;
+
+ RAISE NOTICE '%: Done importing counties (imported % records)', clock_timestamp(), v_processed;
+
+ IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_county_alias') THEN
+ RAISE NOTICE 'Table "public"."pc_paf_county_alias" already exists';
+ v_table_created := false;
+ ELSE
+ RAISE NOTICE 'Creating table "public"."pc_paf_county_alias"';
+ CREATE TABLE public.pc_paf_county_alias (
+ county_alias_key serial NOT NULL,
+ postcode varchar(7) NOT NULL,
+ former_postal_county integer,
+ traditional_county integer,
+ administrative_county integer
+ );
+ v_table_created := true;
+ END IF;
+
+ TRUNCATE TABLE public.pc_paf_county_alias;
+ INSERT INTO public.pc_paf_county_alias (postcode, former_postal_county, traditional_county, administrative_county)
+ SELECT trim(substring(data,2,7)) AS postcode,
+ nullif(substring(data,9,4)::integer,0) AS former_postal_county,
+ nullif(substring(data,13,4)::integer,0) AS traditional_county,
+ nullif(substring(data,17,4)::integer,0) AS administrative_county
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 5;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+
+ IF (v_table_created) THEN
+ ALTER TABLE public.pc_paf_county_alias ADD PRIMARY KEY (county_alias_key),
+ ADD CONSTRAINT pc_paf_county_alias_unique UNIQUE (postcode),
+ ADD FOREIGN KEY (former_postal_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (traditional_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE,
+ ADD FOREIGN KEY (administrative_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE;
+ END IF;
+
+ RAISE NOTICE '%: Done importing county_alias (imported % records)', clock_timestamp(), v_processed;
+
+ RAISE NOTICE '%: Completed', clock_timestamp();
+ RETURN true;
+END;
+
+$BODY$
+LANGUAGE 'plpgsql' VOLATILE;
+
+--
+
+DROP FUNCTION IF EXISTS public.update_pc_paf(varchar, varchar);
+CREATE OR REPLACE FUNCTION public.update_pc_paf(in_edition varchar, in_data varchar)
+RETURNS boolean AS
+$BODY$
+
+DECLARE
+ v_data_root varchar;
+ v_data_update varchar;
+ v_data_postzon varchar;
+ v_data_alias varchar;
+ v_processed integer;
+ v_main_footer varchar;
+ v_std_footer varchar;
+ v_sql varchar;
+ v_table_created boolean;
+BEGIN
+ v_data_root := in_data || '/' || in_edition || '_CHANGES/';
+ v_data_update := v_data_root || 'CONSOLIDATED CHANGES/';
+ v_data_postzon := v_data_root || 'POSTZON 100M/';
+ v_data_alias := v_data_root || 'ALIAS/';
+
+ v_main_footer = ' %';
+
+ RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root;
+ CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP;
+
+ --changes1.c01 - Changes1 (Changes to satelite tables except Organisations) - A single changes file
+ RAISE NOTICE '%: Begin staging Changes1 file', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'changes1.c01');
+ EXECUTE v_sql;
+ DELETE FROM data_stage WHERE data like '%CHANGES1' || in_edition || '%' OR data like v_main_footer;
+
+ RAISE NOTICE '%: Done staging Changes1 file',clock_timestamp();
+
+ SET CONSTRAINTS ALL DEFERRED;
+
+ --Record Type 1 - Localities
+ RAISE NOTICE '%: Preparing to update localities', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_localities ON COMMIT DROP AS
+ SELECT substring(data,2,8)::integer AS locality_key,
+ to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,24,1)),'') AS amendment_type,
+ nullif(trim(substring(data,70,30)),'') AS post_town,
+ nullif(trim(substring(data,100,35)),'') AS dependent_locality,
+ nullif(trim(substring(data,135,35)),'') AS double_dependent_locality
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 1
+ AND nullif(trim(substring(data,24,1)),'') IS NOT NULL
+ ORDER BY to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on localities',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_localities l
+ USING tmp_localities lt WHERE lt.locality_key = l.locality_key
+ AND lt.amendment_type IN ('D', 'B')
+ AND l.* = (lt.locality_key, lt.post_town::character varying(30), lt.dependent_locality::character varying(30), lt.double_dependent_locality::character varying(30));
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from localities',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_localities
+ SELECT lt.locality_key, lt.post_town, lt.dependent_locality, lt.double_dependent_locality
+ FROM tmp_localities lt
+ WHERE lt.amendment_type IN ('I', 'C');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in localities',clock_timestamp(), v_processed;
+
+ --Record Type 2 - Thoroughfares
+ RAISE NOTICE '%: Preparing to update thoroughfares', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_thoroughfares ON COMMIT DROP AS
+ SELECT substring(data,2,8)::integer AS thoroughfare_key,
+ to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,24,1)),'') AS amendment_type,
+ nullif(trim(substring(data,25,60)),'') AS thoroughfare_name
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 2 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on thoroughfares',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_thoroughfares t
+ USING tmp_thoroughfares tt WHERE tt.thoroughfare_key = t.thoroughfare_key
+ AND tt.amendment_type IN ('D', 'B')
+ AND t.* = (tt.thoroughfare_key, t.thoroughfare_name::character varying(60));
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from thoroughfares',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_thoroughfares
+ SELECT tt.thoroughfare_key, tt.thoroughfare_name
+ FROM tmp_thoroughfares tt
+ WHERE tt.amendment_type IN ('I', 'C');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in thoroughfares',clock_timestamp(), v_processed;
+
+ --Record Type 3 - Thoroughfare Descriptors
+ RAISE NOTICE '%: Preparing to update thoroughfare_descriptor', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_thoroughfare_descriptor ON COMMIT DROP AS
+ SELECT substring(data,2,8)::integer AS thoroughfare_descriptor_key ,
+ to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,24,1)),'') AS amendment_type,
+ nullif(trim(substring(data,25,20)),'') AS thoroughfare_descriptor,
+ nullif(trim(substring(data,45,6)),'') AS approved_abbreviation
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 3 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on thoroughfare_descriptor',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_thoroughfare_descriptor td
+ USING tmp_thoroughfare_descriptor tdt WHERE tdt.thoroughfare_descriptor_key = td.thoroughfare_descriptor_key
+ AND tdt.amendment_type IN ('D', 'B')
+ AND td.* = (tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor::character varying(20), tdt.approved_abbreviation::character varying(6));
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from thoroughfare_descriptor',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_thoroughfare_descriptor
+ SELECT tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor, tdt.approved_abbreviation
+ FROM tmp_thoroughfare_descriptor tdt
+ WHERE tdt.amendment_type IN ('I', 'C');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in thoroughfare_descriptor',clock_timestamp(), v_processed;
+
+ --Record Type 4 - Building Names
+ RAISE NOTICE '%: Preparing to update building_names', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_building_names ON COMMIT DROP AS
+ SELECT substring(data,2,8)::integer AS building_name_key,
+ to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,24,1)),'') AS amendment_type,
+ nullif(trim(substring(data,25,50)),'') AS building_name
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 4 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on building_names',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_building_names bn
+ USING tmp_building_names tbn WHERE bn.building_name_key = tbn.building_name_key
+ AND tbn.amendment_type IN ('D', 'B')
+ AND bn.* = (tbn.building_name_key, tbn.building_name::character varying(50));
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from building_names',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_building_names
+ SELECT tbn.building_name_key, tbn.building_name
+ FROM tmp_building_names tbn
+ WHERE tbn.amendment_type IN ('I', 'C');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in building_names',clock_timestamp(), v_processed;
+
+ --Record Type 5 - Sub Building Names
+ RAISE NOTICE '%: Preparing to update sub_building_names', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_sub_building_names ON COMMIT DROP AS
+ SELECT substring(data,2,8)::integer AS sub_building_name_key,
+ to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,24,1)),'') AS amendment_type,
+ nullif(trim(substring(data,25,30)),'') AS sub_building_name
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 5 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on sub_building_names',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_sub_building_names sbn
+ USING tmp_sub_building_names tsbn WHERE sbn.sub_building_name_key = tsbn.sub_building_name_key
+ AND tsbn.amendment_type IN ('D', 'B')
+ AND sbn.* = (tsbn.sub_building_name_key, tsbn.sub_building_name::character varying(50));
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from sub_building_names',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_sub_building_names
+ SELECT tsbn.sub_building_name_key, tsbn.sub_building_name
+ FROM tmp_sub_building_names tsbn
+ WHERE tsbn.amendment_type IN ('I', 'C');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in sub_building_names',clock_timestamp(), v_processed;
+
+ TRUNCATE TABLE data_stage;
+
+ -- fpchgsng.c01 - Changes2 (Changes to Mainfile and Organisations) -- fpchgsng.c01 = Single changes file, fpchngs2.c01 = Timeline changes file
+ RAISE NOTICE '%: Begin staging Changes2 file', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'fpchgsng.c01');
+ EXECUTE v_sql;
+ DELETE FROM data_stage WHERE data like '%CHANGES2' || in_edition || '%' OR data like v_main_footer;
+
+ RAISE NOTICE '%: Done staging Changes2 file',clock_timestamp();
+
+ -- Mainfile
+ RAISE NOTICE '%: Preparing to update mainfile', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_mainfile ON COMMIT DROP AS
+ SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,15,4)),'') AS outward_code,
+ nullif(trim(substring(data,19,3)),'') AS inward_code,
+ nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
+ nullif(trim(substring(data,24,1)),'') AS postcode_type,
+ nullif(substring(data,25,8)::integer,0) AS address_key,
+ nullif(substring(data,33,8)::integer,0) AS organisation_key,
+ nullif(trim(substring(data,41,1)),'') AS amendment_type,
+ nullif(substring(data,42,1)::integer,0) AS record_type,
+ nullif(substring(data,43,8)::integer,0) AS locality_key,
+ nullif(substring(data,51,8)::integer,0) AS thoroughfare_key,
+ nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key,
+ nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key,
+ nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key,
+ nullif(substring(data,83,4)::integer,0) AS building_number,
+ nullif(substring(data,87,8)::integer,0) AS building_name_key,
+ nullif(substring(data,95,8)::integer,0) AS sub_building_name_key,
+ nullif(substring(data,103,4)::integer,0) AS number_of_households,
+ nullif(trim(substring(data,107,1)),'') AS concatenation_indicator,
+ nullif(trim(substring(data,108,6)),'') AS po_box_number,
+ nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator,
+ CASE nullif(substring(data,115,2)::integer,0)
+ WHEN 1 THEN 'New'
+ WHEN 2 THEN 'Correction'
+ WHEN 3 THEN ''
+ WHEN 4 THEN 'Coding Revision'
+ WHEN 5 THEN 'Organisation Change'
+ WHEN 6 THEN 'Status Change'
+ WHEN 7 THEN 'Large User Deleted'
+ WHEN 8 THEN 'Building/Sub Building Change'
+ WHEN 9 THEN 'Large User Change'
+ END AS reason_for_amendment,
+ nullif(trim(substring(data,117,4)),'') AS new_outward_code,
+ nullif(trim(substring(data,121,3)),'') AS new_inward_code
+ FROM data_stage
+ WHERE nullif(substring(data,42,1)::integer,0) = 1;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on mainfile',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_mainfile m
+ USING tmp_mainfile tm WHERE tm.address_key = m.address_key AND tm.organisation_key IS NOT DISTINCT FROM m.organisation_key AND tm.postcode_type = m.postcode_type
+ AND tm.amendment_type IN ('D', 'B');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from mainfile',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_mainfile (outward_code, inward_code, address_key, locality_key, thoroughfare_key,
+ thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key,
+ building_number, building_name_key, sub_building_name_key, number_of_households,
+ organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix,
+ small_user_organisation_indicator, po_box_number)
+ SELECT COALESCE(tm.new_outward_code, tm.outward_code), COALESCE(tm.new_inward_code, tm.inward_code), tm.address_key, tm.locality_key, tm.thoroughfare_key,
+ tm.thoroughfare_descriptor_key, tm.dependent_thoroughfare_key, tm.dependent_thoroughfare_descriptor_key,
+ tm.building_number, tm.building_name_key, tm.sub_building_name_key, tm.number_of_households,
+ tm.organisation_key, tm.postcode_type, tm.concatenation_indicator, tm.delivery_point_suffix,
+ tm.small_user_organisation_indicator, tm.po_box_number
+ FROM (
+ SELECT cume_dist() OVER w, *
+ FROM tmp_mainfile
+ WINDOW w AS (PARTITION BY address_key, organisation_key, postcode_type ORDER BY timestamp)
+ ) tm WHERE tm.cume_dist = 1 AND tm.amendment_type IN ('I', 'C');
+
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in mainfile',clock_timestamp(), v_processed;
+
+ -- Organisations
+ RAISE NOTICE '%: Preparing to update organisations', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_organisations ON COMMIT DROP AS
+ SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,15,4)),'') AS outward_code,
+ nullif(trim(substring(data,19,3)),'') AS inward_code,
+ nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
+ nullif(trim(substring(data,24,1)),'') AS postcode_type,
+ nullif(substring(data,25,8)::integer,0) AS address_key,
+ nullif(substring(data,33,8)::integer,0) AS organisation_key,
+ nullif(trim(substring(data,41,1)),'') AS amendment_type,
+ nullif(substring(data,42,1)::integer,0) AS record_type,
+ nullif(trim(substring(data,43,60)),'') AS organisation_name,
+ nullif(trim(substring(data,103,60)),'') AS department_name,
+ nullif(trim(substring(data,163,6)),'') AS po_box_number
+ FROM data_stage
+ WHERE nullif(substring(data,42,1)::integer,0) IN (2,3);
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on organisations',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_organisations o
+ USING tmp_organisations tx WHERE COALESCE(tx.organisation_key, tx.address_key) = o.organisation_key AND tx.postcode_type = o.postcode_type
+ AND tx.amendment_type IN ('D', 'B');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from organisations',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_organisations
+ SELECT COALESCE(tx.organisation_key, tx.address_key), tx.postcode_type, tx.organisation_name, tx.department_name
+ FROM (
+ SELECT cume_dist() OVER w, *
+ FROM tmp_organisations
+ WINDOW w AS (PARTITION BY COALESCE(organisation_key, address_key), postcode_type ORDER BY timestamp)
+ ) tx WHERE tx.cume_dist = 1 AND tx.amendment_type IN ('I', 'C');
+
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in organisations',clock_timestamp(), v_processed;
+
+ -- wchanges.c01 -- Welsh changes (In same format as regular changes2 file (hence has timelined data, not single changes)
+ TRUNCATE TABLE data_stage;
+
+ RAISE NOTICE '%: Begin staging WChanges file', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'wchanges.c01');
+ EXECUTE v_sql;
+ DELETE FROM data_stage WHERE data like '%WCHANGES' || in_edition || '%' OR data like v_main_footer;
+
+ RAISE NOTICE '%: Done staging WChanges2 file',clock_timestamp();
+
+ RAISE NOTICE '%: Preparing to update welsh mainfile', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_mainfile_welsh ON COMMIT DROP AS
+ SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,15,4)),'') AS outward_code,
+ nullif(trim(substring(data,19,3)),'') AS inward_code,
+ nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
+ nullif(trim(substring(data,24,1)),'') AS postcode_type,
+ nullif(substring(data,25,8)::integer,0) AS address_key,
+ nullif(substring(data,33,8)::integer,0) AS organisation_key,
+ nullif(trim(substring(data,41,1)),'') AS amendment_type,
+ nullif(substring(data,42,1)::integer,0) AS record_type,
+ nullif(substring(data,43,8)::integer,0) AS locality_key,
+ nullif(substring(data,51,8)::integer,0) AS thoroughfare_key,
+ nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key,
+ nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key,
+ nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key,
+ nullif(substring(data,83,4)::integer,0) AS building_number,
+ nullif(substring(data,87,8)::integer,0) AS building_name_key,
+ nullif(substring(data,95,8)::integer,0) AS sub_building_name_key,
+ nullif(substring(data,103,4)::integer,0) AS number_of_households,
+ nullif(trim(substring(data,107,1)),'') AS concatenation_indicator,
+ nullif(trim(substring(data,108,6)),'') AS po_box_number,
+ nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator,
+ CASE nullif(substring(data,115,2)::integer,0)
+ WHEN 1 THEN 'New'
+ WHEN 2 THEN 'Correction'
+ WHEN 3 THEN ''
+ WHEN 4 THEN 'Coding Revision'
+ WHEN 5 THEN 'Organisation Change'
+ WHEN 6 THEN 'Status Change'
+ WHEN 7 THEN 'Large User Deleted'
+ WHEN 8 THEN 'Building/Sub Building Change'
+ WHEN 9 THEN 'Large User Change'
+ END AS reason_for_amendment,
+ nullif(trim(substring(data,117,4)),'') AS new_outward_code,
+ nullif(trim(substring(data,121,3)),'') AS new_inward_code
+ FROM data_stage
+ WHERE nullif(substring(data,42,1)::integer,0) = 1;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on welsh mainfile',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_mainfile_welsh mw
+ USING tmp_mainfile_welsh twm WHERE twm.address_key = mw.address_key AND twm.organisation_key IS NOT DISTINCT FROM mw.organisation_key AND twm.postcode_type = mw.postcode_type
+ AND twm.amendment_type IN ('D', 'B');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from welsh mainfile',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_mainfile_welsh (outward_code, inward_code, address_key, locality_key, thoroughfare_key,
+ thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key,
+ building_number, building_name_key, sub_building_name_key, number_of_households,
+ organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix,
+ small_user_organisation_indicator, po_box_number)
+ WITH twm AS (
+ SELECT cume_dist() OVER w, *
+ FROM tmp_mainfile_welsh
+ WINDOW w AS (PARTITION BY address_key, organisation_key, postcode_type ORDER BY timestamp)
+ )
+ SELECT COALESCE(twm.new_outward_code, twm.outward_code), COALESCE(twm.new_inward_code, twm.inward_code), twm.address_key, twm.locality_key, twm.thoroughfare_key,
+ twm.thoroughfare_descriptor_key, twm.dependent_thoroughfare_key, twm.dependent_thoroughfare_descriptor_key,
+ twm.building_number, twm.building_name_key, twm.sub_building_name_key, twm.number_of_households,
+ twm.organisation_key, twm.postcode_type, twm.concatenation_indicator, twm.delivery_point_suffix,
+ twm.small_user_organisation_indicator, twm.po_box_number
+ FROM twm
+ WHERE twm.cume_dist = 1 AND twm.amendment_type IN ('I', 'C');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in welsh mainfile',clock_timestamp(), v_processed;
+
+ -- Organisations
+ RAISE NOTICE '%: Preparing to update welsh organisations', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_organisations_welsh ON COMMIT DROP AS
+ SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp,
+ nullif(trim(substring(data,15,4)),'') AS outward_code,
+ nullif(trim(substring(data,19,3)),'') AS inward_code,
+ nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix,
+ nullif(trim(substring(data,24,1)),'') AS postcode_type,
+ nullif(substring(data,25,8)::integer,0) AS address_key,
+ nullif(substring(data,33,8)::integer,0) AS organisation_key,
+ nullif(trim(substring(data,41,1)),'') AS amendment_type,
+ nullif(substring(data,42,1)::integer,0) AS record_type,
+ nullif(trim(substring(data,43,60)),'') AS organisation_name,
+ nullif(trim(substring(data,103,60)),'') AS department_name,
+ nullif(trim(substring(data,163,6)),'') AS po_box_number
+ FROM data_stage
+ WHERE nullif(substring(data,42,1)::integer,0) IN (2,3);
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Prepared % records for update on welsh organisations',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_organisations o
+ USING tmp_organisations_welsh txw WHERE COALESCE(txw.organisation_key, txw.address_key) = o.organisation_key AND txw.postcode_type = o.postcode_type
+ AND txw.amendment_type IN ('D', 'B');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Removed % records from welsh organisations',clock_timestamp(), v_processed;
+
+ INSERT INTO public.pc_paf_organisations
+ WITH txw AS (
+ SELECT cume_dist() OVER w, *
+ FROM tmp_organisations_welsh
+ WINDOW w AS (PARTITION BY COALESCE(organisation_key, address_key), postcode_type ORDER BY timestamp)
+ )
+ SELECT COALESCE(txw.organisation_key, txw.address_key), txw.postcode_type, txw.organisation_name, txw.department_name
+ FROM txw
+ WHERE txw.cume_dist = 1 AND amendment_type IN ('I', 'C');
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Created % records in welsh organisations',clock_timestamp(), v_processed;
+
+ -- It would appear postzon is supplied as a full refresh rather than an update file
+ TRUNCATE TABLE data_stage;
+ RAISE NOTICE '%: Begin staging postzone', clock_timestamp();
+
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_postzon || 'pzone100.c01');
+ EXECUTE v_sql;
+ DELETE FROM data_stage WHERE data like '%PZONE100' || in_edition || '%' OR data like v_main_footer;
+
+ RAISE NOTICE '%: Done staging postzon file, now staging with lat long data', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_postzon_100m ON COMMIT DROP AS
+ SELECT nullif(trim(substring(data,1,4)),'') AS outward_code,
+ nullif(trim(substring(data,5,3)),'') AS inward_code,
+ to_date(substring(data,8,6), 'YYYYMM') AS introduction_date,
+ (nullif(trim(substring(data,14,5)),'') || '0')::integer AS grid_reference_east,
+ (CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ '[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')::integer AS grid_reference_north,
+ nullif(trim(substring(data,24,9)),'') AS country_code,
+ nullif(trim(substring(data,33,9)),'') AS area_code_county,
+ nullif(trim(substring(data,42,9)),'') AS area_code_district,
+ nullif(trim(substring(data,51,9)),'') AS ward_code,
+ nullif(trim(substring(data,60,9)),'') AS nhs_region,
+ nullif(trim(substring(data,69,9)),'') AS nhs_code,
+ nullif(trim(substring(data,78,1)),'')::smallint AS user_type,
+ nullif(trim(substring(data,79,1)),'')::smallint AS grid_status,
+ ST_y(ST_transform(ST_GeomFromText('POINT('||(nullif(trim(substring(data,14,5)),'') || '0')||' '||(CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ '[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')||')',CASE WHEN nullif(trim(substring(data,1,4)),'') LIKE 'BT%' THEN 29903 ELSE 27700 END),4326))::numeric(8,6) AS latitude,
+ ST_x(ST_transform(ST_GeomFromText('POINT('||(nullif(trim(substring(data,14,5)),'') || '0')||' '||(CASE WHEN substring(nullif(trim(substring(data,19,5)),''),1,1) ~ '[\x4f-\x5a]' THEN '1' || translate(nullif(trim(substring(data,19,5)),''), 'POUTZY', '221100') ELSE nullif(trim(substring(data,19,5)),'') END || '0')||')',CASE WHEN nullif(trim(substring(data,1,4)),'') LIKE 'BT%' THEN 29903 ELSE 27700 END),4326))::numeric(8,6) AS longitude
+ FROM data_stage;
+
+ RAISE NOTICE '%: Done staging postzon with lat long data, updating', clock_timestamp();
+
+ INSERT INTO public.pc_paf_postzon_100m (outward_code, inward_code, introduction_date, grid_reference_east, grid_reference_north,
+ country_code, area_code_county, area_code_district, ward_code, nhs_region, nhs_code, user_type, grid_status, latitude, longitude)
+ SELECT * FROM tmp_postzon_100m tp
+ WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_postzon_100m WHERE tp.outward_code = outward_code AND tp.inward_code = inward_code);
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Inserted % new records for update on postzon_100m',clock_timestamp(), v_processed;
+
+ UPDATE public.pc_paf_postzon_100m pz
+ SET (outward_code, inward_code, introduction_date, grid_reference_east, grid_reference_north,
+ country_code, area_code_county, area_code_district, ward_code, nhs_region, nhs_code, user_type, grid_status, latitude, longitude)
+ = (tp.outward_code, tp.inward_code, tp.introduction_date, tp.grid_reference_east, tp.grid_reference_north,
+ tp.country_code, tp.area_code_county, tp.area_code_district, tp.ward_code, tp.nhs_region, tp.nhs_code, tp.user_type, tp.grid_status, tp.latitude, tp.longitude)
+ FROM tmp_postzon_100m tp
+ WHERE tp.outward_code = pz.outward_code AND tp.inward_code = pz.inward_code
+ AND (pz.introduction_date, pz.grid_reference_east, pz.grid_reference_north,
+ pz.country_code, pz.area_code_county, pz.area_code_district, pz.ward_code, pz.nhs_region, pz.nhs_code, pz.user_type, pz.grid_status, pz.latitude, pz.longitude)
+ <> (tp.introduction_date, tp.grid_reference_east, tp.grid_reference_north,
+ tp.country_code, tp.area_code_county, tp.area_code_district, tp.ward_code, tp.nhs_region, tp.nhs_code, tp.user_type, tp.grid_status, tp.latitude, tp.longitude);
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Updated % records for update on postzon_100m',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_postzon_100m pz
+ WHERE NOT EXISTS (SELECT 1 FROM tmp_postzon_100m WHERE pz.outward_code = outward_code AND pz.inward_code = inward_code)
+ AND postzon_100m_key <> 0;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Deleted % records for update on postzon_100m',clock_timestamp(), v_processed;
+
+ -- alias is also supplied as a full refresh rather than an update file
+ TRUNCATE TABLE data_stage;
+ RAISE NOTICE '%: Begin staging alias file', clock_timestamp();
+ v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_alias || 'aliasfle.c01');
+ EXECUTE v_sql;
+ DELETE FROM data_stage WHERE data like '%ALIASFLE' || in_edition || '%' OR data like v_std_footer;
+
+ RAISE NOTICE '%: Done staging alias file', clock_timestamp();
+
+ CREATE TEMPORARY TABLE tmp_counties ON COMMIT DROP AS
+ SELECT substring(data,2,4)::integer AS county_key,
+ trim(substring(data,6,30)) AS county_name,
+ trim(substring(data,36,1)) AS county_type
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 4;
+
+ RAISE NOTICE '%: Done staging counties , updating', clock_timestamp();
+
+ INSERT INTO public.pc_paf_counties
+ SELECT * FROM tmp_counties tc
+ WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_counties WHERE tc.county_key = county_key);
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Inserted % new records for update on counties',clock_timestamp(), v_processed;
+
+ UPDATE public.pc_paf_counties pc
+ SET (county_name, county_type) = (tc.county_name, tc.county_type)
+ FROM tmp_counties tc
+ WHERE tc.county_key = pc.county_key
+ AND (pc.county_name, pc.county_type) <> (tc.county_name, tc.county_type);
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Updated % records for update on counties',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_counties pc
+ WHERE NOT EXISTS (SELECT 1 FROM tmp_counties WHERE pc.county_key = county_key)
+ AND pc.county_key <> 0;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Deleted % records for update on counties',clock_timestamp(), v_processed;
+
+ CREATE TEMPORARY TABLE tmp_county_alias ON COMMIT DROP AS
+ SELECT trim(substring(data,2,7)) AS postcode,
+ nullif(substring(data,9,4)::integer,0) AS former_postal_county,
+ nullif(substring(data,13,4)::integer,0) AS traditional_county,
+ nullif(substring(data,17,4)::integer,0) AS administrative_county
+ FROM data_stage
+ WHERE substring(data,1,1)::integer = 5;
+
+ RAISE NOTICE '%: Done staging county_alias , updating', clock_timestamp();
+
+ INSERT INTO public.pc_paf_county_alias (postcode, former_postal_county, traditional_county, administrative_county)
+ SELECT * FROM tmp_county_alias tca
+ WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_county_alias WHERE tca.postcode = postcode);
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Inserted % new records for update on county_alias',clock_timestamp(), v_processed;
+
+ UPDATE public.pc_paf_county_alias pca
+ SET (former_postal_county, traditional_county, administrative_county) =
+ (tca.former_postal_county, tca.traditional_county, tca.administrative_county)
+ FROM tmp_county_alias tca
+ WHERE tca.postcode = pca.postcode
+ AND (pca.former_postal_county, pca.traditional_county, pca.administrative_county) <> (tca.former_postal_county, tca.traditional_county, tca.administrative_county);
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Updated % records for update on county_alias',clock_timestamp(), v_processed;
+
+ DELETE FROM public.pc_paf_county_alias pca
+ WHERE NOT EXISTS (SELECT 1 FROM tmp_county_alias WHERE pca.postcode = postcode)
+ AND pca.county_alias_key <> 0;
+
+ GET DIAGNOSTICS v_processed = ROW_COUNT;
+ RAISE NOTICE '%: Deleted % records for update on county_alias',clock_timestamp(), v_processed;
+
+ RAISE NOTICE '%: Completed', clock_timestamp();
+
+ RETURN true;
+END;
+
+$BODY$
+LANGUAGE 'plpgsql' VOLATILE;
+
+-- Standard master view for postcode tables
+CREATE OR REPLACE VIEW public.pc_paf_master_view AS
+SELECT m.building_number,
+ b.building_name,
+ sb.sub_building_name,
+ COALESCE(o.organisation_name, lo.organisation_name) AS organisation_name,
+ COALESCE(o.department_name, lo.department_name) AS department_name,
+ m.po_box_number,
+ t.thoroughfare_name,
+ td.thoroughfare_descriptor,
+ td.approved_abbreviation,
+ l.post_town,
+ l.dependent_locality,
+ l.double_dependent_locality,
+ apc.county_name,
+ m.outward_code,
+ m.inward_code,
+ m.number_of_households,
+ m.postcode_type,
+ m.concatenation_indicator,
+ m.delivery_point_suffix,
+ m.small_user_organisation_indicator,
+ EXISTS(SELECT 1 FROM public.pc_paf_mainfile_welsh mw WHERE mw.address_key = m.address_key AND mw.organisation_key IS NOT DISTINCT FROM m.organisation_key AND mw.postcode_type IS NOT DISTINCT FROM m.postcode_type) AS welsh_alternative_available,
+ p.longitude,
+ p.latitude
+FROM public.pc_paf_mainfile m
+LEFT OUTER JOIN public.pc_paf_localities l USING (locality_key)
+LEFT OUTER JOIN public.pc_paf_thoroughfares t USING (thoroughfare_key)
+LEFT OUTER JOIN public.pc_paf_thoroughfare_descriptor td USING (thoroughfare_descriptor_key)
+LEFT OUTER JOIN public.pc_paf_building_names b USING (building_name_key)
+LEFT OUTER JOIN public.pc_paf_sub_building_names sb USING (sub_building_name_key)
+LEFT OUTER JOIN public.pc_paf_organisations o USING (organisation_key, postcode_type)
+LEFT OUTER JOIN public.pc_paf_organisations lo ON m.address_key = lo.organisation_key AND m.postcode_type = lo.postcode_type
+LEFT OUTER JOIN public.pc_paf_postzon_100m p USING (outward_code, inward_code)
+LEFT OUTER JOIN public.pc_paf_county_alias a ON a.postcode = (rpad(m.outward_code,4) || m.inward_code)
+LEFT OUTER JOIN public.pc_paf_counties apc ON apc.county_key = a.former_postal_county;
+
+-- Welsh master view for postcode tables
+CREATE OR REPLACE VIEW public.pc_paf_master_welsh_view AS
+SELECT mw.building_number,
+ b.building_name,
+ sb.sub_building_name,
+ COALESCE(o.organisation_name, lo.organisation_name) AS organisation_name,
+ COALESCE(o.department_name, lo.department_name) AS department_name,
+ mw.po_box_number,
+ t.thoroughfare_name,
+ td.thoroughfare_descriptor,
+ td.approved_abbreviation,
+ l.post_town,
+ l.dependent_locality,
+ l.double_dependent_locality,
+ apc.county_name,
+ mw.outward_code,
+ mw.inward_code,
+ mw.number_of_households,
+ mw.postcode_type,
+ mw.concatenation_indicator,
+ mw.delivery_point_suffix,
+ mw.small_user_organisation_indicator,
+ p.longitude,
+ p.latitude
+FROM public.pc_paf_mainfile_welsh mw
+LEFT OUTER JOIN public.pc_paf_localities l USING (locality_key)
+LEFT OUTER JOIN public.pc_paf_thoroughfares t USING (thoroughfare_key)
+LEFT OUTER JOIN public.pc_paf_thoroughfare_descriptor td USING (thoroughfare_descriptor_key)
+LEFT OUTER JOIN public.pc_paf_building_names b USING (building_name_key)
+LEFT OUTER JOIN public.pc_paf_sub_building_names sb USING (sub_building_name_key)
+LEFT OUTER JOIN public.pc_paf_organisations o USING (organisation_key, postcode_type)
+LEFT OUTER JOIN public.pc_paf_organisations lo ON mw.address_key = lo.organisation_key AND mw.postcode_type = lo.postcode_type
+LEFT OUTER JOIN public.pc_paf_postzon_100m p USING (outward_code, inward_code)
+LEFT OUTER JOIN public.pc_paf_county_alias a ON a.postcode = (rpad(mw.outward_code,4) || mw.inward_code)
+LEFT OUTER JOIN public.pc_paf_counties apc ON apc.county_key = a.former_postal_county;
\ No newline at end of file