From befa09c0ea48bcacd6c457d188e6725a2a834ebe Mon Sep 17 00:00:00 2001 From: glyn Date: Thu, 27 Mar 2014 15:21:21 +0000 Subject: [PATCH 1/1] Initial commit --- .gitignore | 2 + os_convert.pl | 128 ++++ os_extract_areas.sh | 14 + os_postgresql_import.plpgsql | 246 +++++++ paf_convert.pl | 161 +++++ paf_postgresql_import.plpgsql | 1136 +++++++++++++++++++++++++++++++++ 6 files changed, 1687 insertions(+) create mode 100644 .gitignore create mode 100644 os_convert.pl create mode 100644 os_extract_areas.sh create mode 100644 os_postgresql_import.plpgsql create mode 100644 paf_convert.pl create mode 100644 paf_postgresql_import.plpgsql diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ff37810 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.DS_Store +Thumb.db diff --git a/os_convert.pl b/os_convert.pl new file mode 100644 index 0000000..6dc0417 --- /dev/null +++ b/os_convert.pl @@ -0,0 +1,128 @@ +#!/usr/bin/perl + +# Glyn Astill - 11/10/2012 +# Script to automate cs2cs transform from esatings/northings to latitudes/longitudes + +use strict; +use warnings; +use Text::CSV; +use Geo::Proj4; +use Getopt::Long qw/GetOptions/; + +use constant false => 0; +use constant true => 1; + +my $csv = Text::CSV->new(); +my $error; +my $usage = '-i -o '; +my $infile; +my $outfile; +my @latlong; +my @eastnorth; +my $linecount; +my @infiles; +my $start_run = time(); +my $end_run; +my $run_time; +my $pipe_cs2cs = true; +my $proj_ng; + +if (!$pipe_cs2cs) { + # I'm not positive that the below is setting a the Bursa Wolf or another parameter correctly, hence loss in accuracy. + # it's possible to set the parameters individually i.e. "Geo::Proj4->new(proj => "tmerc", ellps => "airy", lat_0 => -49)" which may solve + $proj_ng = Geo::Proj4->new("-f '%.7f' +proj=tmerc +lat_0=49 +lon_0=-2 +k=0.9996012717 +x_0=400000 +y_0=-100000 +ellps=airy +towgs84=446.448,-125.157,542.060,0.1502,0.2470,0.8421,-20.4894 +units=m +no_defs +to +proj=latlong +ellps=WGS84 +towgs84=0,0,0 +nodefs") +} + +# Handle command line options +Getopt::Long::Configure('no_ignore_case'); +use vars qw{%opt}; +die $usage unless GetOptions(\%opt, 'infile|i=s', 'outfile|o=s', ) and keys %opt and ! @ARGV; + +if (!defined($opt{infile})) { + print("Please specify an input file.\n"); + die $usage; +} +else { + $infile = $opt{infile}; +} +if (!defined($opt{outfile})) { + print("Please specify an output file.\n"); + die $usage; +} +else { + $outfile = $opt{outfile}; +} + +# Read the codepoint file with eastings and northings, convert and output to outfile +# Open our output file which will contain the eastings/northings and longitudes/latitudes and write a headder +if (open(OUTCSV, ">", $outfile)) { + print OUTCSV ("postcode,easting,northing,latitude,longitude,country_code,admin_county_code,admin_district_code,admin_ward_code\n"); + + # get a list of our files + @infiles = glob($infile); + + foreach my $currfile (@infiles) { + # read in the file + $linecount = 0; + print ("Processing $currfile .."); + unless (open(INCSV, "<", $currfile)) { + print("ERROR: Could not open file:" . $! . ".\n"); + } + while () { + if ($csv->parse($_)) { + @eastnorth=$csv->fields(); + + eval { + if ($pipe_cs2cs) { + open(CS2CS,"echo $eastnorth[2] $eastnorth[3] | cs2cs -f '%.7f' +proj=tmerc +lat_0=49 +lon_0=-2 +k=0.9996012717 +x_0=400000 +y_0=-100000 +ellps=airy +towgs84=446.448,-125.157,542.060,0.1502,0.2470,0.8421,-20.4894 +units=m +no_defs +to +proj=latlong +ellps=WGS84 +towgs84=0,0,0 +nodefs |"); + } + else { + @latlong = $proj_ng->inverse($eastnorth[2], $eastnorth[3]); + } + + if ($pipe_cs2cs) { + @latlong = split(' ', ); + close CS2CS; + print OUTCSV ("$eastnorth[0],$eastnorth[2],$eastnorth[3],$latlong[1],$latlong[0],$eastnorth[4],$eastnorth[7],$eastnorth[8],$eastnorth[9]\n"); + } + else { + print OUTCSV ("$eastnorth[0],$eastnorth[2],$eastnorth[3],$latlong[0],$latlong[1],$eastnorth[4],$eastnorth[7],$eastnorth[8],$eastnorth[9]\n"); + + + + } + }; + if ($@) { + print("ERROR: Could not run command:" . $! . " Line = " . $_ . "\n"); + print("$eastnorth[2] $eastnorth[3]"); + } + + $linecount++; + if (($linecount%1000) == 0) { + print ("..$linecount"); + } + } + else { + $error = $csv->error_input; + print("\tFailed to parse line: $error\n"); + } + } + close (INCSV); + print ("..OK\n"); + } + + close (OUTCSV); +} +else { + print("ERROR: Could not open file:" . $! . ".\n"); +} + +$end_run = time(); +$run_time = (($end_run-$start_run)/60); + +print "Conversion took $run_time minutes\n"; +if ($pipe_cs2cs) { + print "To run quicker use Geo:proj4 (appears to be less accurate - a bug in my usage?) or stream the data directly to cs2cs in one go rather than constantly calling and opening the commands output\n"; +} + +exit(0); diff --git a/os_extract_areas.sh b/os_extract_areas.sh new file mode 100644 index 0000000..589809f --- /dev/null +++ b/os_extract_areas.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +ssconvert -S $1/Doc/Codelist.xls $1/Doc/Codelist_%s.csv > /dev/null 2>&1 + +sed -e 's/$/,CTY/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_CTY.csv +sed -e 's/$/,DIS/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_DIS.csv +sed -e 's/$/,DIW/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_DIW.csv +sed -e 's/$/,LBO/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_LBO.csv +sed -e 's/$/,LBW/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_LBW.csv +sed -e 's/$/,MTD/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_MTD.csv +sed -e 's/$/,MTW/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_MTW.csv +sed -e 's/$/,UTA/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_UTA.csv +sed -e 's/$/,UTE/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_UTE.csv +sed -e 's/$/,UTW/; s/|/,/g; /\(DET\)/d' -i $1/Doc/Codelist_UTW.csv \ No newline at end of file diff --git a/os_postgresql_import.plpgsql b/os_postgresql_import.plpgsql new file mode 100644 index 0000000..8b7dd19 --- /dev/null +++ b/os_postgresql_import.plpgsql @@ -0,0 +1,246 @@ +DROP FUNCTION IF EXISTS public.import_pc_opendata(varchar); +CREATE OR REPLACE FUNCTION public.import_pc_opendata(in_data varchar) +RETURNS boolean AS +$BODY$ + +DECLARE + v_data_root varchar; + v_data_main varchar; + v_data_areas varchar; + v_main_table_created boolean; + v_sql text; + +BEGIN + v_data_root := in_data || '/'; + v_data_main := v_data_root || 'Data/'; + v_data_areas := v_data_root || 'Doc/'; + + RAISE NOTICE '%: Import starting with data root %', clock_timestamp(), v_data_root; + + -- Create our tables if required + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_opendata_types') THEN + RAISE NOTICE 'Table "public"."pc_opendata_types" already exists'; + ELSE + RAISE NOTICE 'Creating table "public"."pc_opendata_types"'; + CREATE TABLE public.pc_opendata_types + ( + id bigserial NOT NULL PRIMARY KEY, + type character varying(3) NOT NULL UNIQUE, + description text NOT NULL + ); + END IF; + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_opendata_areas') THEN + RAISE NOTICE 'Table "public"."pc_opendata_areas" already exists'; + ELSE + RAISE NOTICE 'Creating table "public"."pc_opendata_areas"'; + CREATE TABLE public.pc_opendata_areas + ( + id bigserial NOT NULL PRIMARY KEY, + code character varying(9) NOT NULL UNIQUE, + type character varying(3) NOT NULL REFERENCES public.pc_opendata_types (type), + description text NOT NULL + ); + END IF; + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_opendata') THEN + RAISE NOTICE 'Table "public"."pc_opendata" already exists'; + v_main_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_opendata"'; + CREATE TABLE public.pc_opendata + ( + id bigserial NOT NULL PRIMARY KEY, + postcode character varying(7) NOT NULL, + easting integer NOT NULL, + northing integer NOT NULL, + latitude double precision NOT NULL, + longitude double precision NOT NULL, + country_code character varying(9) REFERENCES public.pc_opendata_areas (code), + admin_county_code character varying(9) REFERENCES public.pc_opendata_areas (code), + admin_district_code character varying(9) REFERENCES public.pc_opendata_areas (code), + admin_ward_code character varying(9) REFERENCES public.pc_opendata_areas (code) + ); + v_main_table_created := true; + END IF; + + -- Import data + + RAISE NOTICE '% : Importing "public"."pc_opendata_types"', clock_timestamp(); + v_sql := 'COPY public.pc_opendata_types (type, description) FROM ' || quote_literal(v_data_areas || 'Codelist_AREA_CODES.csv') || ' CSV'; + EXECUTE v_sql; + + -- Insert missing Country record + INSERT INTO public.pc_opendata_types (type, description) VALUES ('CNY', 'Country'); + + RAISE NOTICE '% : Importing "public"."pc_opendata_areas"', clock_timestamp(); + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_CTY.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_DIS.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_DIW.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_LBO.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_LBW.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_MTD.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_MTW.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_UTA.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_UTE.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY public.pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_UTW.csv') || ' CSV'; + EXECUTE v_sql; + + -- Insert missing Counties/Countries and missing Scilly Isles + INSERT INTO public.pc_opendata_areas (code, type, description) VALUES ('W92000004', 'CNY', 'Wales'); + INSERT INTO public.pc_opendata_areas (code, type, description) VALUES ('E92000001', 'CNY', 'England'); + INSERT INTO public.pc_opendata_areas (code, type, description) VALUES ('S92000003', 'CNY', 'Scotland'); + INSERT INTO public.pc_opendata_areas (code, type, description) VALUES ('E05008322', 'DIW', 'Bryher Ward'); + INSERT INTO public.pc_opendata_areas (code, type, description) VALUES ('E05008323', 'DIW', 'St. Agnes Ward'); + INSERT INTO public.pc_opendata_areas (code, type, description) VALUES ('E05008324', 'DIW', 'St. Martin\'s Ward'); + INSERT INTO public.pc_opendata_areas (code, type, description) VALUES ('E05008325', 'DIW', 'St. Mary\'s Ward'); + INSERT INTO public.pc_opendata_areas (code, type, description) VALUES ('E05008326', 'DIW', 'Tresco Ward'); + + -- Finally our postcode data + RAISE NOTICE '% : Importing "public"."pc_opendata"', clock_timestamp(); + v_sql := 'COPY public.pc_opendata (postcode,easting,northing,latitude,longitude,country_code,admin_county_code,admin_district_code,admin_ward_code) FROM ' + || quote_literal(v_data_main || 'all_areas_20140324.csv') || ' CSV HEADER'; + EXECUTE v_sql; + + if (v_main_table_created) THEN + CREATE UNIQUE INDEX pc_opendata_postcode ON public.pc_opendata USING btree (postcode); + CREATE INDEX pc_opendata_latitude ON public.pc_opendata USING btree (latitude); + CREATE INDEX pc_opendata_longitude ON public.pc_opendata USING btree (longitude); + END IF; + + RAISE NOTICE '%: Completed', clock_timestamp(); + RETURN true; +END; + +$BODY$ +LANGUAGE 'plpgsql' VOLATILE; + +-- + +DROP FUNCTION IF EXISTS public.update_pc_opendata(varchar); +CREATE OR REPLACE FUNCTION public.update_pc_opendata(in_data varchar) +RETURNS boolean AS +$BODY$ + +DECLARE + v_data_root varchar; + v_data_main varchar; + v_data_areas varchar; + v_sql text; + +BEGIN + v_data_root := in_data || '/'; + v_data_main := v_data_root || 'Data/'; + v_data_areas := v_data_root || 'Doc/'; + + RAISE NOTICE '%: Import starting with data root %', clock_timestamp(), v_data_root; + + RAISE NOTICE 'Creating temp table "tmp_pc_opendata_types"'; + CREATE TEMPORARY TABLE tmp_pc_opendata_types + ( + id bigserial NOT NULL PRIMARY KEY, + type character varying(3) NOT NULL UNIQUE, + description text NOT NULL + ); + + RAISE NOTICE 'Creating temp table "tmp_pc_opendata_areas"'; + CREATE TEMPORARY TABLE tmp_pc_opendata_areas + ( + id bigserial NOT NULL PRIMARY KEY, + code character varying(9) NOT NULL UNIQUE, + type character varying(3) NOT NULL REFERENCES tmp_pc_opendata_types (type), + description text NOT NULL + ); + + RAISE NOTICE 'Creating temp table "tmp_pc_opendata"'; + CREATE TEMPORARY TABLE tmp_pc_opendata + ( + id bigserial NOT NULL PRIMARY KEY, + postcode character varying(7) NOT NULL, + easting integer NOT NULL, + northing integer NOT NULL, + latitude double precision NOT NULL, + longitude double precision NOT NULL, + country_code character varying(9) REFERENCES tmp_pc_opendata_areas (code), + admin_county_code character varying(9) REFERENCES tmp_pc_opendata_areas (code), + admin_district_code character varying(9) REFERENCES tmp_pc_opendata_areas (code), + admin_ward_code character varying(9) REFERENCES tmp_pc_opendata_areas (code) + ); + + -- Import data + RAISE NOTICE '% : Importing "tmp_pc_opendata_types"', clock_timestamp(); + v_sql := 'COPY tmp_pc_opendata_types (type, description) FROM ' || quote_literal(v_data_areas || 'Codelist_AREA_CODES.csv') || ' CSV'; + EXECUTE v_sql; + + -- Insert missing Country record + INSERT INTO tmp_pc_opendata_types (type, description) VALUES ('CNY', 'Country'); + + RAISE NOTICE '% : Importing "tmp_pc_opendata_areas"', clock_timestamp(); + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_CTY.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_DIS.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_DIW.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_LBO.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_LBW.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_MTD.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_MTW.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_UTA.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_UTE.csv') || ' CSV'; + EXECUTE v_sql; + v_sql := 'COPY tmp_pc_opendata_areas (description, code, type) FROM ' || quote_literal(v_data_areas || 'Codelist_UTW.csv') || ' CSV'; + EXECUTE v_sql; + + -- Insert missing Counties/Countries and missing Scilly Isles + INSERT INTO tmp_pc_opendata_areas (code, type, description) VALUES ('W92000004', 'CNY', 'Wales'); + INSERT INTO tmp_pc_opendata_areas (code, type, description) VALUES ('E92000001', 'CNY', 'England'); + INSERT INTO tmp_pc_opendata_areas (code, type, description) VALUES ('S92000003', 'CNY', 'Scotland'); + INSERT INTO tmp_pc_opendata_areas (code, type, description) VALUES ('E05008322', 'DIW', 'Bryher Ward'); + INSERT INTO tmp_pc_opendata_areas (code, type, description) VALUES ('E05008323', 'DIW', 'St. Agnes Ward'); + INSERT INTO tmp_pc_opendata_areas (code, type, description) VALUES ('E05008324', 'DIW', 'St. Martin\'s Ward'); + INSERT INTO tmp_pc_opendata_areas (code, type, description) VALUES ('E05008325', 'DIW', 'St. Mary\'s Ward'); + INSERT INTO tmp_pc_opendata_areas (code, type, description) VALUES ('E05008326', 'DIW', 'Tresco Ward'); + + -- Finally our postcode data + RAISE NOTICE '% : Importing "tmp_pc_opendata"', clock_timestamp(); + v_sql := 'COPY tmp_pc_opendata (postcode,easting,northing,latitude,longitude,country_code,admin_county_code,admin_district_code,admin_ward_code) FROM ' + || quote_literal(v_data_main || 'all_areas_20140324.csv') || ' CSV HEADER'; + EXECUTE v_sql; + + -- Now do the upgrade with a truncate (if the locking is an issue then it's better to diff and update/delete/insert) + RAISE NOTICE '% : Switching the data via truncate & reload', clock_timestamp(); + TRUNCATE TABLE public.pc_opendata RESTART IDENTITY; + TRUNCATE TABLE public.pc_opendata_areas RESTART IDENTITY CASCADE; + TRUNCATE TABLE public.pc_opendata_types RESTART IDENTITY CASCADE; + + INSERT INTO public.pc_opendata_types (type, description) + SELECT type, description FROM tmp_pc_opendata_areas; + + INSERT INTO public.pc_opendata_areas (code, type, description) + SELECT code, type, description FROM tmp_pc_opendata_areas; + + INSERT INTO public.pc_opendata (postcode,easting,northing,latitude,longitude,country_code,admin_county_code,admin_district_code,admin_ward_code) + SELECT postcode,easting,northing,latitude,longitude,country_code,admin_county_code,admin_district_code,admin_ward_code + FROM tmp_pc_opendata; + + RAISE NOTICE '%: Completed', clock_timestamp(); + RETURN true; +END; + +$BODY$ +LANGUAGE 'plpgsql' VOLATILE; \ No newline at end of file diff --git a/paf_convert.pl b/paf_convert.pl new file mode 100644 index 0000000..eb4a6ee --- /dev/null +++ b/paf_convert.pl @@ -0,0 +1,161 @@ +#!/usr/bin/perl +# Glyn Astill - 02/03/2014 +# Script to automate cs2cs transform from esatings/northings to latitudes/longitudes +# for PAF files from Royal Mail + +#use diagnostics; +use strict; +use warnings; +use Getopt::Long qw/GetOptions/; +use Geo::Proj4; + +use constant false => 0; +use constant true => 1; + +my $error; +my $usage = '-i -o '; +my $infile; +my $outfile; +my @latlong; +my $linecount; +my @infiles; +my @fields_fixed; +my $start_run = time(); +my $end_run; +my $run_time; +my $pipe_cs2cs = true; +my $proj_ng; +my $proj_ig; + +# Handle command line options +Getopt::Long::Configure('no_ignore_case'); +use vars qw{%opt}; +die $usage unless GetOptions(\%opt, 'infile|i=s', 'outfile|o=s', ) and keys %opt and ! @ARGV; + +if (!defined($opt{infile})) { + print("Please specify an input file.\n"); + die $usage; +} +else { + $infile = $opt{infile}; +} +if (!defined($opt{outfile})) { + print("Please specify an output file.\n"); + die $usage; +} +else { + $outfile = $opt{outfile}; +} + +if (!$pipe_cs2cs) { + # I'm not positive that the below is setting a the Bursa Wolf or another parameter correctly, hence loss in accuracy. + # it's possible to set the parameters individually i.e. "Geo::Proj4->new(proj => "tmerc", ellps => "airy", lat_0 => -49)" which may solve + $proj_ng = Geo::Proj4->new("-f '%.7f' +proj=tmerc +lat_0=49 +lon_0=-2 +k=0.9996012717 +x_0=400000 +y_0=-100000 +ellps=airy +towgs84=446.448,-125.157,542.060,0.1502,0.2470,0.8421,-20.4894 +units=m +no_defs +to +proj=latlong +ellps=WGS84 +towgs84=0,0,0 +nodefs") + or die "parameter error: ".Geo::Proj4->error. "\n"; + $proj_ig = Geo::Proj4->new("-f '%.7f' +proj=tmerc +lat_0=53.5 +lon_0=-8 +k=1.000035 +x_0=200000 +y_0=250000 +ellps=airy +towgs84=482.530,-130.596,564.557,-1.042,-0.214,-0.631,8.15 +units=m +no_defs +to +proj=latlong +ellps=WGS84 +towgs84=0,0,0 +nodefs") + or die "parameter error: ".Geo::Proj4->error. "\n"; +} + +# Read the codepoint file with eastings and northings, convert and output to outfile +# Open our output file which will contain the eastings/northings and longitudes/latitudes +if (open(OUTFILE, ">", $outfile)) { + + # get a list of our files + $infile =~ s/ /\\ /g; + @infiles = glob("$infile"); + + foreach my $currfile (@infiles) { + # read in the file + $linecount = 0; + print ("Processing $currfile .."); + unless (open(INFILE, "<", $currfile)) { + print("ERROR: Could not open file:" . $! . ".\n"); + } + while () { + @fields_fixed = unpack('a4 a3 a6 a5 a5 a9 a9 a9 a9 a9 a9 a1 a1', $_); + + eval { + # Deal with 6 digit northings + for ($fields_fixed[4]) { + s/[PO]/12/; + s/[UT]/11/; + s/[ZY]/10/; + } + if ($fields_fixed[4] =~ m/\s/) { + $fields_fixed[4] = sprintf("%-6s", $fields_fixed[4]) . " "; + $fields_fixed[3] .= ' '; + } + else { + $fields_fixed[4] = sprintf("%06d", $fields_fixed[4]) . "0"; + $fields_fixed[3] .= '0'; + } + + for (my $field = 0; $field <= 12; $field++) { + print OUTFILE ($fields_fixed[$field]); + } + + if ($fields_fixed[4] !~ m/\s/) { + # Irish Grid + if ($fields_fixed[0] =~ m/^BT/i) { + if ($pipe_cs2cs) { + open(CS2CS,"echo $fields_fixed[3] $fields_fixed[4] | cs2cs -f '%.7f' +proj=tmerc +lat_0=53.5 +lon_0=-8 +k=1.000035 +x_0=200000 +y_0=250000 +ellps=airy +towgs84=482.530,-130.596,564.557,-1.042,-0.214,-0.631,8.15 +units=m +no_defs +to +proj=latlong +ellps=WGS84 +towgs84=0,0,0 +nodefs |"); + } + else { + @latlong = $proj_ig->inverse($fields_fixed[3], $fields_fixed[4]); + } + } + # National Grid + else { + if ($pipe_cs2cs) { + open(CS2CS,"echo $fields_fixed[3] $fields_fixed[4] | cs2cs -f '%.7f' +proj=tmerc +lat_0=49 +lon_0=-2 +k=0.9996012717 +x_0=400000 +y_0=-100000 +ellps=airy +towgs84=446.448,-125.157,542.060,0.1502,0.2470,0.8421,-20.4894 +units=m +no_defs +to +proj=latlong +ellps=WGS84 +towgs84=0,0,0 +nodefs |"); + } + else { + @latlong = $proj_ng->inverse($fields_fixed[3], $fields_fixed[4]); + } + } + + if ($pipe_cs2cs) { + @latlong = split(' ', ); + close CS2CS; + print OUTFILE (sprintf("%010f", $latlong[1]) . sprintf("%010f", $latlong[0])); + } + else { + print OUTFILE (sprintf("%010f", $latlong[0]) . sprintf("%010f", $latlong[1])); + } + + } + else { + print OUTFILE " "; + } + + print OUTFILE ("\n"); + $linecount++; + if (($linecount%10000) == 0) { + print ("..$linecount"); + } + }; + if ($@) { + print("ERROR: Could not run command:" . $! . " Line = " . $_ . "\n"); + print("$fields_fixed[3] $fields_fixed[4]"); + } + + } + close (INFILE); + print ("..OK\n"); + } + + close (OUTFILE); +} +else { + print("ERROR: Could not open file:" . $! . ".\n"); +} + +$end_run = time(); +$run_time = (($end_run-$start_run)/60); + +print "Conversion took $run_time minutes\n"; +if ($pipe_cs2cs) { + print "To run quicker use Geo:proj4 (appears to be less accurate - a bug in my usage?) or stream the data directly to cs2cs in one go rather than constantly calling and opening the commands output\n"; +} + +exit(0); \ No newline at end of file diff --git a/paf_postgresql_import.plpgsql b/paf_postgresql_import.plpgsql new file mode 100644 index 0000000..996ffa0 --- /dev/null +++ b/paf_postgresql_import.plpgsql @@ -0,0 +1,1136 @@ +DROP FUNCTION IF EXISTS public.import_pc_paf(varchar, varchar); +CREATE OR REPLACE FUNCTION public.import_pc_paf(in_edition varchar, in_data varchar) +RETURNS boolean AS +$BODY$ + +DECLARE + v_data_root varchar; + v_data_main varchar; + v_data_postzon varchar; + v_data_alias varchar; + v_processed integer; + v_main_footer varchar; + v_std_footer varchar; + v_sql varchar; + v_table_created boolean; +BEGIN + v_data_root := in_data || '/' || in_edition || '/'; + v_data_main := v_data_root || 'PAF MAIN FILE/'; + v_data_postzon := v_data_root || 'POSTZON 100M/'; + v_data_alias := v_data_root || 'ALIAS/'; + + v_main_footer = ' %'; + v_std_footer = '99999999%'; + + RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root; + CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP; + + -- 1) Localaties + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging localaties', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'local.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%LOCALITY' || in_edition || '%' OR data like v_std_footer; + + RAISE NOTICE '%: Done staging localaties, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_localities') THEN + RAISE NOTICE 'Table "public"."pc_paf_localities" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_localities"'; + CREATE TABLE public.pc_paf_localities ( + locality_key integer NOT NULL, + post_town varchar(30), + dependent_locality varchar(35), + double_dependent_locality varchar(35) + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_localities; + INSERT INTO public.pc_paf_localities + SELECT substring(data,1,6)::integer, + nullif(trim(substring(data,52,30)),''), + nullif(trim(substring(data,82,35)),''), + nullif(trim(substring(data,117,35)),'') + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_localities ADD PRIMARY KEY (locality_key); + END IF; + + RAISE NOTICE '%: Done importing localities (imported % records)', clock_timestamp(), v_processed; + + -- 2) Thoroughfares + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging thoroughfares', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thfare.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%THOROUGH' || in_edition || '%' OR data like v_std_footer; + + RAISE NOTICE '%: Done staging thoroughfares, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfares') THEN + RAISE NOTICE 'Table "public"."pc_paf_thoroughfares" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfares"'; + CREATE TABLE public.pc_paf_thoroughfares ( + thoroughfare_key integer NOT NULL, + thoroughfare_name varchar(60) + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public. pc_paf_thoroughfares; + INSERT INTO public. pc_paf_thoroughfares + SELECT substring(data,1,8)::integer, + nullif(trim(substring(data,9,60)),'') + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_thoroughfares ADD PRIMARY KEY (thoroughfare_key); + END IF; + + RAISE NOTICE '%: Done importing thoroughfares (imported % records)', clock_timestamp(), v_processed; + + -- 3) Thoroughfares Descriptor + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging thoroughfares descriptor', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'thdesc.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%THDESCRI' || in_edition || '%' OR data like v_std_footer; + + RAISE NOTICE '%: Done staging thoroughfares descriptor, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_thoroughfare_descriptor') THEN + RAISE NOTICE 'Table "public"."pc_paf_thoroughfare_descriptor" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_thoroughfare_descriptor"'; + CREATE TABLE public.pc_paf_thoroughfare_descriptor ( + thoroughfare_descriptor_key integer NOT NULL, + thoroughfare_descriptor varchar(20), + approved_abbreviation varchar(6) + ); + v_table_created := true; + end if; + + TRUNCATE TABLE public.pc_paf_thoroughfare_descriptor; + INSERT INTO public.pc_paf_thoroughfare_descriptor + SELECT substring(data,1,4)::integer, + nullif(trim(substring(data,5,20)),''), + nullif(trim(substring(data,25,6)),'') + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_thoroughfare_descriptor ADD PRIMARY KEY (thoroughfare_descriptor_key); + END IF; + + RAISE NOTICE '%: Done importing thoroughfares descriptor (imported % records)', clock_timestamp(), v_processed; + + -- 4) Building Names + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging building names', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'bname.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%BUILDING' || in_edition || '%' OR data like v_std_footer; + + RAISE NOTICE '%: Done staging building names, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_building_names') THEN + RAISE NOTICE 'Table "public"."pc_paf_building_names" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_building_names"'; + CREATE TABLE public.pc_paf_building_names ( + building_name_key integer NOT NULL, + building_name varchar(50) + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_building_names; + INSERT INTO public.pc_paf_building_names + SELECT substring(data,1,8)::integer, + nullif(trim(substring(data,9,50)),'') + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_building_names ADD PRIMARY KEY (building_name_key); + END IF; + + RAISE NOTICE '%: Done importing building names (imported % records)', clock_timestamp(), v_processed; + + -- 5) Sub Building Names file (subbname.c01) + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging sub building names', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'subbname.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%SUBBUILD' || in_edition || '%' OR data like v_std_footer; + + RAISE NOTICE '%: Done staging sub building names, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_sub_building_names') THEN + RAISE NOTICE 'Table "public"."pc_paf_sub_building_names" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_sub_building_names"'; + CREATE TABLE public.pc_paf_sub_building_names ( + sub_building_name_key integer NOT NULL, + sub_building_name varchar(50) + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_sub_building_names; + INSERT INTO public.pc_paf_sub_building_names + SELECT substring(data,1,8)::integer, + nullif(trim(substring(data,9,30)),'') + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_sub_building_names ADD PRIMARY KEY (sub_building_name_key); + END IF; + + RAISE NOTICE '%: Done importing sub building names (imported % records)', clock_timestamp(), v_processed; + + -- 6) Organisations + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging organisations', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'org.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%ORGANISA' || in_edition || '%' OR data like v_std_footer; + + RAISE NOTICE '%: Done staging organisations, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_organisations') THEN + RAISE NOTICE 'Table "public"."pc_paf_organisations" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_organisations"'; + CREATE TABLE public.pc_paf_organisations ( + organisation_key integer NOT NULL, + postcode_type varchar(1) NOT NULL, + organisation_name varchar(60), + department_name varchar(60) + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_organisations; + INSERT INTO public.pc_paf_organisations + SELECT substring(data,1,8)::integer, + nullif(trim(substring(data,9,1)),''), + nullif(trim(substring(data,10,60)),''), + nullif(trim(substring(data,70,60)),'') + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_organisations ADD PRIMARY KEY (organisation_key, postcode_type); + COMMENT ON COLUMN public.pc_paf_organisations.organisation_key IS 'When postcode type is L organisation_key relates to address_key'; + END IF; + + RAISE NOTICE '%: Done importing organisations (imported % records)', clock_timestamp(), v_processed; + + -- 7) Postzon with latlon + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging postzone', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_postzon || 'pzone100.c02'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%PZONE100' || in_edition || '%' OR data like v_main_footer; + + RAISE NOTICE '%: Done staging postzon, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_postzon_100m') THEN + RAISE NOTICE 'Table "public"."pc_paf_postzon_100m" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_postzon_100m"'; + CREATE TABLE public.pc_paf_postzon_100m ( + postzon_100m_key serial NOT NULL, + outward_code varchar(4) NOT NULL, + inward_code varchar(3) NOT NULL, + introduction_date date, + grid_reference_east integer, + grid_reference_north integer, + country_code varchar(9), + area_code_county varchar(9), + area_code_district varchar(9), + ward_code varchar(9), + nhs_region varchar(9), + nhs_code varchar(9), + user_type smallint, + grid_status smallint, + latitude double precision, + longitude double precision + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_postzon_100m; + INSERT INTO public.pc_paf_postzon_100m(outward_code, inward_code, introduction_date, grid_reference_east, grid_reference_north, + country_code, area_code_county, area_code_district, ward_code, nhs_region, nhs_code, user_type, grid_status, latitude, longitude) + SELECT nullif(trim(substring(data,1,4)),''), + nullif(trim(substring(data,5,3)),''), + to_date(substring(data,8,6), 'YYYYMM'), + nullif(trim(substring(data,14,6)),'')::integer, + nullif(trim(substring(data,20,7)),'')::integer, + nullif(trim(substring(data,27,9)),''), + nullif(trim(substring(data,36,9)),''), + nullif(trim(substring(data,45,9)),''), + nullif(trim(substring(data,54,9)),''), + nullif(trim(substring(data,63,9)),''), + nullif(trim(substring(data,72,9)),''), + nullif(trim(substring(data,81,1)),'')::smallint, + nullif(trim(substring(data,82,1)),'')::smallint, + nullif(trim(substring(data,83,10)),'')::double precision, + nullif(trim(substring(data,93,10)),'')::double precision + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_postzon_100m ADD PRIMARY KEY (postzon_100m_key), + ADD CONSTRAINT pc_paf_postzon_100m_unique UNIQUE (outward_code, inward_code); + CREATE INDEX pc_paf_postzon_100m_longitude ON public.pc_paf_postzon_100m (longitude); + CREATE INDEX pc_paf_postzon_100m_latitude ON public.pc_paf_postzon_100m (latitude); + + COMMENT ON TABLE public.pc_paf_postzon_100m IS 'Geographical data from the Royal Mail with accuracy of 100m'; + COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_east IS 'BT postcodes reflect the Irish Grid public (different origin and scale); subtract 17000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion'; + COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_reference_north IS 'BT postcodes reflect the Irish Grid public (different origin and scale); add 13000 to approximate onto OS National Grid. For latlong do not approximate but do proper conversion'; + COMMENT ON COLUMN public.pc_paf_postzon_100m.user_type IS '0 Small User 1 Large User'; + COMMENT ON COLUMN public.pc_paf_postzon_100m.grid_status IS '0 Status not supplied by OS \n + 1 Within the building of the matched address closest to the Postcode mean. \n + 2 Co-ordinates allocated by GROS during Postcode boundary creation to the building nearest the centre of the populated part of the Postcode (Scotland only) \n + 3 Approximate to within 50m of true position \n + 4 Postcode unit mean (direct copy from ADDRESS-POINT (GB) and COMPAS (NI) - mean of matched addresses with the same Postcode) \n + 5 Postcode imputed by ONS to 1 metre resolution \n + 6 Postcode sector mean - mainly PO Boxes \n + 9 No co-ordinates available'; + END IF; + + RAISE NOTICE '%: Done importing postzon (imported % records)', clock_timestamp(), v_processed; + + -- 8) Mainfile + TRUNCATE TABLE data_stage; + RAISE NOTICE '%: Begin staging mainfile', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c02'); + EXECUTE v_sql; + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c03'); + EXECUTE v_sql; + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c04'); + EXECUTE v_sql; + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c05'); + EXECUTE v_sql; + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'fpmainfl.c06'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%ADDRESS ' || in_edition || '%' OR data like v_main_footer; + + RAISE NOTICE '%: Done staging mainfile, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile') THEN + RAISE NOTICE 'Table "public"."pc_paf_mainfile" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_mainfile"'; + CREATE TABLE public.pc_paf_mainfile ( + paf_record_key serial NOT NULL, + outward_code varchar(4) NOT NULL, + inward_code varchar(3) NOT NULL, + address_key integer NOT NULL, + locality_key integer, + thoroughfare_key integer, + thoroughfare_descriptor_key integer, + dependent_thoroughfare_key integer, + dependent_thoroughfare_descriptor_key integer, + building_number integer, + building_name_key integer, + sub_building_name_key integer, + number_of_households integer, + organisation_key integer, + postcode_type varchar(1), + concatenation_indicator varchar(1), + delivery_point_suffix varchar(2), + small_user_organisation_indicator varchar(1), + po_box_number varchar(6) + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_mainfile RESTART IDENTITY; + INSERT INTO public.pc_paf_mainfile(outward_code,inward_code,address_key,locality_key,thoroughfare_key, + thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number, + building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator, + delivery_point_suffix,small_user_organisation_indicator,po_box_number) + SELECT nullif(trim(substring(data,1,4)),''), + nullif(trim(substring(data,5,3)),''), + nullif(substring(data,8,8)::integer,0), + nullif(substring(data,16,6)::integer,0), + nullif(substring(data,22,8)::integer,0), + nullif(substring(data,30,4)::integer,0), + nullif(substring(data,34,8)::integer,0), + nullif(substring(data,42,4)::integer,0), + nullif(substring(data,46,4)::integer,0), + nullif(substring(data,50,8)::integer,0), + nullif(substring(data,58,8)::integer,0), + nullif(substring(data,66,4)::integer,0), + nullif(substring(data,70,8)::integer,0), + nullif(trim(substring(data,78,1)),''), + nullif(trim(substring(data,79,1)),''), + nullif(trim(substring(data,80,2)),''), + nullif(trim(substring(data,82,1)),''), + nullif(trim(substring(data,83,6)),'') + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + CREATE INDEX pc_paf_mainfile_postcode ON public.pc_paf_mainfile USING btree (outward_code, inward_code); + ALTER TABLE public.pc_paf_mainfile + ADD PRIMARY KEY (paf_record_key), + ADD CONSTRAINT pc_paf_mainfile_unique UNIQUE (address_key, organisation_key, postcode_type), + ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE; + COMMENT ON COLUMN public.pc_paf_mainfile.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!'; + END IF; + + RAISE NOTICE '%: Done importing mainfile (imported % records)', clock_timestamp(), v_processed; + + -- 9) Welsh Mainfile + TRUNCATE TABLE data_stage; + RAISE NOTICE '%: Begin staging welsh alternative mainfile', clock_timestamp(); + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_main || 'wfmainfl.c06'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%ADDRESS ' || in_edition || '%' OR data like v_main_footer; + + RAISE NOTICE '%: Done staging welsh alternative mainfile, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_mainfile_welsh') THEN + RAISE NOTICE 'Table "public"."pc_paf_mainfile_welsh" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_mainfile_welsh"'; + CREATE TABLE public.pc_paf_mainfile_welsh ( + paf_record_key serial NOT NULL, + outward_code varchar(4) NOT NULL, + inward_code varchar(3) NOT NULL, + address_key integer NOT NULL, + locality_key integer, + thoroughfare_key integer, + thoroughfare_descriptor_key integer, + dependent_thoroughfare_key integer, + dependent_thoroughfare_descriptor_key integer, + building_number integer, + building_name_key integer, + sub_building_name_key integer, + number_of_households integer, + organisation_key integer, + postcode_type varchar(1), + concatenation_indicator varchar(1), + delivery_point_suffix varchar(2), + small_user_organisation_indicator varchar(1), + po_box_number varchar(6) + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_mainfile_welsh RESTART IDENTITY; + INSERT INTO public.pc_paf_mainfile_welsh(outward_code,inward_code,address_key,locality_key,thoroughfare_key, + thoroughfare_descriptor_key,dependent_thoroughfare_key,dependent_thoroughfare_descriptor_key,building_number, + building_name_key,sub_building_name_key,number_of_households,organisation_key,postcode_type,concatenation_indicator, + delivery_point_suffix,small_user_organisation_indicator,po_box_number) + SELECT nullif(trim(substring(data,1,4)),''), + nullif(trim(substring(data,5,3)),''), + nullif(substring(data,8,8)::integer,0), + nullif(substring(data,16,6)::integer,0), + nullif(substring(data,22,8)::integer,0), + nullif(substring(data,30,4)::integer,0), + nullif(substring(data,34,8)::integer,0), + nullif(substring(data,42,4)::integer,0), + nullif(substring(data,46,4)::integer,0), + nullif(substring(data,50,8)::integer,0), + nullif(substring(data,58,8)::integer,0), + nullif(substring(data,66,4)::integer,0), + nullif(substring(data,70,8)::integer,0), + nullif(trim(substring(data,78,1)),''), + nullif(trim(substring(data,79,1)),''), + nullif(trim(substring(data,80,2)),''), + nullif(trim(substring(data,82,1)),''), + nullif(trim(substring(data,83,6)),'') + FROM data_stage; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + CREATE INDEX pc_paf_mainfile_welsh_postcode ON public.pc_paf_mainfile_welsh USING btree (outward_code, inward_code); + ALTER TABLE public.pc_paf_mainfile_welsh + ADD PRIMARY KEY (paf_record_key), + ADD CONSTRAINT pc_paf_mainfile_welsh_unique UNIQUE (address_key, organisation_key, postcode_type), + ADD FOREIGN KEY (locality_key) REFERENCES public.pc_paf_localities(locality_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (thoroughfare_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (thoroughfare_descriptor_key) REFERENCES public.pc_paf_thoroughfares(thoroughfare_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (building_name_key) REFERENCES public.pc_paf_building_names(building_name_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (sub_building_name_key) REFERENCES public.pc_paf_sub_building_names(sub_building_name_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (organisation_key, postcode_type) REFERENCES public.pc_paf_organisations(organisation_key, postcode_type) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (outward_code,inward_code) REFERENCES public.pc_paf_postzon_100m(outward_code,inward_code) DEFERRABLE INITIALLY IMMEDIATE; + COMMENT ON COLUMN public.pc_paf_mainfile_welsh.organisation_key IS 'When postcode type is L then address_key relates to organisation_key - good work RM!'; + END IF; + + RAISE NOTICE '%: Done importing welsh alternative mainfile (imported % records)', clock_timestamp(), v_processed; + + -- 10) Alias file + TRUNCATE TABLE data_stage; + RAISE NOTICE '%: Begin staging alias file', clock_timestamp(); + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_alias || 'aliasfle.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%ALIASFLEY' || in_edition || '%' OR data like v_std_footer; + + RAISE NOTICE '%: Done staging alias file, importing', clock_timestamp(); + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_counties') THEN + RAISE NOTICE 'Table "public"."pc_paf_counties" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_counties"'; + CREATE TABLE public.pc_paf_counties ( + county_key integer NOT NULL, + county_name varchar(30), + county_type varchar(1) + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_counties; + INSERT INTO public.pc_paf_counties + SELECT substring(data,2,4)::integer AS county_key, + trim(substring(data,6,30)) AS county_name, + trim(substring(data,36,1)) AS county_type + FROM data_stage + WHERE substring(data,1,1)::integer = 4; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_counties ADD PRIMARY KEY (county_key), + ADD CONSTRAINT pc_paf_counties_unique UNIQUE (county_name, county_type); + COMMENT ON COLUMN public.pc_paf_counties.county_type IS 'T (Traditional County), P (Former Postal County) or A (Administrative County)'; + END IF; + + RAISE NOTICE '%: Done importing counties (imported % records)', clock_timestamp(), v_processed; + + IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables WHERE schemaname = 'public' AND tablename = 'pc_paf_county_alias') THEN + RAISE NOTICE 'Table "public"."pc_paf_county_alias" already exists'; + v_table_created := false; + ELSE + RAISE NOTICE 'Creating table "public"."pc_paf_county_alias"'; + CREATE TABLE public.pc_paf_county_alias ( + county_alias_key serial NOT NULL, + postcode varchar(7) NOT NULL, + former_postal_county integer, + traditional_county integer, + administrative_county integer + ); + v_table_created := true; + END IF; + + TRUNCATE TABLE public.pc_paf_county_alias; + INSERT INTO public.pc_paf_county_alias (postcode, former_postal_county, traditional_county, administrative_county) + SELECT trim(substring(data,2,7)) AS postcode, + nullif(substring(data,9,4)::integer,0) AS former_postal_county, + nullif(substring(data,13,4)::integer,0) AS traditional_county, + nullif(substring(data,17,4)::integer,0) AS administrative_county + FROM data_stage + WHERE substring(data,1,1)::integer = 5; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + + IF (v_table_created) THEN + ALTER TABLE public.pc_paf_county_alias ADD PRIMARY KEY (county_alias_key), + ADD CONSTRAINT pc_paf_county_alias_unique UNIQUE (postcode), + ADD FOREIGN KEY (former_postal_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (traditional_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE, + ADD FOREIGN KEY (administrative_county) REFERENCES public.pc_paf_counties(county_key) DEFERRABLE INITIALLY IMMEDIATE; + END IF; + + RAISE NOTICE '%: Done importing county_alias (imported % records)', clock_timestamp(), v_processed; + + RAISE NOTICE '%: Completed', clock_timestamp(); + RETURN true; +END; + +$BODY$ +LANGUAGE 'plpgsql' VOLATILE; + +-- + +DROP FUNCTION IF EXISTS public.update_pc_paf(varchar, varchar); +CREATE OR REPLACE FUNCTION public.update_pc_paf(in_edition varchar, in_data varchar) +RETURNS boolean AS +$BODY$ + +DECLARE + v_data_root varchar; + v_data_update varchar; + v_processed integer; + v_main_footer varchar; + v_std_footer varchar; + v_sql varchar; + v_table_created boolean; +BEGIN + v_data_root := in_data || '/' || in_edition || '_CHANGES/'; + v_data_update := v_data_root || 'CONSOLIDATED CHANGES/'; + + v_main_footer = ' %'; + + RAISE NOTICE '%: Import starting for edition % with data root %', clock_timestamp(), in_edition, v_data_root; + CREATE TEMPORARY TABLE data_stage (data text) ON COMMIT DROP; + + RAISE NOTICE '%: Begin staging Changes1 file', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'changes1.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%CHANGES1' || in_edition || '%' OR data like v_main_footer; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Done staging Changes1 file of % records',clock_timestamp(), v_processed; + + SET CONSTRAINTS ALL DEFERRED; + + --changes1.c01 - Changes1 (Changes to satelite tables except Organisations) + --Record Type 1 - Localities + RAISE NOTICE '%: Preparing to update localities', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_localities ON COMMIT DROP AS + SELECT substring(data,2,8)::integer AS locality_key, + to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,24,1)),'') AS amendment_type, + nullif(trim(substring(data,70,30)),'') AS post_town, + nullif(trim(substring(data,100,35)),'') AS dependent_locality, + nullif(trim(substring(data,135,35)),'') AS double_dependent_locality + FROM data_stage + WHERE substring(data,1,1)::integer = 1 + AND nullif(trim(substring(data,24,1)),'') IS NOT NULL + ORDER BY to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on localities',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_localities l + USING tmp_localities lt WHERE lt.locality_key = l.locality_key + AND lt.amendment_type IN ('D', 'B') + AND l.* = (lt.locality_key, lt.post_town::character varying(30), lt.dependent_locality::character varying(30), lt.double_dependent_locality::character varying(30)); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from localities',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_localities + SELECT lt.locality_key, lt.post_town, lt.dependent_locality, lt.double_dependent_locality + FROM tmp_localities lt + WHERE lt.amendment_type IN ('I', 'C'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in localities',clock_timestamp(), v_processed; + + --Record Type 2 - Thoroughfares + RAISE NOTICE '%: Preparing to update thoroughfares', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_thoroughfares ON COMMIT DROP AS + SELECT substring(data,2,8)::integer AS thoroughfare_key, + to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,24,1)),'') AS amendment_type, + nullif(trim(substring(data,25,60)),'') AS thoroughfare_name + FROM data_stage + WHERE substring(data,1,1)::integer = 2 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on thoroughfares',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_thoroughfares t + USING tmp_thoroughfares tt WHERE tt.thoroughfare_key = t.thoroughfare_key + AND tt.amendment_type IN ('D', 'B') + AND t.* = (tt.thoroughfare_key, t.thoroughfare_name::character varying(60)); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from thoroughfares',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_thoroughfares + SELECT tt.thoroughfare_key, tt.thoroughfare_name + FROM tmp_thoroughfares tt + WHERE tt.amendment_type IN ('I', 'C'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in thoroughfares',clock_timestamp(), v_processed; + + --Record Type 3 - Thoroughfare Descriptors + RAISE NOTICE '%: Preparing to update thoroughfare_descriptor', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_thoroughfare_descriptor ON COMMIT DROP AS + SELECT substring(data,2,8)::integer AS thoroughfare_descriptor_key , + to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,24,1)),'') AS amendment_type, + nullif(trim(substring(data,25,20)),'') AS thoroughfare_descriptor, + nullif(trim(substring(data,45,6)),'') AS approved_abbreviation + FROM data_stage + WHERE substring(data,1,1)::integer = 3 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on thoroughfare_descriptor',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_thoroughfare_descriptor td + USING tmp_thoroughfare_descriptor tdt WHERE tdt.thoroughfare_descriptor_key = td.thoroughfare_descriptor_key + AND tdt.amendment_type IN ('D', 'B') + AND td.* = (tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor::character varying(20), tdt.approved_abbreviation::character varying(6)); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from thoroughfare_descriptor',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_thoroughfare_descriptor + SELECT tdt.thoroughfare_descriptor_key , tdt.thoroughfare_descriptor, tdt.approved_abbreviation + FROM tmp_thoroughfare_descriptor tdt + WHERE tdt.amendment_type IN ('I', 'C'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in thoroughfare_descriptor',clock_timestamp(), v_processed; + + --Record Type 4 - Building Names + RAISE NOTICE '%: Preparing to update building_names', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_building_names ON COMMIT DROP AS + SELECT substring(data,2,8)::integer AS building_name_key, + to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,24,1)),'') AS amendment_type, + nullif(trim(substring(data,25,50)),'') AS building_name + FROM data_stage + WHERE substring(data,1,1)::integer = 4 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on building_names',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_building_names bn + USING tmp_building_names tbn WHERE bn.building_name_key = tbn.building_name_key + AND tbn.amendment_type IN ('D', 'B') + AND bn.* = (tbn.building_name_key, tbn.building_name::character varying(50)); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from building_names',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_building_names + SELECT tbn.building_name_key, tbn.building_name + FROM tmp_building_names tbn + WHERE tbn.amendment_type IN ('I', 'C'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in building_names',clock_timestamp(), v_processed; + + --Record Type 5 - Sub Building Names + RAISE NOTICE '%: Preparing to update sub_building_names', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_sub_building_names ON COMMIT DROP AS + SELECT substring(data,2,8)::integer AS sub_building_name_key, + to_timestamp(nullif(substring(data,10,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,24,1)),'') AS amendment_type, + nullif(trim(substring(data,25,30)),'') AS sub_building_name + FROM data_stage + WHERE substring(data,1,1)::integer = 5 AND nullif(trim(substring(data,24,1)),'') IS NOT NULL; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on sub_building_names',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_sub_building_names sbn + USING tmp_sub_building_names tsbn WHERE sbn.sub_building_name_key = tsbn.sub_building_name_key + AND tsbn.amendment_type IN ('D', 'B') + AND sbn.* = (tsbn.sub_building_name_key, tsbn.sub_building_name::character varying(50)); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from sub_building_names',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_sub_building_names + SELECT tsbn.sub_building_name_key, tsbn.sub_building_name + FROM tmp_sub_building_names tsbn + WHERE tsbn.amendment_type IN ('I', 'C'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in sub_building_names',clock_timestamp(), v_processed; + + -- fpchngs2.c01 - Changes2 (Changes to Mainfile and Organisations) + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging Changes2 file', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'fpchngs2.c01'); + EXECUTE v_sql; + + DELETE FROM data_stage WHERE data like '%CHANGES2' || in_edition || '%' OR data like v_main_footer; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Done staging Changes2 file of % records',clock_timestamp(), v_processed; + + -- Mainfile + RAISE NOTICE '%: Preparing to update mainfile', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_mainfile ON COMMIT DROP AS + SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,15,4)),'') AS outward_code, + nullif(trim(substring(data,19,3)),'') AS inward_code, + nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix, + nullif(trim(substring(data,24,1)),'') AS postcode_type, + nullif(substring(data,25,8)::integer,0) AS address_key, + nullif(substring(data,33,8)::integer,0) AS organisation_key, + nullif(trim(substring(data,41,1)),'') AS amendment_type, + nullif(substring(data,42,1)::integer,0) AS record_type, + nullif(substring(data,43,8)::integer,0) AS locality_key, + nullif(substring(data,51,8)::integer,0) AS thoroughfare_key, + nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key, + nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key, + nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key, + nullif(substring(data,83,4)::integer,0) AS building_number, + nullif(substring(data,87,8)::integer,0) AS building_name_key, + nullif(substring(data,95,8)::integer,0) AS sub_building_name_key, + nullif(substring(data,103,4)::integer,0) AS number_of_households, + nullif(trim(substring(data,107,1)),'') AS concatenation_indicator, + nullif(trim(substring(data,108,6)),'') AS po_box_number, + nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator, + CASE nullif(substring(data,115,2)::integer,0) + WHEN 1 THEN 'New' + WHEN 2 THEN 'Correction' + WHEN 3 THEN '' + WHEN 4 THEN 'Coding Revision' + WHEN 5 THEN 'Organisation Change' + WHEN 6 THEN 'Status Change' + WHEN 7 THEN 'Large User Deleted' + WHEN 8 THEN 'Building/Sub Building Change' + WHEN 9 THEN 'Large User Change' + END AS reason_for_amendment, + nullif(trim(substring(data,117,4)),'') AS new_outward_code, + nullif(trim(substring(data,121,3)),'') AS new_inward_code + FROM data_stage + WHERE nullif(substring(data,42,1)::integer,0) = 1; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on mainfile',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_mainfile m + USING tmp_mainfile tm WHERE tm.address_key = m.address_key AND tm.organisation_key = m.organisation_key AND tm.postcode_type = m.postcode_type + AND tm.amendment_type IN ('D', 'B'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from mainfile',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_mainfile (outward_code, inward_code, address_key, locality_key, thoroughfare_key, + thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key, + building_number, building_name_key, sub_building_name_key, number_of_households, + organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix, + small_user_organisation_indicator, po_box_number) + SELECT tm.outward_code, tm.inward_code, tm.address_key, tm.locality_key, tm.thoroughfare_key, + tm.thoroughfare_descriptor_key, tm.dependent_thoroughfare_key, tm.dependent_thoroughfare_descriptor_key, + tm.building_number, tm.building_name_key, tm.sub_building_name_key, tm.number_of_households, + tm.organisation_key, tm.postcode_type, tm.concatenation_indicator, tm.delivery_point_suffix, + tm.small_user_organisation_indicator, tm.po_box_number + FROM ( + SELECT cume_dist() OVER w, * + FROM tmp_mainfile + WHERE amendment_type IN ('I', 'C') + WINDOW w AS (PARTITION BY address_key, organisation_key, postcode_type ORDER BY timestamp) + ) tm WHERE tm.cume_dist = 1 ; + + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in mainfile',clock_timestamp(), v_processed; + + -- Organisations + RAISE NOTICE '%: Preparing to update organisations', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_organisations ON COMMIT DROP AS + SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,15,4)),'') AS outward_code, + nullif(trim(substring(data,19,3)),'') AS inward_code, + nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix, + nullif(trim(substring(data,24,1)),'') AS postcode_type, + nullif(substring(data,25,8)::integer,0) AS address_key, + nullif(substring(data,33,8)::integer,0) AS organisation_key, + nullif(trim(substring(data,41,1)),'') AS amendment_type, + nullif(substring(data,42,1)::integer,0) AS record_type, + nullif(trim(substring(data,43,60)),'') AS organisation_name, + nullif(trim(substring(data,103,60)),'') AS department_name, + nullif(trim(substring(data,163,6)),'') AS po_box_number + FROM data_stage + WHERE nullif(substring(data,42,1)::integer,0) IN (2,3); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on organisations',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_organisations o + USING tmp_organisations tx WHERE COALESCE(tx.organisation_key, tx.address_key) = o.organisation_key AND tx.postcode_type = o.postcode_type + AND tx.amendment_type IN ('D', 'B'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from organisations',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_organisations + SELECT COALESCE(tx.organisation_key, tx.address_key), tx.postcode_type, tx.organisation_name, tx.department_name + FROM ( + SELECT cume_dist() OVER w, * + FROM tmp_organisations + WHERE amendment_type IN ('I', 'C') + WINDOW w AS (PARTITION BY COALESCE(organisation_key, address_key), postcode_type ORDER BY timestamp) + ) tx WHERE tx.cume_dist = 1 ; + + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in organisations',clock_timestamp(), v_processed; + + -- wchanges.c01 -- Welsh changes + TRUNCATE TABLE data_stage; + + RAISE NOTICE '%: Begin staging WChanges2 file', clock_timestamp(); + + v_sql := 'COPY data_stage FROM ' || quote_literal(v_data_update || 'wchanges.c01'); + EXECUTE v_sql; + DELETE FROM data_stage WHERE data like '%WCHANGES' || in_edition || '%' OR data like v_main_footer; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Done staging WChanges2 file of % records',clock_timestamp(), v_processed; + + RAISE NOTICE '%: Preparing to update welsh mainfile', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_mainfile_welsh ON COMMIT DROP AS + SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,15,4)),'') AS outward_code, + nullif(trim(substring(data,19,3)),'') AS inward_code, + nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix, + nullif(trim(substring(data,24,1)),'') AS postcode_type, + nullif(substring(data,25,8)::integer,0) AS address_key, + nullif(substring(data,33,8)::integer,0) AS organisation_key, + nullif(trim(substring(data,41,1)),'') AS amendment_type, + nullif(substring(data,42,1)::integer,0) AS record_type, + nullif(substring(data,43,8)::integer,0) AS locality_key, + nullif(substring(data,51,8)::integer,0) AS thoroughfare_key, + nullif(substring(data,59,8)::integer,0) AS thoroughfare_descriptor_key, + nullif(substring(data,67,8)::integer,0) AS dependent_thoroughfare_key, + nullif(substring(data,75,8)::integer,0) AS dependent_thoroughfare_descriptor_key, + nullif(substring(data,83,4)::integer,0) AS building_number, + nullif(substring(data,87,8)::integer,0) AS building_name_key, + nullif(substring(data,95,8)::integer,0) AS sub_building_name_key, + nullif(substring(data,103,4)::integer,0) AS number_of_households, + nullif(trim(substring(data,107,1)),'') AS concatenation_indicator, + nullif(trim(substring(data,108,6)),'') AS po_box_number, + nullif(trim(substring(data,114,1)),'') AS small_user_organisation_indicator + FROM data_stage + WHERE nullif(substring(data,42,1)::integer,0) = 1; + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on welsh mainfile',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_mainfile_welsh mw + USING tmp_mainfile_welsh twm WHERE twm.address_key = mw.address_key AND twm.organisation_key = mw.organisation_key AND twm.postcode_type = mw.postcode_type + AND twm.amendment_type IN ('D', 'B'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from welsh mainfile',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_mainfile_welsh (outward_code, inward_code, address_key, locality_key, thoroughfare_key, + thoroughfare_descriptor_key, dependent_thoroughfare_key, dependent_thoroughfare_descriptor_key, + building_number, building_name_key, sub_building_name_key, number_of_households, + organisation_key, postcode_type, concatenation_indicator, delivery_point_suffix, + small_user_organisation_indicator, po_box_number) + SELECT twm.outward_code, twm.inward_code, twm.address_key, twm.locality_key, twm.thoroughfare_key, + twm.thoroughfare_descriptor_key, twm.dependent_thoroughfare_key, twm.dependent_thoroughfare_descriptor_key, + twm.building_number, twm.building_name_key, twm.sub_building_name_key, twm.number_of_households, + twm.organisation_key, twm.postcode_type, twm.concatenation_indicator, twm.delivery_point_suffix, + twm.small_user_organisation_indicator, twm.po_box_number + FROM tmp_mainfile_welsh twm + WHERE twm.amendment_type IN ('I', 'C'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in welsh mainfile',clock_timestamp(), v_processed; + + -- Organisations + RAISE NOTICE '%: Preparing to update welsh organisations', clock_timestamp(); + + CREATE TEMPORARY TABLE tmp_organisations_welsh ON COMMIT DROP AS + SELECT to_timestamp(nullif(substring(data,1,14),'00000000000000'), 'YYYYMMDDHH24MISS') AS timestamp, + nullif(trim(substring(data,15,4)),'') AS outward_code, + nullif(trim(substring(data,19,3)),'') AS inward_code, + nullif(trim(substring(data,22,2)),'') AS delivery_point_suffix, + nullif(trim(substring(data,24,1)),'') AS postcode_type, + nullif(substring(data,25,8)::integer,0) AS address_key, + nullif(substring(data,33,8)::integer,0) AS organisation_key, + nullif(trim(substring(data,41,1)),'') AS amendment_type, + nullif(substring(data,42,1)::integer,0) AS record_type, + nullif(trim(substring(data,43,60)),'') AS organisation_name, + nullif(trim(substring(data,103,60)),'') AS department_name, + nullif(trim(substring(data,163,6)),'') AS po_box_number + FROM data_stage + WHERE nullif(substring(data,42,1)::integer,0) IN (2,3); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Prepared % records for update on welsh organisations',clock_timestamp(), v_processed; + + DELETE FROM public.pc_paf_organisations o + USING tmp_organisations_welsh txw WHERE COALESCE(txw.organisation_key, txw.address_key) = o.organisation_key AND txw.postcode_type = o.postcode_type + AND txw.amendment_type IN ('D', 'B'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Removed % records from welsh organisations',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_organisations + SELECT COALESCE(txw.organisation_key, txw.address_key), txw.postcode_type, txw.organisation_name, txw.department_name + FROM tmp_organisations_welsh txw + WHERE txw.amendment_type IN ('I', 'C'); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Created % records in welsh organisations',clock_timestamp(), v_processed; + + -- So far I haven't seen a changes file for postzon data, without which updates will most likely fail on + -- the fk for pc_paf_postzon_100m. I've also not seen an alias changes file. + + -- If I did have a postzon changes file, it'd most likely be best to import the data directly and use postgis to do our conversion + -- as the data is selected out of the data_stage table (rather than using the convert_paf.pl script). We could get those fields with + -- something like: + -- CASE WHEN substring(northing,1,1) ~ '[\x4f-\x5a]' THEN '1' || translate(northing, 'POUTZY', '221100') ELSE northing END || '0' AS f_notrhing, + -- ST_x(ST_transform(ST_GeomFromText('POINT('||easting||' '||f_northing||')',CASE WHEN postcode LIKE 'BT%' THEN 29903 ELSE 27700 END),4326)) AS longitude, + -- ST_y(ST_transform(ST_GeomFromText('POINT('||easting||' '||f_northing||')',CASE WHEN postcode LIKE 'BT%' THEN 29903 ELSE 27700 END),4326)) AS latitude; + + RAISE NOTICE '%: Fudging postzon', clock_timestamp(); + + INSERT INTO public.pc_paf_postzon_100m (outward_code, inward_code) + SELECT DISTINCT m.outward_code, m.inward_code + FROM tmp_mainfile m + WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_postzon_100m p WHERE p.outward_code = m.outward_code AND p.inward_code = m.inward_code); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Fudged % mainfile records in postzon',clock_timestamp(), v_processed; + + INSERT INTO public.pc_paf_postzon_100m (outward_code, inward_code) + SELECT DISTINCT mw.outward_code, mw.inward_code + FROM tmp_mainfile_welsh mw + WHERE NOT EXISTS (SELECT 1 FROM public.pc_paf_postzon_100m p WHERE p.outward_code = mw.outward_code AND p.inward_code = mw.inward_code); + + GET DIAGNOSTICS v_processed = ROW_COUNT; + RAISE NOTICE '%: Fudged % welsh mainfile records in postzon',clock_timestamp(), v_processed; + + RAISE NOTICE '%: Completed', clock_timestamp(); + + RETURN true; +END; + +$BODY$ +LANGUAGE 'plpgsql' VOLATILE; + +-- Standard master view for postcode tables +CREATE OR REPLACE VIEW public.pc_paf_master_view AS +SELECT m.building_number, + b.building_name, + sb.sub_building_name, + COALESCE(o.organisation_name, lo.organisation_name) AS organisation_name, + COALESCE(o.department_name, lo.department_name) AS department_name, + m.po_box_number, + t.thoroughfare_name, + td.thoroughfare_descriptor, + td.approved_abbreviation, + l.post_town, + l.dependent_locality, + l.double_dependent_locality, + apc.county_name, + m.outward_code, + m.inward_code, + m.number_of_households, + m.postcode_type, + m.concatenation_indicator, + m.delivery_point_suffix, + m.small_user_organisation_indicator, + EXISTS(SELECT 1 FROM public.pc_paf_mainfile_welsh mw WHERE mw.address_key = m.address_key AND mw.organisation_key IS NOT DISTINCT FROM m.organisation_key AND mw.postcode_type IS NOT DISTINCT FROM m.postcode_type) AS welsh_alternative_available, + p.longitude, + p.latitude +FROM public.pc_paf_mainfile m +LEFT OUTER JOIN public.pc_paf_localities l USING (locality_key) +LEFT OUTER JOIN public.pc_paf_thoroughfares t USING (thoroughfare_key) +LEFT OUTER JOIN public.pc_paf_thoroughfare_descriptor td USING (thoroughfare_descriptor_key) +LEFT OUTER JOIN public.pc_paf_building_names b USING (building_name_key) +LEFT OUTER JOIN public.pc_paf_sub_building_names sb USING (sub_building_name_key) +LEFT OUTER JOIN public.pc_paf_organisations o USING (organisation_key, postcode_type) +LEFT OUTER JOIN public.pc_paf_organisations lo ON m.address_key = lo.organisation_key AND m.postcode_type = lo.postcode_type +LEFT OUTER JOIN public.pc_paf_postzon_100m p USING (outward_code, inward_code) +LEFT OUTER JOIN public.pc_paf_county_alias a ON a.postcode = (rpad(m.outward_code,4) || m.inward_code) +LEFT OUTER JOIN public.pc_paf_counties apc ON apc.county_key = a.former_postal_county; + +-- Welsh master view for postcode tables +CREATE OR REPLACE VIEW public.pc_paf_master_welsh_view AS +SELECT mw.building_number, + b.building_name, + sb.sub_building_name, + COALESCE(o.organisation_name, lo.organisation_name) AS organisation_name, + COALESCE(o.department_name, lo.department_name) AS department_name, + mw.po_box_number, + t.thoroughfare_name, + td.thoroughfare_descriptor, + td.approved_abbreviation, + l.post_town, + l.dependent_locality, + l.double_dependent_locality, + apc.county_name, + mw.outward_code, + mw.inward_code, + mw.number_of_households, + mw.postcode_type, + mw.concatenation_indicator, + mw.delivery_point_suffix, + mw.small_user_organisation_indicator, + p.longitude, + p.latitude +FROM public.pc_paf_mainfile_welsh mw +LEFT OUTER JOIN public.pc_paf_localities l USING (locality_key) +LEFT OUTER JOIN public.pc_paf_thoroughfares t USING (thoroughfare_key) +LEFT OUTER JOIN public.pc_paf_thoroughfare_descriptor td USING (thoroughfare_descriptor_key) +LEFT OUTER JOIN public.pc_paf_building_names b USING (building_name_key) +LEFT OUTER JOIN public.pc_paf_sub_building_names sb USING (sub_building_name_key) +LEFT OUTER JOIN public.pc_paf_organisations o USING (organisation_key, postcode_type) +LEFT OUTER JOIN public.pc_paf_organisations lo ON mw.address_key = lo.organisation_key AND mw.postcode_type = lo.postcode_type +LEFT OUTER JOIN public.pc_paf_postzon_100m p USING (outward_code, inward_code) +LEFT OUTER JOIN public.pc_paf_county_alias a ON a.postcode = (rpad(mw.outward_code,4) || mw.inward_code) +LEFT OUTER JOIN public.pc_paf_counties apc ON apc.county_key = a.former_postal_county; \ No newline at end of file -- 2.39.5