はじめに
前回、C#
で2つのテーブル間の差分をチェックするバッチ処理を作成したが、今度はPL/pgSQL
のストアドプロシージャで同じ処理を実装してみる。
環境
PostgreSQL 17
GitHub
今回作成したプロシージャのソースコードは以下のリポジトリで公開している。
https://github.com/katsuobushiFPGA/postgresql-procedure-dev
- 作成したテーブルとプロシージャ
プロシージャの動作について
このストアドプロシージャを実行すると、以下のような動作をする。
- トランザクションテーブル(
t_data
)と最新データテーブル(t_datalatest
)を比較 - 差分がないレコードのみをトランザクションテーブルから削除
- 差分があるレコードは最新データテーブルを更新(UPSERT)
- 新規レコードは最新データテーブルに挿入
- 処理結果をログ出力
例えば、以下のような流れになる。
-- 実行前
SELECT 'トランザクションテーブル' as table_name, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル', COUNT(*) FROM t_datalatest;
※件数確認
-- プロシージャ実行
CALL sync_t_data_to_t_datalatest_upsert_correct();
NOTICE: 差分チェック処理を開始します...
NOTICE: 処理件数: 3件
NOTICE: UPSERT総処理件数: 2件
-- 実行後
SELECT 'トランザクションテーブル' as table_name, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル', COUNT(*) FROM t_datalatest;
※件数確認
処理フローとシーケンスについて
処理フロー図
シーケンス図
テーブル構造
前提条件
-
PostgreSQL
データベースが準備されていること
テーブル作成SQL
-- テーブルA(トランザクションテーブル)
-- 随時データが入れ替わるテーブル
DROP TABLE IF EXISTS t_data CASCADE;
CREATE TABLE IF NOT EXISTS t_data (
pkey_1 VARCHAR(50) NOT NULL,
pkey_2 VARCHAR(50) NOT NULL,
pkey_3 VARCHAR(50) NOT NULL,
pkey_4 VARCHAR(50) NOT NULL,
column_1 TEXT,
column_2 TEXT,
column_3 TEXT,
column_4 TEXT,
column_5 TEXT,
column_6 TEXT,
column_7 TEXT,
column_8 TEXT,
column_9 TEXT,
column_10 TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT pk_t_data PRIMARY KEY (pkey_1, pkey_2, pkey_3, pkey_4)
);
-- テーブルB(最新状態保持テーブル)
-- 最新の状態を保持するテーブル
DROP TABLE IF EXISTS t_datalatest CASCADE;
CREATE TABLE IF NOT EXISTS t_datalatest (
pkey_1 VARCHAR(50) NOT NULL,
pkey_2 VARCHAR(50) NOT NULL,
pkey_3 VARCHAR(50) NOT NULL,
pkey_4 VARCHAR(50) NOT NULL,
column_1 TEXT,
column_2 TEXT,
column_3 TEXT,
column_4 TEXT,
column_5 TEXT,
column_6 TEXT,
column_7 TEXT,
column_8 TEXT,
column_9 TEXT,
column_10 TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT pk_t_datalatest PRIMARY KEY (pkey_1, pkey_2, pkey_3, pkey_4)
);
PL/pgSQLプロシージャの実装
差分検出・同期プロシージャ
-- テーブルAとテーブルBの差分更新プロシージャ
-- 要件:
-- ・AとBを比較して差分がないレコードのみをAから削除
-- ・差分があるレコードはBを更新(Aからは削除しない)
-- ・BにないレコードはBに挿入(Aからは削除しない)
CREATE OR REPLACE PROCEDURE sync_t_data_to_t_datalatest_upsert_correct()
LANGUAGE plpgsql
AS
$$
DECLARE
total_count INTEGER;
deleted_count INTEGER := 0;
upsert_count INTEGER := 0;
BEGIN
RAISE NOTICE '差分チェック処理を開始します...';
-- テーブルAのレコード数を取得
SELECT COUNT(*) INTO total_count FROM t_data;
-- 差分がない場合のみテーブルAからの削除
DELETE FROM t_data
USING t_datalatest
WHERE t_data.pkey_1 = t_datalatest.pkey_1
AND t_data.pkey_2 = t_datalatest.pkey_2
AND t_data.pkey_3 = t_datalatest.pkey_3
AND t_data.pkey_4 = t_datalatest.pkey_4
AND t_data.column_1 IS NOT DISTINCT FROM t_datalatest.column_1
AND t_data.column_2 IS NOT DISTINCT FROM t_datalatest.column_2
AND t_data.column_3 IS NOT DISTINCT FROM t_datalatest.column_3
AND t_data.column_4 IS NOT DISTINCT FROM t_datalatest.column_4
AND t_data.column_5 IS NOT DISTINCT FROM t_datalatest.column_5
AND t_data.column_6 IS NOT DISTINCT FROM t_datalatest.column_6
AND t_data.column_7 IS NOT DISTINCT FROM t_datalatest.column_7
AND t_data.column_8 IS NOT DISTINCT FROM t_datalatest.column_8
AND t_data.column_9 IS NOT DISTINCT FROM t_datalatest.column_9
AND t_data.column_10 IS NOT DISTINCT FROM t_datalatest.column_10;
GET DIAGNOSTICS deleted_count = ROW_COUNT;
-- UPSERT (INSERT ... ON CONFLICT DO UPDATE) で一括処理
-- 差分がある場合のみ更新、新規の場合は挿入
INSERT INTO t_datalatest (
pkey_1, pkey_2, pkey_3, pkey_4,
column_1, column_2, column_3, column_4, column_5,
column_6, column_7, column_8, column_9, column_10
)
SELECT
a.pkey_1, a.pkey_2, a.pkey_3, a.pkey_4,
a.column_1, a.column_2, a.column_3, a.column_4, a.column_5,
a.column_6, a.column_7, a.column_8, a.column_9, a.column_10
FROM t_data a
ON CONFLICT (pkey_1, pkey_2, pkey_3, pkey_4)
DO UPDATE SET
column_1 = EXCLUDED.column_1,
column_2 = EXCLUDED.column_2,
column_3 = EXCLUDED.column_3,
column_4 = EXCLUDED.column_4,
column_5 = EXCLUDED.column_5,
column_6 = EXCLUDED.column_6,
column_7 = EXCLUDED.column_7,
column_8 = EXCLUDED.column_8,
column_9 = EXCLUDED.column_9,
column_10 = EXCLUDED.column_10,
updated_at = CURRENT_TIMESTAMP
WHERE (
t_datalatest.column_1 IS DISTINCT FROM EXCLUDED.column_1 OR
t_datalatest.column_2 IS DISTINCT FROM EXCLUDED.column_2 OR
t_datalatest.column_3 IS DISTINCT FROM EXCLUDED.column_3 OR
t_datalatest.column_4 IS DISTINCT FROM EXCLUDED.column_4 OR
t_datalatest.column_5 IS DISTINCT FROM EXCLUDED.column_5 OR
t_datalatest.column_6 IS DISTINCT FROM EXCLUDED.column_6 OR
t_datalatest.column_7 IS DISTINCT FROM EXCLUDED.column_7 OR
t_datalatest.column_8 IS DISTINCT FROM EXCLUDED.column_8 OR
t_datalatest.column_9 IS DISTINCT FROM EXCLUDED.column_9 OR
t_datalatest.column_10 IS DISTINCT FROM EXCLUDED.column_10
);
GET DIAGNOSTICS upsert_count = ROW_COUNT;
-- 処理結果のサマリーを出力
RAISE NOTICE '処理件数: %件', total_count;
RAISE NOTICE 'UPSERT総処理件数: %件', upsert_count;
RAISE NOTICE 'Aから削除(差分なし): %件', deleted_count;
EXCEPTION
WHEN OTHERS THEN
RAISE EXCEPTION 'UPSERT版同期処理中にエラーが発生しました: %', SQLERRM;
END;
$$;
プロシージャの特徴
処理について
- 差分なしレコードの削除 トランザクションテーブルから完全一致レコードを削除
- UPSERT処理 残ったレコードを最新テーブルに挿入/更新
- 処理結果の出力 各段階の処理件数をログ出力
重要なSQL構文
構文 | 役割 |
---|---|
IS NOT DISTINCT FROM | NULL値の比較が可能な構文 |
INSERT ... ON CONFLICT | UPSERT構文 |
GET DIAGNOSTICS | 直前のSQL文で処理されたレコード数を取得 |
RAISE NOTICE | 処理状況をクライアントに通知 |
実行SQLの詳細解説
1. 差分なしレコードの削除SQL
DELETE FROM t_data
USING t_datalatest
WHERE t_data.pkey_1 = t_datalatest.pkey_1
AND t_data.pkey_2 = t_datalatest.pkey_2
-- 主キー4つすべて一致
AND t_data.column_1 IS NOT DISTINCT FROM t_datalatest.column_1
-- データカラム10個すべて一致(NULL値も考慮)
DELETE ... USING
構文で2つのテーブルを結合して条件に合うレコードを削除IS NOT DISTINCT FROM
により、NULL
とNULL
も「一致」として扱う- 主キー4つ + データカラム10個 = 計14項目すべてが一致する場合のみ削除
2. UPSERT処理のSQL
INSERT INTO t_datalatest (...)
SELECT ... FROM t_data a
ON CONFLICT (pkey_1, pkey_2, pkey_3, pkey_4)
DO UPDATE SET column_1 = EXCLUDED.column_1, ...
WHERE (
t_datalatest.column_1 IS DISTINCT FROM EXCLUDED.column_1 OR
t_datalatest.column_2 IS DISTINCT FROM EXCLUDED.column_2 OR
-- データ差分がある場合のみ更新実行
);
INSERT
で新規データの挿入を試行- 主キー重複時は
ON CONFLICT
で更新処理に移行 EXCLUDED
は挿入しようとしたデータを参照WHERE
句で各カラムを比較し、実際に差分がある場合のみ更新を実行
差分検出ロジック
処理 | 条件 | 動作 |
---|---|---|
削除 | 主キー+データ用のカラムが完全一致 | トランザクション(t_data )から該当レコードを削除 |
更新 | 主キー一致 + データ差分あり | 最新テーブル(t_datalatest )を更新 |
挿入 | 主キーが最新テーブルに存在しない | 最新テーブル(t_datalatest )に新規挿入 |
テストと実行結果
サンプルデータの準備
-- サンプルデータの挿入(テスト用)
INSERT INTO t_data (pkey_1, pkey_2, pkey_3, pkey_4, column_1, column_2, column_3, column_4, column_5, column_6, column_7, column_8, column_9, column_10)
VALUES
('A001', 'B001', 'C001', 'D001', 'データ1-1', 'データ1-2', 'データ1-3', 'データ1-4', 'データ1-5', 'データ1-6', 'データ1-7', 'データ1-8', 'データ1-9', 'データ1-10'),
('A002', 'B002', 'C002', 'D002', 'データ2-1', 'データ2-2', 'データ2-3', 'データ2-4', 'データ2-5', 'データ2-6', 'データ2-7', 'データ2-8', 'データ2-9', 'データ2-10'),
('A003', 'B003', 'C003', 'D003', 'データ3-1', 'データ3-2', 'データ3-3', 'データ3-4', 'データ3-5', 'データ3-6', 'データ3-7', 'データ3-8', 'データ3-9', 'データ3-10')
ON CONFLICT DO NOTHING;
INSERT INTO t_datalatest (pkey_1, pkey_2, pkey_3, pkey_4, column_1, column_2, column_3, column_4, column_5, column_6, column_7, column_8, column_9, column_10)
VALUES
('A001', 'B001', 'C001', 'D001', 'データ1-1', 'データ1-2', 'データ1-3', 'データ1-4', 'データ1-5', 'データ1-6', 'データ1-7', 'データ1-8', 'データ1-9', 'データ1-10'),
('A002', 'B002', 'C002', 'D002', '古いデータ2-1', '古いデータ2-2', '古いデータ2-3', '古いデータ2-4', '古いデータ2-5', '古いデータ2-6', '古いデータ2-7', '古いデータ2-8', '古いデータ2-9', '古いデータ2-10')
ON CONFLICT DO NOTHING;
実行例
実行前のテーブル状態:
-- 実行前確認
SELECT 'トランザクションテーブル件数' as label, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル件数', COUNT(*) FROM t_datalatest;
label | count |
---|---|
トランザクションテーブル件数 | 3 |
最新データテーブル件数 | 2 |
プロシージャ実行:
CALL sync_t_data_to_t_datalatest_upsert_correct();
実行ログ:
NOTICE: 差分チェック処理を開始します...
NOTICE: 処理件数: 3件
NOTICE: UPSERT総処理件数: 2件
NOTICE: Aから削除(差分なし): 1件
CALL
実行後のテーブル状態:
-- 実行後確認
SELECT 'トランザクションテーブル件数' as label, COUNT(*) as count FROM t_data
UNION ALL
SELECT '最新データテーブル件数', COUNT(*) FROM t_datalatest;
label | count |
---|---|
トランザクションテーブル件数 | 2 |
最新データテーブル件数 | 3 |
処理結果の詳細
- A001 - 完全一致のため、
t_data
から削除 - A002 - データ差分ありのため、
t_datalatest
を更新 - A003 - 新規データのため、
t_datalatest
に挿入
参考
PostgreSQL
Documentation - PL/pgSQL
https://www.postgresql.org/docs/current/plpgsql.htmlPostgreSQL
Documentation - INSERT ON CONFLICT
https://www.postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICTPostgreSQL
Documentation - IS DISTINCT FROM
https://www.postgresql.org/docs/current/functions-comparison.htmlGitHub Repository
- postgresql-procedure-dev
https://github.com/katsuobushiFPGA/postgresql-procedure-devSQLでNULLとの比較結果をTRUE/FALSEにしたいときはIS DISTINCT FROM演算子を使う | Qiita
https://qiita.com/gooddoog/items/ad8c20b1734d5811bf78
おわりに
今回は、PL/pgSQL
でテーブル間の差分検出・同期処理を実装してみた。
C#
の場合はエンティティなどを作成する必要があるが、プロシージャの場合はSQL
を組み合わせれば実装できるため比較的楽に感じる。
ただし、プロシージャのデバッグが大変なので、デバッグや単体テストについて良い方法がないかを今後調べてみる予定である。